延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。那么,是在什么場景下我才需要這樣的隊列呢?
1. 背景
我們先看看以下業(yè)務場景:
當訂單一直處于未支付狀態(tài)時,如何及時的關閉訂單如何定期檢查處于退款狀態(tài)的訂單是否已經退款成功在訂單長時間沒有收到下游系統(tǒng)的狀態(tài)通知的時候,如何實現階梯式的同步訂單狀態(tài)的策略在系統(tǒng)通知上游系統(tǒng)支付成功終態(tài)時,上游系統(tǒng)返回通知失敗,如何進行異步通知實行分頻率發(fā)送:15s 3m 10m 30m 30m 1h 2h 6h 15h
1.1 解決方案
最簡單的方式,定時掃表。例如對于訂單支付失效要求比較高的,每2s掃表一次檢查過期的訂單進行主動關單操作。優(yōu)點是簡單,缺點是每分鐘全局掃表,浪費資源,如果遇到表數據訂單量即將過期的訂單量很大,會造成關單延遲。
使用rabbitmq或者其他mq改造實現延遲隊列,優(yōu)點是,開源,現成的穩(wěn)定的實現方案,缺點是:mq是一個消息中間件,如果團隊技術棧本來就有mq,那還好,如果不是,那為了延遲隊列而去部署一套mq成本有點大
使用redis的zset、list的特性,我們可以利用redis來實現一個延遲隊列redisdelayqueue
2. 設計目標
實時性:允許存在一定時間的秒級誤差高可用性:支持單機、支持集群支持消息刪除:業(yè)務會隨時刪除指定消息消息可靠性:保證至少被消費一次消息持久化:基于redis自身的持久化特性,如果redis數據丟失,意味著延遲消息的丟失,不過可以做主備和集群保證。這個可以考慮后續(xù)優(yōu)化將消息持久化到mangodb中
3. 設計方案
設計主要包含以下幾點:
將整個redis當做消息池,以kv形式存儲消息使用zset做優(yōu)先隊列,按照score維持優(yōu)先級使用list結構,以先進先出的方式消費zset和list存儲消息地址(對應消息池的每個key)自定義路由對象,存儲zset和list名稱,以點對點的方式將消息從zset路由到正確的list使用定時器維護路由根據ttl規(guī)則實現消息延遲
3.1 設計圖
還是基于有贊的延遲隊列設計,進行優(yōu)化改造及代碼實現。有贊設計
3.2 數據結構
zing:delay_queue:job_pool 是一個hash_table結構,里面存儲了所有延遲隊列的信息。kv結構:k=prefix projectname field = topic jobid v=conent;v由客戶端傳入的數據,消費的時候回傳zing:delay_queue:bucket 延遲隊列的有序集合zset,存放k=id和需要的執(zhí)行時間戳,根據時間戳排序zing:delay_queue:queue list結構,每個topic一個list,list存放的都是當前需要被消費的job
圖片僅供參考,基本可以描述整個流程的執(zhí)行過程,圖片源于文末的參考博客中
3.3 任務的生命周期
新增一個job,會在zing:delay_queue:job_pool中插入一條數據,記錄了業(yè)務方消費方。zing:delay_queue:bucket也會插入一條記錄,記錄執(zhí)行的時間戳搬運線程會去zing:delay_queue:bucket中查找哪些執(zhí)行時間戳的runtimemillis比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的topic是什么,然后將這些任務push到topic對應的列表zing:delay_queue:queue中每個topic的list都會有一個監(jiān)聽線程去批量獲取list中的待消費數據,獲取到的數據全部扔給這個topic的消費線程池消費線程池執(zhí)行會去zing:delay_queue:job_pool查找數據結構,返回給回調結構,執(zhí)行回調方法。
3.4 設計要點
3.4.1 基本概念
job:需要異步處理的任務,是延遲隊列里的基本單元topic:一組相同類型job的集合(隊列)。供消費者來訂閱
3.4.2 消息結構
每個job必須包含以下幾個屬性
jobid:job的唯一標識。用來檢索和刪除指定的job信息topic:job類型??梢岳斫獬删唧w的業(yè)務名稱delay:job需要延遲的時間。單位:秒。(服務端會將其轉換為絕對時間)body:job的內容,供消費者做具體的業(yè)務處理,以json格式存儲retry:失敗重試次數url:通知url
3.5 設計細節(jié)
3.5.1 如何快速消費zing:delay_queue:queue
最簡單的實現方式就是使用定時器進行秒級掃描,為了保證消息執(zhí)行的時效性,可以設置每1s請求redis一次,判斷隊列中是否有待消費的job。但是這樣會存在一個問題,如果queue中一直沒有可消費的job,那頻繁的掃描就失去了意義,也浪費了資源,幸好list中有一個blpop阻塞原語,如果list中有數據就會立馬返回,如果沒有數據就會一直阻塞在那里,直到有數據返回,可以設置阻塞的超時時間,超時會返回null;具體的實現方式及策略會在代碼中進行具體的實現介紹
3.5.2 避免定時導致的消息重復搬運及消費
使用redis的分布式鎖來控制消息的搬運,從而避免消息被重復搬運導致的問題使用分布式鎖來保證定時器的執(zhí)行頻率
4. 核心代碼實現
4.1 技術說明
技術棧:springboot,redisson,redis,分布式鎖,定時器
注意:本項目沒有實現設計方案中的多queue消費,只開啟了一個queue,這個待以后優(yōu)化
4.2 核心實體
4.2.1 job新增對象
/ * 消息結構 * * @author 睜眼看世界 * @date 2020年1月15日 */@datapublic class job implements serializable { private static final long serialversionuid = 1l; / * job的唯一標識。用來檢索和刪除指定的job信息 */ @notblank private string jobid; / * job類型。可以理解成具體的業(yè)務名稱 */ @notblank private string topic; / * job需要延遲的時間。單位:秒。(服務端會將其轉換為絕對時間) */ private long delay; / * job的內容,供消費者做具體的業(yè)務處理,以json格式存儲 */ @notblank private string body; / * 失敗重試次數 */ private int retry = 0; / * 通知url */ @notblank private string url;}4.2.2 job刪除對象
/ * 消息結構 * * @author 睜眼看世界 * @date 2020年1月15日 */@datapublic class jobdie implements serializable { private static final long serialversionuid = 1l; / * job的唯一標識。用來檢索和刪除指定的job信息 */ @notblank private string jobid; / * job類型??梢岳斫獬删唧w的業(yè)務名稱 */ @notblank private string topic;}4.3 搬運線程
/ * 搬運線程 * * @author 睜眼看世界 * @date 2020年1月17日 */@slf4j@componentpublic class carryjobscheduled { @autowired private redissonclient redissonclient; / * 啟動定時開啟搬運job信息 */ @scheduled(cron = "*/1 * * * * *") public void carryjobtoqueue() { system.out.println("carryjobtoqueue --->"); rlock lock = redissonclient.getlock(redisqueuekey.carry_thread_lock); try { boolean lockflag = lock.trylock(lock_wait_time, lock_release_time, timeunit.seconds); if (!lockflag) { throw new businessexception(errormessageenum.acquire_lock_fail); } rscoredsortedset<object> bucketset = redissonclient.gets