Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
E
elp
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Incidents
Environments
Packages & Registries
Packages & Registries
Package Registry
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
崔为之
elp
Commits
21734a2e
Commit
21734a2e
authored
Nov 20, 2023
by
崔为之
💪🏽
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Update project
parent
1b65ae3e
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
203 additions
and
48 deletions
+203
-48
application/dao/__init__.py
application/dao/__init__.py
+1
-0
application/dao/log.py
application/dao/log.py
+168
-0
application/services/log.py
application/services/log.py
+33
-47
application/views/log/log.py
application/views/log/log.py
+1
-1
No files found.
application/dao/__init__.py
View file @
21734a2e
...
...
@@ -11,3 +11,4 @@
"""
from
.user
import
UserDao
from
.log
import
LogDao
application/dao/log.py
0 → 100644
View file @
21734a2e
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/20 09:57
# @File : log.py
# @Description :
"""
from
loguru
import
logger
from
application.utils
import
ElasticsearchUtil
from
application.libs.helper
import
MySQLHelper
from
application.utils
import
ParseUtil
from
application.models
import
Log
class
LogDao
:
"""
Data Access Object for logs.
This class includes methods to get data from Elasticsearch, process it and save it to MySQL.
"""
@
classmethod
def
get_data_from_es
(
cls
,
index
=
None
,
dsl
=
None
,
sid
=
None
)
->
dict
:
"""
Get data from Elasticsearch by either scroll searching or direct searching.
Args:
index (str): The name of the Elasticsearch index.
dsl (dict): The DSL query for Elasticsearch.
sid (str): The scroll id for Elasticsearch scroll search.
Returns:
dict: The data returned from Elasticsearch.
Raises:
SystemError: If none of the required parameters are provided.
"""
if
sid
is
not
None
:
return
ElasticsearchUtil
.
scroll_search
(
sid
)
elif
index
is
not
None
and
dsl
is
not
None
:
return
ElasticsearchUtil
.
search
(
index
,
dsl
)
else
:
raise
SystemError
(
'Could not get data from Elasticsearch'
)
@
classmethod
def
get_mdata
(
cls
,
data
:
dict
)
->
list
:
"""
Get metadata from the data returned by Elasticsearch.
Args:
data (dict): The data returned from Elasticsearch.
Returns:
list: The metadata extracted from the data.
Raises:
SystemError: If the metadata is empty.
"""
mdata
=
data
.
get
(
'hits'
).
get
(
'hits'
)
if
not
mdata
:
logger
.
error
(
'the mdata is an empty list ...'
)
raise
SystemError
(
'the mdata is an empty list ...'
)
return
mdata
@
classmethod
def
get_intent_from_mysql
(
cls
,
sql
:
str
,
cfg
:
dict
)
->
list
:
"""
Get the intent mapping from MySQL using the provided SQL.
Args:
sql (str): The SQL query to execute.
cfg (dict): The configuration for MySQL.
Returns:
list: The intent mapping list.
"""
with
MySQLHelper
(
**
cfg
)
as
helper
:
result
=
helper
.
execute
(
sql
)
mapping_list
=
[
item
[
0
]
for
item
in
result
]
return
mapping_list
@
classmethod
def
process_and_save_data
(
cls
,
lst
:
list
,
mapping_list
:
list
):
"""
Process the given list using the mapping list and save the result to the database.
Args:
lst (list): The list to process.
mapping_list (list): The mapping list to use for processing.
"""
if
not
lst
:
return
result_generator
=
ParseUtil
(
mapping_list
=
mapping_list
).
filter
(
lst
)
_
=
Log
.
batch_save
(
result_generator
)
@
classmethod
def
parse
(
cls
,
start
:
str
,
end
:
str
,
index
:
str
,
sql
:
str
,
cfg
:
dict
)
->
int
:
"""
Parse logs from Elasticsearch and save them to MySQL.
Args:
start (str): The start date for the logs.
end (str): The end date for the logs.
index (str): The Elasticsearch index to get logs from.
sql (str): The SQL query to get the intent mapping from MySQL.
cfg (dict): The configuration for MySQL.
Returns:
int: The total number of logs parsed.
Raises:
SystemError: If there is an error during the process.
"""
# Get the DSL for the given start and end dates.
dsl
=
ElasticsearchUtil
.
dsl
(
start
,
end
)
# Get data from Elasticsearch.
data
=
cls
.
get_data_from_es
(
index
=
index
,
dsl
=
dsl
)
# Extract metadata from the data.
mdata
=
cls
.
get_mdata
(
data
)
# Get the total number of logs.
total
=
data
.
get
(
'hits'
).
get
(
'total'
).
get
(
'value'
)
logger
.
debug
(
f
'The numbers of data by searching data from ES:
{
total
}
'
)
# Log the start of the searching and saving process.
logger
.
debug
(
'The data is inserting ...'
)
# Get the intent mapping from MySQL.
mapping_list
=
cls
.
get_intent_from_mysql
(
sql
,
cfg
)
# Process and save the metadata.
cls
.
process_and_save_data
(
mdata
,
mapping_list
)
# Get the scroll id for scroll searching in Elasticsearch.
scroll_id
=
data
.
get
(
'_scroll_id'
)
try
:
for
_
in
range
(
0
,
int
(
total
/
dsl
.
get
(
'size'
,
10000
)
+
1
)):
# Get more data from Elasticsearch using scroll searching.
res
=
cls
.
get_data_from_es
(
sid
=
scroll_id
)
lst
=
res
.
get
(
'hits'
).
get
(
'hits'
)
if
not
lst
:
continue
# Process and save the data.
cls
.
process_and_save_data
(
lst
,
mapping_list
)
except
Exception
as
e
:
# Log the error and raise a SystemError.
logger
.
error
(
f
'The error:
{
e
}
'
)
raise
SystemError
()
else
:
# Log the success of the process.
logger
.
debug
(
'The process of inserting data succeed!'
)
finally
:
# Log the end of the process.
logger
.
debug
(
'The inserting of the data finished!'
)
return
total
application/services/log.py
View file @
21734a2e
...
...
@@ -10,24 +10,48 @@
# @Description :
"""
from
flask
import
current_app
from
loguru
import
logger
from
application.utils
import
ElasticsearchUtil
from
application.libs.helper
import
MySQLHelper
from
application.utils
import
ParseUtil
from
application.models
import
Log
from
application.dao
import
LogDao
from
application.schemas
import
ParseLogRequestItem
class
LogService
:
@
classmethod
def
parse
(
cls
,
item
)
->
int
:
def
parse
(
cls
,
item
:
ParseLogRequestItem
)
->
int
:
"""
Parse logs from Elasticsearch and process them according to the configured SQL commands.
Args:
item: The item containing the start and end dates for which logs are to be parsed.
Returns:
The result of the log parsing as an integer.
Raises:
SystemError: If there is an issue with the process.
"""
# Retrieve the start and end date from the given item.
start_date
=
item
.
start
end_date
=
item
.
end
es_index
=
current_app
.
config
.
Elasticsearch
.
Index
# Log the time interval for debugging purposes.
logger
.
debug
(
f
'The interval of time is between
{
start_date
}
and
{
end_date
}
...'
)
# Retrieve the Elasticsearch index from the current application's configuration.
index
=
current_app
.
config
.
Elasticsearch
.
Index
# Retrieve the MySQL configuration from the current application's configuration.
mysql_config
=
current_app
.
config
.
ExtraDB
# Retrieve the SQL to be executed from the MySQL configuration.
sql
=
mysql_config
.
Sql
# Construct a configuration dictionary for the MySQL database.
cfg
=
{
'host'
:
mysql_config
.
Host
,
'user'
:
mysql_config
.
User
,
...
...
@@ -35,44 +59,6 @@ class LogService:
'db'
:
mysql_config
.
DB
,
'port'
:
mysql_config
.
Port
,
}
logger
.
debug
(
f
'The interval of time is between
{
start_date
}
and
{
end_date
}
...'
)
dsl
=
ElasticsearchUtil
.
dsl
(
start_date
,
end_date
,
size
=
2500
)
data
=
ElasticsearchUtil
.
search
(
es_index
,
dsl
)
# 获取 mdata
mdata
=
data
.
get
(
'hits'
).
get
(
'hits'
)
if
not
mdata
:
logger
.
error
(
'the mdata is an empty list ...'
)
raise
SystemError
(
'the mdata is an empty list ...'
)
# 获取 total
total
=
data
.
get
(
'hits'
).
get
(
'total'
).
get
(
'value'
)
logger
.
debug
(
f
'The numbers of data by searching data from ES:
{
total
}
'
)
with
MySQLHelper
(
**
cfg
)
as
helper
:
result
=
helper
.
execute
(
sql
)
mapping_list
=
[
item
[
0
]
for
item
in
result
]
result
=
ParseUtil
(
mapping_list
=
mapping_list
).
filter
(
mdata
)
logger
.
debug
(
'The first part of data is inserting ...'
)
_
=
Log
.
batch_save
(
result
)
logger
.
debug
(
'The inserting of the first part data finished!'
)
logger
.
debug
(
'The scrolling part of data is inserting ...'
)
# 获取 scroll_id
scroll_id
=
data
.
get
(
'_scroll_id'
)
try
:
for
_
in
range
(
0
,
int
(
total
/
dsl
.
get
(
'size'
,
10000
)
+
1
)):
res
=
ElasticsearchUtil
.
scroll_search
(
scroll_id
)
lst
=
res
.
get
(
'hits'
).
get
(
'hits'
)
if
not
lst
:
continue
result_generator
=
ParseUtil
(
mapping_list
=
mapping_list
).
filter
(
lst
)
_
=
Log
.
batch_save
(
result_generator
)
except
Exception
as
e
:
# 输出 log 信息
logger
.
error
(
f
'The error:
{
e
}
'
)
raise
SystemError
()
else
:
logger
.
debug
(
'The process of inserting scrolling part data succeed!'
)
finally
:
logger
.
debug
(
'The inserting of the scrolling part data finished!'
)
return
total
# Parse the logs using the LogDao class and return the result.
return
LogDao
.
parse
(
start_date
,
end_date
,
index
,
sql
,
cfg
)
application/views/log/log.py
View file @
21734a2e
...
...
@@ -10,7 +10,7 @@
# @Description :
"""
from
flask
import
Blueprint
,
request
,
jsonify
from
flask
import
Blueprint
,
current_app
,
request
,
jsonify
from
flask_restful
import
Api
from
loguru
import
logger
from
pydantic
import
ValidationError
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment