go 中使用redis 中的 list 队列实现异步处理

作者: adm 分类: go 发布时间: 2023-09-04

reids的链表结构,可以轻松实现阻塞队列,可以使用左进右出的命令组成来完成队列的设计。比如:数据的生产者可以通过Lpush命令从左边插入数据,多个数据消费者,可以使用BRpop命令阻塞的“抢”列表尾部的数据。下面就为大家演示一下

先建一个redis-db.go 文件用来连接redis

package orm

import (
	"context"

	"github.com/redis/go-redis/v9"
	"log"
	"time"
)

var Redis = initRedis()

// 初始化连接

func initRedis() *redis.Client {

	Rdb := redis.NewClient(&redis.Options{

		Addr: "127.0.0.1:6379", //端口写你们自己的

		Password: "", // no password set

		DB: 0, // use default DB

		PoolSize: 10000, // 连接池大小

	})
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	_, err = Rdb.Ping(ctx).Result()
	if err != nil {
		log.Fatal("initRedis client.Ping err: ", err)
	}
	return Rdb
}

redis处理工具


import (
	"bufio"
	"carbon/orm"
	"context"
	"github.com/redis/go-redis/v9"
	"strings"
	"time"
)



//redisUtil Redis操作工具类
type RedisUtil struct{

}

const RedisPrefix = "yz"
//stringToLines string拆分多行
func stringToLines(s string) (lines []string, err error) {
	scanner := bufio.NewScanner(strings.NewReader(s))
	for scanner.Scan() {
		lines = append(lines, scanner.Text())
	}
	err = scanner.Err()
	return
}

//stringToKV string拆分key和val
func stringToKV(s string) (string, string) {
	ss := strings.Split(s, ":")
	if len(ss) < 2 {
		return s, ""
	}
	return ss[0], ss[1]
}

//Info Redis服务信息
func (ru RedisUtil) Info(sections ...string) (res map[string]string) {
	infoStr, err := orm.Redis.Info(context.Background(), sections...).Result()
	res = map[string]string{}
	if err != nil {

		return res
	}
	// string拆分多行
	lines, err := stringToLines(infoStr)
	if err != nil {

		return res
	}
	// 解析成Map
	for i := 0; i < len(lines); i++ {
		if lines[i] == "" || strings.HasPrefix(lines[i], "# ") {
			continue
		}
		k, v := stringToKV(lines[i])
		res[k] = v
	}
	return res
}

//DBSize 当前数据库key数量
func (ru RedisUtil) DBSize() int64 {
	size, err := orm.Redis.DBSize(context.Background()).Result()
	if err != nil {

		return 0
	}
	return size
}

//Set 设置键值对
func (ru RedisUtil) Set(key string, value interface{}, timeSec int) bool {
	err := orm.Redis.Set(context.Background(),
		RedisPrefix+key, value, time.Duration(timeSec)*time.Second).Err()
	if err != nil {

		return false
	}
	return true
}

//Get 获取key的值
func (ru RedisUtil) Get(key string) string {
	res, err := orm.Redis.Get(context.Background(),RedisPrefix+key).Result()
	if err != nil {
		return ""
	}
	return res
}

//SSet 将数据放入set缓存
func (ru RedisUtil) SSet(key string, values ...interface{}) bool {
	err := orm.Redis.SAdd(context.Background(), RedisPrefix+key, values...).Err()
	if err != nil {
		return false
	}
	return true
}

//SGet 根据key获取Set中的所有值
func (ru RedisUtil) SGet(key string) []string {
	res, err := orm.Redis.SMembers(context.Background(), RedisPrefix+key).Result()
	if err != nil {
		return []string{}
	}
	return res
}

//HMSet 设置key, 通过字典的方式设置多个field, value对
func (ru RedisUtil) HMSet(key string, mapping map[string]string, timeSec int) bool {
	err := orm.Redis.HSet(context.Background(), RedisPrefix+key, mapping).Err()
	if err != nil {
		return false
	}
	if timeSec > 0 {
		if !ru.Expire(key, timeSec) {
			return false
		}
	}
	return true
}

//HSet 向hash表中放入数据,如果不存在将创建
func (ru RedisUtil) HSet(key string, field string, value string, timeSec int) bool {
	return ru.HMSet(key, map[string]string{field: value}, timeSec)
}

//HGet 获取key中field域的值
func (ru RedisUtil) HGet(key string, field string) string {
	res, err := orm.Redis.HGet(context.Background(), RedisPrefix+key, field).Result()
	if err != nil {

		return ""
	}
	return res
}

//HExists 判断key中有没有field域名
func (ru RedisUtil) HExists(key string, field string) bool {
	res, err := orm.Redis.HExists(context.Background(), RedisPrefix+key, field).Result()
	if err != nil {
		return false
	}
	return res
}

//HDel 删除hash表中的值
func (ru RedisUtil) HDel(key string, fields ...string) bool {
	err := orm.Redis.HDel(context.Background(), RedisPrefix+key, fields...).Err()
	if err != nil {
		return false
	}
	return true
}

//Exists 判断多项key是否存在
func (ru RedisUtil) Exists(keys ...string) int64 {
	fullKeys := ru.toFullKeys(keys)
	cnt, err := orm.Redis.Exists(context.Background(), fullKeys...).Result()
	if err != nil {

		return -1
	}
	return cnt
}

//Expire 指定缓存失效时间
func (ru RedisUtil) Expire(key string, timeSec int) bool {
	err := orm.Redis.Expire(context.Background(), RedisPrefix+key, time.Duration(timeSec)*time.Second).Err()
	if err != nil {

		return false
	}
	return true
}

//TTL 根据key获取过期时间
func (ru RedisUtil) TTL(key string) int {
	td, err := orm.Redis.TTL(context.Background(), RedisPrefix+key).Result()
	if err != nil {

		return 0
	}
	return int(td / time.Second)
}

//Del 删除一个或多个键
func (ru RedisUtil) Del(keys ...string) bool {
	fullKeys := ru.toFullKeys(keys)
	err := orm.Redis.Del(context.Background(), fullKeys...).Err()
	if err != nil {
		return false
	}
	return true
}

//toFullKeys 为keys批量增加前缀
func (ru RedisUtil) toFullKeys(keys []string) (fullKeys []string) {
	for _, k := range keys {
		fullKeys = append(fullKeys, RedisPrefix+k)
	}
	return
}

func (ru RedisUtil) RPush(key string, data []byte) bool {
	orm.Redis.RPush(context.Background(), key, string(data))

	return true
}

func (ru RedisUtil) BLPop(key string) *redis.StringSliceCmd {
	data := orm.Redis.BLPop(context.Background(),time.Second*2, key)

	return data
}

再建一个producer.go 文件用来充当生产者

package cmd

import (
	"carbon/queue"
	"github.com/spf13/cobra"
)

// moduleCmd represents the module command
var producerCmd = &cobra.Command{
	Use:   "producer",
	Short: "A brief description of your command",
	Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
	Run: func(cmd *cobra.Command, args []string) {
		producerRun()

	},
}
//生产订单信息
func producerRun() {



	for i := 0; i < 100; i++ {
		o:=queue.Document{
			Value: "111",
		}
		o.Id=i
		o.Add(o)

	}

}

再建一个consumer.go 文件用来充当消费者

package cmd

import (
	"carbon/queue"
	"github.com/spf13/cobra"
	"sync"
)

// moduleCmd represents the module command
var consumerCmd = &cobra.Command{
	Use:   "consumer",
	Short: "A brief description of your command",
	Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
	Run: func(cmd *cobra.Command, args []string) {
		ConsumerRun()

	},
}

func ConsumerRun() {

	o:=&queue.Document{}

	var wg sync.WaitGroup

	wg.Add(1)

	go func() {

		for  {

			o.Consumer()

		}

	}()

	wg.Wait()

}

再建一个Document.go 文件用来定义订单结构

package queue

import (
	"carbon/utils"
	"encoding/json"
	"fmt"
)

type Document struct {
	Id    int    `json:"id"`
	Cate  string `json:"cate"`
	Name  string `json:"name"`
	Url   string `json:"url"`
	Value string `json:"value"`
	Retry int    `json:"retry"`
}

func NewOrder(document Document) Document {

	return document

}

// 添加到队列(生产者)

func (o *Document) Add(document Document) {

	data, _ := json.Marshal(NewOrder(document))
	utils.RedisUtil{}.RPush("uploadOss", data)
}

// 消费队列(消费者)

func (o *Document) Consumer() {

	data := utils.RedisUtil{}.BLPop("uploadOss")

	if len(data.Val()) != 0 {
		c := data.Val()[1]
		var mess Document
		json.Unmarshal([]byte(c), &mess)
		fmt.Println(data.Val())
		mess.Retry = mess.Retry + 1
		if mess.Retry<3{
			o.Add(mess)
		}else{
			//重试2次还没完成,就写入日志手工处理
		}

	}

}

最后再分别启动producer.go 和consumer.go

[uploadOss {"id":0,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":1,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":2,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":3,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":4,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":0,"cate":"","name":"","url":"","value":"111","retry":1}]
[uploadOss {"id":5,"cate":"","name":"","url":"","value":"111","retry":0}]

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!