Golang 使用定时任务(robfig/cron)
前两天优化一个从三方查询数据很耗时的接口,改造的方案是每半小时同步一次数据缓存到内存,Golang 下最常用的是 robfig/cron 包,使用简便,功能强大,本文对其使用做了整理记录。
英文官方文档参考:https://pkg.go.dev/github.com/robfig/cron/v3
安装
go get github.com/robfig/cron/v3@v3.0.0
引入
import "github.com/robfig/cron/v3"
示例
package main import ( "fmt" "github.com/robfig/cron/v3" ) func main() { // 创建一个默认的cron对象 c := cron.New() // 添加任务 c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") }) c.AddFunc("@hourly", func() { fmt.Println("Every hour, starting an hour from now") }) c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") }) c.AddFunc("0/5 * * * * ?", test)//5秒执行一次 //开始执行任务 c.Start() //阻塞 select {} } func test(){ fmt.Printf("test=%d",ttt) ttt++ }
预定义的 Schedules
条目 描述 等效于 @yearly (or @annually) Run once a year, midnight, Jan. 1st 0 0 1 1 * @monthly Run once a month, midnight, first of month 0 0 1 * * @weekly Run once a week, midnight between Sat/Sun 0 0 * * 0 @daily (or @midnight) Run once a day, midnight 0 0 * * * @hourly Run once an hour, beginning of hour 0 * * * *
间隔执行
固定间隔执行
@every
举例
@every 1h30m10s 表示一个小时30分钟10秒后执行,并且之后的每个时间间隔都执行。需要注意的是间隔是以运行时间点递增的,例如一个任务执行需要耗时 3 分钟,任务每五分钟执行一次,则任务运行完后,距离下次执行为 2 分钟。
另外,@every 支持的字符串由 time.ParseDuration()(http://golang.org/pkg/time/#ParseDuration) 解析,只要是其支持的格式都可解析。
精确到秒的 Cron 表达式
Cron v3 版本的表达式从六个参数调整为五个,取消了对秒的默认支持,需要精确到秒的控制可以使用 cron.WithSeconds() 解析器。
c := cron.New(cron.WithSeconds()) c.AddFunc("*/1 * * * * *", func() { fmt.Println("Every 1 Second") }) c.Start()
如果你仅仅需要每隔 N 秒运行一次 Job,可以使用 @every 这样的特殊 spec 表达式。
c := cron.New() c.AddFunc("@every 10s", func() { fmt.Println("Every 10 Seconds") }) c.AddFunc("0/5 * * * * ?", test)//5秒执行一次 c.Start() // 关闭任务 defer crontab.Stop() //阻塞 select {} func test(){ fmt.Printf("test=%d",ttt) ttt++ }
常用 Cron 标准表达式
描述 表达式 每1分钟执行一次 * * * * * 每15分钟执行一次 */15 * * * * 每一小时执行 * */1 * * * 每两个小时执行 0 */2 * * * 每小时的第3和第15分钟执行 3,15 * * * * 在上午8点到11点的第3和第15分钟执行 3,15 8-11 * * * 每晚的21:30执行 30 21 * * * 每天18:00至23:00之间每隔30分钟执行 0,30 18-23 * * * 每星期六的晚上11:00pm执行 0 23 * * 6 晚上11点到早上7点之间,每隔一小时执行 * 23-7/1 * * * 指定每天的5:30执行 30 5 * * * 每小时[第一分钟]执行 01 * * * * 每天[凌晨4:02]执行 02 4 * * *
时区设置
// 本地时区早六点 cron.New().AddFunc("0 6 * * ?", ...) // 上海时区早六点 nyc, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(nyc)) c.AddFunc("0 6 * * ?", ...) // 上海时区早六点 cron.New().AddFunc("CRON_TZ=Asia/Shanghai 0 6 * * ?", ...) // 重庆时区早六点 c := cron.New(cron.WithLocation(nyc)) c.SetLocation("Asia/Tokyo") c.AddFunc("CRON_TZ=Asia/Chongqing 0 6 * * ?", ...) Cron 方法 AddFunc() - 支持传入如 @every 这样的 spec 表达式和标准表达式及 func() 函数,它封装了 AddJob()。 AddJob() - 支持传入 spec 和标准表达式及 Job,Job 是一个接口,实现 Run() 方法的类型即是一个 Job,它封装了 Schedule()。 Schedule() - 执行标准表达式和 Job,被 AddJob() 函数调用,只支持标准表达式(即 /5 * * * )。 Entries() - 获取全部 Entry,一个 Entry 即为一个定时任务条目。 Entry() - 根据 EntryID 获取指定条目。 Remove() - 根据 EntryID 移除指定条目。 Location() - 获取 Local 时区。 Start() - 使用新的 goroutine 启动,不会阻塞当前协程,已运行的调度器重复调用会被忽略。 Run() - 在当前 goroutine 启动,会阻塞当前协程,已运行的调度器重复调用会被忽略。 Stop() - 停止调度器,返回 Context,可根据 Context 来等待任务执行完成。
Entry 结构体
type Entry struct { // ID is the cron-assigned ID of this entry, which may be used to look up a // snapshot or remove it. ID EntryID // Schedule on which this job should be run. Schedule Schedule // Next time the job will run, or the zero time if Cron has not been // started or this entry's schedule is unsatisfiable Next time.Time // Prev is the last time this job was run, or the zero time if never. Prev time.Time // WrappedJob is the thing to run when the Schedule is activated. WrappedJob Job // Job is the thing that was submitted to cron. // It is kept around so that user code that needs to get at the job later, // e.g. via Entries() can do so. Job Job }
可以获取任务上次、下次运行时间。
c = cron.New() entryId, _ := c.AddFunc("@every 15m", func() { fmt.Println("Every 15 Minutes") }) entry := c.Entry(entryId) fmt.Println(entry.Next) Job Wrappers 包装器 JobWrapper 可以对 Job 进行修饰,添加一些行为。 func SkipIfStillRunning(logger Logger) JobWrapper 如果上次任务还正在运行,那么跳过本次任务的运行并记录日记 func DelayIfStillRunning(logger Logger) JobWrapper 如果上次任务还正在运行,那么延迟执行本次任务的运行并记录日记 func Recover(logger Logger) JobWrapper 如果 Job Panic,记录日记
以上函数中 Skip 及 Recover 会记录 Info 级别的日志,而 Delay 会在延迟超过一分钟后记录 Info 日志。
使用方法
// 作用于调度器 cron.New(cron.WithChain( cron.SkipIfStillRunning(logger), )) // 作用于单个 Job job = cron.NewChain( cron.SkipIfStillRunning(logger), ).Then(job) Logger 日志记录 type Logger interface { // Info logs routine messages about cron's operation. Info(msg string, keysAndValues ...interface{}) // Error logs an error condition. Error(err error, msg string, keysAndValues ...interface{}) }
Logger 是一个接口,实现了这些接口的日志包都可被用于 Job 日志的记录工具。
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, “cron: “, log.LstdFlags)) var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, “”, 0))
如果未指定,DefaultLogger 是 Cron 的默认输出 Logger,也可以使用 DiscardLogger,不输出日志。
不输出 Job 日志
c := cron.New(cron.WithLogger(cron.DiscardLogger)) Cron 使用 logrus 记录日志示例 import ( "github.com/robfig/cron/v3" log "github.com/sirupsen/logrus" ) type LogrusLog struct { logger log.Logger } func (l *LogrusLog) Info(msg string, keysAndValues ...interface{}) { l.logger.WithFields(log.Fields{ "data": keysAndValues, }).Info(msg) } func (l *LogrusLog) Error(err error, msg string, keysAndValues ...interface{}) { l.logger.WithFields(log.Fields{ "msg": msg, "data": keysAndValues, }).Error(msg) } func main() { logrusLog := &LogrusLog{} c := cron.New(cron.WithLogger(logrusLog)) }
Option 可选项
cron.New() 方法支持传入 Option 可选项。
在上文提及过 cron.WithSeconds()、cron.WithChain() 和 cron.WithParser(),Cron 支持的 Option 函数如下
func WithChain(wrappers ...JobWrapper) Option 指定要应用于此 cron 的所有 Job 的 Job Wrapper。 func WithLocation(loc *time.Location) Option 覆盖 cron 实例的时区。 func WithLogger(logger Logger) Option 使用自定义的日志记录器 func WithParser(p ScheduleParser) Option 重写用于解释 Job 计划的解析器。 func WithSeconds() Option 重写用于解释作业计划的解析器,以将秒字段作为第一个字段。
退出 Stop() 方法等待任务执行完成示例
package main import ( "fmt" "github.com/robfig/cron/v3" "time" ) func main() { c := cron.New(cron.WithSeconds()) c.AddFunc("@every 10s", func() { fmt.Println("goroutine task run") time.Sleep(2 * time.Second) fmt.Println("-> task out") time.Sleep(2 * time.Second) fmt.Println("-> task out2") }) c.Start() fmt.Println("main sleep 12 seconds") fmt.Println("after 10 seconds, goroutine task will run") time.Sleep(12 * time.Second) for { ctx := c.Stop() select { case <-ctx.Done(): fmt.Println("all task done, stopped") return default: time.Sleep(time.Second) fmt.Println("default, wait") } } }
输出
➜ go run main.go main sleep 12 seconds after 10 seconds, goroutine task will run goroutine task run -> task out default, wait -> task out2 default, wait default, wait all task done, stopped
简单解释下这个示例,它模拟了一个调度器退出的场景,当退出调度器的时候有任务正在执行,那么会等到任务执行完成后再退出。