# -*- coding: utf-8 -*- # author:Li Mingjie time:2019/2/28 # Brief:second test import time import requests import matplotlib.pyplot as plt import pandas as pd import bottom_function.m_SQL as qb import bottom_function.data_read as dr import json # from flask import Flask # from flask import request # from flask_cors import CORS def second_test(text, classify): try: # 接口的url url = "http://api.gree.com:8088/unisound/v1/query" headers = {"Content-Type": "application/x-www-form-urlencoded"} # 接口的参数 data = { "uid": "unisound", "token": "9ff9874dd2f8b6d9e0343c22c23f4248543eec156303703b42a38488e581be42", "macWifi": "test-mac", "macVoice": "", "query": text, "classify": classify } r = requests.request("post", url, json=data, headers=headers) # 解析返回结果 second_test_log = r.text # 将平台的返回数据转成json格式 second_test_log = json.loads(second_test_log) if classify == 'gree': status = (second_test_log.get('status')) else: status = second_test_log['header']['semantic'] if status is None: status = second_test_log # 0.2秒延迟 time.sleep(0.5) # 取出二次测试后的返回日志对应的状态码 status_codes = status["code"] return status_codes except Exception as result: print("进行二次测试时出错:{}".format(result)) def second_test_plot(datatype, starttime, endtime, graphtype): csv_data = pd.DataFrame() csv_data = dr.read_data(datatype=datatype, starttime=starttime, endtime=endtime) if datatype == 'error_control': table_name = "control_error_data" classsify = 'control' elif datatype == 'error_application': table_name = "application_error_data" classsify = 'application' db = qb.Schema(host="localhost", user="560193", password="jay560193", mysqlName="semantic_data_schema", port="3306") csv_data.drop_duplicates(subset='query', keep='first', inplace=True) csv_data = csv_data.rest_index(drop=True) error_dict = {} for i in range(len(csv_data)): # query = str(csv_data.ix[i]['query'].encode('utf-8').decode('utf-8-sig')) query = csv_data.ix[i]['query'] if query.startswith(u'\ufeff'): query = query.encode('utf8')[3:].decode('utf8') if query is None: continue status_code = second_test(text=query, classify=classsify) if status_code == 0: db.delData(tableName=table_name, keyWord=query) print('delete semantic data "%s" for second test' % query) continue error_code = "error " + str(csv_data.ix[i]['code']) if error_code in error_dict.keys(): error_dict[error_code] += 1 else: error_dict.update({error_code: 1}) plt.figure(figsize=(10, 8)) if graphtype == 'pie': e = [] code_other = 0 for j in error_dict.keys(): if j != "error 501": e.append(0.1) code_other += error_dict[j] else: e.append(0) labels = ["error 501", "others"] fracs = [error_dict["error 501"], code_other] v_sum = sum(fracs) v_data = fracs for fx in range(0, len(fracs)): v_data[fx] = fracs[fx] / v_sum * 100 if v_data[fx] == 100: v_data = v_data[:1] labels = labels[:1] break fracs = v_data explode = e[:(len(fracs))] if sum(e[len(fracs):]) > 0: explode[-1] = 0.1 plt.pie(x=fracs, labels=labels, explode=explode, autopct='%3.2f%%', shadow=True, startangle=90) if graphtype == 'bar': name_list = list(error_dict.keys()) num_list = list(error_dict.values()) plt.bar(range(len(num_list)), num_list) plt.xticks(range(len(name_list)), name_list) plt.ylabel('Number', fontsize=12, labelpad=5) for x, y in zip(range(len(num_list)), num_list): plt.text(x, y, '%d' % y, ha='center', va='bottom', fontsize=9) plt.title( str(starttime) + ' to ' + str(endtime) + ' second test of error response analysis with ' + graphtype + ' graph', fontsize=15) plt.tight_layout(5) path = '/roobo/soft/phpmyadmin/second_test.jpg' plt.savefig(path) return 'http://120.79.171.145:8000/second_test.jpg' # app = Flask(__name__) # CORS(app, supports_credentials=True) # # # @app.route('/SPDAS/second_test1', methods=['POST']) # def domain(): # param = ({"data_type": [{"value": "error_control"}, {"value": "error_application"}], # "time": "2018-12-01 00:00:00/2018-12-02 00:00:00", # "graph_type": [{"value": "bar"}, {"value": "pie"}]}) # return json.JSONEncoder().encode(param) # # # @app.route('/SPDAS/second_test2', methods=['POST']) # def domain_form(): # # 需要从request对象读取表单内容: # data = request.get_data() # json_re = json.loads(data) # datatype = json_re['data_type'] # # m_time = json_re['time'] # str_time = str(m_time) # m_time = str_time.split('/') # starttime = m_time[0] # endtime = m_time[1] # graphtype = json_re['graph_type'] # # image_path = second_test_plot(datatype=datatype, starttime=starttime, endtime=endtime, graphtype=graphtype) # path = ({"test_image": image_path}) # return json.JSONEncoder().encode(path) # # # if __name__ == '__main__': # app.run(debug=True, host='10.7.19.129', port=5000)