#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/10/28 12:16 # @File : consul_loader.py # @Description : """ import os from typing import NamedTuple, Union import requests from dynaconf.base import LazySettings from dynaconf.utils.parse_conf import parse_conf_data from application.lib import ConsulConfig IDENTIFIER = "consul_loader" def get_env_vars() -> dict: return { 'host': os.environ.get('CONSUL_HOST', 'localhost'), 'port': os.environ.get('CONSUL_PORT', 8500), 'dc': os.environ.get('CONSUL_DC', 'dc1'), 'token': os.getenv('CONSUL_TOKEN'), } def parse_config(data: dict, key: str, obj: LazySettings) -> dict: if key is not None: data = data[key] return { key: parse_conf_data(value, tomlfy=True, box_settings=obj) for key, value in data.items() } def load( obj: LazySettings, env: str = None, silent: bool = True, key: str = None, validate=False, ) -> Union[bool, None]: env_vars = get_env_vars() consul_key = os.getenv('CONSUL_KEY') if consul_key is None: return client = ConsulConfig(**env_vars) try: data = client.get(key=consul_key) except requests.exceptions.ConnectionError: return except Exception as e: raise RuntimeError(f'Unknown error: {e}') if env is None: return try: result = parse_config(data, key, obj) except Exception as e: if silent: return False raise e result['Consul'] = True obj.update( result, loader_identifier=IDENTIFIER, validate=validate, )