Commit dbfeba08 authored by 赵文静's avatar 赵文静

补充新闻

parent 9e758097
......@@ -8,6 +8,11 @@ Server:
Interval: 10 #健康检查间隔,单位:秒
Deregister: 1 #注销时间,相当于过期时间,单位:分钟
RabbitMq:
Url: amqp://iot:qwe!23@172.28.5.42:5672/voice
ExchangeName: voice.media.ctrl
ExchangeType: direct
Logger:
LogFileName: tencent-nlu-parse
LogFileExt: .log
......
......@@ -21,4 +21,5 @@ var (
ConsulObj *consul.ConsulObj
TencentGwSetting *setting.TencentGwSetting
SpeechSetting *setting.SpeechSettings
RabbitMqSetting *setting.RabbitMqSettings
)
......@@ -8,6 +8,7 @@ import (
"net"
"net/url"
"runtime"
"speech-nlu-parse/service/speechNlu"
"strings"
"time"
......@@ -117,6 +118,15 @@ func main() {
if err = server.Serve(lis); err != nil {
log.Fatalf("start service error:%v", err)
}
c, err := speechNlu.NewConsumer(global.RabbitMqSetting.Url, global.RabbitMqSetting.ExchangeName, global.RabbitMqSetting.ExchangeType)
if err != nil {
log.Fatalf("%s", err)
}
if err := c.Shutdown(); err != nil {
log.Printf("error during shutdown: %s", err)
}
}
func setupFlag() error {
......@@ -195,6 +205,10 @@ func setupSetting() error {
if err != nil {
return err
}
err = s.ReadSection("RabbitMq", &global.RabbitMqSetting)
if err != nil {
return err
}
consulUrlParse, err := url.Parse(consulUrl)
if err != nil {
......
......@@ -96,6 +96,12 @@ type SpeechSettings struct {
Url string
}
type RabbitMqSettings struct {
Url string
ExchangeName string
ExchangeType string
}
func (s *Setting) ReadSection(k string, v interface{}) error {
return s.vp.UnmarshalKey(k, v)
}
......@@ -37,7 +37,7 @@ type FeedBackResp struct {
Source string `json:"source"`
}
func FeedBackNews(actionType int, ip string) (*FeedBackResp, error) {
func FeedBackNews(actionType, duration int, ip, sid string) (*FeedBackResp, error) {
url := "http://apis.duiopen.com/feedback/letingv4?productId=279629895&apikey=0c74988953dd4ed4bf31955527802cf3&uid=111"
request := FeedBackReq{
ActionType: actionType,
......@@ -46,8 +46,8 @@ func FeedBackNews(actionType int, ip string) (*FeedBackResp, error) {
Brand: "gree",
Data: []map[string]interface{}{
{
"sid": "m6J2LW-4z-H2dtxea5Sv6_voziweppX1K_aGb7fKtdjq3y36awwXd_NeoKgBXD7a",
"duration": 138,
"sid": sid, //"m6J2LW-4z-H2dtxea5Sv6_voziweppX1K_aGb7fKtdjq3y36awwXd_NeoKgBXD7a"
"duration": duration, //138
},
},
}
......
package speechNlu
import (
"encoding/json"
"fmt"
"log"
"speech-nlu-parse/global"
"speech-nlu-parse/pkg/logger"
"time"
amqp "github.com/rabbitmq/amqp091-go"
......@@ -16,31 +18,6 @@ type Consumer struct {
done chan error
}
//func New(amqpURI string) (Consumer, error) {
// //mq链接
// log.Printf("dialing %q", amqpURI)
// conn, err := amqp.Dial(amqpURI)
// if err != nil {
// global.Logger.Errorf("Dial: %s", err)
// consumerErr++
// return Consumer{}, err
// }
//
// log.Printf("got Connection, getting Channel")
// //创建一个rabbitmq信道
// channel, err := conn.Channel()
// if err != nil {
// return Consumer{}, err
// }
//
// defer channel.Close()
// return Consumer{
// conn: conn,
// channel: channel,
// done: make(chan error),
// }, err
//}
func failOnError(err error, msg string) {
if err != nil {
global.Logger.Errorf("%s: %s", msg, err)
......@@ -92,40 +69,25 @@ func NewConsumer(amqpURI, exchangeName, exchangeType string) (*Consumer, error)
return nil, err
}
// 声明发布消息的交换机
err = c.exchangeDeclare(global.PublishSetting.ExchangeName, global.PublishSetting.ExchangeType)
queue := "news"
routingKey := "news"
// 声明队列并绑定
log.Printf("declared Exchange, declaring Queue %q", queue)
err = c.queueDeclare(queue)
if err != nil {
global.Logger.Errorf("Failed to declare exchange: %s", err)
global.Logger.Errorf("Failed to declare queue %s: %s", queue, err)
return nil, err
}
// 定义队列和路由键绑定
type bind struct {
queue string
routingKey string
}
bindings := []bind{
{"news", "grih.HOME_MAC"},
}
// 声明队列并绑定
for _, b := range bindings {
log.Printf("declared Exchange, declaring Queue %q", b.queue)
err = c.queueDeclare(b.queue)
if err != nil {
global.Logger.Errorf("Failed to declare queue %s: %s", b.queue, err)
return nil, err
}
err = c.channel.QueueBind(b.queue, b.routingKey, exchangeName, false, nil)
if err != nil {
global.Logger.Errorf("Failed to bind queue %s: %s", b.queue, err)
return nil, err
}
err = c.channel.QueueBind(queue, routingKey, exchangeName, false, nil)
if err != nil {
global.Logger.Errorf("Failed to bind queue %s: %s", queue, err)
return nil, err
}
// 设置 QoS
err = c.channel.Qos(250, 0, false)
err = c.channel.Qos(10, 0, false)
if err != nil {
global.Logger.Errorf("Failed to set QoS: %s", err)
return nil, err
......@@ -194,3 +156,63 @@ func (c *Consumer) Shutdown() error {
// wait for handle() to exit
return <-c.done
}
type ReData struct {
Cols []string `json:"cols"`
Dat []interface{} `json:"dat"`
}
func (c *Consumer) Consume() {
msgs, err := c.channel.Consume("news", "", false, false, false, false, nil)
failOnError(err, "Failed to register a consumer")
for d := range msgs {
resp := ReData{}
err := json.Unmarshal(d.Body, &resp)
if err != nil {
global.Logger.WithFields(logger.Fields{"requestId": requestId}).Errorf("subdev json.Unmarshal error: %v", err)
d.Nack(false, true)
break
}
global.Logger.WithFields(logger.Fields{
"requestId": requestId,
"RoutingKey": d.RoutingKey,
"Body": fmt.Sprintf("%s", d.Body),
"re": re,
}).Info("news")
// 初始化变量
var (
state, offset int
mediaId, ip string
)
//todo:还需要判断domain
// 遍历 cols 找到索引并取值
for i, col := range resp.Cols {
switch col {
case "state":
if val, ok := resp.Dat[i].(float64); ok {
state = int(val)
}
case "mediaId":
if val, ok := resp.Dat[i].(string); ok {
mediaId = val
}
case "offset":
if val, ok := resp.Dat[i].(float64); ok {
offset = int(val)
}
}
//todo:还需要ip和mac
feedBack, err := FeedBackNews(state, offset, ip, mediaId)
failOnError(err, "FeedBackNews err")
if feedBack.Code != 0 {
d.Nack(false, true)
break
}
d.Ack(false)
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment