golang实现rabbitmq之routing模式
上一篇介绍了golang实现rabbitmq之订阅模式上面看到订阅模式,是可以做到一个消息由多个消费者消费的,那么可不可以在一个消息由多个消费者消费的基础上还指定由哪些消息者来消费呢?
这个自然也是可以的,而这个就是笔者现在要讲的路由模式(routing模式)。这也就是路由模式的主要特点了。
//这里相对比订阅模式就多了一个routingkey的设计,也是通过这个来指定消费者的。
//创建exchange的kind需要是”direct”,不然就不是roting模式了
routing模式的代码:
package RabbitMq import ( "github.com/streadway/amqp" "log" ) //rabbitmq的路由模式。 //主要特点不仅一个消息可以被多个消费者消费还可以由生产端指定消费者。 //这里相对比订阅模式就多了一个routingkey的设计,也是通过这个来指定消费者的。 //创建exchange的kind需要是"direct",不然就不是roting模式了。 //创建rabbitmq实例,这里有了routingkey为参数了。 func NewRabbitMqRouting(exchangeName string, routingKey string) *RabbitMQ { rabbitmq := NewRabbitMQ("", exchangeName, routingKey) var err error //获取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl) rabbitmq.failOnErr(err, "创建rabbit的路由实例的时候连接出现问题") //获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "创建rabbitmq的路由实例时获取channel出错") return rabbitmq } //路由模式,产生消息。 func (r *RabbitMQ) PublishRouting(message string) { //第一步,尝试创建交换机,与pub/sub模式不同的是这里的kind需要是direct err := r.channel.ExchangeDeclare(r.ExChange, "direct", true, false, false, false, nil) r.failOnErr(err, "路由模式,尝试创建交换机失败") //第二步,发送消息 err = r.channel.Publish( r.ExChange, r.Key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } //路由模式,消费消息。 func (r *RabbitMQ) ConsumerRouting() { //第一步,尝试创建交换机,注意这里的交换机类型与发布订阅模式不同,这里的是direct err := r.channel.ExchangeDeclare( r.ExChange, "direct", true, false, false, false, nil, ) r.failOnErr(err, "路由模式,创建交换机失败。") //第二步,尝试创建队列,注意这里队列名称不用写,这样就会随机产生队列名称 q, err := r.channel.QueueDeclare( "", false, false, true, false, nil, ) r.failOnErr(err, "路由模式,创建队列失败。") //第三步,绑定队列到exchange中 err = r.channel.QueueBind(q.Name, r.Key, r.ExChange, false, nil) //第四步,消费消息。 messages, err := r.channel.Consume(q.Name, "", true, false, false, false, nil) forever := make(chan bool) go func() { for d := range messages { log.Printf("小杜同学写的路由模式(routing模式)收到消息为:%s。\n", d.Body) } }() <-forever }
发布消息到routingKey为one,two,three的路由上:
package main import ( "fmt" "rabbitmq20181121/RabbitMq" "strconv" "time" ) func main() { rabbitmq1 := RabbitMq.NewRabbitMqRouting("duExchangeName", "one") rabbitmq2 := RabbitMq.NewRabbitMqRouting("duExchangeName", "two") rabbitmq3 := RabbitMq.NewRabbitMqRouting("duExchangeName", "three") for i := 0; i < 100; i++ { rabbitmq1.PublishRouting("路由模式one" + strconv.Itoa(i)) rabbitmq2.PublishRouting("路由模式two" + strconv.Itoa(i)) rabbitmq3.PublishRouting("路由模式three" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Printf("在路由模式下,routingKey为one,为two,为three的都分别生产了%d条消息\n", i) } }
来消费routingKey为one的消息:
package main import "rabbitmq20181121/RabbitMq" func main() { one := RabbitMq.NewRabbitMqRouting("duExchangeName", "one") one.ConsumerRouting() }
来消费routingKey为two的消息:
package main import "rabbitmq20181121/RabbitMq" func main() { two := RabbitMq.NewRabbitMqRouting("duExchangeName", "two") two.ConsumerRouting() }
接着run起来,会发现RoutingConsumer1只会消费routingKey为one的消息,RoutingConsumer2则只会消费routingKey为two的消息,而routingKey为three是没有被消费到的。