go 中使用redis 中的 list 队列实现异步处理
reids的链表结构,可以轻松实现阻塞队列,可以使用左进右出的命令组成来完成队列的设计。比如:数据的生产者可以通过Lpush命令从左边插入数据,多个数据消费者,可以使用BRpop命令阻塞的“抢”列表尾部的数据。下面就为大家演示一下
先建一个redis-db.go 文件用来连接redis
package orm import ( "context" "github.com/redis/go-redis/v9" "log" "time" ) var Redis = initRedis() // 初始化连接 func initRedis() *redis.Client { Rdb := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", //端口写你们自己的 Password: "", // no password set DB: 0, // use default DB PoolSize: 10000, // 连接池大小 }) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() _, err = Rdb.Ping(ctx).Result() if err != nil { log.Fatal("initRedis client.Ping err: ", err) } return Rdb }
redis处理工具
import ( "bufio" "carbon/orm" "context" "github.com/redis/go-redis/v9" "strings" "time" ) //redisUtil Redis操作工具类 type RedisUtil struct{ } const RedisPrefix = "yz" //stringToLines string拆分多行 func stringToLines(s string) (lines []string, err error) { scanner := bufio.NewScanner(strings.NewReader(s)) for scanner.Scan() { lines = append(lines, scanner.Text()) } err = scanner.Err() return } //stringToKV string拆分key和val func stringToKV(s string) (string, string) { ss := strings.Split(s, ":") if len(ss) < 2 { return s, "" } return ss[0], ss[1] } //Info Redis服务信息 func (ru RedisUtil) Info(sections ...string) (res map[string]string) { infoStr, err := orm.Redis.Info(context.Background(), sections...).Result() res = map[string]string{} if err != nil { return res } // string拆分多行 lines, err := stringToLines(infoStr) if err != nil { return res } // 解析成Map for i := 0; i < len(lines); i++ { if lines[i] == "" || strings.HasPrefix(lines[i], "# ") { continue } k, v := stringToKV(lines[i]) res[k] = v } return res } //DBSize 当前数据库key数量 func (ru RedisUtil) DBSize() int64 { size, err := orm.Redis.DBSize(context.Background()).Result() if err != nil { return 0 } return size } //Set 设置键值对 func (ru RedisUtil) Set(key string, value interface{}, timeSec int) bool { err := orm.Redis.Set(context.Background(), RedisPrefix+key, value, time.Duration(timeSec)*time.Second).Err() if err != nil { return false } return true } //Get 获取key的值 func (ru RedisUtil) Get(key string) string { res, err := orm.Redis.Get(context.Background(),RedisPrefix+key).Result() if err != nil { return "" } return res } //SSet 将数据放入set缓存 func (ru RedisUtil) SSet(key string, values ...interface{}) bool { err := orm.Redis.SAdd(context.Background(), RedisPrefix+key, values...).Err() if err != nil { return false } return true } //SGet 根据key获取Set中的所有值 func (ru RedisUtil) SGet(key string) []string { res, err := orm.Redis.SMembers(context.Background(), RedisPrefix+key).Result() if err != nil { return []string{} } return res } //HMSet 设置key, 通过字典的方式设置多个field, value对 func (ru RedisUtil) HMSet(key string, mapping map[string]string, timeSec int) bool { err := orm.Redis.HSet(context.Background(), RedisPrefix+key, mapping).Err() if err != nil { return false } if timeSec > 0 { if !ru.Expire(key, timeSec) { return false } } return true } //HSet 向hash表中放入数据,如果不存在将创建 func (ru RedisUtil) HSet(key string, field string, value string, timeSec int) bool { return ru.HMSet(key, map[string]string{field: value}, timeSec) } //HGet 获取key中field域的值 func (ru RedisUtil) HGet(key string, field string) string { res, err := orm.Redis.HGet(context.Background(), RedisPrefix+key, field).Result() if err != nil { return "" } return res } //HExists 判断key中有没有field域名 func (ru RedisUtil) HExists(key string, field string) bool { res, err := orm.Redis.HExists(context.Background(), RedisPrefix+key, field).Result() if err != nil { return false } return res } //HDel 删除hash表中的值 func (ru RedisUtil) HDel(key string, fields ...string) bool { err := orm.Redis.HDel(context.Background(), RedisPrefix+key, fields...).Err() if err != nil { return false } return true } //Exists 判断多项key是否存在 func (ru RedisUtil) Exists(keys ...string) int64 { fullKeys := ru.toFullKeys(keys) cnt, err := orm.Redis.Exists(context.Background(), fullKeys...).Result() if err != nil { return -1 } return cnt } //Expire 指定缓存失效时间 func (ru RedisUtil) Expire(key string, timeSec int) bool { err := orm.Redis.Expire(context.Background(), RedisPrefix+key, time.Duration(timeSec)*time.Second).Err() if err != nil { return false } return true } //TTL 根据key获取过期时间 func (ru RedisUtil) TTL(key string) int { td, err := orm.Redis.TTL(context.Background(), RedisPrefix+key).Result() if err != nil { return 0 } return int(td / time.Second) } //Del 删除一个或多个键 func (ru RedisUtil) Del(keys ...string) bool { fullKeys := ru.toFullKeys(keys) err := orm.Redis.Del(context.Background(), fullKeys...).Err() if err != nil { return false } return true } //toFullKeys 为keys批量增加前缀 func (ru RedisUtil) toFullKeys(keys []string) (fullKeys []string) { for _, k := range keys { fullKeys = append(fullKeys, RedisPrefix+k) } return } func (ru RedisUtil) RPush(key string, data []byte) bool { orm.Redis.RPush(context.Background(), key, string(data)) return true } func (ru RedisUtil) BLPop(key string) *redis.StringSliceCmd { data := orm.Redis.BLPop(context.Background(),time.Second*2, key) return data }
再建一个producer.go 文件用来充当生产者
package cmd import ( "carbon/queue" "github.com/spf13/cobra" ) // moduleCmd represents the module command var producerCmd = &cobra.Command{ Use: "producer", Short: "A brief description of your command", Long: `A longer description that spans multiple lines and likely contains examples and usage of using your command. For example: Cobra is a CLI library for Go that empowers applications. This application is a tool to generate the needed files to quickly create a Cobra application.`, Run: func(cmd *cobra.Command, args []string) { producerRun() }, } //生产订单信息 func producerRun() { for i := 0; i < 100; i++ { o:=queue.Document{ Value: "111", } o.Id=i o.Add(o) } }
再建一个consumer.go 文件用来充当消费者
package cmd import ( "carbon/queue" "github.com/spf13/cobra" "sync" ) // moduleCmd represents the module command var consumerCmd = &cobra.Command{ Use: "consumer", Short: "A brief description of your command", Long: `A longer description that spans multiple lines and likely contains examples and usage of using your command. For example: Cobra is a CLI library for Go that empowers applications. This application is a tool to generate the needed files to quickly create a Cobra application.`, Run: func(cmd *cobra.Command, args []string) { ConsumerRun() }, } func ConsumerRun() { o:=&queue.Document{} var wg sync.WaitGroup wg.Add(1) go func() { for { o.Consumer() } }() wg.Wait() }
再建一个Document.go 文件用来定义订单结构
package queue import ( "carbon/utils" "encoding/json" "fmt" ) type Document struct { Id int `json:"id"` Cate string `json:"cate"` Name string `json:"name"` Url string `json:"url"` Value string `json:"value"` Retry int `json:"retry"` } func NewOrder(document Document) Document { return document } // 添加到队列(生产者) func (o *Document) Add(document Document) { data, _ := json.Marshal(NewOrder(document)) utils.RedisUtil{}.RPush("uploadOss", data) } // 消费队列(消费者) func (o *Document) Consumer() { data := utils.RedisUtil{}.BLPop("uploadOss") if len(data.Val()) != 0 { c := data.Val()[1] var mess Document json.Unmarshal([]byte(c), &mess) fmt.Println(data.Val()) mess.Retry = mess.Retry + 1 if mess.Retry<3{ o.Add(mess) }else{ //重试2次还没完成,就写入日志手工处理 } } }
最后再分别启动producer.go 和consumer.go
[uploadOss {"id":0,"cate":"","name":"","url":"","value":"111","retry":0}] [uploadOss {"id":1,"cate":"","name":"","url":"","value":"111","retry":0}] [uploadOss {"id":2,"cate":"","name":"","url":"","value":"111","retry":0}] [uploadOss {"id":3,"cate":"","name":"","url":"","value":"111","retry":0}] [uploadOss {"id":4,"cate":"","name":"","url":"","value":"111","retry":0}] [uploadOss {"id":0,"cate":"","name":"","url":"","value":"111","retry":1}] [uploadOss {"id":5,"cate":"","name":"","url":"","value":"111","retry":0}]