package speechNlu import ( "fmt" "log" "speech-nlu-parse/global" "time" amqp "github.com/rabbitmq/amqp091-go" ) type Consumer struct { conn *amqp.Connection channel *amqp.Channel tag string done chan error } //func New(amqpURI string) (Consumer, error) { // //mq链接 // log.Printf("dialing %q", amqpURI) // conn, err := amqp.Dial(amqpURI) // if err != nil { // global.Logger.Errorf("Dial: %s", err) // consumerErr++ // return Consumer{}, err // } // // log.Printf("got Connection, getting Channel") // //创建一个rabbitmq信道 // channel, err := conn.Channel() // if err != nil { // return Consumer{}, err // } // // defer channel.Close() // return Consumer{ // conn: conn, // channel: channel, // done: make(chan error), // }, err //} func failOnError(err error, msg string) { if err != nil { global.Logger.Errorf("%s: %s", msg, err) return } } var consumerErr int func NewConsumer(amqpURI, exchangeName, exchangeType string) (*Consumer, error) { c := &Consumer{ conn: nil, channel: nil, done: make(chan error), } var err error // 最大重试次数 maxRetries := 3 // 连接 RabbitMQ log.Printf("dialing %q", amqpURI) c.conn, err = amqp.Dial(amqpURI) if err != nil { global.Logger.Errorf("Dial: %s", err) consumerErr++ if consumerErr > maxRetries { return nil, fmt.Errorf("failed to connect after %d retries", maxRetries) } time.Sleep(3 * time.Second) return NewConsumer(amqpURI, exchangeName, exchangeType) // 重试连接 } defer c.conn.Close() // 创建信道 log.Printf("got Connection, getting Channel") c.channel, err = c.conn.Channel() if err != nil { global.Logger.Errorf("Failed to open a channel: %s", err) return nil, err } defer c.channel.Close() // 声明交换机 log.Printf("got Channel, declaring Exchange (%q)", exchangeName) err = c.exchangeDeclare(exchangeName, exchangeType) if err != nil { global.Logger.Errorf("Failed to declare exchange: %s", err) return nil, err } // 声明发布消息的交换机 err = c.exchangeDeclare(global.PublishSetting.ExchangeName, global.PublishSetting.ExchangeType) if err != nil { global.Logger.Errorf("Failed to declare exchange: %s", err) return nil, err } // 定义队列和路由键绑定 type bind struct { queue string routingKey string } bindings := []bind{ {"news", "grih.HOME_MAC"}, } // 声明队列并绑定 for _, b := range bindings { log.Printf("declared Exchange, declaring Queue %q", b.queue) err = c.queueDeclare(b.queue) if err != nil { global.Logger.Errorf("Failed to declare queue %s: %s", b.queue, err) return nil, err } err = c.channel.QueueBind(b.queue, b.routingKey, exchangeName, false, nil) if err != nil { global.Logger.Errorf("Failed to bind queue %s: %s", b.queue, err) return nil, err } } // 设置 QoS err = c.channel.Qos(250, 0, false) if err != nil { global.Logger.Errorf("Failed to set QoS: %s", err) return nil, err } // 启动消费者 err = c.Consume() if err != nil { global.Logger.Errorf("Failed to start consumer: %s", err) return nil, err } // 监听通道关闭事件 closeChan := make(chan *amqp.Error, 1) notifyClose := c.channel.NotifyClose(closeChan) // 处理通道关闭事件 for { select { case e := <-notifyClose: global.Logger.Errorf("chan通道错误:%v", e.Error()) time.Sleep(5 * time.Second) log.Println("休息5秒后重新连接") // 重新连接 c, err = NewConsumer(amqpURI, exchangeName, exchangeType) if err != nil { global.Logger.Errorf("Failed to reconnect: %s", err) return nil, err } return c, nil // 返回新的 Consumer 实例 } } } func (c *Consumer) queueDeclare(queue string) error { _, err := c.channel.QueueDeclare(queue, true, false, false, false, nil) return err } func (c *Consumer) exchangeDeclare(exchangeName, exchangeType string) error { err := c.channel.ExchangeDeclare( exchangeName, // name of the exchange exchangeType, // type true, // durable false, // delete when complete false, // internal false, // noWait nil, // arguments ) return err } func (c *Consumer) Shutdown() error { // will close() the deliveries channel if err := c.channel.Cancel(c.tag, true); err != nil { return fmt.Errorf("Consumer cancel failed: %s", err) } if err := c.conn.Close(); err != nil { return fmt.Errorf("AMQP connection close error: %s", err) } defer log.Printf("AMQP shutdown OK") // wait for handle() to exit return <-c.done }