Commit 9e758097 authored by 赵文静's avatar 赵文静

补充新闻

parent 614196cc
......@@ -15,6 +15,7 @@ import (
"speech-nlu-parse/pkg/errCode"
"speech-nlu-parse/pkg/logger"
"speech-nlu-parse/pkg/proto"
"speech-nlu-parse/pkg/util"
"speech-nlu-parse/service/speechNlu"
"speech-nlu-parse/service/tencentNlu"
)
......@@ -610,7 +611,8 @@ func (TencentNlu) TencentNluParseStream(stream proto.TencentNlu_TencentNluParseS
speechNluWs := speechNlu.SpeechNlpWs{}
var nlpWsConn *websocket.Conn
nlpWsConn, err = speechNluWs.SpeechWs(mac, ip)
deviceId := util.ComputerMd5(mac)
nlpWsConn, err = speechNluWs.SpeechWs(mac, ip, deviceId)
if err != nil {
global.Logger.Errorf("speechWs.SpeechWs error. %v", err)
return err
......
......@@ -176,6 +176,12 @@ func chatDomain(params *model.SpeechDomainParams) []byte {
// 目前无返回
func newsDomain(params *model.SpeechDomainParams) []byte {
//resp, err := FeedBackNews(3, "45.244.56.176")
//if err != nil {
// return nil
//}
//fmt.Println(resp)
res, err := baseParse(params)
if err != nil {
global.Logger.WithFields(logger.Fields{
......
package speechNlu
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"speech-nlu-parse/global"
"speech-nlu-parse/pkg/logger"
"time"
)
type FeedBackReq struct {
ActionType int `json:"action_type"`
Timestamp int64 `json:"timestamp"`
Imei string `json:"imei"`
Os string `json:"os"`
ClientIp string `json:"client_ip"`
Brand string `json:"brand"`
Data []map[string]interface{} `json:"data"`
}
type FeedBackReqData struct {
Sid string `json:"sid"`
Duration int `json:"duration"`
Ext struct {
District string `json:"district"`
City string `json:"city"`
Province string `json:"province"`
} `json:"ext"`
}
type FeedBackResp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Source string `json:"source"`
}
func FeedBackNews(actionType int, ip string) (*FeedBackResp, error) {
url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111"
request := FeedBackReq{
ActionType: actionType,
Timestamp: time.Now().Unix(),
ClientIp: ip,
Brand: "gree",
Data: []map[string]interface{}{
{
"sid": "m6J2LW-4z-H2dtxea5Sv6_voziweppX1K_aGb7fKtdjq3y36awwXd_NeoKgBXD7a",
"duration": 138,
},
},
}
field := logger.Fields{
"FeedBackNews_request": request,
"FeedBackNews_url": url,
}
reqData, err := json.Marshal(request)
if err != nil {
global.Logger.WithFields(field).Error(err.Error())
return nil, err
}
start := time.Now()
client := http.Client{Timeout: 2 * time.Second}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(reqData))
if err != nil {
global.Logger.WithFields(field).Error(err.Error())
return nil, err
}
res, err := client.Do(req)
if err != nil {
global.Logger.WithFields(field).Error(err.Error())
return nil, err
}
defer res.Body.Close()
field["cost_time_cmd"] = fmt.Sprintf("%fs", time.Since(start).Seconds())
body, err := io.ReadAll(res.Body)
if err != nil {
global.Logger.WithFields(field).Error(err.Error())
return nil, err
}
field["GetGridConList_response"] = string(body)
var response FeedBackResp
if err := json.Unmarshal(body, &response); err != nil {
global.Logger.WithFields(field).Error(err.Error())
return nil, err
}
global.Logger.WithFields(field).Info("FeedBackNews")
return &response, nil
}
package speechNlu
import (
"fmt"
"log"
"speech-nlu-parse/global"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel
tag string
done chan error
}
//func New(amqpURI string) (Consumer, error) {
// //mq链接
// log.Printf("dialing %q", amqpURI)
// conn, err := amqp.Dial(amqpURI)
// if err != nil {
// global.Logger.Errorf("Dial: %s", err)
// consumerErr++
// return Consumer{}, err
// }
//
// log.Printf("got Connection, getting Channel")
// //创建一个rabbitmq信道
// channel, err := conn.Channel()
// if err != nil {
// return Consumer{}, err
// }
//
// defer channel.Close()
// return Consumer{
// conn: conn,
// channel: channel,
// done: make(chan error),
// }, err
//}
func failOnError(err error, msg string) {
if err != nil {
global.Logger.Errorf("%s: %s", msg, err)
return
}
}
var consumerErr int
func NewConsumer(amqpURI, exchangeName, exchangeType string) (*Consumer, error) {
c := &Consumer{
conn: nil,
channel: nil,
done: make(chan error),
}
var err error
// 最大重试次数
maxRetries := 3
// 连接 RabbitMQ
log.Printf("dialing %q", amqpURI)
c.conn, err = amqp.Dial(amqpURI)
if err != nil {
global.Logger.Errorf("Dial: %s", err)
consumerErr++
if consumerErr > maxRetries {
return nil, fmt.Errorf("failed to connect after %d retries", maxRetries)
}
time.Sleep(3 * time.Second)
return NewConsumer(amqpURI, exchangeName, exchangeType) // 重试连接
}
defer c.conn.Close()
// 创建信道
log.Printf("got Connection, getting Channel")
c.channel, err = c.conn.Channel()
if err != nil {
global.Logger.Errorf("Failed to open a channel: %s", err)
return nil, err
}
defer c.channel.Close()
// 声明交换机
log.Printf("got Channel, declaring Exchange (%q)", exchangeName)
err = c.exchangeDeclare(exchangeName, exchangeType)
if err != nil {
global.Logger.Errorf("Failed to declare exchange: %s", err)
return nil, err
}
// 声明发布消息的交换机
err = c.exchangeDeclare(global.PublishSetting.ExchangeName, global.PublishSetting.ExchangeType)
if err != nil {
global.Logger.Errorf("Failed to declare exchange: %s", err)
return nil, err
}
// 定义队列和路由键绑定
type bind struct {
queue string
routingKey string
}
bindings := []bind{
{"news", "grih.HOME_MAC"},
}
// 声明队列并绑定
for _, b := range bindings {
log.Printf("declared Exchange, declaring Queue %q", b.queue)
err = c.queueDeclare(b.queue)
if err != nil {
global.Logger.Errorf("Failed to declare queue %s: %s", b.queue, err)
return nil, err
}
err = c.channel.QueueBind(b.queue, b.routingKey, exchangeName, false, nil)
if err != nil {
global.Logger.Errorf("Failed to bind queue %s: %s", b.queue, err)
return nil, err
}
}
// 设置 QoS
err = c.channel.Qos(250, 0, false)
if err != nil {
global.Logger.Errorf("Failed to set QoS: %s", err)
return nil, err
}
// 启动消费者
err = c.Consume()
if err != nil {
global.Logger.Errorf("Failed to start consumer: %s", err)
return nil, err
}
// 监听通道关闭事件
closeChan := make(chan *amqp.Error, 1)
notifyClose := c.channel.NotifyClose(closeChan)
// 处理通道关闭事件
for {
select {
case e := <-notifyClose:
global.Logger.Errorf("chan通道错误:%v", e.Error())
time.Sleep(5 * time.Second)
log.Println("休息5秒后重新连接")
// 重新连接
c, err = NewConsumer(amqpURI, exchangeName, exchangeType)
if err != nil {
global.Logger.Errorf("Failed to reconnect: %s", err)
return nil, err
}
return c, nil // 返回新的 Consumer 实例
}
}
}
func (c *Consumer) queueDeclare(queue string) error {
_, err := c.channel.QueueDeclare(queue, true, false, false, false, nil)
return err
}
func (c *Consumer) exchangeDeclare(exchangeName, exchangeType string) error {
err := c.channel.ExchangeDeclare(
exchangeName, // name of the exchange
exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
)
return err
}
func (c *Consumer) Shutdown() error {
// will close() the deliveries channel
if err := c.channel.Cancel(c.tag, true); err != nil {
return fmt.Errorf("Consumer cancel failed: %s", err)
}
if err := c.conn.Close(); err != nil {
return fmt.Errorf("AMQP connection close error: %s", err)
}
defer log.Printf("AMQP shutdown OK")
// wait for handle() to exit
return <-c.done
}
......@@ -10,6 +10,7 @@ import (
"speech-nlu-parse/model"
"speech-nlu-parse/pkg/errCode"
"speech-nlu-parse/pkg/logger"
"speech-nlu-parse/pkg/util"
"sync"
"time"
)
......@@ -72,14 +73,15 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) {
session = ""
}
start := time.Now()
data := SpeechNlpWsReq(reqStruct, session)
fmt.Println("*****************")
fmt.Println(data)
speechNluWs := SpeechNlpWs{}
var nlpWsConn *websocket.Conn
nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.MacVoice, reqStruct.Ip)
deviceId := util.ComputerMd5(reqStruct.MacVoice)
nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.MacVoice, reqStruct.Ip, deviceId)
if err != nil {
global.Logger.WithFields(logger.Fields{
"requestId": reqStruct.RequestId,
......@@ -93,7 +95,7 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) {
done := make(chan struct{})
dataChan := make(chan []model.SpeechWsResp, 1) // 缓冲通道避免阻塞
go receiveMessage(nlpWsConn, done, dataChan, reqStruct.RequestId)
go receiveMessage(nlpWsConn, done, dataChan, reqStruct.RequestId, start)
if err := sendRunTaskMsg(nlpWsConn, data); err != nil {
global.Logger.WithFields(logger.Fields{
......@@ -145,7 +147,7 @@ func sendRunTaskMsg(conn *websocket.Conn, v interface{}) error {
return conn.WriteMessage(websocket.TextMessage, msgJSON)
}
func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp, requestId string) {
func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []model.SpeechWsResp, requestId string, start time.Time) {
var responses []model.SpeechWsResp
defer func() {
dataChan <- responses
......@@ -162,6 +164,7 @@ func receiveMessage(conn *websocket.Conn, done chan struct{}, dataChan chan<- []
global.Logger.WithFields(logger.Fields{
"requestId": requestId,
"origin-data": string(message),
"time": time.Since(start).Seconds(),
}).Info("原始消息")
//思必驰nlu回复
......
......@@ -10,7 +10,6 @@ import (
"speech-nlu-parse/global"
"speech-nlu-parse/model"
"speech-nlu-parse/pkg/logger"
"speech-nlu-parse/pkg/util"
"sync"
"time"
)
......@@ -20,7 +19,7 @@ type SpeechNlpWs struct {
mutex sync.Mutex
}
func (s *SpeechNlpWs) SpeechWs(mac, ip string) (*websocket.Conn, error) {
func (s *SpeechNlpWs) SpeechWs(mac, ip, deviceId string) (*websocket.Conn, error) {
SpeechWs_Url := global.SpeechSetting.Url
SpeechWs_ProductId := global.SpeechSetting.ProductId
SpeechWs_ApiKey := global.SpeechSetting.ApiKey
......@@ -30,8 +29,6 @@ func (s *SpeechNlpWs) SpeechWs(mac, ip string) (*websocket.Conn, error) {
HandshakeTimeout: 30 * time.Second,
}
deviceId := util.ComputerMd5(mac)
header := http.Header{}
header.Set("X-Forwarded-For", ip)
......@@ -81,7 +78,7 @@ func SpeechNlpWsReq(data *model.SemanticReq, sessionId string) *SpeechWsData {
global.Logger.WithFields(logger.Fields{
"req": req,
"sessionId": sessionId,
}).Error("SpeechNlpWsReq")
}).Info("SpeechNlpWsReq")
return req
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment