前言
消息隊(duì)列(Message Queue),從廣義上講是一種消息隊(duì)列服務(wù)中間件,提供一套完整的信息生產(chǎn)、傳遞、消費(fèi)的軟件系統(tǒng)。
消息隊(duì)列所涵蓋的功能遠(yuǎn)不止于隊(duì)列(Queue),其本質(zhì)是兩個(gè)進(jìn)程傳遞信息的一種方法。兩個(gè)進(jìn)程可以分布在同一臺(tái)機(jī)器上,亦可以分布在不同的機(jī)器上。
眾所周知,進(jìn)程通信可以通過(guò) RPC(Remote Procedure Call,遠(yuǎn)程過(guò)程調(diào)用)進(jìn)行,那么我們?yōu)槭裁匆孟㈥?duì)列這種軟件服務(wù)來(lái)傳遞消息呢?
我們通過(guò)一個(gè)快遞員送快遞的栗子來(lái)描述下消息隊(duì)列的作用。
消息隊(duì)列
現(xiàn)實(shí)世界的例子
小明是一名快遞員,通常給一個(gè)客戶(hù)送快遞分為三步:
- 第一步:把快遞運(yùn)到客戶(hù)家門(mén)口;
- 第二步:敲門(mén);
- 第三步:客戶(hù)開(kāi)門(mén)取走快遞。
好了上邊是送快遞最簡(jiǎn)單的三步,讓我們想想,這簡(jiǎn)單的三步會(huì)有什么問(wèn)題?
(1)耦合
小明什么時(shí)候完成這一單,完全依賴(lài)于客戶(hù)的響應(yīng)速度。
如果客戶(hù)還沒(méi)起床,聽(tīng)見(jiàn)敲門(mén)聲再穿衣服開(kāi)門(mén),可能消耗很多時(shí)間。如果客戶(hù)沒(méi)在家呢?那就要配送失敗了,如何判斷配送失敗呢?小明需要判斷等多久開(kāi)門(mén)(超時(shí)時(shí)間),打電話判斷是否在家(健康檢查),最終郁悶的離開(kāi),下次再來(lái)一次(重試)。
小明直接與客戶(hù)交互,對(duì)客戶(hù)的狀態(tài)強(qiáng)依賴(lài),產(chǎn)生了耦合現(xiàn)象。
(2)同步影響性能
小明的配送速度受到客戶(hù)的響應(yīng)速度影響極大,有一兩個(gè)需要長(zhǎng)時(shí)間等待的快件,小明的配送效率(吞吐率)會(huì)受到很大影響。
(3)高峰期負(fù)載很高
每次到雙11、618 購(gòu)物節(jié)的時(shí)候,小明都很煩躁??爝f太多,來(lái)的比送得快,這可如何是好。一旦有客戶(hù)因?yàn)槁?lián)系不上影響了配送效率,就會(huì)影響后面客戶(hù)的配送,嚴(yán)重了還會(huì)收到投訴。
這個(gè)時(shí)候有個(gè)叫X巢的快遞柜出現(xiàn)了,小明可以把快遞放到柜子里,發(fā)條短信通知客戶(hù)過(guò)來(lái)取快遞。這樣就不強(qiáng)依賴(lài)客戶(hù)的響應(yīng),大大提高了配送效率。
這里的快遞柜就相當(dāng)于是編程世界的消息隊(duì)列,讓我們看看消息隊(duì)列到底起到了什么作用。
消息隊(duì)列解決什么問(wèn)題
- 解耦:此時(shí),小明只需要把快遞放到柜子里,不需要關(guān)心客戶(hù)是否在家,是否在睡覺(jué)??蛻?hù)也不需要一直等待給小明開(kāi)門(mén),兩個(gè)人解耦了。
- 異步:小明把快遞放到柜子里發(fā)個(gè)信息就可以去送下一件,不需同步等待結(jié)果。這樣每個(gè)快遞的處理速度(響應(yīng)時(shí)間)都變得極短,每天送的快遞數(shù)量(吞吐量)也變多了。
- 削峰:這次又到了雙十一,以前小明一天只能配送 100 個(gè)快遞,現(xiàn)在有了快遞柜,配送量(吞吐量和響應(yīng)速度)增加了好幾倍,甚至數(shù)十倍,大大提升了小明的工作效率。這下小明再也不擔(dān)心接到投訴了。
總結(jié)
讓我們簡(jiǎn)單總結(jié)一下消息隊(duì)列的作用,首先需要肯定的是使用消息組件有很多好處,其中最核心的三個(gè)是:解耦、異步、削峰。
- 解耦:生產(chǎn)端和消費(fèi)端不需要相互依賴(lài);
- 異步:生產(chǎn)端不需要等待消費(fèi)端響應(yīng),直接返回,提高了響應(yīng)時(shí)間和吞吐量;
- 削峰:打平高峰期的流量,消費(fèi)端可以以自己的速度處理,同時(shí)也無(wú)需在高峰期增加太多資源,提高資源利用率。
引入消息隊(duì)列后讓我們子系統(tǒng)間耦合性降低了,異步處理機(jī)制減少了系統(tǒng)的響應(yīng)時(shí)間,同時(shí)能夠有效的應(yīng)對(duì)請(qǐng)求峰值問(wèn)題,提升系統(tǒng)的穩(wěn)定性。但同時(shí)引入消息隊(duì)列也會(huì)帶來(lái)一些問(wèn)題。
下面我們以 RocketMQ
為例來(lái)分析引入 MQ 帶來(lái)的問(wèn)題以及解決方案。
MQ 常見(jiàn)問(wèn)題分析
消息丟失
消息丟失可以說(shuō)是 MQ 中普遍存在的問(wèn)題,不管用哪種 MQ 都無(wú)法避免。
那么有哪些場(chǎng)景會(huì)出現(xiàn)消息丟失問(wèn)題呢?
我們下面來(lái)看一下,整個(gè)消息從生產(chǎn)到消費(fèi)的過(guò)程中,哪些地方可能會(huì)導(dǎo)致丟消息,以及應(yīng)該如何避免消息丟失。
一條消息從生產(chǎn)到被消費(fèi),將會(huì)經(jīng)歷三個(gè)階段:
- 生產(chǎn)階段,生產(chǎn)者新建消息,然后通過(guò)網(wǎng)絡(luò)將消息投遞給 MQ 服務(wù)器
- 存儲(chǔ)階段,消息將會(huì)存儲(chǔ)在服務(wù)器磁盤(pán)中,如果是集群,消息會(huì)在這個(gè)階段被復(fù)制到其他的副本上
- 消費(fèi)階段, 消費(fèi)者將會(huì)從 MQ 服務(wù)器拉取消息
以上任一階段都可能會(huì)丟失消息:
- 生產(chǎn)階段:生產(chǎn)者發(fā)送消息時(shí),由于網(wǎng)絡(luò)原因,發(fā)送到 MQ 失敗了。
- 存儲(chǔ)階段:MQ 服務(wù)器持久化時(shí),服務(wù)器宕機(jī)、重啟導(dǎo)致丟失消息。
- 消費(fèi)階段:消息消費(fèi)者剛讀取消息,已經(jīng) ack 確認(rèn)了,但業(yè)務(wù)還沒(méi)處理完,服務(wù)就被重啟了。
消息丟失解決方案
生產(chǎn)階段
RocketMQ
提供了 3 種發(fā)送消息方式,分別是:
- 同步發(fā)送:生產(chǎn)者向 MQ 發(fā)送消息,阻塞當(dāng)前線程等待 MQ 服務(wù)器響應(yīng)發(fā)送結(jié)果。
- 異步發(fā)送:生產(chǎn)者首先構(gòu)建一個(gè)向服務(wù)器發(fā)送消息的任務(wù),把該任務(wù)提交給線程池,等執(zhí)行完該任務(wù)時(shí),回調(diào)用戶(hù)自定義的回調(diào)函數(shù),執(zhí)行處理結(jié)果。
- Oneway 發(fā)送:生產(chǎn)者發(fā)起消息發(fā)送請(qǐng)求后并不會(huì)等待服務(wù)器的響應(yīng)結(jié)果,也不會(huì)調(diào)用回調(diào)函數(shù),即不關(guān)心消息的最終發(fā)送結(jié)果。
Oneway 相對(duì)前兩種發(fā)送方式來(lái)說(shuō)是一種不可靠的消息發(fā)送方式,因此要保證消息發(fā)送的可靠性,我們只考慮同步和異步的發(fā)送方式。
(1)同步發(fā)送可靠性保證
采用同步阻塞式的發(fā)送,然后同步檢查 MQ 服務(wù)器返回的狀態(tài)來(lái)判斷消息是否持久化成功。如果發(fā)送超時(shí)或者失敗,則會(huì)自動(dòng)重試,如果重試再失敗,就會(huì)以返回值或者異常的方式告知用戶(hù)。
我們?cè)诰帉?xiě)發(fā)送消息代碼時(shí),需要注意,正確處理返回值或者捕獲異常,就可以保證這個(gè)階段的消息不會(huì)丟失。
同步發(fā)送,代碼如下:
public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}
同步發(fā)送會(huì)返回 4 個(gè)狀態(tài)碼:
SEND_OK:消息發(fā)送成功。
- 需要注意的是,消息發(fā)送到服務(wù)器后,還有兩個(gè)操作:「消息刷盤(pán)」和「消息同步到 slave 節(jié)點(diǎn)」,默認(rèn)這兩個(gè)操作都是異步的,只有把這兩個(gè)操作都改為同步,
SEND_OK
這個(gè)狀態(tài)才能真正表示發(fā)送成功。 FLUSH_DISK_TIMEOUT
:消息發(fā)送成功但是消息刷盤(pán)超時(shí)。FLUSH_SLAVE_TIMEOUT
:消息發(fā)送成功但是消息同步到 slave 節(jié)點(diǎn)時(shí)超時(shí)。SLAVE_NOT_AVAILABLE
:消息發(fā)送成功但是 broker 的 slave 節(jié)點(diǎn)不可用。
根據(jù)返回的狀態(tài)碼,可以做消息重試,這里設(shè)置的重試次數(shù)是 3。
消息重試時(shí),消費(fèi)端一定要做好冪等處理。
既然是同步發(fā)送肯定就比較耗費(fèi)一些時(shí)間,如果你的業(yè)務(wù)比較注重 RT 那就可以使用異步發(fā)送的方式。
(2)異步發(fā)送可靠性保證
異步發(fā)送時(shí),則需要在回調(diào)方法里進(jìn)行檢查。這個(gè)地方是需要特別注意的,很多丟消息的原因就是,我們使用了異步發(fā)送,卻沒(méi)有在回調(diào)中檢查發(fā)送結(jié)果。
具體的業(yè)務(wù)實(shí)現(xiàn)可以根據(jù)發(fā)送的結(jié)果信息來(lái)判斷是否需要重試來(lái)保證消息的可靠性。
異步發(fā)送,代碼如下:
public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
// TODO 可以在這里加入重試邏輯
}
});
}
異步發(fā)送,可以重寫(xiě)回調(diào)函數(shù),回調(diào)函數(shù)捕獲到 Exception 時(shí)表示發(fā)送失敗,這時(shí)可以進(jìn)行重試,這里設(shè)置的重試次數(shù)是 3。
存儲(chǔ)階段
默認(rèn)的情況下,消息隊(duì)列為了快速響應(yīng),在接受到生產(chǎn)者的請(qǐng)求,將消息保存在內(nèi)存成功之后,就會(huì)立刻返回 ACK 響應(yīng)給生產(chǎn)者。
RocketMQ
的刷盤(pán)方式分為「同步刷盤(pán)」和「異步刷盤(pán)」兩種。
- 異步刷盤(pán):消息寫(xiě)入 CommitLog 時(shí),并不會(huì)直接寫(xiě)入磁盤(pán),而是先寫(xiě)入 PageCache 緩存后返回成功,然后用后臺(tái)線程異步把消息刷入磁盤(pán)。異步刷盤(pán)提高了消息吞吐量,但是可能會(huì)有消息丟失的情況,比如斷點(diǎn)導(dǎo)致機(jī)器停機(jī),PageCache 中沒(méi)來(lái)得及刷盤(pán)的消息就會(huì)丟失。
同步刷盤(pán):消息寫(xiě)入內(nèi)存的 PageCache 后,立刻通知刷盤(pán)線程刷盤(pán),然后等待刷盤(pán)完成,如果消息未在約定的時(shí)間內(nèi)(默認(rèn) 5 s)刷盤(pán)成功,就返回FLUSH_DISK_TIMEOUT,Producer 收到這個(gè)響應(yīng)后,可以進(jìn)行重試。同步刷盤(pán)策略保證了消息的可靠性,同時(shí)降低了吞吐量,增加了延遲。
- 要開(kāi)啟同步刷盤(pán),需要增加下面配置:
flushDiskType=SYNC_FLUSH
RocketMQ 默認(rèn)的是異步刷盤(pán),就有可能導(dǎo)致消息還未刷到硬盤(pán)上就丟失了,可以通過(guò)設(shè)置為同步刷盤(pán)的方式來(lái)保證消息可靠性,這樣即使 MQ 掛了,恢復(fù)的時(shí)候也可以從磁盤(pán)中去恢復(fù)消息。
如果是 Broker 是由多個(gè)節(jié)點(diǎn)組成的集群,需要將 Broker 集群配置成:至少將消息發(fā)送到 2 個(gè)以上的節(jié)點(diǎn),再給客戶(hù)端回復(fù)發(fā)送確認(rèn)響應(yīng)。 這樣當(dāng)某個(gè) Broker 宕機(jī)時(shí),其他的 Broker 可以替代宕機(jī)的 Broker,也不會(huì)發(fā)生消息丟失。
Broker 采用集群配置時(shí),需要注意的一點(diǎn)是:消息發(fā)送到 master 節(jié)點(diǎn)后,slave 節(jié)點(diǎn)會(huì)從 master 拉取消息保持跟 master 的一致。這個(gè)過(guò)程默認(rèn)是異步的,即 master 收到消息后,不等 slave 節(jié)點(diǎn)復(fù)制消息就直接給 Producer 返回成功。
這樣會(huì)有一個(gè)問(wèn)題,如果 slave 節(jié)點(diǎn)還沒(méi)有完成消息復(fù)制,這時(shí) master 宕機(jī)了,進(jìn)行主備切換后就會(huì)有消息丟失。
為了避免這個(gè)問(wèn)題,可以采用 slave 節(jié)點(diǎn)同步復(fù)制消息,即等 slave 節(jié)點(diǎn)復(fù)制消息成功后再給 Producer 返回發(fā)送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER。
消費(fèi)階段
消費(fèi)階段采用和生產(chǎn)階段類(lèi)似的確認(rèn)機(jī)制來(lái)保證消息的可靠傳遞,客戶(hù)端從 Broker 拉取消息后,執(zhí)行用戶(hù)的消費(fèi)業(yè)務(wù)邏輯:
- 如果 Consumer 消費(fèi)成功,返回
CONSUME_SUCCESS
,提交 offset 并從 Broker 拉取下一批消息 - 如果 Consumer 消費(fèi)失敗,下次拉消息的時(shí)候還會(huì)返回同一條消息,即進(jìn)行消費(fèi)重試。
消費(fèi)重試
RocketMQ 認(rèn)為消息消費(fèi)失敗需要重試的場(chǎng)景有三種:
- 返回 RECONSUME_LATER
- 返回 null
- 拋出異常
Broker 收到這個(gè)響應(yīng)后,會(huì)把這條消息放入重試隊(duì)列,Topic 名字為%RETRY% + consumerGroup
。
注意:
Broker 默認(rèn)最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入「死信隊(duì)列」,Consumer 可以訂閱死信隊(duì)列進(jìn)行消費(fèi)。
重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。
Consumer 端一定要做好冪等處理。
其實(shí)重試 3 次都失敗就可以說(shuō)明代碼有問(wèn)題,這時(shí) Consumer 可以把消息存入本地,給 Broker 返回 CONSUME_SUCCESS 來(lái)結(jié)束重試。
死信隊(duì)列:未能成功消費(fèi)的消息,消息隊(duì)列并不會(huì)立刻將消息丟棄,而是將消息發(fā)送到死信隊(duì)列,其名稱(chēng)是在原隊(duì)列名稱(chēng)前加 %DLQ%,如果消息最終進(jìn)入了死信隊(duì)列,則可以通過(guò) RocketMQ 提供的相關(guān)接口從死信隊(duì)列獲取到相應(yīng)的消息,保證了消息消費(fèi)的可靠性。
上面方案看似萬(wàn)無(wú)一失,每個(gè)階段都能保證消息的不丟失,但在分布式系統(tǒng)中,故障不可避免,作為消息生產(chǎn)端,你并不能保證 MQ 是不是弄丟了你的消息,消費(fèi)者是否消費(fèi)了你的消息,所以,本著 Design for Failure 的設(shè)計(jì)原則,我們需要一種機(jī)制,來(lái) Check 消息是否丟失了。
檢測(cè)消息丟失的方法
總體方案解決思路為:在消息生產(chǎn)端,給每個(gè)發(fā)出的消息都指定一個(gè)全局唯一 ID,或者附加一個(gè)連續(xù)遞增的版本號(hào),然后在消費(fèi)端做對(duì)應(yīng)的版本校驗(yàn)。
可以利用攔截器機(jī)制。在生產(chǎn)端發(fā)送消息之前,通過(guò)攔截器將消息版本號(hào)注入消息中(版本號(hào)可以采用連續(xù)遞增的 ID 生成,也可以通過(guò)分布式全局唯一 ID生成)。然后在消費(fèi)端收到消息后,再通過(guò)攔截器檢測(cè)版本號(hào)的連續(xù)性或消費(fèi)狀態(tài),這樣實(shí)現(xiàn)的好處是消息檢測(cè)的代碼不會(huì)侵入到業(yè)務(wù)代碼中,可以通過(guò)單獨(dú)的任務(wù)來(lái)定位丟失的消息,做進(jìn)一步的排查。
如果同時(shí)存在多個(gè)消息生產(chǎn)端和消息消費(fèi)端,通過(guò)版本號(hào)遞增的方式就很難實(shí)現(xiàn)了,因?yàn)椴荒鼙WC版本號(hào)的唯一性,此時(shí)只能通過(guò)全局唯一 ID 的方案來(lái)進(jìn)行消息檢測(cè),具體的實(shí)現(xiàn)原理和版本號(hào)遞增的方式一致。
重復(fù)消息
RocketMQ 為了保證消息的可靠性,選擇 「至少傳輸成功一次」 的消息模型。
在消息領(lǐng)域有一個(gè)對(duì)消息投遞的 QoS 定義,分為:
- 最多一次(At most once)
- 至少一次(At least once)
- 僅一次( Exactly once)
既然是至少一次,那避免不了消息重復(fù),尤其是在分布式網(wǎng)絡(luò)環(huán)境下。比如:網(wǎng)絡(luò)原因閃斷,ACK 返回失敗等等故障,確認(rèn)信息沒(méi)有傳送到消息隊(duì)列,導(dǎo)致消息隊(duì)列不知道該消息已經(jīng)被消費(fèi)了,再次將該消息分發(fā)給其他的消費(fèi)者。
那么如何解決這個(gè)問(wèn)題?
這個(gè)問(wèn)題其實(shí)可以換一種說(shuō)法,就是如何解決消費(fèi)端冪等性問(wèn)題(冪等性,就是一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同),只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問(wèn)題也就解決了。
那如何保證消息隊(duì)列消費(fèi)的冪等性? 我們還是得結(jié)合業(yè)務(wù)來(lái)思考,這里給幾個(gè)思路:
- 利用數(shù)據(jù)庫(kù)的唯一約束實(shí)現(xiàn):比如收到數(shù)據(jù)時(shí)要寫(xiě)庫(kù),通過(guò)創(chuàng)建唯一索引的方式保證冪等。
- 為更新的數(shù)據(jù)設(shè)置前置條件:給數(shù)據(jù)變更設(shè)置一個(gè)前置條件,如果滿(mǎn)足條件就更新數(shù)據(jù),否則拒絕更新數(shù)據(jù)。 比如可以通過(guò)判斷狀態(tài)是否允許操作,不滿(mǎn)足的拒絕更新。
如果上面提到的兩種實(shí)現(xiàn)冪等方法都不能適用于你的場(chǎng)景,我們還有一種通用性最強(qiáng),適用范圍最廣的實(shí)現(xiàn)冪等性方法。
更通用的解決方案
終極方法:「基于消息冪等表的非事務(wù)方案」,實(shí)現(xiàn)的思路特別簡(jiǎn)單:在執(zhí)行業(yè)務(wù)代碼之前,先檢查一下是否處理過(guò)這個(gè)條消息。
具體的實(shí)現(xiàn)方法是:
首先,在數(shù)據(jù)庫(kù)中建一張消息日志表,這個(gè)表有兩個(gè)字段:「消息 ID」和「消息執(zhí)行狀態(tài)(消費(fèi)中、已消費(fèi))」。
然后給消息 ID 來(lái)創(chuàng)建一個(gè)唯一約束,這樣對(duì)于相同的消息 ID,表里至多只能存在一條記錄。
在發(fā)送消息時(shí),給每條消息指定一個(gè)全局唯一的 ID,消費(fèi)時(shí),先根據(jù)這個(gè) ID 檢查這條消息是否有被消費(fèi)過(guò),如果沒(méi)有消費(fèi)過(guò),才執(zhí)行業(yè)務(wù)代碼,然后將消費(fèi)狀態(tài)置為已消費(fèi)。
可以看到,此方案是無(wú)事務(wù)的,而是針對(duì)消息表本身做了狀態(tài)的區(qū)分:消費(fèi)中、消費(fèi)完成。只有消費(fèi)完成的消息才會(huì)被冪等處理掉。
而對(duì)于已有消費(fèi)中的消息,后面重復(fù)的消息會(huì)觸發(fā)延遲消費(fèi),這樣主要是為了控制并發(fā)場(chǎng)景下,第二條消息在第一條消息沒(méi)完成的過(guò)程中,去控制消息不丟(如果直接冪等,那么會(huì)丟失消息(同一個(gè)消息id的話),因?yàn)樯弦粭l消息如果沒(méi)有消費(fèi)完成的時(shí)候,第二條消息你已經(jīng)告訴broker成功了,那么第一條消息這時(shí)候失敗broker也不會(huì)重新投遞了)。
我們分析下這種方案是否解決了冪等問(wèn)題:
- 消息已經(jīng)消費(fèi)成功了,第二條消息將被直接冪等處理掉(消費(fèi)成功)
- 并發(fā)場(chǎng)景下的消息,依舊能滿(mǎn)足不會(huì)出現(xiàn)消息重復(fù)
- 支持上游業(yè)務(wù)生產(chǎn)者重發(fā)的業(yè)務(wù)重復(fù)的消息冪等問(wèn)題
第一個(gè)問(wèn)題明顯解決了。
第二個(gè)問(wèn)題也已經(jīng)解決,主要是依靠插入消息表的這個(gè)動(dòng)作做控制的,因?yàn)椤赶?ID」的惟一的,后面的消息插入會(huì)由于主鍵沖突而失敗,走向延遲消費(fèi)的分支,然后后面延遲消費(fèi)的時(shí)候就會(huì)變成上面第一個(gè)場(chǎng)景的問(wèn)題。
關(guān)于第三個(gè)問(wèn)題,只要我們?cè)O(shè)計(jì)去重的消息鍵讓其支持業(yè)務(wù)的主鍵(例如訂單號(hào)、請(qǐng)求流水號(hào)等),而不僅僅是 messageId 即可。所以也不是問(wèn)題。
此方案是否有消息丟失的風(fēng)險(xiǎn)?
細(xì)心的讀者可能會(huì)發(fā)現(xiàn)這里實(shí)際上是有邏輯漏洞的,問(wèn)題出在上面聊到的三個(gè)問(wèn)題中的第 2 個(gè)問(wèn)題「并發(fā)場(chǎng)景」。
在并發(fā)場(chǎng)景下我們依賴(lài)于消息狀態(tài)做并發(fā)控制,使得第二條重復(fù)的消息會(huì)不斷延遲消費(fèi)(重試)。
但如果這時(shí)候第一條消息也由于一些異常原因(例如機(jī)器重啟了、外部異常導(dǎo)致消費(fèi)失?。](méi)有成功消費(fèi)成功呢?
也就是說(shuō)這時(shí)候延遲消費(fèi)實(shí)際上每次下來(lái)看到的都是「消費(fèi)中」的狀態(tài),最后消費(fèi)就會(huì)被視為消費(fèi)失敗而被投遞到死信隊(duì)列中。
對(duì)于此,我們解決的方法是,插入的消息表必須要帶一個(gè)最長(zhǎng)消費(fèi)過(guò)期時(shí)間,例如 10 分鐘,意思是如果一個(gè)消息處于消費(fèi)中超過(guò) 10 分鐘,就需要從消息表中刪除(需要程序自行實(shí)現(xiàn))。
所以最后這個(gè)消息的流程會(huì)是這樣的:
消息積壓
如果出現(xiàn)積壓,那一定是性能問(wèn)題,想要解決消息從生產(chǎn)到消費(fèi)上的性能問(wèn)題,就首先要知道哪些環(huán)節(jié)可能出現(xiàn)消息積壓,然后在考慮如何解決。
因?yàn)橄l(fā)送之后才會(huì)出現(xiàn)積壓的問(wèn)題,所以和消息生產(chǎn)端沒(méi)有關(guān)系,又因?yàn)榻^大部分的消息隊(duì)列單節(jié)點(diǎn)都能達(dá)到每秒鐘幾萬(wàn)的處理能力,相對(duì)于業(yè)務(wù)邏輯來(lái)說(shuō),性能不會(huì)出現(xiàn)在中間件的消息存儲(chǔ)上面。
毫無(wú)疑問(wèn),出問(wèn)題的肯定是消息消費(fèi)階段。
如果是線上突發(fā)問(wèn)題,要臨時(shí)擴(kuò)容,增加消費(fèi)端的數(shù)量,與此同時(shí),降級(jí)一些非核心的業(yè)務(wù)。通過(guò)擴(kuò)容和降級(jí)承擔(dān)流量。
其次,才是排查解決異常問(wèn)題,如通過(guò)監(jiān)控,日志等手段分析是否消費(fèi)端的業(yè)務(wù)邏輯代碼出現(xiàn)了問(wèn)題,優(yōu)化消費(fèi)端的業(yè)務(wù)處理邏輯。
最后,如果是消費(fèi)端的處理能力不足,可以通過(guò)水平擴(kuò)容來(lái)提供消費(fèi)端的并發(fā)處理能力。
在擴(kuò)容消費(fèi)者的是時(shí)候有一點(diǎn)需要注意,如果當(dāng)前 Topic 的 Message Queue 的數(shù)量大于消費(fèi)者數(shù)量,就可以對(duì)消費(fèi)者進(jìn)行擴(kuò)容,增加消費(fèi)者,來(lái)提高消費(fèi)能力,盡快把積壓的消息消費(fèi)完。如果消費(fèi)者的數(shù)量大于等于 Message Queue 的數(shù)量,增加消費(fèi)者是沒(méi)有用的。
順序消費(fèi)
我們知道,RocketMQ 在主題上是無(wú)序的。但是在有些場(chǎng)景下,使用 MQ 需要保證消息的順序性,比如在電商系統(tǒng)中:下單、付款、發(fā)貨、買(mǎi)家確認(rèn)收貨,消費(fèi)端需要嚴(yán)格按照業(yè)務(wù)狀態(tài)機(jī)的順序處理,否則,就會(huì)出現(xiàn)業(yè)務(wù)問(wèn)題。
我們發(fā)現(xiàn),消息帶上了狀態(tài),不再是一個(gè)個(gè)獨(dú)立的個(gè)體,有了上下文依賴(lài)關(guān)系!
那么 MQ 是如何來(lái)保證消息順序的?
我們通常發(fā)送消息的時(shí)候,消息發(fā)送默認(rèn)是會(huì)采用輪詢(xún)的方式發(fā)送到不同的 queue。
而消費(fèi)端消費(fèi)的時(shí)候,是會(huì)分配到多個(gè) queue 的,多個(gè) queue 是同時(shí)拉取提交消費(fèi)。
但是同一條 queue 里面,RocketMQ 的確是能保證 FIFO 的。那么要做到順序消息,應(yīng)該怎么實(shí)現(xiàn)呢——把消息確保投遞到同一條 queue。
對(duì)于 RocketMQ 來(lái)說(shuō),主要是通過(guò) Producer 和 Consumer 來(lái)保證消息順序的。
生產(chǎn)端
生產(chǎn)端提供了一個(gè)接口 MessageQueueSelector
:
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
接口內(nèi)定義一個(gè) select 方法,具體參數(shù)含義:
- mqs:該 Topic 下所有的隊(duì)列分片
- msg:待發(fā)送的消息
- arg:發(fā)送消息時(shí)傳遞的參數(shù)
示例代碼
模擬訂單消息的發(fā)送,共有 3 個(gè)訂單,每個(gè)訂單都包含下單
、付款
、發(fā)貨
、買(mǎi)家確認(rèn)收貨
四個(gè)流程,對(duì)應(yīng) 4 條消息。同一個(gè)訂單的消息要求嚴(yán)格按照順序消費(fèi),不同訂單的消息可以并發(fā)執(zhí)行。
首先實(shí)現(xiàn) MessageQueueSelector
接口,定制 MessageQueue
選擇策略:
public class OrderMessageQueueSelector implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//選擇以參數(shù)arg為索引的MessageQueue
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}
下面實(shí)現(xiàn)發(fā)送消息邏輯:
@Slf4j
@Service
public class OrderMessageProducer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private static final DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
private static final String[] ORDER_MESSAGES = {"下單", "付款", "發(fā)貨", "買(mǎi)家確認(rèn)收貨"};
@PostConstruct
public void sendMessage() {
try {
//設(shè)置namesrv
producer.setNamesrvAddr(namesrvAddr);
//啟動(dòng)Producer
producer.start();
System.err.println("Order Message Producer Start...");
//創(chuàng)建3組消息,每組消息發(fā)往同一個(gè)Queue,保證消息的局部有序性
String tags = "Tags";
OrderMessageQueueSelector orderMessageQueueSelector = new OrderMessageQueueSelector();
//注:要實(shí)現(xiàn)順序消費(fèi),必須同步發(fā)送消息
for (int i = 0; i < 3; i++) {
String orderId = "" + (i + 1);
for (int j = 0, size = ORDER_MESSAGES.length; j < size; j++) {
String message = "Order-" + orderId + "-" + ORDER_MESSAGES[j];
String keys = message;
byte[] messageBody = message.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message mqMsg = new Message("TEST_TOPIC_NAME", tags, keys, messageBody);
producer.send(mqMsg, orderMessageQueueSelector, i);
}
}
} catch (Exception e) {
log.error("Message Producer: Send Message Error ", e);
}
}
}
使用 DefaultMQProducer 的 send() 方法,指定 MessageQueueSelector 和參數(shù),Broker 將會(huì)將邏輯上需要保證順序性的消息發(fā)往同一隊(duì)列。
注意:上面的代碼把 orderId 相同的消息都會(huì)發(fā)送到同一個(gè) MessageQueue,這樣同一個(gè) orderId 的消息是有序的,這也叫做局部有序。對(duì)應(yīng)的另一種是全局有序,這需要把所有的消息都發(fā)到同一個(gè) MessageQueue。
注:想要實(shí)現(xiàn)順序消費(fèi),發(fā)送方式必須為同步發(fā)送,異步發(fā)送無(wú)法保證消息的發(fā)送順序!
這樣同一批我們需要做到順序消費(fèi)訂單肯定會(huì)投遞到同一個(gè)隊(duì)列,同一個(gè)隊(duì)列肯定會(huì)投遞到同一個(gè)消費(fèi)實(shí)例,同一個(gè)消費(fèi)實(shí)例肯定是順序拉取并順序提交線程池的,只要保證消費(fèi)端順序消費(fèi),則大功告成!
消費(fèi)端
消費(fèi)端想要實(shí)現(xiàn)順序消費(fèi),只要設(shè)置監(jiān)聽(tīng)器實(shí)現(xiàn) MessageListenerOrderly
接口即可。
示例代碼
首先自定義 MessageListenerOrderly
接口實(shí)現(xiàn)類(lèi),實(shí)現(xiàn)順序消費(fèi):
public class OrderMessageListener implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
if (CollectionUtils.isEmpty(msgs)){
return ConsumeOrderlyStatus.SUCCESS;
}
//設(shè)置自動(dòng)提交
context.setAutoCommit(true);
msgs.stream()
.forEach(msg -> {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Handle Order Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: "
+ msg.getTags() + ",keys: " + msg.getKeys() + ",messageBody: " + messageBody);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return ConsumeOrderlyStatus.SUCCESS;
}
}
下面就是消費(fèi)邏輯:
@Service
public class OrderMessageConsumer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
//設(shè)置namesrv地址
consumer.setNamesrvAddr(namesrvAddr);
//從消息隊(duì)列頭部開(kāi)始消費(fèi)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//集群消費(fèi)模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//訂閱主題
consumer.subscribe("TEST_TOPIC_NAME", "*");
//注冊(cè)消息監(jiān)聽(tīng)器,這里因?yàn)橐獙?shí)現(xiàn)順序消費(fèi),所以必須注冊(cè)MessageListenerOrderly
consumer.registerMessageListener(new OrderMessageListener());
//啟動(dòng)消費(fèi)端
consumer.start();
System.err.println("Order Message Consumer Start...");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
要保證消息的順序性,就需要保證同一個(gè) MessageQueue 只能被同一個(gè) Consumer 消費(fèi)。簡(jiǎn)單來(lái)說(shuō)就是通過(guò)對(duì) MessageQueueLock 進(jìn)行加鎖,這樣就保證只有一個(gè)線程在處理當(dāng)前 MessageQueue。感興趣的同學(xué)可以深入研究下。
總結(jié)
在項(xiàng)目中引入 MQ 解決了我們系統(tǒng)之間的耦合度過(guò)高的問(wèn)題、提高系統(tǒng)的靈活性和峰值處理能力。但同時(shí)也帶來(lái)了一些問(wèn)題:消息丟失、重復(fù)消息和消息積壓。
消息丟失可分三個(gè)階段進(jìn)行分析:
- 生產(chǎn)階段:采用同步發(fā)送,通過(guò)正確處理返回值或者捕獲異常,保證消息可靠性;采用異步發(fā)送則需要在回調(diào)方法里進(jìn)行檢查。
- 存儲(chǔ)階段:存儲(chǔ)端的可靠性依靠持久化策略、備份(主從復(fù)制)保證。
- 消費(fèi)階段:消費(fèi)失敗可以依靠重試策略保證可靠性。
對(duì)于重復(fù)消息,我們最后也給出一個(gè)終極方案:「基于消息冪等表的非事務(wù)方案」。不依賴(lài)事務(wù)而實(shí)現(xiàn)消息的去重,那么方案就能推廣到更復(fù)雜的場(chǎng)景例如:RPC、跨庫(kù)等。
而消息積壓,絕大部分問(wèn)題出現(xiàn)在消費(fèi)端,我們可以通過(guò)水平擴(kuò)容增加 Consumer 的實(shí)例數(shù)量來(lái)解決,需要注意的是,增加并發(fā)需要同步擴(kuò)容分區(qū)數(shù)量,否則是起不到效果的。
最后介紹了順序消費(fèi),RocketMQ 采用了局部順序一致性的機(jī)制,實(shí)現(xiàn)了單個(gè)隊(duì)列中的消息嚴(yán)格有序。