我在 Golang 的生态里一直没找到好用的延时任务方案,在之前的工作中需要的时候也是自己实现了一个简易方案,它有诸多问题,比如部署复杂,稳定性不够。
直到 2022 年我知晓了 Asynq。
Asynq 这样介绍自己:
Simple, reliable & efficient distributed task queue in Go
Go 中简单、可靠、高效的分布式任务队列
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.
Asynq 是一个 Go 库,用于排队任务并通过工作者异步处理这些任务。它由 Redis 支持,被设计成可扩展且容易上手。
Asynq 过去一年在公司业务中调度了超过 5000w 个任务,确实高效且稳定,并且使用的 Redis 人手都有,部署难度很低。
现在空下来研究下它的实现原理,发现比想象中更复杂也有趣。
在研究 Asynq 之前,我也尝试自己思考了下应该如何实现,这样才知道有哪些难点,以及 Asyncq 如此设计是为了解决什么问题。
实现方案:使用有序集合存储将要到期的任务,定时从集合中拿取任务,看起来我们只需要用到一个数据结构,一个 key 就可以了。
问题:
解决问题 1 可以先拿出再删除,如果没删除成功,说明已经有其他节点拿到了这个任务,只有删除成功才能执行。
问题 2 更复杂,程序逻辑无法解决这个问题,因为程序本身就不稳定, 所以仅仅使用问题 1 的解决方案是实现不了的,因为删除后数据就从 Redis 丢失了,而程序又无法可靠的进行重试。
我们还需要一个新的 key 存储等待响应的任务,在任务超时后重试。现在问题变得复杂起来了,放弃思考,抄答案吧。
带着这两个问题,我们来看看 Asyncq 它是如何实现的。
scheduled
记录异步任务。pending
队列。ZRANGEBYSCORE
定时从 scheduled
和 retry
队列查询到期的任务,并使用 ZREM
删除 任务,并插入到 pending
队列。 (!必须使用 script 才能满足原子性)RPOPLPUSH
将pending
的任务放入active
队列,并将任务放在有序集合 lease
(延时 30 s)active
和 lease
移除,然后存入归档。active
和 lease
移除,(如果没有移除成功,则不做任何处理),放入有序集合 retry
(延时 N s)active
移除,从lease
移除,添加到 pending
队列。lease
取出任务,检查最大重试次数,如果没有超过最大重试次数就执行 retry。否则就存档标记为失败。(注意这里可以不使用 script 实现原子操作,因为 retry 命令是幂等的。)%%{init: {'theme':'default'}}%% flowchart TD Event[Event] --> Delay{是否延时?} Delay -- Yes --> Scheduled[(Scheduled)] Delay -- No --> Pending[(Pending)] Scheduled -- 使用 ZRANGEBYSCORE 查询到期的任务 --> Pending Pending -- 用于判断任务状态 --> Active[(Active)] Active -- 执行方法 --> Func{执行成功?} Func -- Yes --> Archive[(Archive)] Func -- No --> CanRetry{需要重试?} CanRetry -- Yes --> Retry[(Retry)] CanRetry -- No --> Archive Pending -- 同时放入 Lease 队列为了实现 ACK --> Lease[(Lease)] Retry -- 使用 ZRANGEBYSCORE 查询到期的任务 --> Pending Lease -- 使用 ZRANGEBYSCORE 查询到期的任务 --> CanRetry{需要重试?}
流程图可能画得不够好,将就看看辅助理解。
原子性是指操作在执行过程中不可被中断或分割,要么全部执行成功,要么全部失败回滚,保证操作的完整性和一致性。 在并发环境下,原子性是确保多个线程在同时访问共享资源时,对共享资源的操作不会相互干扰从而导致数据不一致。
由于在某个动作中需要操作多个队列,为了实现原子性,则需要使用 script 命令来执行操作,比如一次出队列操作如下:
var dequeueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[2]) == 0 then
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
if id then
local key = ARGV[2] .. id
redis.call("HSET", key, "state", "active")
redis.call("HDEL", key, "pending_since")
redis.call("ZADD", KEYS[4], ARGV[1], id)
return redis.call("HGET", key, "msg")
end
end
return nil`)
由于程序的崩溃可能发生在任何地方,为了保证不丢数据,和 MQ 一样,我们必须设计一个 ACK(确认 Acknowledgement)机制,如果超过一段时间程序没有应答(无论成功和失败都算应答)则需要重试。这也是整个系统最复杂的地方,可以说将复杂度扩大了几倍。
在 asyncq 中出队列时,不光使用 RPOPLPUSH
将 pending
的任务放入 active
队列,同时也会将任务放在有序集合 lease
(默认延时 30 s),然后定时从 lease
查询出过期的任务并重试。
并发可能导致判断失效而重复执行,asynq 在不同场景使用不同机制来避免重复执行:
lease
查询出过期的任务,2. 再检查重试次数之后重新放入 retry
集合,并从 active
和 lease
移除任务, 由于步骤 1 和 2 无法像出队列时做在一个 script 中,所以当多个 pod 并发查询时会查询到同一个任务,如果都往 retry
集合放入的话就会导致重复执行 为了避免这个问题,可以先从 active
和 lease
移除任务,只有当移除成功之后才会放入 retry
集合。