diff --git a/conf/config.yaml b/conf/config.yaml index 23ac839afc7e6dc66ca6fbe1b4987b7f6e594ec9..a581103f5f9b575e3d74918a844dff73ded832b8 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -8,6 +8,11 @@ Server: Interval: 10 #健康检查间隔,单位:秒 Deregister: 1 #注销时间,相当于过期时间,单位:分钟 +RabbitMq: + Url: amqp://iot:qwe!23@172.28.124.106:5672/voice + ExchangeName: voice.media.ctrl + ExchangeType: direct + Logger: LogFileName: tencent-nlu-parse LogFileExt: .log @@ -26,6 +31,12 @@ Device: #举例 DB: Dsn: root:123456@tcp(172.28.124.105:3306)/chatroom?charset=utf8 +Speech: + ProductId: 279629895 + Apikey : 0c74988953dd4ed4bf31955527802cf3 + Url: wss://dds.dui.ai/dds/v2/test + FeedBackUrl: http://apis.duiopen.com/feedback/letingv4 + MusicDomain: IsQQMusicRemid: true # 是否替换为自定义的提醒 QQMusicRemid: 。您的语音空调尚未激活,请尽快更新格力+APP,进入语音空调语音技能页面,点击QQ音乐进行授权,授权过程不产生任何费用。 diff --git a/global/setting.go b/global/setting.go index 36ccac5c5eee075ccea2e9e1b318cb68f26e5baf..64bff7932d5262fcc27af7526c3c568332e7f4f9 100644 --- a/global/setting.go +++ b/global/setting.go @@ -21,4 +21,5 @@ var ( ConsulObj *consul.ConsulObj TencentGwSetting *setting.TencentGwSetting SpeechSetting *setting.SpeechSettings + RabbitMqSetting *setting.RabbitMqSettings ) diff --git a/go.mod b/go.mod index 28e1633238f87f0e3dec46823c05073f8ec0ab8a..36ce11c5cdec243254146372a150ad212fd6c365 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/smartystreets/goconvey v1.8.0 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect diff --git a/go.sum b/go.sum index de0f199d9eeec9531b6c8cc32f89f847e3e460dc..ff4955fc5d237b5b9cc71f5ec31eecbc167392aa 100644 --- a/go.sum +++ b/go.sum @@ -306,6 +306,8 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= diff --git a/main.go b/main.go index 7ff49924a3ba56ed70431201a87a321ba3a42b3a..25231529903a64452a413c143bac7c9d0fc59658 100644 --- a/main.go +++ b/main.go @@ -4,28 +4,26 @@ import ( "errors" "flag" "fmt" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" "log" "net" "net/url" "runtime" + "speech-nlu-parse/middleware" + "speech-nlu-parse/pkg/proto" + "speech-nlu-parse/service" "strings" "time" + "gopkg.in/natefinch/lumberjack.v2" "speech-nlu-parse/dao" "speech-nlu-parse/global" - "speech-nlu-parse/middleware" "speech-nlu-parse/pkg/consul" "speech-nlu-parse/pkg/logger" - "speech-nlu-parse/pkg/proto" "speech-nlu-parse/pkg/setting" - "speech-nlu-parse/service" - - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - - "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/reflection" - "gopkg.in/natefinch/lumberjack.v2" ) var ( @@ -85,6 +83,61 @@ func setup() error { } func main() { + //// 初始化配置 + //if err := setup(); err != nil { + // log.Fatalf("[FATAL] Setup failed: %v", err) + //} + // + //// 初始化 RabbitMQ 消费者 + //consumer, err := speechNlu.NewConsumer( + // global.RabbitMqSetting.Url, + // global.RabbitMqSetting.ExchangeName, + // global.RabbitMqSetting.ExchangeType, + //) + //if err != nil { + // log.Fatalf("[FATAL] Create consumer failed: %v", err) + //} + //defer func() { + // if err := consumer.Shutdown(); err != nil { + // log.Printf("[WARN] Consumer shutdown error: %v", err) + // } + //}() + // + //// 初始化 gRPC + //lis, err := net.Listen("tcp", fmt.Sprintf(":%d", global.ServerSetting.Port)) + //if err != nil { + // log.Fatalf("[FATAL] Listen failed: %v", err) + //} + // + //server := grpc.NewServer( + // grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + // middleware.StreamGSError500(), + // )), + // grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + // middleware.UnaryGSError500(), + // )), + //) + //proto.RegisterTencentNluServer(server, &service.TencentNlu{}) + //grpc_health_v1.RegisterHealthServer(server, &consul.HealthImpl{}) + //reflection.Register(server) + // + //// 启动 gRPC 服务 + //go func() { + // global.Logger.Info("service is running......") + // if err := server.Serve(lis); err != nil { + // log.Fatalf("[FATAL] gRPC serve error: %v", err) + // } + //}() + // + //// 阻塞等待信号 + //stop := make(chan os.Signal, 1) + //signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + //<-stop + // + //// 优雅关闭 + //log.Println("[INFO] Shutting down...") + //server.GracefulStop() + //log.Println("[INFO] Server stopped") var err error // 初始化, 加载配置 err = setup() @@ -117,6 +170,7 @@ func main() { if err = server.Serve(lis); err != nil { log.Fatalf("start service error:%v", err) } + } func setupFlag() error { @@ -195,6 +249,10 @@ func setupSetting() error { if err != nil { return err } + //err = s.ReadSection("RabbitMq", &global.RabbitMqSetting) + //if err != nil { + // return err + //} consulUrlParse, err := url.Parse(consulUrl) if err != nil { diff --git a/pkg/setting/section.go b/pkg/setting/section.go index 8e612484f6fe746f03945bc8ae5f6a4093609a13..38c07b71982fafdcfac0da74a0268747e88a39f0 100644 --- a/pkg/setting/section.go +++ b/pkg/setting/section.go @@ -91,9 +91,16 @@ type TencentGwSetting struct { } type SpeechSettings struct { - ProductId string - ApiKey string - Url string + ProductId string + ApiKey string + Url string + FeedBackUrl string +} + +type RabbitMqSettings struct { + Url string + ExchangeName string + ExchangeType string } func (s *Setting) ReadSection(k string, v interface{}) error { diff --git a/service/service.go b/service/service.go index 107fd68342ae52777e0538b7b535e43352b17fe8..f1c00926ab6350631adf5cf11a6bdf90d05b182a 100644 --- a/service/service.go +++ b/service/service.go @@ -15,6 +15,7 @@ import ( "speech-nlu-parse/pkg/errCode" "speech-nlu-parse/pkg/logger" "speech-nlu-parse/pkg/proto" + "speech-nlu-parse/pkg/util" "speech-nlu-parse/service/speechNlu" "speech-nlu-parse/service/tencentNlu" ) @@ -610,7 +611,8 @@ func (TencentNlu) TencentNluParseStream(stream proto.TencentNlu_TencentNluParseS speechNluWs := speechNlu.SpeechNlpWs{} var nlpWsConn *websocket.Conn - nlpWsConn, err = speechNluWs.SpeechWs(mac, ip) + deviceId := util.EncodeMD5(mac) + nlpWsConn, err = speechNluWs.SpeechWs(ip, deviceId) if err != nil { global.Logger.Errorf("speechWs.SpeechWs error. %v", err) return err diff --git a/service/speechNlu/domain.go b/service/speechNlu/domain.go index 5fd55ba99fc7f5d49394d4c68cec16a9cdaca1c2..58ed9955aaa651fdeb3e441b189a31d9bd70b749 100644 --- a/service/speechNlu/domain.go +++ b/service/speechNlu/domain.go @@ -116,7 +116,7 @@ func calendarDomain(params *model.SpeechDomainParams) []byte { "vender": params.MidType, }).Error("params.SpeechWsResp.Dm.Widget.Extra.Result不是数组类型") if res.ResponseText != "" { - return replyWithChat(res.ResponseText, "doudi") + return replyWithChat(res.ResponseText, "chat") } return replyWithChat(error_reply, "doudi") } @@ -187,6 +187,44 @@ func newsDomain(params *model.SpeechDomainParams) []byte { return replyWithChat(error_reply, "doudi") } + res.Header.Semantic.Domain = "news" + res.Header.Semantic.Intent = "search" + res.Header.Semantic.SkillId = res.Header.Semantic.Domain + "." + res.Header.Semantic.Intent + res.ListItems = make([]map[string]interface{}, 0) + sliceData, isSlice := params.SpeechWsResp.Dm.Widget.Content.([]interface{}) + if !isSlice { + global.Logger.WithFields(logger.Fields{ + "requestId": params.RequestId, + "mac": params.Mac, + "mid": params.Mid, + "vender": params.MidType, + }).Error("params.SpeechWsResp.Dm.Widget.Content不是数组类型") + if res.ResponseText != "" { + return replyWithChat(res.ResponseText, "chat") + } + return replyWithChat(error_reply, "doudi") + } + + for _, item := range sliceData { + itemBytes, _ := json.Marshal(item) + var con model.Content + if err := json.Unmarshal(itemBytes, &con); err != nil { + global.Logger.WithFields(logger.Fields{ + "requestId": params.RequestId, + "mac": params.Mac, + "mid": params.Mid, + "vender": params.MidType, + }).Errorf("json.Unmarshal元素解析失败: %v", err) + return replyWithChat(error_reply, "doudi") + } + res.ListItems = append(res.ListItems, map[string]interface{}{ + "url": con.LinkUrl, + "title": con.Title, + "newsFrom": "", + "newsType": "", + }) + } + return Marshal(params, res) } @@ -210,12 +248,16 @@ func ancientpoemDomain(params *model.SpeechDomainParams) []byte { res.ListItems = append(res.ListItems, map[string]interface{}{ "content": params.SpeechWsResp.Dm.Widget.Extra.ContentTranslation, "title": params.SpeechWsResp.Dm.Widget.Extra.Title, - "type": params.SpeechWsResp.Dm.Widget.Type, //原来是 TEXT,看看是否能播放 + "type": "TEXT", //原来是 TEXT,看看是否能播放 "url": "", "mediaId": "", }) res.Header.Semantic.SkillId = res.Header.Semantic.Domain + "." + res.Header.Semantic.Intent return Marshal(params, res) + } else if params.SpeechWsResp.Dm.IntentName == "查询诗句" { + res.Header.Semantic.Intent = "search_ancientpoem_chains" + res.Header.Semantic.SkillId = res.Header.Semantic.Domain + "." + res.Header.Semantic.Intent + return Marshal(params, res) } res.ListItems = make([]map[string]interface{}, 0) @@ -228,6 +270,9 @@ func ancientpoemDomain(params *model.SpeechDomainParams) []byte { "mid": params.Mid, "vender": params.MidType, }).Error("params.SpeechWsResp.Dm.Widget.Content不是数组类型") + if res.ResponseText != "" { + return replyWithChat(res.ResponseText, "chat") + } return replyWithChat(error_reply, "doudi") } else { for _, item := range sliceData { @@ -249,7 +294,7 @@ func ancientpoemDomain(params *model.SpeechDomainParams) []byte { "author": con.Author, "dynasty": con.Extra.Dynasty, "mediaId": "", - "type": params.SpeechWsResp.Dm.Widget.Type, //原来是 AUDIO,看看是否能播放 + "type": "AUDIO", //原来是 AUDIO,看看是否能播放 }) } } @@ -299,6 +344,9 @@ func jokeDomain(params *model.SpeechDomainParams) []byte { "mid": params.Mid, "vender": params.MidType, }).Error("params.SpeechWsResp.Dm.Widget.Content不是数组类型") + if res.ResponseText != "" { + return replyWithChat(res.ResponseText, "chat") + } return replyWithChat(error_reply, "doudi") } @@ -614,7 +662,7 @@ func fmDomain(params *model.SpeechDomainParams) []byte { "vender": params.MidType, }).Error("params.SpeechWsResp.Dm.Widget.Content不是数组类型") if res.ResponseText != "" { - return replyWithChat(res.ResponseText, "doudi") + return replyWithChat(res.ResponseText, "chat") } return replyWithChat(error_reply, "doudi") } @@ -629,7 +677,7 @@ func fmDomain(params *model.SpeechDomainParams) []byte { "mid": params.Mid, "vender": params.MidType, }).Errorf("json.Unmarshal元素解析失败: %v", err) - return replyWithChat(error_reply, "doudi") + return replyWithChat(params.SpeechWsResp.Dm.Nlg, "doudi") } res.ListItems = append(res.ListItems, map[string]interface{}{ "url": con.LinkUrl, @@ -818,7 +866,7 @@ func sleepMusicDomain(params *model.SpeechDomainParams) []byte { "vender": params.MidType, }).Error("params.SpeechWsResp.Dm.Widget.Content不是数组类型") if res.ResponseText != "" { - return replyWithChat(res.ResponseText, "doudi") + return replyWithChat(res.ResponseText, "chat") } return replyWithChat(error_reply, "doudi") } @@ -917,8 +965,22 @@ func speechOtherDomain(params *model.SpeechDomainParams) []byte { } func helpDomain(params *model.SpeechDomainParams) []byte { - resultTextStr := "我可以控制空调开机、播放音乐、查询时间天气、调节温度,设置模式,例如您可以对我说,空调开机。" - return replyWithChat(resultTextStr, "chat.chat") + if !params.CheckDm() { + resultTextStr := "我可以控制空调开机、播放音乐、查询时间天气、调节温度,设置模式,例如您可以对我说,空调开机。" + return replyWithChat(resultTextStr, "chat.chat") + } + + var result model.ResponseBody + result.Header.Semantic.Code = 0 + result.Header.Semantic.Domain = "chat" + result.Header.Semantic.Intent = "chat" + result.Header.Semantic.Msg = speech_nlu_parse + result.Header.Semantic.SessionComplete = params.SpeechWsResp.Dm.ShouldEndSession //params.SpeechWsResp.Dm.ShouldEndSession + result.Header.Semantic.SkillId = result.Header.Semantic.Domain + "." + result.Header.Semantic.Intent + result.ResponseText = params.SpeechWsResp.Dm.Nlg + result.AsrRecongize = params.SpeechWsResp.Dm.Input + + return Marshal(params, &result) } func Marshal(params *model.SpeechDomainParams, res *model.ResponseBody) []byte { diff --git a/service/speechNlu/news.go b/service/speechNlu/news.go new file mode 100644 index 0000000000000000000000000000000000000000..89295fa9ac46cd0462caaea8ac54e9def4ffd21f --- /dev/null +++ b/service/speechNlu/news.go @@ -0,0 +1,99 @@ +package speechNlu + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "speech-nlu-parse/global" + "speech-nlu-parse/pkg/logger" + "time" +) + +type FeedBackReq struct { + ActionType int `json:"action_type"` + Timestamp int64 `json:"timestamp"` + Imei string `json:"imei"` + Os string `json:"os"` + ClientIp string `json:"client_ip"` + Brand string `json:"brand"` + Data []map[string]interface{} `json:"data"` +} + +type FeedBackReqData struct { + Sid string `json:"sid"` + Duration int `json:"duration"` + Ext struct { + District string `json:"district"` + City string `json:"city"` + Province string `json:"province"` + } `json:"ext"` +} + +type FeedBackResp struct { + Code int `json:"code"` + Msg string `json:"msg"` + Source string `json:"source"` +} + +func FeedBackNews(actionType, duration int, ip, sid, uid string) (*FeedBackResp, error) { + url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111" + //url := global.SpeechSetting.FeedBackUrl + "?productId=" + global.SpeechSetting.ProductId + "&apikey=" + global.SpeechSetting.ApiKey + "&uid=" + uid + request := FeedBackReq{ + ActionType: actionType, + Timestamp: time.Now().Unix(), + ClientIp: ip, //ip + Brand: "gree", + Data: []map[string]interface{}{ + { + "sid": sid, //"m6J2LW-4z-H2dtxea5Sv6_voziweppX1K_aGb7fKtdjq3y36awwXd_NeoKgBXD7a" + "duration": duration, //138 + }, + }, + } + + field := logger.Fields{ + "FeedBackNews_request": request, + "FeedBackNews_url": url, + } + + reqData, err := json.Marshal(request) + if err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + + start := time.Now() + client := http.Client{Timeout: 2 * time.Second} + req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(reqData)) + if err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + res, err := client.Do(req) + if err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + defer res.Body.Close() + + field["cost_time_cmd"] = fmt.Sprintf("%fs", time.Since(start).Seconds()) + + body, err := io.ReadAll(res.Body) + if err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + + field["GetGridConList_response"] = string(body) + + var response FeedBackResp + if err := json.Unmarshal(body, &response); err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + + global.Logger.WithFields(field).Info("FeedBackNews") + return &response, nil +} diff --git a/service/speechNlu/rabbitmq.go b/service/speechNlu/rabbitmq.go new file mode 100644 index 0000000000000000000000000000000000000000..9b69dc81edb3f0edc2c48c883123e3a1da38bdd3 --- /dev/null +++ b/service/speechNlu/rabbitmq.go @@ -0,0 +1,232 @@ +package speechNlu + +import ( + "encoding/json" + "fmt" + "log" + "speech-nlu-parse/global" + "speech-nlu-parse/pkg/logger" + "speech-nlu-parse/pkg/util" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type Consumer struct { + conn *amqp.Connection + channel *amqp.Channel + tag string + done chan error + amqpURI string + exchangeName string + exchangeType string + closeNotifier chan *amqp.Error +} + +func NewConsumer(amqpURI, exchangeName, exchangeType string) (*Consumer, error) { + c := &Consumer{ + amqpURI: amqpURI, + exchangeName: exchangeName, + exchangeType: exchangeType, + done: make(chan error), + } + + maxRetries := 3 + retryInterval := 3 * time.Second + + var err error + for i := 0; i < maxRetries; i++ { + if err = c.connect(); err == nil { + break + } + log.Printf("Connection attempt %d failed: %v", i+1, err) + time.Sleep(retryInterval) + } + if err != nil { + return nil, fmt.Errorf("failed to connect after %d retries: %v", maxRetries, err) + } + + // 初始化 Exchange、Queue 和 Binding + if err := c.setup(); err != nil { + return nil, fmt.Errorf("setup failed: %v", err) + } + + // 启动消费者协程 + go c.Consume() + + // 监听通道关闭事件 + go c.handleChannelClose() + + return c, nil +} + +func (c *Consumer) connect() error { + var err error + c.conn, err = amqp.Dial(c.amqpURI) + if err != nil { + return fmt.Errorf("dial error: %v", err) + } + + c.channel, err = c.conn.Channel() + if err != nil { + c.conn.Close() + return fmt.Errorf("channel error: %v", err) + } + + return nil +} + +func (c *Consumer) setup() error { + // 声明 Exchange + if err := c.channel.ExchangeDeclare( + c.exchangeName, + c.exchangeType, + true, // durable + false, // autoDelete + false, // internal + false, // noWait + nil, // args + ); err != nil { + return fmt.Errorf("declare exchange error: %v", err) + } + + // 声明队列 + queue := "news" + if _, err := c.channel.QueueDeclare(queue, + true, // durable + false, // autoDelete + false, // exclusive + false, // noWait + nil, // args + ); err != nil { + return fmt.Errorf("declare queue error: %v", err) + } + + // 绑定队列 + routingKey := "mqhandle" + if err := c.channel.QueueBind( + queue, + routingKey, + c.exchangeName, + false, // noWait + nil, // args + ); err != nil { + return fmt.Errorf("queue bind error: %v", err) + } + + // 设置 QoS + if err := c.channel.Qos(10, 0, false); err != nil { + return fmt.Errorf("QoS error: %v", err) + } + + return nil +} + +func (c *Consumer) handleChannelClose() { + closeErr := <-c.channel.NotifyClose(make(chan *amqp.Error)) + if closeErr != nil { + global.Logger.Errorf("Channel closed: %v", closeErr) + // 触发重连逻辑 + c.reconnect() + } +} + +func (c *Consumer) reconnect() { + log.Println("Attempting to reconnect...") + var err error + for { + time.Sleep(5 * time.Second) + if err := c.connect(); err == nil { + if err := c.setup(); err == nil { + go c.Consume() + go c.handleChannelClose() + log.Println("Reconnected successfully") + return + } + } + log.Printf("Reconnect failed: %v", err) + } +} + +func (c *Consumer) Shutdown() error { + if c.channel != nil { + if err := c.channel.Close(); err != nil { + return fmt.Errorf("channel close error: %v", err) + } + } + if c.conn != nil { + if err := c.conn.Close(); err != nil { + return fmt.Errorf("connection close error: %v", err) + } + } + return nil +} + +func (c *Consumer) Consume() { + msgs, err := c.channel.Consume( + "news", + "", // consumer + false, // autoAck + false, // exclusive + false, // noLocal + false, // noWait + nil, // args + ) + if err != nil { + global.Logger.Errorf("Consume error: %v", err) + return + } + + for d := range msgs { + var resp ReData + if err := json.Unmarshal(d.Body, &resp); err != nil { + global.Logger.Errorf("Unmarshal error: %v", err) + _ = d.Nack(false, true) + continue // 继续处理下一条消息 + } + + global.Logger.WithFields(logger.Fields{ + "RoutingKey": d.RoutingKey, + "Body": string(d.Body), + "Response": resp, + }).Info("Received message") + + // 处理消息逻辑 + state, offset, mediaId := parseData(&resp) + uid := util.EncodeMD5(resp.Mac) + ip := "14.215.222.17" //todo:待改 + if _, err := FeedBackNews(state, offset, ip, mediaId, uid); err != nil { + global.Logger.Errorf("Feedback error: %v", err) + _ = d.Nack(false, true) + continue + } + + _ = d.Ack(false) + } +} + +type ReData struct { + Mac string `json:"mac"` + Cols []string `json:"cols"` + Dat []interface{} `json:"dat"` +} + +func parseData(resp *ReData) (state, offset int, mediaId string) { + for i, col := range resp.Cols { + switch col { + case "state": + if val, ok := resp.Dat[i].(float64); ok { + state = int(val) + } + case "mediaId": + if val, ok := resp.Dat[i].(string); ok { + mediaId = val + } + case "offset": + if val, ok := resp.Dat[i].(float64); ok { + offset = int(val) + } + } + } + return +} diff --git a/service/speechNlu/speech.go b/service/speechNlu/speech.go index 0b76de93150d73801c3a87389067f8c2bf20dd40..9705018f65d3b248ea7e92f927af8df1313c7857 100644 --- a/service/speechNlu/speech.go +++ b/service/speechNlu/speech.go @@ -10,6 +10,7 @@ import ( "speech-nlu-parse/model" "speech-nlu-parse/pkg/errCode" "speech-nlu-parse/pkg/logger" + "speech-nlu-parse/pkg/util" "sync" "time" ) @@ -72,14 +73,15 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) { session = "" } + start := time.Now() data := SpeechNlpWsReq(reqStruct, session) - fmt.Println("*****************") - fmt.Println(data) speechNluWs := SpeechNlpWs{} var nlpWsConn *websocket.Conn - nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.MacVoice, reqStruct.Ip) + deviceId := util.EncodeMD5(reqStruct.MacVoice) + + nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.Ip, deviceId) if err != nil { global.Logger.WithFields(logger.Fields{ "requestId": reqStruct.RequestId, @@ -93,7 +95,7 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) { done := make(chan struct{}) dataChan := make(chan []model.SpeechWsResp, 1) // 缓冲通道避免阻塞 - go receiveMessage(nlpWsConn, done, dataChan, reqStruct.RequestId) + go receiveMessage(nlpWsConn, done, dataChan, reqStruct.RequestId, start) if err := sendRunTaskMsg(nlpWsConn, data); err != nil { global.Logger.WithFields(logger.Fields{ @@ -145,7 +147,7 @@ func sendRunTaskMsg(conn *websocket.Conn, v interface{}) error { return conn.WriteMessage(websocket.TextMessage, msgJSON) } -func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp, requestId string) { +func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp, requestId string, start time.Time) { var responses []model.SpeechWsResp defer func() { dataChan <- responses @@ -162,6 +164,7 @@ func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- [] global.Logger.WithFields(logger.Fields{ "requestId": requestId, "origin-data": string(message), + "time": time.Since(start).Seconds(), }).Info("原始消息") //思必驰nlu回复 diff --git a/service/speechNlu/speechWs.go b/service/speechNlu/speechWs.go index 130c62168c84e74481f9f4825354729cdd3824b3..29ab5165a0b8fa9629c46be2e33298ca8df973e9 100644 --- a/service/speechNlu/speechWs.go +++ b/service/speechNlu/speechWs.go @@ -10,7 +10,6 @@ import ( "speech-nlu-parse/global" "speech-nlu-parse/model" "speech-nlu-parse/pkg/logger" - "speech-nlu-parse/pkg/util" "sync" "time" ) @@ -20,7 +19,7 @@ type SpeechNlpWs struct { mutex sync.Mutex } -func (s *SpeechNlpWs) SpeechWs(mac, ip string) (*websocket.Conn, error) { +func (s *SpeechNlpWs) SpeechWs(ip, deviceId string) (*websocket.Conn, error) { SpeechWs_Url := global.SpeechSetting.Url SpeechWs_ProductId := global.SpeechSetting.ProductId SpeechWs_ApiKey := global.SpeechSetting.ApiKey @@ -30,13 +29,12 @@ func (s *SpeechNlpWs) SpeechWs(mac, ip string) (*websocket.Conn, error) { HandshakeTimeout: 30 * time.Second, } - deviceId := util.ComputerMd5(mac) - header := http.Header{} header.Set("X-Forwarded-For", ip) SpeechUrl := SpeechWs_Url + "?serviceType=websocket&productId=" + SpeechWs_ProductId + "&apikey=" + SpeechWs_ApiKey + "&deviceId=" + deviceId + fmt.Println(SpeechUrl) conn, resp, err := dialer.Dial(SpeechUrl, header) if err != nil { @@ -73,7 +71,7 @@ func readResp(resp *http.Response) string { func SpeechNlpWsReq(data *model.SemanticReq, sessionId string) *SpeechWsData { req := new(SpeechWsData) req.Topic = "nlu.input.text" - req.RecordId = data.Guid + req.RecordId = data.RequestId //sessionId待定 req.SessionId = sessionId req.RefText = data.Query @@ -81,7 +79,7 @@ func SpeechNlpWsReq(data *model.SemanticReq, sessionId string) *SpeechWsData { global.Logger.WithFields(logger.Fields{ "req": req, "sessionId": sessionId, - }).Error("SpeechNlpWsReq") + }).Info("SpeechNlpWsReq") return req }