通過 Pulsar 源碼徹底解決重復消費問題
最近真是和 Pulsar杠上了,業務團隊反饋說是線上有個應用消息重復消費。
(資料圖片)
而且在測試環境是可以穩定復現的,根據經驗來看一般能穩定復現的都比較好解決。
定位問題接著便是定位問題了,根據之前的經驗讓業務按照這幾種情況先排查一下:
通過排查:1,2可以排除了。
沒有相關日志存在異常,但最外層也捕獲了,所以不管有無異常都會 ACK。第三個也在消費的入口和提交消息出計算了時間,最終發現都是在2s左右 ACK 的。
偽代碼如下:
Consumerconsumer=client.newConsumer().subscriptionType(SubscriptionType.Shared).enableRetry(true).topic(topic).ackTimeout(30,TimeUnit.SECONDS).subscriptionName("my-sub").messageListener(newMessageListener(){@SneakyThrows@Overridepublicvoidreceived(Consumer consumer,Message msg){log.info("msg_id{}",msg.getMessageId().toString());TimeUnit.SECONDS.sleep(2);consumer.acknowledge(msg);}}).subscribe();
那這就很奇怪了,因為代碼里配置的 ackTimeout 是 30s,理論上來說是不會存在超時導致消息重發的。
為了排除是否是超時引起的,直接將業務代碼注釋掉了,等于是消息收到后立即就 ACK,經過測試發現這樣確實就沒有重復消費了。
為了再次確認是不是和 ackTimeout 有關,直接將 .ackTimeout(30, TimeUnit.SECONDS)注釋掉后測試,發現也沒有重復消費了。
確認原因既然如此那一定是和這個配置有關了,但看代碼確實沒有超時,為了定位具體原因只有去看 client 的源碼了。
這里簡單梳理下消息的消費的流程:
根據 .receiverQueueSize(1000)的配置,默認情況下 broker 會直接給客戶端推送 1000 條消息??蛻舳藢⑦@ 1000 條消息保存到內部隊列中。如果使用同步消費 receive()時,本質上就是去 take這個內部隊列。如果是使用的是 messageListener異步消費并配置 ackTimeout,每當從隊列里獲得一條消息后便會把這條消息加入 UnAckedMessageTracker內部的一個時間輪中,定時檢測頂部是否存在消息,如果存在則會觸發重新投遞。4.1 加入時間輪后,異步調用我們自定義的事件,這個異步操作是提交到一個無界隊列中由單個線程依次排隊執行(這點是這次問題的關鍵)業務 ACK 的時候會從時間輪中刪除消息,所以如果消息 ACK 的足夠快,在第四步就不會獲取到消息進行重新投遞。整體流程如上圖,代碼細節如下圖:
所以問題的根本原因就是寫入時間輪(UnAckedMessageTracker)開始倒計時的線程和回調業務邏輯的不是同一個線程。
如果業務執行耗時,等到消息從那個單線程的無界隊列中取出來的時候很有可能已經過了 ackTimeou 的時間,從而導致了超時重發。
也就是用戶所理解的 ackTimeout周期(應該進入回調時候開始計時)和 SDK 實現的不一致造成的。
之后我再次確認同樣的代碼換為同步消費是沒有問題的,不會導致重復消費:
while(true){Messagemsg=consumer.receive();log.info("consumerMessagereceived:"+newString(msg.getData())+msg.getMessageId().toString());TimeUnit.SECONDS.sleep(2);consumer.acknowledge(msg);}
查看代碼后發現同步代碼的獲取消息和加入 UnAckedMessageTracker時間輪是同步的,也就不會出現超時的問題。
總結所以其實 是messageListener異步消費的 ackTimeout 的語義是有問題的,需要將加入 UnAckedMessageTracker處移動到回調函數中同步調用。
我查看了最新的 2.11.x版本的代碼依然沒有修復,正準備提個 PR 切換到 master 時才發現已經有相關的 PR 了,只是還沒有發版。
修復的背景和思路也是類似的,具體參考:
https://github.com/apache/pulsar/pull/18911
其實業務中并不推薦使用 ackTimeout 這個配置了,不好預估時間從而導致超時,而且我相信大部分業務配置好 ackTImeout后直到后續出問題的時候才想起來要改。所以干脆一開始就不要使用。
在 go 版本的 SDK 中直接廢棄掉了這個參數,推薦使用 nack API 替換。
往期推薦
一個詭異的 Pulsar InterruptedException 異常
Istio 升級后踩的坑
Pulsar負載均衡原理及優化
2022 年度總結
對 Pulsar 集群的壓測與優化
點分享
點收藏
點點贊
點在看
相關閱讀
-
通過 Pulsar 源碼徹底解決重復消費問題
背景最近真是和Pulsar杠上了,業務團隊反饋說是線上有個應用消息重... -
Redis7系列教程入門-認識Redis_環球熱點
大家好,我是二條。一位從事服務端研發的程序猿。從今天開始,我會... -
面試Go 被defer的幾個盲區坑了 世界視點
大家好,我是二條,一位從事后端開發的程序員。上一篇,我們了解了G... -
一文弄清混合云架構模式
當我們在說云架構的時候,通常指的并不是云平臺的自身架構,而是基... -
如果你使用 Helm,這款可視化工具不可錯過
大家好,又見面了,我是GitHub精選君!今天要給大家推薦一個GitHub... -
天天熱訊:機器學習系統架構的10個要素
這是一個AI賦能的時代,而機器學習則是實現AI的一種重要技術手段。...