elasticsearch.py 3.63 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/10/28 12:19
# @File        : elasticsearch.py
# @Description :
"""

from typing import Any

from elasticsearch import Elasticsearch
from flask import current_app, Flask
from loguru import logger


class FlaskElasticsearch:
    def __init__(self, app=None):
        if app is not None:
            self.init_app(app)

    def init_app(self, app: Flask) -> None:
        """Initialize the app"""
        if hasattr(app, 'teardown_appcontext'):
            app.teardown_appcontext(self.teardown)
        else:
            app.teardown_request(self.teardown)

    @staticmethod
    def teardown(exception):
        """Clears the Elasticsearch connection after each request."""
        ctx = current_app._get_current_object()
        if hasattr(ctx, 'elasticsearch'):
            ctx.elasticsearch = None
        if exception is not None:
            raise RuntimeError(exception)

    def __getattr__(self, item: Any) -> Any:
        """Lazy initialization of Elasticsearch connection on first use."""
        ctx = current_app._get_current_object()
        if ctx is not None:
            if not hasattr(ctx, "elasticsearch"):
                cfg = self._get_config()
                ctx.elasticsearch = Elasticsearch(**cfg)
                if ctx.elasticsearch.ping():
                    logger.info('Connected to Elasticsearch')
                else:
                    logger.error('Connected to Elasticsearch')
                    raise ConnectionError('Connected to Elasticsearch')
            return getattr(ctx.elasticsearch, item)

    @staticmethod
    def _get_config():
        """Retrieves Elasticsearch configuration from the current Flask application context."""
        with current_app.app_context():
            if current_app:
                # Retrieve configuration from current_app.config and return it
                host = current_app.config.Elasticsearch.Host if current_app.config.get(
                    'Elasticsearch') is not None else 'localhost'

                port = int(current_app.config.Elasticsearch.Port) if current_app.config.get(
                    'Elasticsearch') is not None else 9200

                user = current_app.config.Elasticsearch.User if current_app.config.get(
                    'Elasticsearch') is not None else 'user'

                password = current_app.config.Elasticsearch.Password if current_app.config.get(
                    'Elasticsearch') is not None else 'password'

                use_ssl = current_app.config.Elasticsearch.UseSsl == 'True' if current_app.config.get(
                    'Elasticsearch') is not None else False

                verify_certs = current_app.config.Elasticsearch.VerifyCerts == 'False' if current_app.config.get(
                    'Elasticsearch') is not None else True

                ca_certs = current_app.config.Elasticsearch.CaCerts if current_app.config.get(
                    'Elasticsearch') is not None else None

                es_config = dict(
                    hosts=[{'host': host, 'port': port}],
                    http_auth=None if not user else (user, password),
                    use_ssl=use_ssl,
                    verify_certs=verify_certs,
                    ca_certs=ca_certs
                )

                return es_config
            else:
                logger.error('Attempted to access application configuration outside of application context.')
                raise RuntimeError('Attempted to access application configuration outside of application context.')