diff --git a/service/service.go b/service/service.go index 107fd68342ae52777e0538b7b535e43352b17fe8..6ef20f33ca2b07e1c6e90d2b629436c248329a51 100644 --- a/service/service.go +++ b/service/service.go @@ -15,6 +15,7 @@ import ( "speech-nlu-parse/pkg/errCode" "speech-nlu-parse/pkg/logger" "speech-nlu-parse/pkg/proto" + "speech-nlu-parse/pkg/util" "speech-nlu-parse/service/speechNlu" "speech-nlu-parse/service/tencentNlu" ) @@ -610,7 +611,8 @@ func (TencentNlu) TencentNluParseStream(stream proto.TencentNlu_TencentNluParseS speechNluWs := speechNlu.SpeechNlpWs{} var nlpWsConn *websocket.Conn - nlpWsConn, err = speechNluWs.SpeechWs(mac, ip) + deviceId := util.ComputerMd5(mac) + nlpWsConn, err = speechNluWs.SpeechWs(mac, ip, deviceId) if err != nil { global.Logger.Errorf("speechWs.SpeechWs error. %v", err) return err diff --git a/service/speechNlu/domain.go b/service/speechNlu/domain.go index 5fd55ba99fc7f5d49394d4c68cec16a9cdaca1c2..ee6d3fce9d9d28239b7eb325a2a5f297adb3cfc3 100644 --- a/service/speechNlu/domain.go +++ b/service/speechNlu/domain.go @@ -176,6 +176,12 @@ func chatDomain(params *model.SpeechDomainParams) []byte { // 目前无返回 func newsDomain(params *model.SpeechDomainParams) []byte { + //resp, err := FeedBackNews(3, "45.244.56.176") + //if err != nil { + // return nil + //} + //fmt.Println(resp) + res, err := baseParse(params) if err != nil { global.Logger.WithFields(logger.Fields{ diff --git a/service/speechNlu/news.go b/service/speechNlu/news.go new file mode 100644 index 0000000000000000000000000000000000000000..21f704752af958f49aee28ae7683b61d0080ad4e --- /dev/null +++ b/service/speechNlu/news.go @@ -0,0 +1,98 @@ +package speechNlu + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "speech-nlu-parse/global" + "speech-nlu-parse/pkg/logger" + "time" +) + +type FeedBackReq struct { + ActionType int `json:"action_type"` + Timestamp int64 `json:"timestamp"` + Imei string `json:"imei"` + Os string `json:"os"` + ClientIp string `json:"client_ip"` + Brand string `json:"brand"` + Data []map[string]interface{} `json:"data"` +} + +type FeedBackReqData struct { + Sid string `json:"sid"` + Duration int `json:"duration"` + Ext struct { + District string `json:"district"` + City string `json:"city"` + Province string `json:"province"` + } `json:"ext"` +} + +type FeedBackResp struct { + Code int `json:"code"` + Msg string `json:"msg"` + Source string `json:"source"` +} + +func FeedBackNews(actionType int, ip string) (*FeedBackResp, error) { + url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111" + request := FeedBackReq{ + ActionType: actionType, + Timestamp: time.Now().Unix(), + ClientIp: ip, + Brand: "gree", + Data: []map[string]interface{}{ + { + "sid": "m6J2LW-4z-H2dtxea5Sv6_voziweppX1K_aGb7fKtdjq3y36awwXd_NeoKgBXD7a", + "duration": 138, + }, + }, + } + + field := logger.Fields{ + "FeedBackNews_request": request, + "FeedBackNews_url": url, + } + + reqData, err := json.Marshal(request) + if err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + + start := time.Now() + client := http.Client{Timeout: 2 * time.Second} + req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(reqData)) + if err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + res, err := client.Do(req) + if err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + defer res.Body.Close() + + field["cost_time_cmd"] = fmt.Sprintf("%fs", time.Since(start).Seconds()) + + body, err := io.ReadAll(res.Body) + if err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + + field["GetGridConList_response"] = string(body) + + var response FeedBackResp + if err := json.Unmarshal(body, &response); err != nil { + global.Logger.WithFields(field).Error(err.Error()) + return nil, err + } + + global.Logger.WithFields(field).Info("FeedBackNews") + return &response, nil +} diff --git a/service/speechNlu/rabbitmq.go b/service/speechNlu/rabbitmq.go new file mode 100644 index 0000000000000000000000000000000000000000..56d626000da0bb8fa0b7fec480b63377282d2fea --- /dev/null +++ b/service/speechNlu/rabbitmq.go @@ -0,0 +1,196 @@ +package speechNlu + +import ( + "fmt" + "log" + "speech-nlu-parse/global" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type Consumer struct { + conn *amqp.Connection + channel *amqp.Channel + tag string + done chan error +} + +//func New(amqpURI string) (Consumer, error) { +// //mq链接 +// log.Printf("dialing %q", amqpURI) +// conn, err := amqp.Dial(amqpURI) +// if err != nil { +// global.Logger.Errorf("Dial: %s", err) +// consumerErr++ +// return Consumer{}, err +// } +// +// log.Printf("got Connection, getting Channel") +// //创建一个rabbitmq信道 +// channel, err := conn.Channel() +// if err != nil { +// return Consumer{}, err +// } +// +// defer channel.Close() +// return Consumer{ +// conn: conn, +// channel: channel, +// done: make(chan error), +// }, err +//} + +func failOnError(err error, msg string) { + if err != nil { + global.Logger.Errorf("%s: %s", msg, err) + return + } +} + +var consumerErr int + +func NewConsumer(amqpURI, exchangeName, exchangeType string) (*Consumer, error) { + c := &Consumer{ + conn: nil, + channel: nil, + done: make(chan error), + } + + var err error + + // 最大重试次数 + maxRetries := 3 + + // 连接 RabbitMQ + log.Printf("dialing %q", amqpURI) + c.conn, err = amqp.Dial(amqpURI) + if err != nil { + global.Logger.Errorf("Dial: %s", err) + consumerErr++ + if consumerErr > maxRetries { + return nil, fmt.Errorf("failed to connect after %d retries", maxRetries) + } + time.Sleep(3 * time.Second) + return NewConsumer(amqpURI, exchangeName, exchangeType) // 重试连接 + } + defer c.conn.Close() + // 创建信道 + log.Printf("got Connection, getting Channel") + c.channel, err = c.conn.Channel() + if err != nil { + global.Logger.Errorf("Failed to open a channel: %s", err) + return nil, err + } + defer c.channel.Close() + + // 声明交换机 + log.Printf("got Channel, declaring Exchange (%q)", exchangeName) + err = c.exchangeDeclare(exchangeName, exchangeType) + if err != nil { + global.Logger.Errorf("Failed to declare exchange: %s", err) + return nil, err + } + + // 声明发布消息的交换机 + err = c.exchangeDeclare(global.PublishSetting.ExchangeName, global.PublishSetting.ExchangeType) + if err != nil { + global.Logger.Errorf("Failed to declare exchange: %s", err) + return nil, err + } + + // 定义队列和路由键绑定 + type bind struct { + queue string + routingKey string + } + bindings := []bind{ + {"news", "grih.HOME_MAC"}, + } + + // 声明队列并绑定 + for _, b := range bindings { + log.Printf("declared Exchange, declaring Queue %q", b.queue) + err = c.queueDeclare(b.queue) + if err != nil { + global.Logger.Errorf("Failed to declare queue %s: %s", b.queue, err) + return nil, err + } + + err = c.channel.QueueBind(b.queue, b.routingKey, exchangeName, false, nil) + if err != nil { + global.Logger.Errorf("Failed to bind queue %s: %s", b.queue, err) + return nil, err + } + } + + // 设置 QoS + err = c.channel.Qos(250, 0, false) + if err != nil { + global.Logger.Errorf("Failed to set QoS: %s", err) + return nil, err + } + + // 启动消费者 + err = c.Consume() + if err != nil { + global.Logger.Errorf("Failed to start consumer: %s", err) + return nil, err + } + + // 监听通道关闭事件 + closeChan := make(chan *amqp.Error, 1) + notifyClose := c.channel.NotifyClose(closeChan) + + // 处理通道关闭事件 + for { + select { + case e := <-notifyClose: + global.Logger.Errorf("chan通道错误:%v", e.Error()) + time.Sleep(5 * time.Second) + log.Println("休息5秒后重新连接") + + // 重新连接 + c, err = NewConsumer(amqpURI, exchangeName, exchangeType) + if err != nil { + global.Logger.Errorf("Failed to reconnect: %s", err) + return nil, err + } + return c, nil // 返回新的 Consumer 实例 + } + } +} + +func (c *Consumer) queueDeclare(queue string) error { + _, err := c.channel.QueueDeclare(queue, true, false, false, false, nil) + return err +} + +func (c *Consumer) exchangeDeclare(exchangeName, exchangeType string) error { + err := c.channel.ExchangeDeclare( + exchangeName, // name of the exchange + exchangeType, // type + true, // durable + false, // delete when complete + false, // internal + false, // noWait + nil, // arguments + ) + return err +} + +func (c *Consumer) Shutdown() error { + // will close() the deliveries channel + if err := c.channel.Cancel(c.tag, true); err != nil { + return fmt.Errorf("Consumer cancel failed: %s", err) + } + + if err := c.conn.Close(); err != nil { + return fmt.Errorf("AMQP connection close error: %s", err) + } + + defer log.Printf("AMQP shutdown OK") + + // wait for handle() to exit + return <-c.done +} diff --git a/service/speechNlu/speech.go b/service/speechNlu/speech.go index 0b76de93150d73801c3a87389067f8c2bf20dd40..890af522872b84f258b0741099e6e48db1e93b3c 100644 --- a/service/speechNlu/speech.go +++ b/service/speechNlu/speech.go @@ -10,6 +10,7 @@ import ( "speech-nlu-parse/model" "speech-nlu-parse/pkg/errCode" "speech-nlu-parse/pkg/logger" + "speech-nlu-parse/pkg/util" "sync" "time" ) @@ -72,14 +73,15 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) { session = "" } + start := time.Now() data := SpeechNlpWsReq(reqStruct, session) - fmt.Println("*****************") - fmt.Println(data) speechNluWs := SpeechNlpWs{} var nlpWsConn *websocket.Conn - nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.MacVoice, reqStruct.Ip) + deviceId := util.ComputerMd5(reqStruct.MacVoice) + + nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.MacVoice, reqStruct.Ip, deviceId) if err != nil { global.Logger.WithFields(logger.Fields{ "requestId": reqStruct.RequestId, @@ -93,7 +95,7 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) { done := make(chan struct{}) dataChan := make(chan []model.SpeechWsResp, 1) // 缓冲通道避免阻塞 - go receiveMessage(nlpWsConn, done, dataChan, reqStruct.RequestId) + go receiveMessage(nlpWsConn, done, dataChan, reqStruct.RequestId, start) if err := sendRunTaskMsg(nlpWsConn, data); err != nil { global.Logger.WithFields(logger.Fields{ @@ -145,7 +147,7 @@ func sendRunTaskMsg(conn *websocket.Conn, v interface{}) error { return conn.WriteMessage(websocket.TextMessage, msgJSON) } -func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp, requestId string) { +func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp, requestId string, start time.Time) { var responses []model.SpeechWsResp defer func() { dataChan <- responses @@ -162,6 +164,7 @@ func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- [] global.Logger.WithFields(logger.Fields{ "requestId": requestId, "origin-data": string(message), + "time": time.Since(start).Seconds(), }).Info("原始消息") //思必驰nlu回复 diff --git a/service/speechNlu/speechWs.go b/service/speechNlu/speechWs.go index 130c62168c84e74481f9f4825354729cdd3824b3..b935c6c63dd4c138e3f0c8cc301fa12c7db46e97 100644 --- a/service/speechNlu/speechWs.go +++ b/service/speechNlu/speechWs.go @@ -10,7 +10,6 @@ import ( "speech-nlu-parse/global" "speech-nlu-parse/model" "speech-nlu-parse/pkg/logger" - "speech-nlu-parse/pkg/util" "sync" "time" ) @@ -20,7 +19,7 @@ type SpeechNlpWs struct { mutex sync.Mutex } -func (s *SpeechNlpWs) SpeechWs(mac, ip string) (*websocket.Conn, error) { +func (s *SpeechNlpWs) SpeechWs(mac, ip, deviceId string) (*websocket.Conn, error) { SpeechWs_Url := global.SpeechSetting.Url SpeechWs_ProductId := global.SpeechSetting.ProductId SpeechWs_ApiKey := global.SpeechSetting.ApiKey @@ -30,8 +29,6 @@ func (s *SpeechNlpWs) SpeechWs(mac, ip string) (*websocket.Conn, error) { HandshakeTimeout: 30 * time.Second, } - deviceId := util.ComputerMd5(mac) - header := http.Header{} header.Set("X-Forwarded-For", ip) @@ -81,7 +78,7 @@ func SpeechNlpWsReq(data *model.SemanticReq, sessionId string) *SpeechWsData { global.Logger.WithFields(logger.Fields{ "req": req, "sessionId": sessionId, - }).Error("SpeechNlpWsReq") + }).Info("SpeechNlpWsReq") return req }