#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/10/28 12:16 # @File : consul_loader.py # @Description : """ import os from typing import NamedTuple, Union import requests from dynaconf.base import LazySettings from dynaconf.utils.parse_conf import parse_conf_data from application.lib import ConsulConfig IDENTIFIER = "consul_loader" class SourceMetadata(NamedTuple): loader: str identifier: str env: str merged: bool = False def load( obj: LazySettings, env: str = None, silent: bool = True, key: str = None, validate=False, ) -> Union[bool, None]: consul_host = os.environ.get('CONSUL_HOST', 'localhost') consul_port = os.environ.get('CONSUL_PORT', 8500) consul_dc = os.environ.get('CONSUL_DC', 'dc1') consul_token = os.getenv('CONSUL_TOKEN') consul_key = os.getenv('CONSUL_KEY') # 没有对应环境变量,会进入下一个加载器 if consul_key is None: return # 实例 ConsulConfig client = ConsulConfig(host=consul_host, port=consul_port, token=consul_token, dc=consul_dc) # 捕获异常 try: data = client.get(key=consul_key) except requests.exceptions.ConnectionError: print(2) # 连接错误后,则会进入下一个加载器 return except Exception as e: # 发生未知错误,才会抛出异常 raise RuntimeError(f'Unknown error: {e}') # 基于 key 获取所需配置 if key is not None: data = data[key] if env is None: return try: # 获取 consul 注册中心的配置信息 result = { key: parse_conf_data(value, tomlfy=True, box_settings=obj) for key, value in data.items() } except Exception as e: if silent: return False raise e else: result['Consul'] = True # 将 result 配置写入 dynaconf 内置配置中 obj.update( result, loader_identifier=IDENTIFIER, validate=validate, )