From dbfeba08ebca71c318a3ad05a4f6b4b17136822a Mon Sep 17 00:00:00 2001 From: 13196 <1319697849@qq.com> Date: Sun, 18 May 2025 19:22:00 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85=E6=96=B0=E9=97=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/config.yaml | 5 ++ global/setting.go | 1 + main.go | 14 ++++ pkg/setting/section.go | 6 ++ service/speechNlu/news.go | 6 +- service/speechNlu/rabbitmq.go | 126 ++++++++++++++++++++-------------- 6 files changed, 103 insertions(+), 55 deletions(-) diff --git a/conf/config.yaml b/conf/config.yaml index 23ac839..4164ccb 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -8,6 +8,11 @@ Server: Interval: 10 #健康检查间隔,单位:秒 Deregister: 1 #注销时间,相当于过期时间,单位:分钟 +RabbitMq: + Url: amqp://iot:qwe!23@172.28.5.42:5672/voice + ExchangeName: voice.media.ctrl + ExchangeType: direct + Logger: LogFileName: tencent-nlu-parse LogFileExt: .log diff --git a/global/setting.go b/global/setting.go index 36ccac5..64bff79 100644 --- a/global/setting.go +++ b/global/setting.go @@ -21,4 +21,5 @@ var ( ConsulObj *consul.ConsulObj TencentGwSetting *setting.TencentGwSetting SpeechSetting *setting.SpeechSettings + RabbitMqSetting *setting.RabbitMqSettings ) diff --git a/main.go b/main.go index 7ff4992..2ba69fc 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "net" "net/url" "runtime" + "speech-nlu-parse/service/speechNlu" "strings" "time" @@ -117,6 +118,15 @@ func main() { if err = server.Serve(lis); err != nil { log.Fatalf("start service error:%v", err) } + + c, err := speechNlu.NewConsumer(global.RabbitMqSetting.Url, global.RabbitMqSetting.ExchangeName, global.RabbitMqSetting.ExchangeType) + if err != nil { + log.Fatalf("%s", err) + } + + if err := c.Shutdown(); err != nil { + log.Printf("error during shutdown: %s", err) + } } func setupFlag() error { @@ -195,6 +205,10 @@ func setupSetting() error { if err != nil { return err } + err = s.ReadSection("RabbitMq", &global.RabbitMqSetting) + if err != nil { + return err + } consulUrlParse, err := url.Parse(consulUrl) if err != nil { diff --git a/pkg/setting/section.go b/pkg/setting/section.go index 8e61248..594828d 100644 --- a/pkg/setting/section.go +++ b/pkg/setting/section.go @@ -96,6 +96,12 @@ type SpeechSettings struct { Url string } +type RabbitMqSettings struct { + Url string + ExchangeName string + ExchangeType string +} + func (s *Setting) ReadSection(k string, v interface{}) error { return s.vp.UnmarshalKey(k, v) } diff --git a/service/speechNlu/news.go b/service/speechNlu/news.go index 21f7047..a4ead55 100644 --- a/service/speechNlu/news.go +++ b/service/speechNlu/news.go @@ -37,7 +37,7 @@ type FeedBackResp struct { Source string `json:"source"` } -func FeedBackNews(actionType int, ip string) (*FeedBackResp, error) { +func FeedBackNews(actionType, duration int, ip, sid string) (*FeedBackResp, error) { url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111" request := FeedBackReq{ ActionType: actionType, @@ -46,8 +46,8 @@ func FeedBackNews(actionType int, ip string) (*FeedBackResp, error) { Brand: "gree", Data: []map[string]interface{}{ { - "sid": "m6J2LW-4z-H2dtxea5Sv6_voziweppX1K_aGb7fKtdjq3y36awwXd_NeoKgBXD7a", - "duration": 138, + "sid": sid, //"m6J2LW-4z-H2dtxea5Sv6_voziweppX1K_aGb7fKtdjq3y36awwXd_NeoKgBXD7a" + "duration": duration, //138 }, }, } diff --git a/service/speechNlu/rabbitmq.go b/service/speechNlu/rabbitmq.go index 56d6260..ea228a7 100644 --- a/service/speechNlu/rabbitmq.go +++ b/service/speechNlu/rabbitmq.go @@ -1,9 +1,11 @@ package speechNlu import ( + "encoding/json" "fmt" "log" "speech-nlu-parse/global" + "speech-nlu-parse/pkg/logger" "time" amqp "github.com/rabbitmq/amqp091-go" @@ -16,31 +18,6 @@ type Consumer struct { done chan error } -//func New(amqpURI string) (Consumer, error) { -// //mq链接 -// log.Printf("dialing %q", amqpURI) -// conn, err := amqp.Dial(amqpURI) -// if err != nil { -// global.Logger.Errorf("Dial: %s", err) -// consumerErr++ -// return Consumer{}, err -// } -// -// log.Printf("got Connection, getting Channel") -// //创建一个rabbitmq信道 -// channel, err := conn.Channel() -// if err != nil { -// return Consumer{}, err -// } -// -// defer channel.Close() -// return Consumer{ -// conn: conn, -// channel: channel, -// done: make(chan error), -// }, err -//} - func failOnError(err error, msg string) { if err != nil { global.Logger.Errorf("%s: %s", msg, err) @@ -92,40 +69,25 @@ func NewConsumer(amqpURI, exchangeName, exchangeType string) (*Consumer, error) return nil, err } - // 声明发布消息的交换机 - err = c.exchangeDeclare(global.PublishSetting.ExchangeName, global.PublishSetting.ExchangeType) + queue := "news" + routingKey := "news" + + // 声明队列并绑定 + log.Printf("declared Exchange, declaring Queue %q", queue) + err = c.queueDeclare(queue) if err != nil { - global.Logger.Errorf("Failed to declare exchange: %s", err) + global.Logger.Errorf("Failed to declare queue %s: %s", queue, err) return nil, err } - // 定义队列和路由键绑定 - type bind struct { - queue string - routingKey string - } - bindings := []bind{ - {"news", "grih.HOME_MAC"}, - } - - // 声明队列并绑定 - for _, b := range bindings { - log.Printf("declared Exchange, declaring Queue %q", b.queue) - err = c.queueDeclare(b.queue) - if err != nil { - global.Logger.Errorf("Failed to declare queue %s: %s", b.queue, err) - return nil, err - } - - err = c.channel.QueueBind(b.queue, b.routingKey, exchangeName, false, nil) - if err != nil { - global.Logger.Errorf("Failed to bind queue %s: %s", b.queue, err) - return nil, err - } + err = c.channel.QueueBind(queue, routingKey, exchangeName, false, nil) + if err != nil { + global.Logger.Errorf("Failed to bind queue %s: %s", queue, err) + return nil, err } // 设置 QoS - err = c.channel.Qos(250, 0, false) + err = c.channel.Qos(10, 0, false) if err != nil { global.Logger.Errorf("Failed to set QoS: %s", err) return nil, err @@ -194,3 +156,63 @@ func (c *Consumer) Shutdown() error { // wait for handle() to exit return <-c.done } + +type ReData struct { + Cols []string `json:"cols"` + Dat []interface{} `json:"dat"` +} + +func (c *Consumer) Consume() { + msgs, err := c.channel.Consume("news", "", false, false, false, false, nil) + failOnError(err, "Failed to register a consumer") + for d := range msgs { + resp := ReData{} + err := json.Unmarshal(d.Body, &resp) + if err != nil { + global.Logger.WithFields(logger.Fields{"requestId": requestId}).Errorf("subdev json.Unmarshal error: %v", err) + d.Nack(false, true) + break + } + + global.Logger.WithFields(logger.Fields{ + "requestId": requestId, + "RoutingKey": d.RoutingKey, + "Body": fmt.Sprintf("%s", d.Body), + "re": re, + }).Info("news") + + // 初始化变量 + var ( + state, offset int + mediaId, ip string + ) + //todo:还需要判断domain + + // 遍历 cols 找到索引并取值 + for i, col := range resp.Cols { + switch col { + case "state": + if val, ok := resp.Dat[i].(float64); ok { + state = int(val) + } + case "mediaId": + if val, ok := resp.Dat[i].(string); ok { + mediaId = val + } + case "offset": + if val, ok := resp.Dat[i].(float64); ok { + offset = int(val) + } + } + //todo:还需要ip和mac + feedBack, err := FeedBackNews(state, offset, ip, mediaId) + failOnError(err, "FeedBackNews err") + if feedBack.Code != 0 { + d.Nack(false, true) + break + } + d.Ack(false) + } + + } +} -- GitLab