elasticsearch.py 4.28 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/10/28 12:19
# @File        : elasticsearch.py
# @Description :
"""

崔为之's avatar
崔为之 committed
13
import time
崔为之's avatar
崔为之 committed
14 15 16
from typing import Any

from elasticsearch import Elasticsearch
崔为之's avatar
崔为之 committed
17 18
from flask import Flask
from flask_elasticsearch import FlaskElasticsearch as BaseFlaskElasticsearch
崔为之's avatar
崔为之 committed
19 20
from loguru import logger

崔为之's avatar
崔为之 committed
21 22
from application.common import ConfigHelper

崔为之's avatar
崔为之 committed
23

崔为之's avatar
崔为之 committed
24 25 26 27 28 29 30
# Find the stack on which we want to store the database connection.
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
# before that we need to use the _request_ctx_stack.
try:
    from flask import _app_ctx_stack as stack_context
except ImportError:
    from flask import _request_ctx_stack as stack_context
崔为之's avatar
崔为之 committed
31

崔为之's avatar
崔为之 committed
32 33 34 35 36 37

class FlaskElasticsearch(BaseFlaskElasticsearch):
    def __init__(self, app=None, **kwargs):
        """
        Initializes a FlaskElasticsearch instance.
        :param app: Flask app instance for Elasticsearch initialization.
崔为之's avatar
崔为之 committed
38
        """
崔为之's avatar
崔为之 committed
39 40
        self.extra_options = None
        super().__init__(app, **kwargs)
崔为之's avatar
崔为之 committed
41

崔为之's avatar
崔为之 committed
42
    def init_app(self, app: Flask, **kwargs) -> None:
崔为之's avatar
崔为之 committed
43 44 45 46 47
        """
        Initialize the app with Elasticsearch connection.

        :param app: Flask application instance for Elasticsearch initialization.
        """
崔为之's avatar
崔为之 committed
48 49 50 51 52 53

        self.extra_options = kwargs

        # Use the new_style teardown_appcontext if it's available,
        # otherwise fall back to the request context
        if hasattr(app, "teardown_appcontext"):
崔为之's avatar
崔为之 committed
54 55 56 57 58
            app.teardown_appcontext(self.teardown)
        else:
            app.teardown_request(self.teardown)

    def __getattr__(self, item: Any) -> Any:
崔为之's avatar
崔为之 committed
59 60 61 62 63 64
        """
        Lazy initialization of Elasticsearch connection on first use.

        :param item: Attribute to get from the Elasticsearch connection.
        :return: Value of the requested attribute.
        """
崔为之's avatar
崔为之 committed
65
        ctx = stack_context.top
崔为之's avatar
崔为之 committed
66
        if ctx is not None:
崔为之's avatar
崔为之 committed
67
            if not hasattr(ctx, 'elasticsearch'):
崔为之's avatar
崔为之 committed
68 69
                cfg = self._get_config(ctx.app)
                ctx.elasticsearch = Elasticsearch(**cfg, **self.extra_options)
崔为之's avatar
崔为之 committed
70 71 72 73 74 75 76 77
                # Retry connection on failure
                for i in range(5):  # Retry up to 5 times
                    if ctx.elasticsearch.ping():
                        logger.info('Connected to Elasticsearch')
                        break
                    else:
                        logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...')
                        time.sleep(2 ** i)  # Exponential backoff
崔为之's avatar
崔为之 committed
78
                else:
崔为之's avatar
崔为之 committed
79 80
                    logger.error('Can not connect to Elasticsearch after 5 attempts')
                    raise ConnectionError('Can not connect to Elasticsearch after 5 attempts')
崔为之's avatar
崔为之 committed
81 82 83
            return getattr(ctx.elasticsearch, item)

    @staticmethod
崔为之's avatar
崔为之 committed
84
    def get_es_config(app: Flask) -> dict:
崔为之's avatar
崔为之 committed
85
        """Get Elasticsearch configuration from the current Flask application context."""
崔为之's avatar
崔为之 committed
86 87
        config_helper = ConfigHelper(app)
        return config_helper.Elasticsearch
崔为之's avatar
崔为之 committed
88 89 90 91

    @staticmethod
    def build_es_options(cfg) -> dict:
        """
崔为之's avatar
崔为之 committed
92
        Builds Elasticsearch connection options from the configuration.
崔为之's avatar
崔为之 committed
93 94

        :param cfg: Elasticsearch's configuration from the Flask application context.
崔为之's avatar
崔为之 committed
95
        :return: Elasticsearch's connection options dictionary.
崔为之's avatar
崔为之 committed
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
        """
        host = cfg.get('Host', 'localhost')
        port = int(cfg.get('Port', 9200))
        user = cfg.get('User')
        password = cfg.get('Password')
        use_ssl = cfg.get('UseSsl', 'False') == 'True'
        verify_certs = cfg.get('VerifyCerts', 'False') == 'True'
        ca_certs = cfg.get('CaCerts')

        options = {
            'hosts': [{'host': host, 'port': port}],
            'http_auth': (user, password) if user and password else None,
            'use_ssl': use_ssl,
            'verify_certs': verify_certs,
            'ca_certs': ca_certs
        }

        return options

崔为之's avatar
崔为之 committed
115 116 117 118 119 120 121
    def _get_config(self, app: Flask) -> dict:
        """
        Combines getting Elasticsearch configuration and building options.
        :param app: Flask app instance.
        :return: Elasticsearch's connection options dictionary.
        """
        cfg = self.get_es_config(app)
崔为之's avatar
崔为之 committed
122
        return self.build_es_options(cfg)