#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/10/28 12:19 # @File : elasticsearch.py # @Description : """ import time from typing import Any from elasticsearch import Elasticsearch from flask import Flask from flask import _app_ctx_stack as stack_context from flask_elasticsearch import FlaskElasticsearch as BaseFlaskElasticsearch from loguru import logger from application.common import ConfigHelper class FlaskElasticsearch(BaseFlaskElasticsearch): def __init__(self, app=None, **kwargs): """ Initializes a FlaskElasticsearch instance. :param app: Flask app instance for Elasticsearch initialization. """ self.extra_options = None super().__init__(app, **kwargs) def init_app(self, app: Flask, **kwargs) -> None: """ Initialize the app with Elasticsearch connection. :param app: Flask application instance for Elasticsearch initialization. """ self.extra_options = kwargs # Use the new_style teardown_appcontext if it's available, # otherwise fall back to the request context if hasattr(app, "teardown_appcontext"): app.teardown_appcontext(self.teardown) else: app.teardown_request(self.teardown) def __getattr__(self, item: Any) -> Any: """ Lazy initialization of Elasticsearch connection on first use. :param item: Attribute to get from the Elasticsearch connection. :return: Value of the requested attribute. """ ctx = stack_context.top if ctx is not None: if not hasattr(ctx, 'elasticsearch'): cfg = self._get_config(ctx.app) ctx.elasticsearch = Elasticsearch(**cfg, **self.extra_options) # Retry connection on failure for i in range(5): # Retry up to 5 times if ctx.elasticsearch.ping(): logger.info('Connected to Elasticsearch') break else: logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...') time.sleep(2 ** i) # Exponential backoff else: logger.error('Can not connect to Elasticsearch after 5 attempts') raise ConnectionError('Can not connect to Elasticsearch after 5 attempts') return getattr(ctx.elasticsearch, item) @staticmethod def get_es_config(app: Flask) -> dict: """Get Elasticsearch configuration from the current Flask application context.""" config_helper = ConfigHelper(app) return config_helper.Elasticsearch @staticmethod def build_es_options(cfg) -> dict: """ Builds Elasticsearch connection options from the configuration. :param cfg: Elasticsearch's configuration from the Flask application context. :return: Elasticsearch's connection options dictionary. """ host = cfg.get('Host', 'localhost') port = int(cfg.get('Port', 9200)) user = cfg.get('User') password = cfg.get('Password') use_ssl = cfg.get('UseSsl', 'False') == 'True' verify_certs = cfg.get('VerifyCerts', 'False') == 'True' ca_certs = cfg.get('CaCerts') options = { 'hosts': [{'host': host, 'port': port}], 'http_auth': (user, password) if user and password else None, 'use_ssl': use_ssl, 'verify_certs': verify_certs, 'ca_certs': ca_certs } return options def _get_config(self, app: Flask) -> dict: """ Combines getting Elasticsearch configuration and building options. :param app: Flask app instance. :return: Elasticsearch's connection options dictionary. """ cfg = self.get_es_config(app) return self.build_es_options(cfg)