#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/11/21 09:10 # @File : consul.py # @Description : This module provides an interface for registering and deregistering Flask services with Consul. """ import atexit import consul from flask import Flask from loguru import logger from application.libs import ServerError from application.libs.helper import ConfigHelper from application.libs.helper import EnvVarHelper class FlaskConsulService: """Flask service class for Consul registration and deregistration.""" def __init__(self, app=None): """Initialize with optional Flask application.""" self.app = app self.client = None self.service_id = None if app is not None: self.init_app(app) def init_app(self, app: Flask) -> None: """Initialize service with Flask app, register it with Consul and setup deregistration at exit.""" env_vars, server_cfg = self.set_server_cfg(app) self.register_service(env_vars, server_cfg) atexit.register(self.deregister_service) @staticmethod def set_server_cfg(app: Flask) -> tuple: """Return environment variables and server configuration.""" # 实例化对象 eh = EnvVarHelper() # 从环境变量获取 consul 参数 env_vars = eh.consul_vars # 获取 consul key _ = env_vars.pop('key', None) # Initialize ConfigHelper ch = ConfigHelper(app) # Fetch the logging configuration from the app's config cfg = ch.Server return env_vars, cfg def register_service(self, env_vars: dict, cfg: ConfigHelper): """Register the service with Consul using the given environment variables and server configuration.""" service_name = cfg.get('Name', 'elasticsearch-log-parse') service_address = cfg.get('IP', 'localhost') service_port = cfg.get('Port', '5000') check_interval = cfg.get('Interval', '30s') timeout = cfg.get('Timeout', '30s') deregister_after = cfg.get('Deregister', '60s') if self.service_id is None: self.service_id = f'{service_name}-{service_address}-{service_port}' try: check = consul.Check().tcp(host=service_address, port=service_port, interval=check_interval, timeout=timeout, deregister=deregister_after) if self.client is None: self.client = consul.Consul(**env_vars) self.client.agent.service.register( name=service_name, service_id=self.service_id, address=service_address, port=service_port, check=check ) except Exception as e: logger.error(f'Failed to register service, error: {e}') ServerError('Failed to register service') def deregister_service(self): """Deregister the service from Consul.""" self.client.agent.service.deregister(self.service_id)