package speechNlu import ( "encoding/json" "fmt" "log" "speech-nlu-parse/global" "speech-nlu-parse/pkg/logger" "speech-nlu-parse/pkg/util" "time" amqp "github.com/rabbitmq/amqp091-go" ) type Consumer struct { conn *amqp.Connection channel *amqp.Channel tag string done chan error amqpURI string exchangeName string exchangeType string closeNotifier chan *amqp.Error } func NewConsumer(amqpURI, exchangeName, exchangeType string) (*Consumer, error) { c := &Consumer{ amqpURI: amqpURI, exchangeName: exchangeName, exchangeType: exchangeType, done: make(chan error), } maxRetries := 3 retryInterval := 3 * time.Second var err error for i := 0; i < maxRetries; i++ { if err = c.connect(); err == nil { break } log.Printf("Connection attempt %d failed: %v", i+1, err) time.Sleep(retryInterval) } if err != nil { return nil, fmt.Errorf("failed to connect after %d retries: %v", maxRetries, err) } // 初始化 Exchange、Queue 和 Binding if err := c.setup(); err != nil { return nil, fmt.Errorf("setup failed: %v", err) } // 启动消费者协程 go c.Consume() // 监听通道关闭事件 go c.handleChannelClose() return c, nil } func (c *Consumer) connect() error { var err error c.conn, err = amqp.Dial(c.amqpURI) if err != nil { return fmt.Errorf("dial error: %v", err) } c.channel, err = c.conn.Channel() if err != nil { c.conn.Close() return fmt.Errorf("channel error: %v", err) } return nil } func (c *Consumer) setup() error { // 声明 Exchange if err := c.channel.ExchangeDeclare( c.exchangeName, c.exchangeType, true, // durable false, // autoDelete false, // internal false, // noWait nil, // args ); err != nil { return fmt.Errorf("declare exchange error: %v", err) } // 声明队列 queue := "news" if _, err := c.channel.QueueDeclare(queue, true, // durable false, // autoDelete false, // exclusive false, // noWait nil, // args ); err != nil { return fmt.Errorf("declare queue error: %v", err) } // 绑定队列 routingKey := "mqhandle" if err := c.channel.QueueBind( queue, routingKey, c.exchangeName, false, // noWait nil, // args ); err != nil { return fmt.Errorf("queue bind error: %v", err) } // 设置 QoS if err := c.channel.Qos(10, 0, false); err != nil { return fmt.Errorf("QoS error: %v", err) } return nil } func (c *Consumer) handleChannelClose() { closeErr := <-c.channel.NotifyClose(make(chan *amqp.Error)) if closeErr != nil { global.Logger.Errorf("Channel closed: %v", closeErr) // 触发重连逻辑 c.reconnect() } } func (c *Consumer) reconnect() { log.Println("Attempting to reconnect...") var err error for { time.Sleep(5 * time.Second) if err := c.connect(); err == nil { if err := c.setup(); err == nil { go c.Consume() go c.handleChannelClose() log.Println("Reconnected successfully") return } } log.Printf("Reconnect failed: %v", err) } } func (c *Consumer) Shutdown() error { if c.channel != nil { if err := c.channel.Close(); err != nil { return fmt.Errorf("channel close error: %v", err) } } if c.conn != nil { if err := c.conn.Close(); err != nil { return fmt.Errorf("connection close error: %v", err) } } return nil } func (c *Consumer) Consume() { msgs, err := c.channel.Consume( "news", "", // consumer false, // autoAck false, // exclusive false, // noLocal false, // noWait nil, // args ) if err != nil { global.Logger.Errorf("Consume error: %v", err) return } for d := range msgs { var resp ReData if err := json.Unmarshal(d.Body, &resp); err != nil { global.Logger.Errorf("Unmarshal error: %v", err) _ = d.Nack(false, true) continue // 继续处理下一条消息 } global.Logger.WithFields(logger.Fields{ "RoutingKey": d.RoutingKey, "Body": string(d.Body), "Response": resp, }).Info("Received message") // 处理消息逻辑 state, offset, mediaId := parseData(&resp) uid := util.EncodeMD5(resp.Mac) ip := "14.215.222.17" //todo:待改 if _, err := FeedBackNews(state, offset, ip, mediaId, uid); err != nil { global.Logger.Errorf("Feedback error: %v", err) _ = d.Nack(false, true) continue } _ = d.Ack(false) } } type ReData struct { Mac string `json:"mac"` Cols []string `json:"cols"` Dat []interface{} `json:"dat"` } func parseData(resp *ReData) (state, offset int, mediaId string) { for i, col := range resp.Cols { switch col { case "state": if val, ok := resp.Dat[i].(float64); ok { state = int(val) } case "mediaId": if val, ok := resp.Dat[i].(string); ok { mediaId = val } case "offset": if val, ok := resp.Dat[i].(float64); ok { offset = int(val) } } } return }