Commit 29c0b1a9 authored by 赵文静's avatar 赵文静

补充新闻rabbitmq部分

parent dbfeba08
......@@ -9,7 +9,7 @@ Server:
Deregister: 1 #注销时间,相当于过期时间,单位:分钟
RabbitMq:
Url: amqp://iot:qwe!23@172.28.5.42:5672/voice
Url: amqp://iot:qwe!23@172.28.124.106:5672/voice
ExchangeName: voice.media.ctrl
ExchangeType: direct
......@@ -31,6 +31,12 @@ Device: #举例
DB:
Dsn: root:123456@tcp(172.28.124.105:3306)/chatroom?charset=utf8
Speech:
ProductId: 279629895
Apikey : 0c74988953dd4ed4bf31955527802cf3
Url: wss://dds.dui.ai/dds/v2/test
FeedBackUrl: http://apis.duiopen.com/feedback/letingv4
MusicDomain:
IsQQMusicRemid: true # 是否替换为自定义的提醒
QQMusicRemid: 。您的语音空调尚未激活,请尽快更新格力+APP,进入语音空调语音技能页面,点击QQ音乐进行授权,授权过程不产生任何费用。
......
......@@ -41,6 +41,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/smartystreets/goconvey v1.8.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
......
......@@ -7,9 +7,12 @@ import (
"log"
"net"
"net/url"
"os"
"os/signal"
"runtime"
"speech-nlu-parse/service/speechNlu"
"strings"
"syscall"
"time"
"speech-nlu-parse/dao"
......@@ -86,16 +89,30 @@ func setup() error {
}
func main() {
var err error
// 初始化, 加载配置
err = setup()
// 初始化配置
if err := setup(); err != nil {
log.Fatalf("[FATAL] Setup failed: %v", err)
}
// 初始化 RabbitMQ 消费者
consumer, err := speechNlu.NewConsumer(
global.RabbitMqSetting.Url,
global.RabbitMqSetting.ExchangeName,
global.RabbitMqSetting.ExchangeType,
)
if err != nil {
return
log.Fatalf("[FATAL] Create consumer failed: %v", err)
}
defer func() {
if err := consumer.Shutdown(); err != nil {
log.Printf("[WARN] Consumer shutdown error: %v", err)
}
}()
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", global.ServerSetting.Port))
// 初始化 gRPC
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", global.ServerSetting.Port))
if err != nil {
log.Fatalf("listen error:%v", err)
log.Fatalf("[FATAL] Listen failed: %v", err)
}
server := grpc.NewServer(
......@@ -107,26 +124,26 @@ func main() {
)),
)
proto.RegisterTencentNluServer(server, &service.TencentNlu{})
grpc_health_v1.RegisterHealthServer(server, &consul.HealthImpl{})
reflection.Register(server)
_ = consul.RegisterService(global.ServerSetting) //将服务注册到注册中心
grpc_health_v1.RegisterHealthServer(server, &consul.HealthImpl{}) //执行健康检查
reflection.Register(server) //使用grpcurl、grpcui工具需添加该行
// 启动 gRPC 服务
go func() {
global.Logger.Info("service is running......")
if err = server.Serve(lis); err != nil {
log.Fatalf("start service error:%v", err)
if err := server.Serve(lis); err != nil {
log.Fatalf("[FATAL] gRPC serve error: %v", err)
}
}()
c, err := speechNlu.NewConsumer(global.RabbitMqSetting.Url, global.RabbitMqSetting.ExchangeName, global.RabbitMqSetting.ExchangeType)
if err != nil {
log.Fatalf("%s", err)
}
// 阻塞等待信号
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
<-stop
if err := c.Shutdown(); err != nil {
log.Printf("error during shutdown: %s", err)
}
// 优雅关闭
log.Println("[INFO] Shutting down...")
server.GracefulStop()
log.Println("[INFO] Server stopped")
}
func setupFlag() error {
......
......@@ -94,6 +94,7 @@ type SpeechSettings struct {
ProductId string
ApiKey string
Url string
FeedBackUrl string
}
type RabbitMqSettings struct {
......
......@@ -611,8 +611,8 @@ func (TencentNlu) TencentNluParseStream(stream proto.TencentNlu_TencentNluParseS
speechNluWs := speechNlu.SpeechNlpWs{}
var nlpWsConn *websocket.Conn
deviceId := util.ComputerMd5(mac)
nlpWsConn, err = speechNluWs.SpeechWs(mac, ip, deviceId)
deviceId := util.EncodeMD5(mac)
nlpWsConn, err = speechNluWs.SpeechWs(ip, deviceId)
if err != nil {
global.Logger.Errorf("speechWs.SpeechWs error. %v", err)
return err
......
......@@ -176,12 +176,6 @@ func chatDomain(params *model.SpeechDomainParams) []byte {
// 目前无返回
func newsDomain(params *model.SpeechDomainParams) []byte {
//resp, err := FeedBackNews(3, "45.244.56.176")
//if err != nil {
// return nil
//}
//fmt.Println(resp)
res, err := baseParse(params)
if err != nil {
global.Logger.WithFields(logger.Fields{
......@@ -193,6 +187,44 @@ func newsDomain(params *model.SpeechDomainParams) []byte {
return replyWithChat(error_reply, "doudi")
}
res.Header.Semantic.Domain = "news"
res.Header.Semantic.Intent = "search"
res.Header.Semantic.SkillId = res.Header.Semantic.Domain + "." + res.Header.Semantic.Intent
res.ListItems = make([]map[string]interface{}, 0)
sliceData, isSlice := params.SpeechWsResp.Dm.Widget.Content.([]interface{})
if !isSlice {
global.Logger.WithFields(logger.Fields{
"requestId": params.RequestId,
"mac": params.Mac,
"mid": params.Mid,
"vender": params.MidType,
}).Error("params.SpeechWsResp.Dm.Widget.Content不是数组类型")
if res.ResponseText != "" {
return replyWithChat(res.ResponseText, "doudi")
}
return replyWithChat(error_reply, "doudi")
}
for _, item := range sliceData {
itemBytes, _ := json.Marshal(item)
var con model.Content
if err := json.Unmarshal(itemBytes, &con); err != nil {
global.Logger.WithFields(logger.Fields{
"requestId": params.RequestId,
"mac": params.Mac,
"mid": params.Mid,
"vender": params.MidType,
}).Errorf("json.Unmarshal元素解析失败: %v", err)
return replyWithChat(error_reply, "doudi")
}
res.ListItems = append(res.ListItems, map[string]interface{}{
"url": con.LinkUrl,
"title": con.Title,
"newsFrom": "",
"newsType": "",
})
}
return Marshal(params, res)
}
......@@ -923,8 +955,22 @@ func speechOtherDomain(params *model.SpeechDomainParams) []byte {
}
func helpDomain(params *model.SpeechDomainParams) []byte {
if !params.CheckDm() {
resultTextStr := "我可以控制空调开机、播放音乐、查询时间天气、调节温度,设置模式,例如您可以对我说,空调开机。"
return replyWithChat(resultTextStr, "chat.chat")
}
var result model.ResponseBody
result.Header.Semantic.Code = 0
result.Header.Semantic.Domain = "chat"
result.Header.Semantic.Intent = "chat"
result.Header.Semantic.Msg = speech_nlu_parse
result.Header.Semantic.SessionComplete = params.SpeechWsResp.Dm.ShouldEndSession //params.SpeechWsResp.Dm.ShouldEndSession
result.Header.Semantic.SkillId = result.Header.Semantic.Domain + "." + result.Header.Semantic.Intent
result.ResponseText = params.SpeechWsResp.Dm.Nlg
result.AsrRecongize = params.SpeechWsResp.Dm.Input
return Marshal(params, &result)
}
func Marshal(params *model.SpeechDomainParams, res *model.ResponseBody) []byte {
......
......@@ -37,12 +37,14 @@ type FeedBackResp struct {
Source string `json:"source"`
}
func FeedBackNews(actionType, duration int, ip, sid string) (*FeedBackResp, error) {
url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111"
func FeedBackNews(actionType, duration int, ip, sid, uid string) (*FeedBackResp, error) {
//url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111"
url := global.SpeechSetting.FeedBackUrl + "?productId=" + global.SpeechSetting.ProductId +
"&apikey=" + global.SpeechSetting.ApiKey + "&uid=" + uid
request := FeedBackReq{
ActionType: actionType,
Timestamp: time.Now().Unix(),
ClientIp: ip,
ClientIp: ip, //ip
Brand: "gree",
Data: []map[string]interface{}{
{
......
......@@ -6,6 +6,7 @@ import (
"log"
"speech-nlu-parse/global"
"speech-nlu-parse/pkg/logger"
"speech-nlu-parse/pkg/util"
"time"
amqp "github.com/rabbitmq/amqp091-go"
......@@ -16,179 +17,201 @@ type Consumer struct {
channel *amqp.Channel
tag string
done chan error
amqpURI string
exchangeName string
exchangeType string
closeNotifier chan *amqp.Error
}
func failOnError(err error, msg string) {
if err != nil {
global.Logger.Errorf("%s: %s", msg, err)
return
}
}
var consumerErr int
func NewConsumer(amqpURI, exchangeName, exchangeType string) (*Consumer, error) {
c := &Consumer{
conn: nil,
channel: nil,
amqpURI: amqpURI,
exchangeName: exchangeName,
exchangeType: exchangeType,
done: make(chan error),
}
var err error
// 最大重试次数
maxRetries := 3
retryInterval := 3 * time.Second
// 连接 RabbitMQ
log.Printf("dialing %q", amqpURI)
c.conn, err = amqp.Dial(amqpURI)
if err != nil {
global.Logger.Errorf("Dial: %s", err)
consumerErr++
if consumerErr > maxRetries {
return nil, fmt.Errorf("failed to connect after %d retries", maxRetries)
var err error
for i := 0; i < maxRetries; i++ {
if err = c.connect(); err == nil {
break
}
time.Sleep(3 * time.Second)
return NewConsumer(amqpURI, exchangeName, exchangeType) // 重试连接
log.Printf("Connection attempt %d failed: %v", i+1, err)
time.Sleep(retryInterval)
}
defer c.conn.Close()
// 创建信道
log.Printf("got Connection, getting Channel")
c.channel, err = c.conn.Channel()
if err != nil {
global.Logger.Errorf("Failed to open a channel: %s", err)
return nil, err
return nil, fmt.Errorf("failed to connect after %d retries: %v", maxRetries, err)
}
defer c.channel.Close()
// 声明交换机
log.Printf("got Channel, declaring Exchange (%q)", exchangeName)
err = c.exchangeDeclare(exchangeName, exchangeType)
if err != nil {
global.Logger.Errorf("Failed to declare exchange: %s", err)
return nil, err
// 初始化 Exchange、Queue 和 Binding
if err := c.setup(); err != nil {
return nil, fmt.Errorf("setup failed: %v", err)
}
queue := "news"
routingKey := "news"
// 启动消费者协程
go c.Consume()
// 监听通道关闭事件
go c.handleChannelClose()
// 声明队列并绑定
log.Printf("declared Exchange, declaring Queue %q", queue)
err = c.queueDeclare(queue)
return c, nil
}
func (c *Consumer) connect() error {
var err error
c.conn, err = amqp.Dial(c.amqpURI)
if err != nil {
global.Logger.Errorf("Failed to declare queue %s: %s", queue, err)
return nil, err
return fmt.Errorf("dial error: %v", err)
}
err = c.channel.QueueBind(queue, routingKey, exchangeName, false, nil)
c.channel, err = c.conn.Channel()
if err != nil {
global.Logger.Errorf("Failed to bind queue %s: %s", queue, err)
return nil, err
c.conn.Close()
return fmt.Errorf("channel error: %v", err)
}
// 设置 QoS
err = c.channel.Qos(10, 0, false)
if err != nil {
global.Logger.Errorf("Failed to set QoS: %s", err)
return nil, err
return nil
}
func (c *Consumer) setup() error {
// 声明 Exchange
if err := c.channel.ExchangeDeclare(
c.exchangeName,
c.exchangeType,
true, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args
); err != nil {
return fmt.Errorf("declare exchange error: %v", err)
}
// 启动消费者
err = c.Consume()
if err != nil {
global.Logger.Errorf("Failed to start consumer: %s", err)
return nil, err
// 声明队列
queue := "news"
if _, err := c.channel.QueueDeclare(queue,
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
); err != nil {
return fmt.Errorf("declare queue error: %v", err)
}
// 监听通道关闭事件
closeChan := make(chan *amqp.Error, 1)
notifyClose := c.channel.NotifyClose(closeChan)
// 绑定队列
routingKey := "mqhandle"
if err := c.channel.QueueBind(
queue,
routingKey,
c.exchangeName,
false, // noWait
nil, // args
); err != nil {
return fmt.Errorf("queue bind error: %v", err)
}
// 设置 QoS
if err := c.channel.Qos(10, 0, false); err != nil {
return fmt.Errorf("QoS error: %v", err)
}
return nil
}
func (c *Consumer) handleChannelClose() {
closeErr := <-c.channel.NotifyClose(make(chan *amqp.Error))
if closeErr != nil {
global.Logger.Errorf("Channel closed: %v", closeErr)
// 触发重连逻辑
c.reconnect()
}
}
// 处理通道关闭事件
func (c *Consumer) reconnect() {
log.Println("Attempting to reconnect...")
var err error
for {
select {
case e := <-notifyClose:
global.Logger.Errorf("chan通道错误:%v", e.Error())
time.Sleep(5 * time.Second)
log.Println("休息5秒后重新连接")
// 重新连接
c, err = NewConsumer(amqpURI, exchangeName, exchangeType)
if err != nil {
global.Logger.Errorf("Failed to reconnect: %s", err)
return nil, err
if err := c.connect(); err == nil {
if err := c.setup(); err == nil {
go c.Consume()
go c.handleChannelClose()
log.Println("Reconnected successfully")
return
}
return c, nil // 返回新的 Consumer 实例
}
log.Printf("Reconnect failed: %v", err)
}
}
func (c *Consumer) queueDeclare(queue string) error {
_, err := c.channel.QueueDeclare(queue, true, false, false, false, nil)
return err
func (c *Consumer) Shutdown() error {
if c.channel != nil {
if err := c.channel.Close(); err != nil {
return fmt.Errorf("channel close error: %v", err)
}
}
if c.conn != nil {
if err := c.conn.Close(); err != nil {
return fmt.Errorf("connection close error: %v", err)
}
}
return nil
}
func (c *Consumer) exchangeDeclare(exchangeName, exchangeType string) error {
err := c.channel.ExchangeDeclare(
exchangeName, // name of the exchange
exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
func (c *Consumer) Consume() {
msgs, err := c.channel.Consume(
"news",
"", // consumer
false, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
nil, // args
)
return err
}
func (c *Consumer) Shutdown() error {
// will close() the deliveries channel
if err := c.channel.Cancel(c.tag, true); err != nil {
return fmt.Errorf("Consumer cancel failed: %s", err)
if err != nil {
global.Logger.Errorf("Consume error: %v", err)
return
}
if err := c.conn.Close(); err != nil {
return fmt.Errorf("AMQP connection close error: %s", err)
for d := range msgs {
var resp ReData
if err := json.Unmarshal(d.Body, &resp); err != nil {
global.Logger.Errorf("Unmarshal error: %v", err)
_ = d.Nack(false, true)
continue // 继续处理下一条消息
}
defer log.Printf("AMQP shutdown OK")
global.Logger.WithFields(logger.Fields{
"RoutingKey": d.RoutingKey,
"Body": string(d.Body),
"Response": resp,
}).Info("Received message")
// wait for handle() to exit
return <-c.done
// 处理消息逻辑
state, offset, mediaId := parseData(&resp)
uid := util.EncodeMD5(resp.Mac)
ip := "14.215.222.17" //todo:待改
if _, err := FeedBackNews(state, offset, ip, mediaId, uid); err != nil {
global.Logger.Errorf("Feedback error: %v", err)
_ = d.Nack(false, true)
continue
}
_ = d.Ack(false)
}
}
type ReData struct {
Mac string `json:"mac"`
Cols []string `json:"cols"`
Dat []interface{} `json:"dat"`
}
func (c *Consumer) Consume() {
msgs, err := c.channel.Consume("news", "", false, false, false, false, nil)
failOnError(err, "Failed to register a consumer")
for d := range msgs {
resp := ReData{}
err := json.Unmarshal(d.Body, &resp)
if err != nil {
global.Logger.WithFields(logger.Fields{"requestId": requestId}).Errorf("subdev json.Unmarshal error: %v", err)
d.Nack(false, true)
break
}
global.Logger.WithFields(logger.Fields{
"requestId": requestId,
"RoutingKey": d.RoutingKey,
"Body": fmt.Sprintf("%s", d.Body),
"re": re,
}).Info("news")
// 初始化变量
var (
state, offset int
mediaId, ip string
)
//todo:还需要判断domain
// 遍历 cols 找到索引并取值
func parseData(resp *ReData) (state, offset int, mediaId string) {
for i, col := range resp.Cols {
switch col {
case "state":
......@@ -204,15 +227,6 @@ func (c *Consumer) Consume() {
offset = int(val)
}
}
//todo:还需要ip和mac
feedBack, err := FeedBackNews(state, offset, ip, mediaId)
failOnError(err, "FeedBackNews err")
if feedBack.Code != 0 {
d.Nack(false, true)
break
}
d.Ack(false)
}
}
return
}
......@@ -79,9 +79,9 @@ func SpeechNlu(reqStruct *model.SemanticReq) (string, *errCode.Error) {
speechNluWs := SpeechNlpWs{}
var nlpWsConn *websocket.Conn
deviceId := util.ComputerMd5(reqStruct.MacVoice)
deviceId := util.EncodeMD5(reqStruct.MacVoice)
nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.MacVoice, reqStruct.Ip, deviceId)
nlpWsConn, err = speechNluWs.SpeechWs(reqStruct.Ip, deviceId)
if err != nil {
global.Logger.WithFields(logger.Fields{
"requestId": reqStruct.RequestId,
......
......@@ -19,7 +19,7 @@ type SpeechNlpWs struct {
mutex sync.Mutex
}
func (s *SpeechNlpWs) SpeechWs(mac, ip, deviceId string) (*websocket.Conn, error) {
func (s *SpeechNlpWs) SpeechWs(ip, deviceId string) (*websocket.Conn, error) {
SpeechWs_Url := global.SpeechSetting.Url
SpeechWs_ProductId := global.SpeechSetting.ProductId
SpeechWs_ApiKey := global.SpeechSetting.ApiKey
......@@ -34,6 +34,7 @@ func (s *SpeechNlpWs) SpeechWs(mac, ip, deviceId string) (*websocket.Conn, error
SpeechUrl := SpeechWs_Url + "?serviceType=websocket&productId=" + SpeechWs_ProductId +
"&apikey=" + SpeechWs_ApiKey + "&deviceId=" + deviceId
fmt.Println(SpeechUrl)
conn, resp, err := dialer.Dial(SpeechUrl, header)
if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment