elasticsearch.py 3.95 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/10/28 12:19
# @File        : elasticsearch.py
# @Description :
"""

崔为之's avatar
崔为之 committed
13
import time
崔为之's avatar
崔为之 committed
14 15 16
from typing import Any

from elasticsearch import Elasticsearch
崔为之's avatar
崔为之 committed
17
from flask import Flask
崔为之's avatar
崔为之 committed
18
from flask import _app_ctx_stack as stack_context
崔为之's avatar
崔为之 committed
19
from flask_elasticsearch import FlaskElasticsearch as BaseFlaskElasticsearch
崔为之's avatar
崔为之 committed
20 21
from loguru import logger

崔为之's avatar
崔为之 committed
22
from application.libs.helper import ConfigHelper
崔为之's avatar
崔为之 committed
23

崔为之's avatar
崔为之 committed
24

崔为之's avatar
崔为之 committed
25 26 27 28 29
class FlaskElasticsearch(BaseFlaskElasticsearch):
    def __init__(self, app=None, **kwargs):
        """
        Initializes a FlaskElasticsearch instance.
        :param app: Flask app instance for Elasticsearch initialization.
崔为之's avatar
崔为之 committed
30
        """
崔为之's avatar
崔为之 committed
31 32
        self.extra_options = None
        super().__init__(app, **kwargs)
崔为之's avatar
崔为之 committed
33

崔为之's avatar
崔为之 committed
34
    def init_app(self, app: Flask, **kwargs) -> None:
崔为之's avatar
崔为之 committed
35 36 37 38 39
        """
        Initialize the app with Elasticsearch connection.

        :param app: Flask application instance for Elasticsearch initialization.
        """
崔为之's avatar
崔为之 committed
40 41 42 43 44

        self.extra_options = kwargs

        # Use the new_style teardown_appcontext if it's available,
        # otherwise fall back to the request context
崔为之's avatar
崔为之 committed
45
        if hasattr(app, 'teardown_appcontext'):
崔为之's avatar
崔为之 committed
46 47 48 49 50
            app.teardown_appcontext(self.teardown)
        else:
            app.teardown_request(self.teardown)

    def __getattr__(self, item: Any) -> Any:
崔为之's avatar
崔为之 committed
51 52 53 54 55 56
        """
        Lazy initialization of Elasticsearch connection on first use.

        :param item: Attribute to get from the Elasticsearch connection.
        :return: Value of the requested attribute.
        """
崔为之's avatar
崔为之 committed
57
        ctx = stack_context.top
崔为之's avatar
崔为之 committed
58
        if ctx is not None:
崔为之's avatar
崔为之 committed
59
            if not hasattr(ctx, 'elasticsearch'):
崔为之's avatar
崔为之 committed
60 61
                cfg = self._get_config(ctx.app)
                ctx.elasticsearch = Elasticsearch(**cfg, **self.extra_options)
崔为之's avatar
崔为之 committed
62 63 64 65 66 67 68
                # Retry connection on failure
                for i in range(5):  # Retry up to 5 times
                    if ctx.elasticsearch.ping():
                        break
                    else:
                        logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...')
                        time.sleep(2 ** i)  # Exponential backoff
崔为之's avatar
崔为之 committed
69
                else:
崔为之's avatar
崔为之 committed
70 71
                    logger.error('Can not connect to Elasticsearch after 5 attempts')
                    raise ConnectionError('Can not connect to Elasticsearch after 5 attempts')
崔为之's avatar
崔为之 committed
72 73 74
            return getattr(ctx.elasticsearch, item)

    @staticmethod
崔为之's avatar
崔为之 committed
75
    def get_es_config(app: Flask) -> dict:
崔为之's avatar
崔为之 committed
76
        """Get Elasticsearch configuration from the current Flask application context."""
崔为之's avatar
崔为之 committed
77 78
        config_helper = ConfigHelper(app)
        return config_helper.Elasticsearch
崔为之's avatar
崔为之 committed
79 80 81 82

    @staticmethod
    def build_es_options(cfg) -> dict:
        """
崔为之's avatar
崔为之 committed
83
        Builds Elasticsearch connection options from the configuration.
崔为之's avatar
崔为之 committed
84 85

        :param cfg: Elasticsearch's configuration from the Flask application context.
崔为之's avatar
崔为之 committed
86
        :return: Elasticsearch's connection options dictionary.
崔为之's avatar
崔为之 committed
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
        """
        host = cfg.get('Host', 'localhost')
        port = int(cfg.get('Port', 9200))
        user = cfg.get('User')
        password = cfg.get('Password')
        use_ssl = cfg.get('UseSsl', 'False') == 'True'
        verify_certs = cfg.get('VerifyCerts', 'False') == 'True'
        ca_certs = cfg.get('CaCerts')

        options = {
            'hosts': [{'host': host, 'port': port}],
            'http_auth': (user, password) if user and password else None,
            'use_ssl': use_ssl,
            'verify_certs': verify_certs,
            'ca_certs': ca_certs
        }

        return options

崔为之's avatar
崔为之 committed
106 107 108 109 110 111 112
    def _get_config(self, app: Flask) -> dict:
        """
        Combines getting Elasticsearch configuration and building options.
        :param app: Flask app instance.
        :return: Elasticsearch's connection options dictionary.
        """
        cfg = self.get_es_config(app)
崔为之's avatar
崔为之 committed
113
        return self.build_es_options(cfg)