From 66eaa8891c5016dbd0ce607fc414de703384e3e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=96=87=E9=9D=99?= <1319697849@qq.com> Date: Thu, 22 May 2025 17:31:16 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85=E6=96=B0=E9=97=BBrabbitmq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 117 +++++++++++++--------------------- model/speech.go | 1 + service/speechNlu/domain.go | 1 + service/speechNlu/news.go | 6 +- service/speechNlu/rabbitmq.go | 6 +- 5 files changed, 55 insertions(+), 76 deletions(-) diff --git a/main.go b/main.go index 2523152..2209a42 100644 --- a/main.go +++ b/main.go @@ -11,11 +11,15 @@ import ( "log" "net" "net/url" + "os" + "os/signal" "runtime" "speech-nlu-parse/middleware" "speech-nlu-parse/pkg/proto" "speech-nlu-parse/service" + "speech-nlu-parse/service/speechNlu" "strings" + "syscall" "time" "gopkg.in/natefinch/lumberjack.v2" @@ -83,71 +87,30 @@ func setup() error { } func main() { - //// 初始化配置 - //if err := setup(); err != nil { - // log.Fatalf("[FATAL] Setup failed: %v", err) - //} - // - //// 初始化 RabbitMQ 消费者 - //consumer, err := speechNlu.NewConsumer( - // global.RabbitMqSetting.Url, - // global.RabbitMqSetting.ExchangeName, - // global.RabbitMqSetting.ExchangeType, - //) - //if err != nil { - // log.Fatalf("[FATAL] Create consumer failed: %v", err) - //} - //defer func() { - // if err := consumer.Shutdown(); err != nil { - // log.Printf("[WARN] Consumer shutdown error: %v", err) - // } - //}() - // - //// 初始化 gRPC - //lis, err := net.Listen("tcp", fmt.Sprintf(":%d", global.ServerSetting.Port)) - //if err != nil { - // log.Fatalf("[FATAL] Listen failed: %v", err) - //} - // - //server := grpc.NewServer( - // grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - // middleware.StreamGSError500(), - // )), - // grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - // middleware.UnaryGSError500(), - // )), - //) - //proto.RegisterTencentNluServer(server, &service.TencentNlu{}) - //grpc_health_v1.RegisterHealthServer(server, &consul.HealthImpl{}) - //reflection.Register(server) - // - //// 启动 gRPC 服务 - //go func() { - // global.Logger.Info("service is running......") - // if err := server.Serve(lis); err != nil { - // log.Fatalf("[FATAL] gRPC serve error: %v", err) - // } - //}() - // - //// 阻塞等待信号 - //stop := make(chan os.Signal, 1) - //signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - //<-stop - // - //// 优雅关闭 - //log.Println("[INFO] Shutting down...") - //server.GracefulStop() - //log.Println("[INFO] Server stopped") - var err error - // 初始化, 加载配置 - err = setup() + // 初始化配置 + if err := setup(); err != nil { + log.Fatalf("[FATAL] Setup failed: %v", err) + } + + // 初始化 RabbitMQ 消费者 + consumer, err := speechNlu.NewConsumer( + global.RabbitMqSetting.Url, + global.RabbitMqSetting.ExchangeName, + global.RabbitMqSetting.ExchangeType, + ) if err != nil { - return + log.Fatalf("[FATAL] Create consumer failed: %v", err) } + defer func() { + if err := consumer.Shutdown(); err != nil { + log.Printf("[WARN] Consumer shutdown error: %v", err) + } + }() - lis, err := net.Listen("tcp", fmt.Sprintf(":%v", global.ServerSetting.Port)) + // 初始化 gRPC + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", global.ServerSetting.Port)) if err != nil { - log.Fatalf("listen error:%v", err) + log.Fatalf("[FATAL] Listen failed: %v", err) } server := grpc.NewServer( @@ -162,15 +125,25 @@ func main() { _ = consul.RegisterService(global.ServerSetting) //将服务注册到注册中心 grpc_health_v1.RegisterHealthServer(server, &consul.HealthImpl{}) //执行健康检查 + reflection.Register(server) - reflection.Register(server) //使用grpcurl、grpcui工具需添加该行 - - global.Logger.Info("service is running......") + // 启动 gRPC 服务 + go func() { + global.Logger.Info("service is running......") + if err := server.Serve(lis); err != nil { + log.Fatalf("[FATAL] gRPC serve error: %v", err) + } + }() - if err = server.Serve(lis); err != nil { - log.Fatalf("start service error:%v", err) - } + // 阻塞等待信号 + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + <-stop + // 优雅关闭 + log.Println("[INFO] Shutting down...") + server.GracefulStop() + log.Println("[INFO] Server stopped") } func setupFlag() error { @@ -179,7 +152,7 @@ func setupFlag() error { flag.StringVar(&serverTag, "tag", "", "注册到注册中心的标签") flag.StringVar(&ip, "ip", "", "IP") flag.IntVar(&port, "port", 0, "端口") - flag.StringVar(&consulUrl, "consul", "http://172.28.124.106:8500", "consul服务地址") + flag.StringVar(&consulUrl, "consul", "http://172.28.124.105:8500", "consul服务地址") flag.StringVar(&config, "config", "env/v2,speech_nlu_parse/conf", "指定要使用的配置文件路径") flag.BoolVar(&isVersion, "version", false, "编译信息") flag.StringVar(&consulToken, "token", "092288b5-824f-854c-39aa-a958afd9a633", "consul token") @@ -249,10 +222,10 @@ func setupSetting() error { if err != nil { return err } - //err = s.ReadSection("RabbitMq", &global.RabbitMqSetting) - //if err != nil { - // return err - //} + err = s.ReadSection("RabbitMq", &global.RabbitMqSetting) + if err != nil { + return err + } consulUrlParse, err := url.Parse(consulUrl) if err != nil { diff --git a/model/speech.go b/model/speech.go index 3294017..c0ed486 100644 --- a/model/speech.go +++ b/model/speech.go @@ -112,6 +112,7 @@ type Content struct { Dynasty string `json:"dynasty"` ResType string `json:"resType"` Source string `json:"source"` + NewsId string `json:"news_id"` } `json:"extra"` } diff --git a/service/speechNlu/domain.go b/service/speechNlu/domain.go index 7477b87..dda4466 100644 --- a/service/speechNlu/domain.go +++ b/service/speechNlu/domain.go @@ -221,6 +221,7 @@ func newsDomain(params *model.SpeechDomainParams) []byte { "url": con.LinkUrl, "title": con.Title, "newsFrom": con.Extra.Source, + "mediaId": con.Extra.NewsId, //"newsType": "", }) } diff --git a/service/speechNlu/news.go b/service/speechNlu/news.go index 54be198..febb2fa 100644 --- a/service/speechNlu/news.go +++ b/service/speechNlu/news.go @@ -38,8 +38,8 @@ type FeedBackResp struct { } func FeedBackNews(actionType, duration int, sid, uid string) (*FeedBackResp, error) { - url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111" - //url := global.SpeechSetting.FeedBackUrl + "?productId=" + global.SpeechSetting.ProductId + "&apikey=" + global.SpeechSetting.ApiKey + "&uid=" + uid + //url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111" + url := global.SpeechSetting.FeedBackUrl + "?productId=" + global.SpeechSetting.ProductId + "&apikey=" + global.SpeechSetting.ApiKey + "&uid=" + uid request := FeedBackReq{ ActionType: actionType, Timestamp: time.Now().Unix(), @@ -86,7 +86,7 @@ func FeedBackNews(actionType, duration int, sid, uid string) (*FeedBackResp, err return nil, err } - field["GetGridConList_response"] = string(body) + field["FeedBackNews_response"] = string(body) var response FeedBackResp if err := json.Unmarshal(body, &response); err != nil { diff --git a/service/speechNlu/rabbitmq.go b/service/speechNlu/rabbitmq.go index fc00da0..84d956f 100644 --- a/service/speechNlu/rabbitmq.go +++ b/service/speechNlu/rabbitmq.go @@ -194,7 +194,11 @@ func (c *Consumer) Consume() { // 处理消息逻辑 state, offset, mediaId := parseData(&resp) uid := util.EncodeMD5(resp.Mac) - //ip := "14.215.222.17" //todo:待改 + //ip := "14.215.222.17" + if state != 3 { + state = 4 //未播完 + } + offset = offset / 1000 if _, err := FeedBackNews(state, offset, mediaId, uid); err != nil { global.Logger.Errorf("Feedback error: %v", err) _ = d.Nack(false, true) -- GitLab