#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/11/21 09:10 # @File : consul.py # @Description : """ import atexit import consul from flask import Flask from application.libs.helper import ConfigHelper from application.libs.helper import EnvVarHelper class FlaskConsulService: def __init__(self, app=None): self.app = app self.client = None self.service_id = None if app is not None: self.init_app(app) def init_app(self, app: Flask): env_vars, server_cfg = self.set_server_cfg(app) self.register_service(env_vars, server_cfg) atexit.register(self.deregister_service) @staticmethod def set_server_cfg(app: Flask) -> tuple: # 实例化对象 eh = EnvVarHelper() # 从环境变量获取 consul 参数 env_vars = eh.consul_vars # 获取 consul key _ = env_vars.pop('key', None) # Initialize ConfigHelper ch = ConfigHelper(app) # Fetch the logging configuration from the app's config cfg = ch.Server return env_vars, cfg def register_service(self, env_vars: dict, cfg: ConfigHelper): service_name = cfg.get('Name', 'elasticsearch-log-parse') service_address = cfg.get('IP', 'localhost') service_port = cfg.get('Port', '5000') check_interval = cfg.get('Interval', '30s') timeout = cfg.get('Timeout', '30s') deregister_after = cfg.get('Deregister', '60s') if self.service_id is None: self.service_id = f'{service_name}-{service_address}-{service_port}' try: check = consul.Check().tcp(host=service_address, port=service_port, interval=check_interval, timeout=timeout, deregister=deregister_after) if self.client is None: self.client = consul.Consul(**env_vars) self.client.agent.service.register( name=service_name, service_id=self.service_id, address=service_address, port=service_port, check=check ) except Exception as e: print(e) def deregister_service(self): self.client.agent.service.deregister(self.service_id)