Go调用NSQ简单实践
本文主要是Go语言调用NSQ的简单实践,demo分为两部分:生产者和消费者。
生产者:
/** * nsq demo - 生产者 * author: JetWu * date: 2020.05.06 */ package main import ( "fmt" nsq "github.com/nsqio/go-nsq" ) func main() { //创建生产者,连接nsqd var producer *nsq.Producer producer, err := nsq.NewProducer("192.168.56.102:4150", nsq.NewConfig()) //测试连接是否成功 err = producer.Ping() if err != nil { producer.Stop() producer = nil panic(err) } //指定topic,产生10条消息数据 topic := "nsqtest" for i := 0; i < 10; i++ { message := fmt.Sprintf("hello world %d", i) if producer != nil && message != "" { err = producer.Publish(topic, []byte(message)) if err != nil { fmt.Printf("Producer publish fail: %v", err) } fmt.Println("Producer publish success:", message) } } }
消费者:
/** * nsq demo - 消费者 * author: JetWu * date: 2020.05.06 */ package main import ( "fmt" "time" "github.com/nsqio/go-nsq" ) //消费者 type Consumer struct{} //处理接收到的消息(实现nsq.Handler) func (*Consumer) HandleMessage(msg *nsq.Message) error { fmt.Println("Receive from ", msg.NSQDAddress, ": ", string(msg.Body)) return nil } func main() { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second //设置重连时间 //创建消费者,指定topic和channel consumer, err := nsq.NewConsumer("nsqtest", "test-channel", cfg) if err != nil { panic(err) } consumer.SetLogger(nil, 0) //屏蔽系统日志 consumer.AddHandler(&Consumer{}) //添加消费者接口 //连接nsqd err = consumer.ConnectToNSQD("192.168.56.102:4150") if err != nil { panic(err) } //阻塞 for { time.Sleep(time.Second * 10) } }
测试:
1. 开启nsqlookupd和nsqd、nsqadmin,开启方法详见另一篇博文:https://www.cnblogs.com/wujuntian/p/12830817.html。
2. 启动生产者:
3. 查看nsqadmin:
4. 启动消费者:
5. 查看nsqadmin: