elasticsearch.py 3.06 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/10/28 12:19
# @File        : elasticsearch.py
# @Description :
"""

from typing import Any

from elasticsearch import Elasticsearch
from flask import current_app, Flask
from loguru import logger

崔为之's avatar
崔为之 committed
19 20
from application.common import ConfigHelper

崔为之's avatar
崔为之 committed
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

class FlaskElasticsearch:
    def __init__(self, app=None):
        if app is not None:
            self.init_app(app)

    def init_app(self, app: Flask) -> None:
        """Initialize the app"""
        if hasattr(app, 'teardown_appcontext'):
            app.teardown_appcontext(self.teardown)
        else:
            app.teardown_request(self.teardown)

    @staticmethod
    def teardown(exception):
        """Clears the Elasticsearch connection after each request."""
        ctx = current_app._get_current_object()
        if hasattr(ctx, 'elasticsearch'):
            ctx.elasticsearch = None
        if exception is not None:
            raise RuntimeError(exception)

    def __getattr__(self, item: Any) -> Any:
        """Lazy initialization of Elasticsearch connection on first use."""
        ctx = current_app._get_current_object()
        if ctx is not None:
崔为之's avatar
崔为之 committed
47
            if not hasattr(ctx, 'elasticsearch'):
崔为之's avatar
崔为之 committed
48 49 50 51 52
                cfg = self._get_config()
                ctx.elasticsearch = Elasticsearch(**cfg)
                if ctx.elasticsearch.ping():
                    logger.info('Connected to Elasticsearch')
                else:
崔为之's avatar
崔为之 committed
53 54
                    logger.error('Can not connect to Elasticsearch')
                    raise ConnectionError('Can not connect to Elasticsearch')
崔为之's avatar
崔为之 committed
55 56 57 58 59 60 61
            return getattr(ctx.elasticsearch, item)

    @staticmethod
    def _get_config():
        """Retrieves Elasticsearch configuration from the current Flask application context."""
        with current_app.app_context():
            if current_app:
崔为之's avatar
崔为之 committed
62
                config_helper = ConfigHelper(current_app)
崔为之's avatar
崔为之 committed
63
                cfg = config_helper.Elasticsearch
崔为之's avatar
崔为之 committed
64 65 66 67 68 69 70 71 72 73 74 75
                if cfg is None:
                    raise KeyError('Key Elasticsearch is not defined')

                host = cfg.Host or 'localhost'
                port = int(cfg.Port) or 9200
                user = cfg.User or None
                password = cfg.Password or None
                use_ssl = cfg.UseSsl == 'True' or False
                verify_certs = cfg.VerifyCerts == 'False'
                ca_certs = cfg.CaCerts or None

                options = dict(
崔为之's avatar
崔为之 committed
76 77 78 79 80 81 82
                    hosts=[{'host': host, 'port': port}],
                    http_auth=None if not user else (user, password),
                    use_ssl=use_ssl,
                    verify_certs=verify_certs,
                    ca_certs=ca_certs
                )

崔为之's avatar
崔为之 committed
83
                return options
崔为之's avatar
崔为之 committed
84 85 86
            else:
                logger.error('Attempted to access application configuration outside of application context.')
                raise RuntimeError('Attempted to access application configuration outside of application context.')