consul_loader.py 1.76 KB
Newer Older
Weizhi Cui's avatar
Weizhi Cui committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/10/28 12:16
# @File        : consul_loader.py
# @Description :
"""

import os
from typing import NamedTuple, Union

import requests
from dynaconf.base import LazySettings
from dynaconf.utils.parse_conf import parse_conf_data

from application.lib import ConsulConfig

IDENTIFIER = "consul_loader"


崔为之's avatar
崔为之 committed
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
def get_env_vars() -> dict:
    return {
        'host': os.environ.get('CONSUL_HOST', 'localhost'),
        'port': os.environ.get('CONSUL_PORT', 8500),
        'dc': os.environ.get('CONSUL_DC', 'dc1'),
        'token': os.getenv('CONSUL_TOKEN'),
    }


def parse_config(data: dict, key: str, obj: LazySettings) -> dict:
    if key is not None:
        data = data[key]

    return {
        key: parse_conf_data(value, tomlfy=True, box_settings=obj)
        for key, value in data.items()
    }
Weizhi Cui's avatar
Weizhi Cui committed
42 43 44 45 46 47 48 49 50


def load(
    obj: LazySettings,
    env: str = None,
    silent: bool = True,
    key: str = None,
    validate=False,
) -> Union[bool, None]:
崔为之's avatar
崔为之 committed
51
    env_vars = get_env_vars()
Weizhi Cui's avatar
Weizhi Cui committed
52 53 54 55 56
    consul_key = os.getenv('CONSUL_KEY')

    if consul_key is None:
        return

崔为之's avatar
崔为之 committed
57 58
    client = ConsulConfig(**env_vars)

Weizhi Cui's avatar
Weizhi Cui committed
59 60 61 62 63 64 65 66 67 68 69
    try:
        data = client.get(key=consul_key)
    except requests.exceptions.ConnectionError:
        return
    except Exception as e:
        raise RuntimeError(f'Unknown error: {e}')

    if env is None:
        return

    try:
崔为之's avatar
崔为之 committed
70
        result = parse_config(data, key, obj)
Weizhi Cui's avatar
Weizhi Cui committed
71 72 73 74
    except Exception as e:
        if silent:
            return False
        raise e
崔为之's avatar
崔为之 committed
75 76 77 78 79 80 81

    result['Consul'] = True
    obj.update(
        result,
        loader_identifier=IDENTIFIER,
        validate=validate,
    )