#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/10/28 12:19 # @File : elasticsearch.py # @Description : """ from typing import Any from elasticsearch import Elasticsearch from flask import current_app, Flask from loguru import logger from application.common import ConfigHelper class FlaskElasticsearch: def __init__(self, app=None): if app is not None: self.init_app(app) def init_app(self, app: Flask) -> None: """Initialize the app""" if hasattr(app, 'teardown_appcontext'): app.teardown_appcontext(self.teardown) else: app.teardown_request(self.teardown) @staticmethod def teardown(exception): """Clears the Elasticsearch connection after each request.""" ctx = current_app._get_current_object() if hasattr(ctx, 'elasticsearch'): ctx.elasticsearch = None if exception is not None: raise RuntimeError(exception) def __getattr__(self, item: Any) -> Any: """Lazy initialization of Elasticsearch connection on first use.""" ctx = current_app._get_current_object() if ctx is not None: if not hasattr(ctx, 'elasticsearch'): cfg = self._get_config() ctx.elasticsearch = Elasticsearch(**cfg) if ctx.elasticsearch.ping(): logger.info('Connected to Elasticsearch') else: logger.error('Connected to Elasticsearch') raise ConnectionError('Connected to Elasticsearch') return getattr(ctx.elasticsearch, item) @staticmethod def _get_config(): """Retrieves Elasticsearch configuration from the current Flask application context.""" with current_app.app_context(): if current_app: config_helper = ConfigHelper(current_app) cfg = config_helper.get_config('Elasticsearch') if cfg is None: raise KeyError('Key Elasticsearch is not defined') host = cfg.Host or 'localhost' port = int(cfg.Port) or 9200 user = cfg.User or None password = cfg.Password or None use_ssl = cfg.UseSsl == 'True' or False verify_certs = cfg.VerifyCerts == 'False' ca_certs = cfg.CaCerts or None options = dict( hosts=[{'host': host, 'port': port}], http_auth=None if not user else (user, password), use_ssl=use_ssl, verify_certs=verify_certs, ca_certs=ca_certs ) return options else: logger.error('Attempted to access application configuration outside of application context.') raise RuntimeError('Attempted to access application configuration outside of application context.')