elasticsearch.py 3.81 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/10/28 12:19
# @File        : elasticsearch.py
# @Description :
"""

from typing import Any

from elasticsearch import Elasticsearch
from flask import current_app, Flask
from loguru import logger

崔为之's avatar
崔为之 committed
19 20
from application.common import ConfigHelper

崔为之's avatar
崔为之 committed
21 22 23

class FlaskElasticsearch:
    def __init__(self, app=None):
崔为之's avatar
崔为之 committed
24 25 26 27 28
        """
        Initialize a FlaskElasticsearch instance.

        :param app: Flask application instance for Elasticsearch initialization.
        """
崔为之's avatar
崔为之 committed
29 30 31 32
        if app is not None:
            self.init_app(app)

    def init_app(self, app: Flask) -> None:
崔为之's avatar
崔为之 committed
33 34 35 36 37
        """
        Initialize the app with Elasticsearch connection.

        :param app: Flask application instance for Elasticsearch initialization.
        """
崔为之's avatar
崔为之 committed
38 39 40 41 42 43 44
        if hasattr(app, 'teardown_appcontext'):
            app.teardown_appcontext(self.teardown)
        else:
            app.teardown_request(self.teardown)

    @staticmethod
    def teardown(exception):
崔为之's avatar
崔为之 committed
45 46 47 48 49
        """
        Clear the Elasticsearch connection after each request.

        :param exception: Exception instance, if any.
        """
崔为之's avatar
崔为之 committed
50 51 52 53 54 55 56
        ctx = current_app._get_current_object()
        if hasattr(ctx, 'elasticsearch'):
            ctx.elasticsearch = None
        if exception is not None:
            raise RuntimeError(exception)

    def __getattr__(self, item: Any) -> Any:
崔为之's avatar
崔为之 committed
57 58 59 60 61 62
        """
        Lazy initialization of Elasticsearch connection on first use.

        :param item: Attribute to get from the Elasticsearch connection.
        :return: Value of the requested attribute.
        """
崔为之's avatar
崔为之 committed
63 64
        ctx = current_app._get_current_object()
        if ctx is not None:
崔为之's avatar
崔为之 committed
65
            if not hasattr(ctx, 'elasticsearch'):
崔为之's avatar
崔为之 committed
66 67 68 69 70
                cfg = self._get_config()
                ctx.elasticsearch = Elasticsearch(**cfg)
                if ctx.elasticsearch.ping():
                    logger.info('Connected to Elasticsearch')
                else:
崔为之's avatar
崔为之 committed
71 72
                    logger.error('Can not connect to Elasticsearch')
                    raise ConnectionError('Can not connect to Elasticsearch')
崔为之's avatar
崔为之 committed
73 74 75
            return getattr(ctx.elasticsearch, item)

    @staticmethod
崔为之's avatar
崔为之 committed
76 77
    def get_es_config() -> dict:
        """Get Elasticsearch configuration from the current Flask application context."""
崔为之's avatar
崔为之 committed
78 79
        with current_app.app_context():
            if current_app:
崔为之's avatar
崔为之 committed
80
                config_helper = ConfigHelper(current_app)
崔为之's avatar
崔为之 committed
81
                return config_helper.Elasticsearch
崔为之's avatar
崔为之 committed
82 83 84
            else:
                logger.error('Attempted to access application configuration outside of application context.')
                raise RuntimeError('Attempted to access application configuration outside of application context.')
崔为之's avatar
崔为之 committed
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114

    @staticmethod
    def build_es_options(cfg) -> dict:
        """
        Build Elasticsearch connection options from the configuration.

        :param cfg: Elasticsearch's configuration from the Flask application context.
        """
        host = cfg.get('Host', 'localhost')
        port = int(cfg.get('Port', 9200))
        user = cfg.get('User')
        password = cfg.get('Password')
        use_ssl = cfg.get('UseSsl', 'False') == 'True'
        verify_certs = cfg.get('VerifyCerts', 'False') == 'True'
        ca_certs = cfg.get('CaCerts')

        options = {
            'hosts': [{'host': host, 'port': port}],
            'http_auth': (user, password) if user and password else None,
            'use_ssl': use_ssl,
            'verify_certs': verify_certs,
            'ca_certs': ca_certs
        }

        return options

    def _get_config(self) -> dict:
        """Combines getting Elasticsearch configuration and building options."""
        cfg = self.get_es_config()
        return self.build_es_options(cfg)