elasticsearch.py 4.19 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/10/28 12:19
# @File        : elasticsearch.py
# @Description :
"""

崔为之's avatar
崔为之 committed
13
import time
崔为之's avatar
崔为之 committed
14 15 16 17 18 19
from typing import Any

from elasticsearch import Elasticsearch
from flask import current_app, Flask
from loguru import logger

崔为之's avatar
崔为之 committed
20 21
from application.common import ConfigHelper

崔为之's avatar
崔为之 committed
22 23 24

class FlaskElasticsearch:
    def __init__(self, app=None):
崔为之's avatar
崔为之 committed
25 26 27 28 29
        """
        Initialize a FlaskElasticsearch instance.

        :param app: Flask application instance for Elasticsearch initialization.
        """
崔为之's avatar
崔为之 committed
30 31 32 33
        if app is not None:
            self.init_app(app)

    def init_app(self, app: Flask) -> None:
崔为之's avatar
崔为之 committed
34 35 36 37 38
        """
        Initialize the app with Elasticsearch connection.

        :param app: Flask application instance for Elasticsearch initialization.
        """
崔为之's avatar
崔为之 committed
39 40 41 42 43 44 45
        if hasattr(app, 'teardown_appcontext'):
            app.teardown_appcontext(self.teardown)
        else:
            app.teardown_request(self.teardown)

    @staticmethod
    def teardown(exception):
崔为之's avatar
崔为之 committed
46 47 48 49 50
        """
        Clear the Elasticsearch connection after each request.

        :param exception: Exception instance, if any.
        """
崔为之's avatar
崔为之 committed
51 52 53 54 55 56 57
        ctx = current_app._get_current_object()
        if hasattr(ctx, 'elasticsearch'):
            ctx.elasticsearch = None
        if exception is not None:
            raise RuntimeError(exception)

    def __getattr__(self, item: Any) -> Any:
崔为之's avatar
崔为之 committed
58 59 60 61 62 63
        """
        Lazy initialization of Elasticsearch connection on first use.

        :param item: Attribute to get from the Elasticsearch connection.
        :return: Value of the requested attribute.
        """
崔为之's avatar
崔为之 committed
64 65
        ctx = current_app._get_current_object()
        if ctx is not None:
崔为之's avatar
崔为之 committed
66
            if not hasattr(ctx, 'elasticsearch'):
崔为之's avatar
崔为之 committed
67 68
                cfg = self._get_config()
                ctx.elasticsearch = Elasticsearch(**cfg)
崔为之's avatar
崔为之 committed
69 70 71 72 73 74 75 76
                # Retry connection on failure
                for i in range(5):  # Retry up to 5 times
                    if ctx.elasticsearch.ping():
                        logger.info('Connected to Elasticsearch')
                        break
                    else:
                        logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...')
                        time.sleep(2 ** i)  # Exponential backoff
崔为之's avatar
崔为之 committed
77
                else:
崔为之's avatar
崔为之 committed
78 79
                    logger.error('Can not connect to Elasticsearch after 5 attempts')
                    raise ConnectionError('Can not connect to Elasticsearch after 5 attempts')
崔为之's avatar
崔为之 committed
80 81 82
            return getattr(ctx.elasticsearch, item)

    @staticmethod
崔为之's avatar
崔为之 committed
83 84
    def get_es_config() -> dict:
        """Get Elasticsearch configuration from the current Flask application context."""
崔为之's avatar
崔为之 committed
85 86
        with current_app.app_context():
            if current_app:
崔为之's avatar
崔为之 committed
87
                config_helper = ConfigHelper(current_app)
崔为之's avatar
崔为之 committed
88
                return config_helper.Elasticsearch
崔为之's avatar
崔为之 committed
89 90 91
            else:
                logger.error('Attempted to access application configuration outside of application context.')
                raise RuntimeError('Attempted to access application configuration outside of application context.')
崔为之's avatar
崔为之 committed
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121

    @staticmethod
    def build_es_options(cfg) -> dict:
        """
        Build Elasticsearch connection options from the configuration.

        :param cfg: Elasticsearch's configuration from the Flask application context.
        """
        host = cfg.get('Host', 'localhost')
        port = int(cfg.get('Port', 9200))
        user = cfg.get('User')
        password = cfg.get('Password')
        use_ssl = cfg.get('UseSsl', 'False') == 'True'
        verify_certs = cfg.get('VerifyCerts', 'False') == 'True'
        ca_certs = cfg.get('CaCerts')

        options = {
            'hosts': [{'host': host, 'port': port}],
            'http_auth': (user, password) if user and password else None,
            'use_ssl': use_ssl,
            'verify_certs': verify_certs,
            'ca_certs': ca_certs
        }

        return options

    def _get_config(self) -> dict:
        """Combines getting Elasticsearch configuration and building options."""
        cfg = self.get_es_config()
        return self.build_es_options(cfg)