consul.py 2.41 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/11/21 09:10
# @File        : consul.py
# @Description :
"""


import atexit
import consul
from flask import current_app, Flask

from application.libs.helper import ConfigHelper
from application.libs.helper import EnvVarHelper


class FlaskConsulService:

    def __init__(self, app=None):
        self.app = app
        self.client = None
        self.service_id = None
        if app is not None:
            self.init_app(app)

    def init_app(self, app: Flask):
        env_vars, server_cfg = self.set_server_cfg(app)
        self.register_service(env_vars, server_cfg)
        atexit.register(self.deregister_service)

    @staticmethod
    def set_server_cfg(app: Flask) -> tuple:
        # 实例化对象
        eh = EnvVarHelper()
        # 从环境变量获取 consul 参数
        env_vars = eh.consul_vars
        # 获取 consul key
        _ = env_vars.pop('key', None)

        # Initialize ConfigHelper
        ch = ConfigHelper(app)
        # Fetch the logging configuration from the app's config
        cfg = ch.Server

        return env_vars, cfg

    def register_service(self, env_vars: dict, cfg: ConfigHelper):
        service_name = cfg.get('Name', 'elasticsearch-log-parse')
        service_address = cfg.get('IP', 'localhost')
        service_port = cfg.get('Port', '5000')
        check_interval = cfg.get('Interval', '30s')
        timeout = cfg.get('Timeout', '30s')
        deregister_after = cfg.get('Deregister', '60s')
        if self.service_id is None:
            self.service_id = f'{service_name}-{service_address}-{service_port}'
        try:
            check = consul.Check().tcp(host=service_address, port=service_port, interval=check_interval, timeout=timeout,
                                       deregister=deregister_after)
            if self.client is None:
                self.client = consul.Consul(**env_vars)
            self.client.agent.service.register(
                name=service_name,
                service_id=self.service_id,
                address=service_address,
                port=service_port,
                check=check
            )
        except Exception as e:
            print(e)

    def deregister_service(self):
        self.client.agent.service.deregister(self.service_id)