golang实现rabbitmq之订阅模式
上一篇介绍了golang实现rabbitmq之work模式上面简单介绍了两种模式,一个是simple模式,另外一个是work模式,他们有一个共同的特点就是一个消息只能被一个消费者消费,那么我们的消息能不能被多个消费者消费呢,这个自然是可以的,也就是我要说的订阅模式(Publish/Subscribe)。订阅模式的特别是:一个消息被投递到多个队列,一个消息能被多个消费者获取。过程是由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//订阅模式需要用到exchange。
//因为其过程就是:由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//另外定义exchange时,其kind类型一定要是”fanout”,这样才是广播类型。
发布订阅模式的代码:
package RabbitMq import ( "fmt" "github.com/streadway/amqp" ) //这里是订阅模式的相关代码。 //订阅模式需要用到exchange。 //因为其过程就是:由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。 //另外定义exchange时,其kind类型一定要是"fanout",这样才是广播类型。 //获取订阅模式下的rabbitmq的实例 func NewRabbitMqSubscription(exchangeName string) *RabbitMQ { //创建rabbitmq实例 rabbitmq := NewRabbitMQ("", exchangeName, "") var err error //获取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl) rabbitmq.failOnErr(err, "订阅模式连接rabbitmq失败。") //获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "订阅模式获取channel失败") return rabbitmq } //订阅模式发布消息 func (r *RabbitMQ) PublishSubscription(message string) { //第一步,尝试连接交换机 err := r.channel.ExchangeDeclare( r.ExChange, "fanout", //这里一定要设计为"fanout"也就是广播类型。 true, false, false, false, nil, ) r.failOnErr(err, "订阅模式发布方法中尝试连接交换机失败。") //第二步,发送消息 err = r.channel.Publish( r.ExChange, "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } //订阅模式消费者 func (r *RabbitMQ) ConsumeSbuscription() { //第一步,试探性创建交换机exchange err := r.channel.ExchangeDeclare( r.ExChange, "fanout", true, false, false, false, nil, ) r.failOnErr(err, "订阅模式消费方法中创建交换机失败。") //第二步,试探性创建队列queue q, err := r.channel.QueueDeclare( "", //随机生产队列名称 false, false, true, false, nil, ) r.failOnErr(err, "订阅模式消费方法中创建创建队列失败。") //第三步,绑定队列到交换机中 err = r.channel.QueueBind( q.Name, "", //在pub/sub模式下key要为空 r.ExChange, false, nil, ) //第四步,消费消息 messages, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range messages { fmt.Printf("小杜同学写的订阅模式收到的消息:%s\n", d.Body) } }() fmt.Println("订阅模式消费者已开启,退出请按 CTRL+C\n") <-forever }
发布者的代码:
package main import ( "fmt" "rabbitmq20181121/RabbitMq" "strconv" "time" ) func main() { rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName") for i := 0; i < 100; i++ { rabbitmq.PublishSubscription("订阅模式生产第" + strconv.Itoa(i) + "条数据") fmt.Printf("订阅模式生产第" + strconv.Itoa(i) + "条数据\n") time.Sleep(1 * time.Second) } }
建立两个一样的消费者的代码:
package main import "rabbitmq20181121/RabbitMq" func main() { rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName") rabbitmq.ConsumeSbuscription() }
接着,依旧是把发布者和两个消费者run起来,会发现两个消费者都同时消费了发布者发布的消息了。也就是发布订阅模式也成功了。