From 08fb93d6c83d31bf72a13a3486be0538a1d6b0ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=96=87=E9=9D=99?= <1319697849@qq.com> Date: Wed, 14 May 2025 18:33:50 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85domain?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- service/service.go | 2 +- service/speechNlu/constant.go | 10 ++-- service/speechNlu/domain.go | 85 ++++++++++++++++++++++++++++----- service/speechNlu/register.go | 3 ++ service/speechNlu/speech.go | 89 +++++++++++++++++++++++++++++++++-- service/speechNlu/speechWs.go | 16 ++++--- 6 files changed, 178 insertions(+), 27 deletions(-) diff --git a/service/service.go b/service/service.go index 6b186f5..12b763f 100644 --- a/service/service.go +++ b/service/service.go @@ -609,7 +609,7 @@ func (TencentNlu) TencentNluParseStream(stream proto.TencentNlu_TencentNluParseS speechNluWs := speechNlu.SpeechNlpWs{} var nlpWsConn *websocket.Conn - nlpWsConn, err = speechNluWs.SpeechWs() + nlpWsConn, err = speechNluWs.SpeechWs(mac) if err != nil { global.Logger.Errorf("speechWs.SpeechWs error. %v", err) return err diff --git a/service/speechNlu/constant.go b/service/speechNlu/constant.go index 3e1ac9b..dd2a9a9 100644 --- a/service/speechNlu/constant.go +++ b/service/speechNlu/constant.go @@ -85,11 +85,13 @@ const ( MUSIC = "音乐" HELP = "产品形象" STORY = "故事" - HOLIDAY = "holiday" + FINANCE = "汇率" + CONVERT = "单位换算" + HOLIDAY = "节假日查询" + + SOUND = "sound" + ALMANAC = "almanac" - SOUND = "sound" - ALMANAC = "almanac" - FINANCE = "finance" FOOD = "food" GENERALQA = "general_question_answering" COMMONQA = "common_qa" diff --git a/service/speechNlu/domain.go b/service/speechNlu/domain.go index 06f7da5..76c8ee3 100644 --- a/service/speechNlu/domain.go +++ b/service/speechNlu/domain.go @@ -51,6 +51,10 @@ func baseParse(params *model.SpeechDomainParams) (*model.ResponseBody, error) { //result.Header.Semantic.SkillId = result.Header.Semantic.Domain + "." + result.Header.Semantic.Intent result.ResponseText = params.SpeechWsResp.Dm.Nlg result.AsrRecongize = params.SpeechWsResp.Dm.Input + if !params.SpeechWsResp.Dm.ShouldEndSession { + fmt.Println("保存:", params.SpeechWsResp.SessionId) + UpdateSession(params.Mac, params.SpeechWsResp.SessionId) + } return &result, nil } @@ -71,7 +75,6 @@ func weatherDomain(params *model.SpeechDomainParams) []byte { res.Header.Semantic.Domain = "weather" res.Header.Semantic.Intent = "general_search" res.Header.Semantic.SkillId = res.Header.Semantic.Domain + "." + res.Header.Semantic.Intent - //todo:带屏设备的待改 return Marshal(params, res) @@ -359,10 +362,25 @@ func translateDomain(params *model.SpeechDomainParams) []byte { } func alarmDomain(params *model.SpeechDomainParams) []byte { + var result model.ResponseBody + result.Header.Semantic.Code = 0 + result.Header.Semantic.Msg = speech_nlu_parse + result.AsrRecongize = params.SpeechWsResp.Dm.Input + result.Header.Semantic.Domain = "alarm" + result.Header.Semantic.SessionComplete = params.SpeechWsResp.Dm.ShouldEndSession //params.SpeechWsResp.Dm.ShouldEndSession + result.ResponseText = params.SpeechWsResp.Dm.Nlg + + if !params.SpeechWsResp.Dm.ShouldEndSession { + fmt.Println("保存:", params.SpeechWsResp.SessionId) + UpdateSession(params.Mac, params.SpeechWsResp.SessionId) + return Marshal(params, &result) + } + fmt.Println("1111111111111111111111111") + fmt.Println(params.SpeechWsResp.SessionId) + if !params.CheckDm() || !params.CheckCommandParam() { return replyWithChat(error_reply, "doudi") } - var result model.ResponseBody api := params.SpeechWsResp.Dm.Command.Api extra := params.SpeechWsResp.Dm.Command.Param.Extra object := params.SpeechWsResp.Dm.Command.Param.Object @@ -372,11 +390,6 @@ func alarmDomain(params *model.SpeechDomainParams) []byte { } else { result.Header.Semantic.Domain = "reminder_v2" } - result.Header.Semantic.Code = 0 - result.Header.Semantic.Msg = speech_nlu_parse - result.AsrRecongize = params.SpeechWsResp.Dm.Input - result.Header.Semantic.SessionComplete = params.SpeechWsResp.Dm.ShouldEndSession //params.SpeechWsResp.Dm.ShouldEndSession - result.ResponseText = params.SpeechWsResp.Dm.Nlg if api == "ai.dui.dskdm.reminder.query" { result.Header.Semantic.Intent = "check" @@ -404,10 +417,6 @@ func alarmDomain(params *model.SpeechDomainParams) []byte { } else if api == "ai.dui.dskdm.reminder.remove" { result.Header.Semantic.Intent = "delete" result.Header.Semantic.SkillId = result.Header.Semantic.Domain + "." + result.Header.Semantic.Intent - if !params.SpeechWsResp.Dm.ShouldEndSession { - return Marshal(params, &result) - } - resAlarm, err := connect.AlarmGrpc(params, api, extra, object) if err != nil { global.Logger.WithFields(logger.Fields{ @@ -781,6 +790,60 @@ func musicDomain(params *model.SpeechDomainParams) []byte { return Marshal(params, &result) } +func financeDomain(params *model.SpeechDomainParams) []byte { + if !params.CheckDm() { + return replyWithChat(error_reply, "doudi") + } + + var result model.ResponseBody + result.Header.Semantic.Code = 0 + result.Header.Semantic.Domain = "finance" + result.Header.Semantic.Intent = "exchangecalculate" + result.Header.Semantic.Msg = speech_nlu_parse + result.Header.Semantic.SessionComplete = params.SpeechWsResp.Dm.ShouldEndSession //params.SpeechWsResp.Dm.ShouldEndSession + result.Header.Semantic.SkillId = result.Header.Semantic.Domain + "." + result.Header.Semantic.Intent + result.ResponseText = params.SpeechWsResp.Dm.Nlg + result.AsrRecongize = params.SpeechWsResp.Dm.Input + + return Marshal(params, &result) +} + +func convertDomain(params *model.SpeechDomainParams) []byte { + if !params.CheckDm() { + return replyWithChat(error_reply, "doudi") + } + + var result model.ResponseBody + result.Header.Semantic.Code = 0 + result.Header.Semantic.Domain = "science" + result.Header.Semantic.Intent = "unit_convert" + result.Header.Semantic.Msg = speech_nlu_parse + result.Header.Semantic.SessionComplete = params.SpeechWsResp.Dm.ShouldEndSession //params.SpeechWsResp.Dm.ShouldEndSession + result.Header.Semantic.SkillId = result.Header.Semantic.Domain + "." + result.Header.Semantic.Intent + result.ResponseText = params.SpeechWsResp.Dm.Nlg + result.AsrRecongize = params.SpeechWsResp.Dm.Input + + return Marshal(params, &result) +} + +func holidayDomain(params *model.SpeechDomainParams) []byte { + if !params.CheckDm() { + return replyWithChat(error_reply, "doudi") + } + + var result model.ResponseBody + result.Header.Semantic.Code = 0 + result.Header.Semantic.Domain = "holiday" + result.Header.Semantic.Intent = "search_holiday" + result.Header.Semantic.Msg = speech_nlu_parse + result.Header.Semantic.SessionComplete = params.SpeechWsResp.Dm.ShouldEndSession //params.SpeechWsResp.Dm.ShouldEndSession + result.Header.Semantic.SkillId = result.Header.Semantic.Domain + "." + result.Header.Semantic.Intent + result.ResponseText = params.SpeechWsResp.Dm.Nlg + result.AsrRecongize = params.SpeechWsResp.Dm.Input + + return Marshal(params, &result) +} + func helpDomain(params *model.SpeechDomainParams) []byte { resultTextStr := "我可以控制空调开机、播放音乐、查询时间天气、调节温度,设置模式,例如您可以对我说,空调开机。" return replyWithChat(resultTextStr, "chat.chat") diff --git a/service/speechNlu/register.go b/service/speechNlu/register.go index d185f10..88e97b5 100644 --- a/service/speechNlu/register.go +++ b/service/speechNlu/register.go @@ -45,6 +45,9 @@ func register() { DomainRegister(MUSIC, musicDomain) DomainRegister(HELP, helpDomain) DomainRegister(STORY, fmDomain) + DomainRegister(FINANCE, financeDomain) + DomainRegister(CONVERT, convertDomain) + DomainRegister(HOLIDAY, holidayDomain) // //DomainRegisterV2(CHAT, chatDomainV2) diff --git a/service/speechNlu/speech.go b/service/speechNlu/speech.go index 190ff4f..85a5f96 100644 --- a/service/speechNlu/speech.go +++ b/service/speechNlu/speech.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "github.com/gorilla/websocket" + "log" "speech-nlu-parse/dao" "speech-nlu-parse/global" "speech-nlu-parse/model" @@ -18,7 +19,15 @@ func init() { register() } -var SpeechWg = &sync.WaitGroup{} +type SessionRecord struct { + SessionID string `json:"sessionId"` + LastActive int64 +} + +var ( + SessionCache = make(map[string]*SessionRecord) + CacheMutex sync.RWMutex +) func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) { if reqStruct == nil { @@ -50,12 +59,27 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) { } else if reqStruct.Query == "编号七零六八一" && reqStruct.MacWifi == "f4911e594443" { channelStr = string(getGreeNluChatReply("好的,计时五分钟马上开始,五,四,三,二,一", "countdown", reqStruct.Query)) } else { - data := SpeechNlpWsReq(reqStruct) + var session string + // 自动清理过期Session的守护协程 + go StartSessionJanitor(1 * time.Minute) + + // 自动携带有效SessionID(自动刷新时间戳) + if sessionID, valid := GetAndRefreshSession(reqStruct.MacVoice, 30); valid { + session = sessionID + log.Printf("检测到有效SessionID: %s", sessionID) + } else { + log.Println("Session已过期或无可用Session") + session = "" + + } + data := SpeechNlpWsReq(reqStruct, session) + fmt.Println("*****************") + fmt.Println(data) speechNluWs := SpeechNlpWs{} var nlpWsConn *websocket.Conn - nlpWsConn, err = speechNluWs.SpeechWs() + nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.MacVoice) if err != nil { global.Logger.WithFields(logger.Fields{ "requestId": reqStruct.RequestId, @@ -69,7 +93,7 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) { done := make(chan struct{}) dataChan := make(chan []model.SpeechWsResp, 1) // 缓冲通道避免阻塞 - go receiveMessage(nlpWsConn, done, dataChan) + go receiveMessage(nlpWsConn, done, dataChan, reqStruct.RequestId) if err := sendRunTaskMsg(nlpWsConn, data); err != nil { global.Logger.WithFields(logger.Fields{ @@ -121,7 +145,7 @@ func sendRunTaskMsg(conn *websocket.Conn, v interface{}) error { return conn.WriteMessage(websocket.TextMessage, msgJSON) } -func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp) { +func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp, requestId string) { var responses []model.SpeechWsResp defer func() { dataChan <- responses @@ -134,6 +158,12 @@ func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- [] global.Logger.Errorf("消息接收终止:%v", err) return } + + global.Logger.WithFields(logger.Fields{ + "requestId": requestId, + "origin-data": string(message), + }).Info("原始消息") + //思必驰nlu回复 var resp model.SpeechWsResp if err := json.Unmarshal(message, &resp); err != nil { @@ -191,3 +221,52 @@ func ParseSpeechJson(speechJson model.SpeechWsResp, mac string, query, mid, requ return jsonByte } + +func UpdateSession(mac, sessionId string) { + CacheMutex.Lock() + defer CacheMutex.Unlock() + + if record, exist := SessionCache[mac]; exist { + record.LastActive = time.Now().Unix() + record.SessionID = sessionId + } else { + SessionCache[mac] = &SessionRecord{ + SessionID: sessionId, + LastActive: time.Now().Unix(), + } + } +} + +func GetAndRefreshSession(mac string, timeout int) (string, bool) { + CacheMutex.Lock() + defer CacheMutex.Unlock() + + record, exist := SessionCache[mac] + if !exist { + return "", false + } + + if time.Now().Unix()-record.LastActive > int64(timeout) { + delete(SessionCache, mac) + return "", false + } + + record.LastActive = time.Now().Unix() + return record.SessionID, true +} + +func StartSessionJanitor(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for range ticker.C { + now := time.Now().Unix() + CacheMutex.Lock() + for mac, record := range SessionCache { + if now-record.LastActive > 30 { // 硬编码超时30秒 + delete(SessionCache, mac) + } + } + CacheMutex.Unlock() + } +} diff --git a/service/speechNlu/speechWs.go b/service/speechNlu/speechWs.go index 8ba9a4a..96d8991 100644 --- a/service/speechNlu/speechWs.go +++ b/service/speechNlu/speechWs.go @@ -5,10 +5,12 @@ import ( "fmt" "github.com/gorilla/websocket" "io/ioutil" + "log" "net/http" "speech-nlu-parse/global" "speech-nlu-parse/model" "speech-nlu-parse/pkg/logger" + "speech-nlu-parse/pkg/util" "sync" "time" ) @@ -18,7 +20,7 @@ type SpeechNlpWs struct { mutex sync.Mutex } -func (s *SpeechNlpWs) SpeechWs() (*websocket.Conn, error) { +func (s *SpeechNlpWs) SpeechWs(mac string) (*websocket.Conn, error) { SpeechWs_Url := global.SpeechSetting.Url SpeechWs_ProductId := global.SpeechSetting.ProductId SpeechWs_ApiKey := global.SpeechSetting.ApiKey @@ -28,16 +30,16 @@ func (s *SpeechNlpWs) SpeechWs() (*websocket.Conn, error) { HandshakeTimeout: 30 * time.Second, } + deviceId := util.ComputerMd5(mac) + header := http.Header{} SpeechUrl := SpeechWs_Url + "?serviceType=websocket&productId=" + SpeechWs_ProductId + - "&apikey=" + SpeechWs_ApiKey - - fmt.Println(SpeechUrl) + "&apikey=" + SpeechWs_ApiKey + "&deviceId=" + deviceId conn, resp, err := dialer.Dial(SpeechUrl, header) if err != nil { global.Logger.WithFields(logger.Fields{ - //"RequestId": p.RequestId, + "url": SpeechUrl, "resp": readResp(resp), }).Errorf("[SpeechWs] Connect error : %v", err.Error()) return nil, err @@ -66,12 +68,14 @@ func readResp(resp *http.Response) string { return fmt.Sprintf("code=%d,body=%s", resp.StatusCode, string(b)) } -func SpeechNlpWsReq(data *model.SemanticReq) *SpeechWsData { +func SpeechNlpWsReq(data *model.SemanticReq, sessionId string) *SpeechWsData { req := new(SpeechWsData) req.Topic = "nlu.input.text" req.RecordId = data.Guid //sessionId待定 + req.SessionId = sessionId req.RefText = data.Query + log.Printf("请求的sessionId: %s", req.SessionId) return req } -- GitLab