consul_loader.py 2.16 KB
Newer Older
Weizhi Cui's avatar
Weizhi Cui committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/10/28 12:16
# @File        : consul_loader.py
# @Description :
"""

import os
from typing import NamedTuple, Union

import requests
from dynaconf.base import LazySettings
from dynaconf.utils.parse_conf import parse_conf_data

from application.lib import ConsulConfig

IDENTIFIER = "consul_loader"


class SourceMetadata(NamedTuple):
    loader: str
    identifier: str
    env: str
    merged: bool = False


def load(
    obj: LazySettings,
    env: str = None,
    silent: bool = True,
    key: str = None,
    validate=False,
) -> Union[bool, None]:
    consul_host = os.environ.get('CONSUL_HOST', 'localhost')
    consul_port = os.environ.get('CONSUL_PORT', 8500)
    consul_dc = os.environ.get('CONSUL_DC', 'dc1')
    consul_token = os.getenv('CONSUL_TOKEN')
    consul_key = os.getenv('CONSUL_KEY')

    # 没有对应环境变量,会进入下一个加载器
    if consul_key is None:
        return

    # 实例 ConsulConfig
    client = ConsulConfig(host=consul_host, port=consul_port, token=consul_token, dc=consul_dc)
    # 捕获异常
    try:
        data = client.get(key=consul_key)
    except requests.exceptions.ConnectionError:
        print(2)
        # 连接错误后,则会进入下一个加载器
        return
    except Exception as e:
        # 发生未知错误,才会抛出异常
        raise RuntimeError(f'Unknown error: {e}')

    # 基于 key 获取所需配置
    if key is not None:
        data = data[key]

    if env is None:
        return

    try:
        # 获取 consul 注册中心的配置信息
        result = {
            key: parse_conf_data(value, tomlfy=True, box_settings=obj)
            for key, value in data.items()
        }
    except Exception as e:
        if silent:
            return False
        raise e
    else:
        result['Consul'] = True
        # 将 result 配置写入 dynaconf 内置配置中
        obj.update(
            result,
            loader_identifier=IDENTIFIER,
            validate=validate,
        )