#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/10/28 12:19 # @File : elasticsearch.py # @Description : """ import time from typing import Any from elasticsearch import Elasticsearch from flask import current_app, Flask from loguru import logger from application.common import ConfigHelper class FlaskElasticsearch: def __init__(self, app=None): """ Initialize a FlaskElasticsearch instance. :param app: Flask application instance for Elasticsearch initialization. """ if app is not None: self.init_app(app) def init_app(self, app: Flask) -> None: """ Initialize the app with Elasticsearch connection. :param app: Flask application instance for Elasticsearch initialization. """ if hasattr(app, 'teardown_appcontext'): app.teardown_appcontext(self.teardown) else: app.teardown_request(self.teardown) @staticmethod def teardown(exception): """ Clear the Elasticsearch connection after each request. :param exception: Exception instance, if any. """ ctx = current_app._get_current_object() if hasattr(ctx, 'elasticsearch'): ctx.elasticsearch = None if exception is not None: raise RuntimeError(exception) def __getattr__(self, item: Any) -> Any: """ Lazy initialization of Elasticsearch connection on first use. :param item: Attribute to get from the Elasticsearch connection. :return: Value of the requested attribute. """ ctx = current_app._get_current_object() if ctx is not None: if not hasattr(ctx, 'elasticsearch'): cfg = self._get_config() ctx.elasticsearch = Elasticsearch(**cfg) # Retry connection on failure for i in range(5): # Retry up to 5 times if ctx.elasticsearch.ping(): logger.info('Connected to Elasticsearch') break else: logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...') time.sleep(2 ** i) # Exponential backoff else: logger.error('Can not connect to Elasticsearch after 5 attempts') raise ConnectionError('Can not connect to Elasticsearch after 5 attempts') return getattr(ctx.elasticsearch, item) @staticmethod def get_es_config() -> dict: """Get Elasticsearch configuration from the current Flask application context.""" with current_app.app_context(): if current_app: config_helper = ConfigHelper(current_app) return config_helper.Elasticsearch else: logger.error('Attempted to access application configuration outside of application context.') raise RuntimeError('Attempted to access application configuration outside of application context.') @staticmethod def build_es_options(cfg) -> dict: """ Build Elasticsearch connection options from the configuration. :param cfg: Elasticsearch's configuration from the Flask application context. """ host = cfg.get('Host', 'localhost') port = int(cfg.get('Port', 9200)) user = cfg.get('User') password = cfg.get('Password') use_ssl = cfg.get('UseSsl', 'False') == 'True' verify_certs = cfg.get('VerifyCerts', 'False') == 'True' ca_certs = cfg.get('CaCerts') options = { 'hosts': [{'host': host, 'port': port}], 'http_auth': (user, password) if user and password else None, 'use_ssl': use_ssl, 'verify_certs': verify_certs, 'ca_certs': ca_certs } return options def _get_config(self) -> dict: """Combines getting Elasticsearch configuration and building options.""" cfg = self.get_es_config() return self.build_es_options(cfg)