我在 Golang 的生态里一直没找到好用的延时任务方案,在之前的工作中需要的时候也是自己实现了一个简易方案,它有诸多问题,比如部署复杂,稳定性不够。
直到 2022 年我知晓了 Asynq。
Asynq 这样介绍自己:
Simple, reliable & efficient distributed task queue in Go
Go 中简单、可靠、高效的分布式任务队列
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.
Asynq 是一个 Go 库,用于排队任务并通过工作者异步处理这些任务。它由 Redis 支持,被设计成可扩展且容易上手。
Asynq 过去一年在公司业务中调度了超过 5000w 个任务,确实高效且稳定,并且使用的 Redis 人手都有,部署难度很低。
现在空下来研究下它的实现原理,发现比想象中更复杂也有趣。
在研究 Asynq 之前,我也尝试自己思考了下应该如何实现,这样才知道有哪些难点,以及 Asyncq 如此设计是为了解决什么问题。
实现方案:使用有序集合存储将要到期的任务,定时从集合中拿取任务,看起来我们只需要用到一个数据结构,一个 key 就可以了。
问题:
解决问题 1 可以先拿出再删除,如果没删除成功,说明已经有其他节点拿到了这个任务,只有删除成功才能执行。
问题 2 更复杂,程序逻辑无法解决这个问题,因为程序本身就不稳定, 所以仅仅使用问题 1 的解决方案是实现不了的,因为删除后数据就从 Redis 丢失了,而程序又无法可靠的进行重试。
我们还需要一个新的 key 存储等待响应的任务,在任务超时后重试。现在问题变得复杂起来了,放弃思考,抄答案吧。
带着这两个问题,我们来看看 Asyncq 它是如何实现的。
原子性是指操作在执行过程中不可被中断或分割,要么全部执行成功,要么全部失败回滚,保证操作的完整性和一致性。 在并发环境下,原子性是确保多个线程在同时访问共享资源时,对共享资源的操作不会相互干扰从而导致数据不一致。
由于在某个动作中需要操作多个队列,为了实现原子性,则需要使用 script 命令来执行操作,比如一次出队列操作如下:
var dequeueCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[2]) == 0 then local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) if id then local key = ARGV[2] .. id redis.call("HSET", key, "state", "active") redis.call("HDEL", key, "pending_since") redis.call("ZADD", KEYS[4], ARGV[1], id) return redis.call("HGET", key, "msg") end end return nil`)
由于程序的崩溃可能发生在任何地方,为了保证不丢数据,和 MQ 一样,我们必须设计一个 ACK(确认 Acknowledgement)机制,如果超过一段时间程序没有应答(无论成功和失败都算应答)则需要重试。这也是整个系统最复杂的地方,可以说将复杂度扩大了几倍。
在 asyncq 中出队列时,不光使用 RPOPLPUSH 将 pending 的任务放入 active 队列,同时也会将任务放在有序集合 lease(默认延时 30 s),然后定时从 lease 查询出过期的任务并重试。
并发可能导致判断失效而重复执行,asynq 在不同场景使用不同机制来避免重复执行:
© 2024 bysir's Blog - by
Creght