second_test.py 5.6 KB
Newer Older
李明杰's avatar
李明杰 committed
1 2 3 4 5 6 7 8 9 10 11
# -*- coding: utf-8 -*-
# author:Li Mingjie time:2019/2/28
# Brief:second test

import time
import requests
import matplotlib.pyplot as plt
import pandas as pd
import bottom_function.m_SQL as qb
import bottom_function.data_read as dr
import json
李明杰's avatar
李明杰 committed
12 13 14 15 16


# from flask import Flask
# from flask import request
# from flask_cors import CORS
李明杰's avatar
李明杰 committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59


def second_test(text, classify):
    try:

        # 接口的url
        url = "http://api.gree.com:8088/unisound/v1/query"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        # 接口的参数
        data = {
            "uid": "unisound",
            "token": "9ff9874dd2f8b6d9e0343c22c23f4248543eec156303703b42a38488e581be42",
            "macWifi": "test-mac",
            "macVoice": "",
            "query": text,
            "classify": classify
        }
        r = requests.request("post", url, json=data, headers=headers)
        # 解析返回结果
        second_test_log = r.text
        # 将平台的返回数据转成json格式
        second_test_log = json.loads(second_test_log)
        if classify == 'gree':
            status = (second_test_log.get('status'))
        else:
            status = second_test_log['header']['semantic']
            if status is None:
                status = second_test_log
            # 0.2秒延迟
            time.sleep(0.5)
        # 取出二次测试后的返回日志对应的状态码
        status_codes = status["code"]

        return status_codes
    except Exception as result:
        print("进行二次测试时出错:{}".format(result))


def second_test_plot(datatype, starttime, endtime, graphtype):
    csv_data = pd.DataFrame()
    csv_data = dr.read_data(datatype=datatype, starttime=starttime, endtime=endtime)
    if datatype == 'error_control':
        table_name = "control_error_data"
李明杰's avatar
李明杰 committed
60
        classsify = 'control'
李明杰's avatar
李明杰 committed
61 62
    elif datatype == 'error_application':
        table_name = "application_error_data"
李明杰's avatar
李明杰 committed
63
        classsify = 'application'
李明杰's avatar
李明杰 committed
64
    db = qb.Schema(host="localhost", user="560193", password="jay560193", mysqlName="semantic_data_schema", port="3306")
李明杰's avatar
李明杰 committed
65 66 67
    csv_data.drop_duplicates(subset='query', keep='first', inplace=True)
    csv_data = csv_data.rest_index(drop=True)

李明杰's avatar
李明杰 committed
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
    error_dict = {}
    for i in range(len(csv_data)):
        # query = str(csv_data.ix[i]['query'].encode('utf-8').decode('utf-8-sig'))
        query = csv_data.ix[i]['query']
        if query.startswith(u'\ufeff'):
            query = query.encode('utf8')[3:].decode('utf8')
        if query is None:
            continue
        status_code = second_test(text=query, classify=classsify)
        if status_code == 0:
            db.delData(tableName=table_name, keyWord=query)
            print('delete semantic data "%s" for second test' % query)
            continue
        error_code = "error " + str(csv_data.ix[i]['code'])
        if error_code in error_dict.keys():
            error_dict[error_code] += 1
        else:
            error_dict.update({error_code: 1})

李明杰's avatar
李明杰 committed
87
    plt.figure(figsize=(10, 8))
李明杰's avatar
李明杰 committed
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    if graphtype == 'pie':
        e = []
        code_other = 0
        for j in error_dict.keys():
            if j != "error 501":
                e.append(0.1)
                code_other += error_dict[j]
            else:
                e.append(0)

        labels = ["error 501", "others"]
        fracs = [error_dict["error 501"], code_other]

        v_sum = sum(fracs)
        v_data = fracs
        for fx in range(0, len(fracs)):
            v_data[fx] = fracs[fx] / v_sum * 100
            if v_data[fx] == 100:
                v_data = v_data[:1]
                labels = labels[:1]
                break
        fracs = v_data
        explode = e[:(len(fracs))]
        if sum(e[len(fracs):]) > 0:
            explode[-1] = 0.1
        plt.pie(x=fracs, labels=labels, explode=explode, autopct='%3.2f%%', shadow=True, startangle=90)

    if graphtype == 'bar':
        name_list = list(error_dict.keys())
        num_list = list(error_dict.values())
        plt.bar(range(len(num_list)), num_list)
        plt.xticks(range(len(name_list)), name_list)
        plt.ylabel('Number', fontsize=12, labelpad=5)
        for x, y in zip(range(len(num_list)), num_list):
            plt.text(x, y, '%d' % y, ha='center', va='bottom', fontsize=9)

    plt.title(
        str(starttime) + ' to ' + str(endtime) + ' second test of error response analysis with ' + graphtype + ' graph',
李明杰's avatar
李明杰 committed
126
        fontsize=15)
李明杰's avatar
李明杰 committed
127 128 129
    plt.tight_layout(5)
    path = '/roobo/soft/phpmyadmin/second_test.jpg'
    plt.savefig(path)
李明杰's avatar
李明杰 committed
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
    return 'http://120.79.171.145:8000/second_test.jpg'

# app = Flask(__name__)
# CORS(app, supports_credentials=True)
#
#
# @app.route('/SPDAS/second_test1', methods=['POST'])
# def domain():
#     param = ({"data_type": [{"value": "error_control"}, {"value": "error_application"}],
#               "time": "2018-12-01 00:00:00/2018-12-02 00:00:00",
#               "graph_type": [{"value": "bar"}, {"value": "pie"}]})
#     return json.JSONEncoder().encode(param)
#
#
# @app.route('/SPDAS/second_test2', methods=['POST'])
# def domain_form():
#     # 需要从request对象读取表单内容:
#     data = request.get_data()
#     json_re = json.loads(data)
#     datatype = json_re['data_type']
#
#     m_time = json_re['time']
#     str_time = str(m_time)
#     m_time = str_time.split('/')
#     starttime = m_time[0]
#     endtime = m_time[1]
#     graphtype = json_re['graph_type']
#
#     image_path = second_test_plot(datatype=datatype, starttime=starttime, endtime=endtime, graphtype=graphtype)
#     path = ({"test_image": image_path})
#     return json.JSONEncoder().encode(path)
#
#
# if __name__ == '__main__':
#     app.run(debug=True, host='10.7.19.129', port=5000)