golang实现rabbitmq之topic模式
上一篇介绍了golang实现rabbitmq之routing模式 .接着就是要说的最后一个模式,topic模式了。这个模式也是在routing模式上进一步升华而来,通过上面的介绍我们知道routing模式最大的特点是可以从生产端来指定消费端来消费消息,是通过routingKey来指定的。那么我们可不可以通过一定的规则来指定呢?比如用通配符来的指定,当然这个也是可以的。这也就是topic模式最大的特点了。
topic模式也是在routing的模式上演化而来。不同的是我们以通配符的方式来指定我们的消费者。
来看一下topic模式的代码,注意这里创建exchange的kind则是”topic”了:
package RabbitMq import ( "github.com/streadway/amqp" "log" ) //topic模式 //与routing模式不同的是这个exchange的kind是"topic"类型的。 //topic模式的特别是可以以通配符的形式来指定与之匹配的消费者。 //"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。 //创建rabbitmq实例 func NewRabbitMqTopic(exchangeName string, routingKey string) *RabbitMQ { rabbitmq := NewRabbitMQ("", exchangeName, routingKey) var err error //获取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl) rabbitmq.failOnErr(err, "创建rabbit的topic模式时候连接出现问题") //获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "创建rabbitmq的topic实例时获取channel出错") return rabbitmq } //topic模式。生产者。 func (r *RabbitMQ) PublishTopic(message string) { //第一步,尝试创建交换机,这里的kind的类型要改为topic err := r.channel.ExchangeDeclare( r.ExChange, "topic", true, false, false, false, nil, ) r.failOnErr(err, "topic模式尝试创建exchange失败。") //第二步,发送消息。 err = r.channel.Publish( r.ExChange, r.Key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } //topic模式。消费者。"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。 func (r *RabbitMQ) ConsumerTopic() { //第一步,创建交换机。这里的kind需要是“topic”类型。 err := r.channel.ExchangeDeclare( r.ExChange, "topic", true, //这里需要是true false, false, false, nil, ) r.failOnErr(err, "topic模式,消费者创建exchange失败。") //第二步,创建队列。这里不用写队列名称。 q, err := r.channel.QueueDeclare( "", false, false, true, false, nil, ) r.failOnErr(err, "topic模式,消费者创建queue失败。") //第三步,将队列绑定到交换机里。 err = r.channel.QueueBind( q.Name, r.Key, r.ExChange, false, nil, ) //第四步,消费消息。 messages, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range messages { log.Printf("小杜同学写的topic模式收到了消息:%s。\n", d.Body) } }() <-forever }
生产端的代码:
package main import ( "fmt" "rabbitmq20181121/RabbitMq" "strconv" "time" ) func main() { one := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.Jay") two := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Persident.XIDADA") for i := 0; i < 100; i++ { one.PublishTopic("小杜同学,topic模式,Jay," + strconv.Itoa(i)) two.PublishTopic("小杜同学,topic模式,All," + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Printf("topic模式。这是小杜同学发布的消息%v \n", i) } }
消费端1的代码:
package main import "rabbitmq20181121/RabbitMq" func main() { jay := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.*") jay.ConsumerTopic() }
消费端2的代码:
package main import "rabbitmq20181121/RabbitMq" func main() { all := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "#") all.ConsumerTopic() }
结果:发现Jay只会配置到Singer来的消息,也就是topic模式也是成功了的了。