package speechNlu import ( "encoding/json" "fmt" "github.com/gorilla/websocket" "log" "speech-nlu-parse/dao" "speech-nlu-parse/global" "speech-nlu-parse/model" "speech-nlu-parse/pkg/errCode" "speech-nlu-parse/pkg/logger" "speech-nlu-parse/pkg/util" "sync" "time" ) func init() { // 注册 register() } type SessionRecord struct { SessionID string `json:"sessionId"` LastActive int64 } var ( SessionCache = make(map[string]*SessionRecord) CacheMutex sync.RWMutex ) func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) { if reqStruct == nil { return string(replyWithChat(error_reply, "doudi")), errCode.RequestFormatError } // 结果 var channelStr string //ts := time.Now() err, content, url := dao.GetTestContent(reqStruct.MacWifi, reqStruct.Query) //tc := time.Since(ts) //fmt.Println(time.Now().String()[:19], err, reqStruct.MacWifi, reqStruct.Query, tc) if err == nil { channelStr = string(getTencentSongReply(url, content, reqStruct.Query)) } else { if reqStruct.Query == "声道测试" || reqStruct.Query == "音响音质测试" || reqStruct.Query == "音箱音质测试" { // channelStr = string(getTencentSongReply("", reqStruct.Query)) // err, content, url := dao.GetAnyTestContent(reqStruct.MacWifi, reqStruct.Query) url = global.OthersSetting.SoundTestUrl content = "" channelStr = string(getTencentSongReply(url, content, reqStruct.Query)) } else if reqStruct.Query == "重定向测试" { url = "http://nlu.gree.com/temp/music" content = "" channelStr = string(getTencentSongReply(url, content, reqStruct.Query)) } else if reqStruct.Query == "编号七零六八一" && reqStruct.MacWifi == "f4911e594443" { channelStr = string(getGreeNluChatReply("好的,计时五分钟马上开始,五,四,三,二,一", "countdown", reqStruct.Query)) } else { var session string // 自动清理过期Session的守护协程 go StartSessionJanitor(1 * time.Minute) // 自动携带有效SessionID(自动刷新时间戳) if sessionID, valid := GetAndRefreshSession(reqStruct.MacVoice, 30); valid { session = sessionID log.Printf("检测到有效SessionID: %s", sessionID) } else { log.Println("Session已过期或无可用Session") session = "" } start := time.Now() data := SpeechNlpWsReq(reqStruct, session) speechNluWs := SpeechNlpWs{} var nlpWsConn *websocket.Conn deviceId := util.ComputerMd5(reqStruct.MacVoice) nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.MacVoice, reqStruct.Ip, deviceId) if err != nil { global.Logger.WithFields(logger.Fields{ "requestId": reqStruct.RequestId, "reqStruct": reqStruct, }).Errorf("speechNluWs.SpeechWs error: %v", err) return string(replyWithChat(error_reply, "doudi")), errCode.InternalServiceError } speechNluWs.NlpWsConn = nlpWsConn defer nlpWsConn.Close() done := make(chan struct{}) dataChan := make(chan []model.SpeechWsResp, 1) // 缓冲通道避免阻塞 go receiveMessage(nlpWsConn, done, dataChan, reqStruct.RequestId, start) if err := sendRunTaskMsg(nlpWsConn, data); err != nil { global.Logger.WithFields(logger.Fields{ "requestId": reqStruct.RequestId, "reqStruct": reqStruct, }).Errorf("发送run-task指令失败:%v", err) return string(replyWithChat(error_reply, "doudi")), errCode.InternalServiceError } var responses []model.SpeechWsResp select { case <-done: responses = <-dataChan for _, resp := range responses { semanticRespByteData := ParseSpeechJson(resp, reqStruct.MacWifi, reqStruct.OriginQuery, reqStruct.Mid, reqStruct.RequestId, reqStruct.Vender) return string(semanticRespByteData), errCode.Success } case <-time.After(5 * time.Minute): select { case responses = <-dataChan: for _, resp := range responses { fmt.Printf("%+v\n", resp) semanticRespByteData := ParseSpeechJson(resp, reqStruct.MacWifi, reqStruct.OriginQuery, reqStruct.Mid, reqStruct.RequestId, reqStruct.Vender) return string(semanticRespByteData), errCode.Success } default: global.Logger.WithFields(logger.Fields{ "requestId": reqStruct.RequestId, "reqStruct": reqStruct, }).Error("未接收到任何数据") return string(replyWithChat(error_reply, "doudi")), errCode.InternalServiceError } } return string(replyWithChat(error_reply, "doudi")), errCode.InternalServiceError } } return channelStr, errCode.Success } func sendRunTaskMsg(conn *websocket.Conn, v interface{}) error { msgJSON, err := json.Marshal(v) if err != nil { return err } return conn.WriteMessage(websocket.TextMessage, msgJSON) } func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp, requestId string, start time.Time) { var responses []model.SpeechWsResp defer func() { dataChan <- responses close(done) }() for { _, message, err := conn.ReadMessage() if err != nil { global.Logger.Errorf("消息接收终止:%v", err) return } global.Logger.WithFields(logger.Fields{ "requestId": requestId, "origin-data": string(message), "time": time.Since(start).Seconds(), }).Info("原始消息") //思必驰nlu回复 var resp model.SpeechWsResp if err := json.Unmarshal(message, &resp); err != nil { global.Logger.WithFields(logger.Fields{ "origin-data": string(message), }).Errorf("json.Unmarshal error:%v", err) continue } responses = append(responses, resp) //resp.Dm.ShouldEndSession不能判定是否结束 //if resp.Dm.ShouldEndSession { // fmt.Println("\n收到会话结束标志,终止接收") // return //} if resp.Dm != nil { return } } } func ParseSpeechJson(speechJson model.SpeechWsResp, mac string, query, mid, requestId, vender string) []byte { //思必驰返回的domain是中文,待改 domain := speechJson.Skill var jsonByte []byte //reply_when_tencent_empty = replyMessage[util.GetRandom(len(replyMessage))] params := &model.SpeechDomainParams{ SpeechWsResp: &speechJson, Query: query, RequestId: requestId, Mac: mac, Domain: domain, Mid: mid, MidType: vender, } // 根据mid屏蔽部分domain, 并返回 //if util.IsContain(domain, global.GetLimitedSetting(mid).ShieldedDomainList) { // jsonByte = shieldedDomain() // return jsonByte //} //jsonByte = jokeDomain(speechJson) if value, ok := getHandler(domain); ok { jsonByte = value(params) } else { // 其他domain进行兜底 fmt.Println("兜底") jsonByte = otherDomain(params) } return jsonByte } func UpdateSession(mac, sessionId string) { CacheMutex.Lock() defer CacheMutex.Unlock() if record, exist := SessionCache[mac]; exist { record.LastActive = time.Now().Unix() record.SessionID = sessionId } else { SessionCache[mac] = &SessionRecord{ SessionID: sessionId, LastActive: time.Now().Unix(), } } } func GetAndRefreshSession(mac string, timeout int) (string, bool) { CacheMutex.Lock() defer CacheMutex.Unlock() record, exist := SessionCache[mac] if !exist { return "", false } if time.Now().Unix()-record.LastActive > int64(timeout) { delete(SessionCache, mac) return "", false } record.LastActive = time.Now().Unix() return record.SessionID, true } func StartSessionJanitor(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { now := time.Now().Unix() CacheMutex.Lock() for mac, record := range SessionCache { if now-record.LastActive > 30 { // 硬编码超时30秒 delete(SessionCache, mac) } } CacheMutex.Unlock() } }