#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/10/28 12:16 # @File : consul_loader.py # @Description : """ from typing import Union import requests from dynaconf.base import LazySettings from dynaconf.utils.parse_conf import parse_conf_data from application.libs import ConsulConfig from application.libs.helper import EnvVarHelper IDENTIFIER = "consul_loader" def parse_config(data: dict, key: str, obj: LazySettings) -> dict: """ Parses the configuration data. :param data: The configuration data. :param key: The key to be parsed in the configuration. :param obj: The LazySettings object. :return: A dictionary with parsed configuration data. """ if key is not None: data = data[key] return { key: parse_conf_data(value, tomlfy=True, box_settings=obj) for key, value in data.items() } def load( obj: LazySettings, env: str = None, silent: bool = True, key: str = None, validate=False, ) -> Union[bool, None]: """ Loads the configuration data. :param obj: The LazySettings object. :param env: The environment. :param silent: If True, suppress exceptions. :param key: The key to be loaded in the configuration. :param validate: If True, validate the configuration. :return: Boolean indicating success, or None in case of failure. """ # 实例化对象 env_helper = EnvVarHelper() # 从环境变量获取 consul 参数 env_vars = env_helper.consul_vars # 获取 consul key consul_key = env_vars.pop('key', None) # If the key is None or empty, return None if not consul_key: return client = ConsulConfig(**env_vars) try: data = client.get(key=consul_key) except requests.exceptions.ConnectionError: return except Exception as e: # Include original exception in the traceback raise RuntimeError(f'Unknown error: {e}') from e if env is None: return try: result = parse_config(data, key, obj) except Exception as e: if silent: return False raise e result['Consul'] = True obj.update( result, loader_identifier=IDENTIFIER, validate=validate, )