宅男在线永久免费观看网直播,亚洲欧洲日产国码无码久久99,野花社区在线观看视频,亚洲人交乣女bbw,一本一本久久a久久精品综合不卡

全部
常見(jiàn)問(wèn)題
產(chǎn)品動(dòng)態(tài)
精選推薦

消息隊(duì)列常見(jiàn)問(wèn)題整理

管理 管理 編輯 刪除

前言

消息隊(duì)列(Message Queue),從廣義上講是一種消息隊(duì)列服務(wù)中間件,提供一套完整的信息生產(chǎn)、傳遞、消費(fèi)的軟件系統(tǒng)。

消息隊(duì)列所涵蓋的功能遠(yuǎn)不止于隊(duì)列(Queue),其本質(zhì)是兩個(gè)進(jìn)程傳遞信息的一種方法。兩個(gè)進(jìn)程可以分布在同一臺(tái)機(jī)器上,亦可以分布在不同的機(jī)器上。

眾所周知,進(jìn)程通信可以通過(guò) RPC(Remote Procedure Call,遠(yuǎn)程過(guò)程調(diào)用)進(jìn)行,那么我們?yōu)槭裁匆孟㈥?duì)列這種軟件服務(wù)來(lái)傳遞消息呢?

我們通過(guò)一個(gè)快遞員送快遞的栗子來(lái)描述下消息隊(duì)列的作用。

消息隊(duì)列

現(xiàn)實(shí)世界的例子

小明是一名快遞員,通常給一個(gè)客戶(hù)送快遞分為三步:

  1. 第一步:把快遞運(yùn)到客戶(hù)家門(mén)口;
  2. 第二步:敲門(mén);
  3. 第三步:客戶(hù)開(kāi)門(mén)取走快遞。

好了上邊是送快遞最簡(jiǎn)單的三步,讓我們想想,這簡(jiǎn)單的三步會(huì)有什么問(wèn)題?

(1)耦合

小明什么時(shí)候完成這一單,完全依賴(lài)于客戶(hù)的響應(yīng)速度。

如果客戶(hù)還沒(méi)起床,聽(tīng)見(jiàn)敲門(mén)聲再穿衣服開(kāi)門(mén),可能消耗很多時(shí)間。如果客戶(hù)沒(méi)在家呢?那就要配送失敗了,如何判斷配送失敗呢?小明需要判斷等多久開(kāi)門(mén)(超時(shí)時(shí)間),打電話判斷是否在家(健康檢查),最終郁悶的離開(kāi),下次再來(lái)一次(重試)。

小明直接與客戶(hù)交互,對(duì)客戶(hù)的狀態(tài)強(qiáng)依賴(lài),產(chǎn)生了耦合現(xiàn)象。

(2)同步影響性能

小明的配送速度受到客戶(hù)的響應(yīng)速度影響極大,有一兩個(gè)需要長(zhǎng)時(shí)間等待的快件,小明的配送效率(吞吐率)會(huì)受到很大影響。

(3)高峰期負(fù)載很高

每次到雙11、618 購(gòu)物節(jié)的時(shí)候,小明都很煩躁??爝f太多,來(lái)的比送得快,這可如何是好。一旦有客戶(hù)因?yàn)槁?lián)系不上影響了配送效率,就會(huì)影響后面客戶(hù)的配送,嚴(yán)重了還會(huì)收到投訴。

這個(gè)時(shí)候有個(gè)叫X巢的快遞柜出現(xiàn)了,小明可以把快遞放到柜子里,發(fā)條短信通知客戶(hù)過(guò)來(lái)取快遞。這樣就不強(qiáng)依賴(lài)客戶(hù)的響應(yīng),大大提高了配送效率。

這里的快遞柜就相當(dāng)于是編程世界的消息隊(duì)列,讓我們看看消息隊(duì)列到底起到了什么作用。

消息隊(duì)列解決什么問(wèn)題

  • 解耦:此時(shí),小明只需要把快遞放到柜子里,不需要關(guān)心客戶(hù)是否在家,是否在睡覺(jué)??蛻?hù)也不需要一直等待給小明開(kāi)門(mén),兩個(gè)人解耦了。
  • 異步:小明把快遞放到柜子里發(fā)個(gè)信息就可以去送下一件,不需同步等待結(jié)果。這樣每個(gè)快遞的處理速度(響應(yīng)時(shí)間)都變得極短,每天送的快遞數(shù)量(吞吐量)也變多了。
  • 削峰:這次又到了雙十一,以前小明一天只能配送 100 個(gè)快遞,現(xiàn)在有了快遞柜,配送量(吞吐量和響應(yīng)速度)增加了好幾倍,甚至數(shù)十倍,大大提升了小明的工作效率。這下小明再也不擔(dān)心接到投訴了。

總結(jié)

讓我們簡(jiǎn)單總結(jié)一下消息隊(duì)列的作用,首先需要肯定的是使用消息組件有很多好處,其中最核心的三個(gè)是:解耦、異步、削峰。

  • 解耦:生產(chǎn)端和消費(fèi)端不需要相互依賴(lài);
  • 異步:生產(chǎn)端不需要等待消費(fèi)端響應(yīng),直接返回,提高了響應(yīng)時(shí)間和吞吐量;
  • 削峰:打平高峰期的流量,消費(fèi)端可以以自己的速度處理,同時(shí)也無(wú)需在高峰期增加太多資源,提高資源利用率。

引入消息隊(duì)列后讓我們子系統(tǒng)間耦合性降低了,異步處理機(jī)制減少了系統(tǒng)的響應(yīng)時(shí)間,同時(shí)能夠有效的應(yīng)對(duì)請(qǐng)求峰值問(wèn)題,提升系統(tǒng)的穩(wěn)定性。但同時(shí)引入消息隊(duì)列也會(huì)帶來(lái)一些問(wèn)題。

下面我們以 RocketMQ 為例來(lái)分析引入 MQ 帶來(lái)的問(wèn)題以及解決方案。

MQ 常見(jiàn)問(wèn)題分析

消息丟失

消息丟失可以說(shuō)是 MQ 中普遍存在的問(wèn)題,不管用哪種 MQ 都無(wú)法避免。

那么有哪些場(chǎng)景會(huì)出現(xiàn)消息丟失問(wèn)題呢?

我們下面來(lái)看一下,整個(gè)消息從生產(chǎn)到消費(fèi)的過(guò)程中,哪些地方可能會(huì)導(dǎo)致丟消息,以及應(yīng)該如何避免消息丟失。

一條消息從生產(chǎn)到被消費(fèi),將會(huì)經(jīng)歷三個(gè)階段:

c2e8a202305271633564157.png

  • 生產(chǎn)階段,生產(chǎn)者新建消息,然后通過(guò)網(wǎng)絡(luò)將消息投遞給 MQ 服務(wù)器
  • 存儲(chǔ)階段,消息將會(huì)存儲(chǔ)在服務(wù)器磁盤(pán)中,如果是集群,消息會(huì)在這個(gè)階段被復(fù)制到其他的副本上
  • 消費(fèi)階段, 消費(fèi)者將會(huì)從 MQ 服務(wù)器拉取消息

以上任一階段都可能會(huì)丟失消息:

  • 生產(chǎn)階段:生產(chǎn)者發(fā)送消息時(shí),由于網(wǎng)絡(luò)原因,發(fā)送到 MQ 失敗了。
  • 存儲(chǔ)階段:MQ 服務(wù)器持久化時(shí),服務(wù)器宕機(jī)、重啟導(dǎo)致丟失消息。
  • 消費(fèi)階段:消息消費(fèi)者剛讀取消息,已經(jīng) ack 確認(rèn)了,但業(yè)務(wù)還沒(méi)處理完,服務(wù)就被重啟了。

消息丟失解決方案

生產(chǎn)階段

RocketMQ 提供了 3 種發(fā)送消息方式,分別是:

  • 同步發(fā)送:生產(chǎn)者向 MQ 發(fā)送消息,阻塞當(dāng)前線程等待 MQ 服務(wù)器響應(yīng)發(fā)送結(jié)果。
  • 異步發(fā)送:生產(chǎn)者首先構(gòu)建一個(gè)向服務(wù)器發(fā)送消息的任務(wù),把該任務(wù)提交給線程池,等執(zhí)行完該任務(wù)時(shí),回調(diào)用戶(hù)自定義的回調(diào)函數(shù),執(zhí)行處理結(jié)果。
  • Oneway 發(fā)送:生產(chǎn)者發(fā)起消息發(fā)送請(qǐng)求后并不會(huì)等待服務(wù)器的響應(yīng)結(jié)果,也不會(huì)調(diào)用回調(diào)函數(shù),即不關(guān)心消息的最終發(fā)送結(jié)果。

Oneway 相對(duì)前兩種發(fā)送方式來(lái)說(shuō)是一種不可靠的消息發(fā)送方式,因此要保證消息發(fā)送的可靠性,我們只考慮同步和異步的發(fā)送方式。

(1)同步發(fā)送可靠性保證

采用同步阻塞式的發(fā)送,然后同步檢查 MQ 服務(wù)器返回的狀態(tài)來(lái)判斷消息是否持久化成功。如果發(fā)送超時(shí)或者失敗,則會(huì)自動(dòng)重試,如果重試再失敗,就會(huì)以返回值或者異常的方式告知用戶(hù)。

我們?cè)诰帉?xiě)發(fā)送消息代碼時(shí),需要注意,正確處理返回值或者捕獲異常,就可以保證這個(gè)階段的消息不會(huì)丟失。

同步發(fā)送,代碼如下:

public void send() throws Exception {
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");
    SendResult sendResult = null;

    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    try {
        sendResult = producer.send(sendMessage);
    } catch (Exception e) {
        e.printStackTrace();
    }
    if (sendResult != null) {
        System.out.println(sendResult.getSendStatus());
    }
}

同步發(fā)送會(huì)返回 4 個(gè)狀態(tài)碼:

SEND_OK:消息發(fā)送成功。

  • 需要注意的是,消息發(fā)送到服務(wù)器后,還有兩個(gè)操作:「消息刷盤(pán)」和「消息同步到 slave 節(jié)點(diǎn)」,默認(rèn)這兩個(gè)操作都是異步的,只有把這兩個(gè)操作都改為同步,SEND_OK 這個(gè)狀態(tài)才能真正表示發(fā)送成功。
  • FLUSH_DISK_TIMEOUT:消息發(fā)送成功但是消息刷盤(pán)超時(shí)。
  • FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功但是消息同步到 slave 節(jié)點(diǎn)時(shí)超時(shí)。
  • SLAVE_NOT_AVAILABLE:消息發(fā)送成功但是 broker 的 slave 節(jié)點(diǎn)不可用。

根據(jù)返回的狀態(tài)碼,可以做消息重試,這里設(shè)置的重試次數(shù)是 3。

消息重試時(shí),消費(fèi)端一定要做好冪等處理。

既然是同步發(fā)送肯定就比較耗費(fèi)一些時(shí)間,如果你的業(yè)務(wù)比較注重 RT 那就可以使用異步發(fā)送的方式。

(2)異步發(fā)送可靠性保證

異步發(fā)送時(shí),則需要在回調(diào)方法里進(jìn)行檢查。這個(gè)地方是需要特別注意的,很多丟消息的原因就是,我們使用了異步發(fā)送,卻沒(méi)有在回調(diào)中檢查發(fā)送結(jié)果。

具體的業(yè)務(wù)實(shí)現(xiàn)可以根據(jù)發(fā)送的結(jié)果信息來(lái)判斷是否需要重試來(lái)保證消息的可靠性。

異步發(fā)送,代碼如下:

public void sendAsync() throws Exception {
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");

    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    producer.send(sendMessage, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            
        }

        @Override
        public void onException(Throwable e) {
            // TODO 可以在這里加入重試邏輯
        }
    });
}

異步發(fā)送,可以重寫(xiě)回調(diào)函數(shù),回調(diào)函數(shù)捕獲到 Exception 時(shí)表示發(fā)送失敗,這時(shí)可以進(jìn)行重試,這里設(shè)置的重試次數(shù)是 3。

存儲(chǔ)階段

默認(rèn)的情況下,消息隊(duì)列為了快速響應(yīng),在接受到生產(chǎn)者的請(qǐng)求,將消息保存在內(nèi)存成功之后,就會(huì)立刻返回 ACK 響應(yīng)給生產(chǎn)者。

RocketMQ 的刷盤(pán)方式分為「同步刷盤(pán)」和「異步刷盤(pán)」兩種。

  • 異步刷盤(pán):消息寫(xiě)入 CommitLog 時(shí),并不會(huì)直接寫(xiě)入磁盤(pán),而是先寫(xiě)入 PageCache 緩存后返回成功,然后用后臺(tái)線程異步把消息刷入磁盤(pán)。異步刷盤(pán)提高了消息吞吐量,但是可能會(huì)有消息丟失的情況,比如斷點(diǎn)導(dǎo)致機(jī)器停機(jī),PageCache 中沒(méi)來(lái)得及刷盤(pán)的消息就會(huì)丟失。

同步刷盤(pán):消息寫(xiě)入內(nèi)存的 PageCache 后,立刻通知刷盤(pán)線程刷盤(pán),然后等待刷盤(pán)完成,如果消息未在約定的時(shí)間內(nèi)(默認(rèn) 5 s)刷盤(pán)成功,就返回FLUSH_DISK_TIMEOUT,Producer 收到這個(gè)響應(yīng)后,可以進(jìn)行重試。同步刷盤(pán)策略保證了消息的可靠性,同時(shí)降低了吞吐量,增加了延遲。

  • 要開(kāi)啟同步刷盤(pán),需要增加下面配置:flushDiskType=SYNC_FLUSH

RocketMQ 默認(rèn)的是異步刷盤(pán),就有可能導(dǎo)致消息還未刷到硬盤(pán)上就丟失了,可以通過(guò)設(shè)置為同步刷盤(pán)的方式來(lái)保證消息可靠性,這樣即使 MQ 掛了,恢復(fù)的時(shí)候也可以從磁盤(pán)中去恢復(fù)消息。

如果是 Broker 是由多個(gè)節(jié)點(diǎn)組成的集群,需要將 Broker 集群配置成:至少將消息發(fā)送到 2 個(gè)以上的節(jié)點(diǎn),再給客戶(hù)端回復(fù)發(fā)送確認(rèn)響應(yīng)。 這樣當(dāng)某個(gè) Broker 宕機(jī)時(shí),其他的 Broker 可以替代宕機(jī)的 Broker,也不會(huì)發(fā)生消息丟失。

Broker 采用集群配置時(shí),需要注意的一點(diǎn)是:消息發(fā)送到 master 節(jié)點(diǎn)后,slave 節(jié)點(diǎn)會(huì)從 master 拉取消息保持跟 master 的一致。這個(gè)過(guò)程默認(rèn)是異步的,即 master 收到消息后,不等 slave 節(jié)點(diǎn)復(fù)制消息就直接給 Producer 返回成功。

這樣會(huì)有一個(gè)問(wèn)題,如果 slave 節(jié)點(diǎn)還沒(méi)有完成消息復(fù)制,這時(shí) master 宕機(jī)了,進(jìn)行主備切換后就會(huì)有消息丟失。

為了避免這個(gè)問(wèn)題,可以采用 slave 節(jié)點(diǎn)同步復(fù)制消息,即等 slave 節(jié)點(diǎn)復(fù)制消息成功后再給 Producer 返回發(fā)送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER。

消費(fèi)階段

消費(fèi)階段采用和生產(chǎn)階段類(lèi)似的確認(rèn)機(jī)制來(lái)保證消息的可靠傳遞,客戶(hù)端從 Broker 拉取消息后,執(zhí)行用戶(hù)的消費(fèi)業(yè)務(wù)邏輯:

  • 如果 Consumer 消費(fèi)成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息
  • 如果 Consumer 消費(fèi)失敗,下次拉消息的時(shí)候還會(huì)返回同一條消息,即進(jìn)行消費(fèi)重試。
消費(fèi)重試

RocketMQ 認(rèn)為消息消費(fèi)失敗需要重試的場(chǎng)景有三種:

  • 返回 RECONSUME_LATER
  • 返回 null
  • 拋出異常

Broker 收到這個(gè)響應(yīng)后,會(huì)把這條消息放入重試隊(duì)列,Topic 名字為%RETRY% + consumerGroup

注意:

Broker 默認(rèn)最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入「死信隊(duì)列」,Consumer 可以訂閱死信隊(duì)列進(jìn)行消費(fèi)。

重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。

Consumer 端一定要做好冪等處理。

其實(shí)重試 3 次都失敗就可以說(shuō)明代碼有問(wèn)題,這時(shí) Consumer 可以把消息存入本地,給 Broker 返回 CONSUME_SUCCESS 來(lái)結(jié)束重試。

死信隊(duì)列:未能成功消費(fèi)的消息,消息隊(duì)列并不會(huì)立刻將消息丟棄,而是將消息發(fā)送到死信隊(duì)列,其名稱(chēng)是在原隊(duì)列名稱(chēng)前加 %DLQ%,如果消息最終進(jìn)入了死信隊(duì)列,則可以通過(guò) RocketMQ 提供的相關(guān)接口從死信隊(duì)列獲取到相應(yīng)的消息,保證了消息消費(fèi)的可靠性。

上面方案看似萬(wàn)無(wú)一失,每個(gè)階段都能保證消息的不丟失,但在分布式系統(tǒng)中,故障不可避免,作為消息生產(chǎn)端,你并不能保證 MQ 是不是弄丟了你的消息,消費(fèi)者是否消費(fèi)了你的消息,所以,本著 Design for Failure 的設(shè)計(jì)原則,我們需要一種機(jī)制,來(lái) Check 消息是否丟失了。

檢測(cè)消息丟失的方法

總體方案解決思路為:在消息生產(chǎn)端,給每個(gè)發(fā)出的消息都指定一個(gè)全局唯一 ID,或者附加一個(gè)連續(xù)遞增的版本號(hào),然后在消費(fèi)端做對(duì)應(yīng)的版本校驗(yàn)。

可以利用攔截器機(jī)制。在生產(chǎn)端發(fā)送消息之前,通過(guò)攔截器將消息版本號(hào)注入消息中(版本號(hào)可以采用連續(xù)遞增的 ID 生成,也可以通過(guò)分布式全局唯一 ID生成)。然后在消費(fèi)端收到消息后,再通過(guò)攔截器檢測(cè)版本號(hào)的連續(xù)性或消費(fèi)狀態(tài),這樣實(shí)現(xiàn)的好處是消息檢測(cè)的代碼不會(huì)侵入到業(yè)務(wù)代碼中,可以通過(guò)單獨(dú)的任務(wù)來(lái)定位丟失的消息,做進(jìn)一步的排查。

如果同時(shí)存在多個(gè)消息生產(chǎn)端和消息消費(fèi)端,通過(guò)版本號(hào)遞增的方式就很難實(shí)現(xiàn)了,因?yàn)椴荒鼙WC版本號(hào)的唯一性,此時(shí)只能通過(guò)全局唯一 ID 的方案來(lái)進(jìn)行消息檢測(cè),具體的實(shí)現(xiàn)原理和版本號(hào)遞增的方式一致。

重復(fù)消息

RocketMQ 為了保證消息的可靠性,選擇 「至少傳輸成功一次」 的消息模型。

在消息領(lǐng)域有一個(gè)對(duì)消息投遞的 QoS 定義,分為:

  • 最多一次(At most once)
  • 至少一次(At least once)
  • 僅一次( Exactly once)

既然是至少一次,那避免不了消息重復(fù),尤其是在分布式網(wǎng)絡(luò)環(huán)境下。比如:網(wǎng)絡(luò)原因閃斷,ACK 返回失敗等等故障,確認(rèn)信息沒(méi)有傳送到消息隊(duì)列,導(dǎo)致消息隊(duì)列不知道該消息已經(jīng)被消費(fèi)了,再次將該消息分發(fā)給其他的消費(fèi)者。

那么如何解決這個(gè)問(wèn)題?

這個(gè)問(wèn)題其實(shí)可以換一種說(shuō)法,就是如何解決消費(fèi)端冪等性問(wèn)題(冪等性,就是一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同),只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問(wèn)題也就解決了。

那如何保證消息隊(duì)列消費(fèi)的冪等性? 我們還是得結(jié)合業(yè)務(wù)來(lái)思考,這里給幾個(gè)思路:

  • 利用數(shù)據(jù)庫(kù)的唯一約束實(shí)現(xiàn):比如收到數(shù)據(jù)時(shí)要寫(xiě)庫(kù),通過(guò)創(chuàng)建唯一索引的方式保證冪等。
  • 為更新的數(shù)據(jù)設(shè)置前置條件:給數(shù)據(jù)變更設(shè)置一個(gè)前置條件,如果滿(mǎn)足條件就更新數(shù)據(jù),否則拒絕更新數(shù)據(jù)。 比如可以通過(guò)判斷狀態(tài)是否允許操作,不滿(mǎn)足的拒絕更新。

如果上面提到的兩種實(shí)現(xiàn)冪等方法都不能適用于你的場(chǎng)景,我們還有一種通用性最強(qiáng),適用范圍最廣的實(shí)現(xiàn)冪等性方法。

更通用的解決方案

終極方法:「基于消息冪等表的非事務(wù)方案」,實(shí)現(xiàn)的思路特別簡(jiǎn)單:在執(zhí)行業(yè)務(wù)代碼之前,先檢查一下是否處理過(guò)這個(gè)條消息

具體的實(shí)現(xiàn)方法是:

首先,在數(shù)據(jù)庫(kù)中建一張消息日志表,這個(gè)表有兩個(gè)字段:「消息 ID」和「消息執(zhí)行狀態(tài)(消費(fèi)中、已消費(fèi))」。

然后給消息 ID 來(lái)創(chuàng)建一個(gè)唯一約束,這樣對(duì)于相同的消息 ID,表里至多只能存在一條記錄。

在發(fā)送消息時(shí),給每條消息指定一個(gè)全局唯一的 ID,消費(fèi)時(shí),先根據(jù)這個(gè) ID 檢查這條消息是否有被消費(fèi)過(guò),如果沒(méi)有消費(fèi)過(guò),才執(zhí)行業(yè)務(wù)代碼,然后將消費(fèi)狀態(tài)置為已消費(fèi)。

544ed202305271707048457.png

可以看到,此方案是無(wú)事務(wù)的,而是針對(duì)消息表本身做了狀態(tài)的區(qū)分:消費(fèi)中、消費(fèi)完成。只有消費(fèi)完成的消息才會(huì)被冪等處理掉。

而對(duì)于已有消費(fèi)中的消息,后面重復(fù)的消息會(huì)觸發(fā)延遲消費(fèi),這樣主要是為了控制并發(fā)場(chǎng)景下,第二條消息在第一條消息沒(méi)完成的過(guò)程中,去控制消息不丟(如果直接冪等,那么會(huì)丟失消息(同一個(gè)消息id的話),因?yàn)樯弦粭l消息如果沒(méi)有消費(fèi)完成的時(shí)候,第二條消息你已經(jīng)告訴broker成功了,那么第一條消息這時(shí)候失敗broker也不會(huì)重新投遞了)。

我們分析下這種方案是否解決了冪等問(wèn)題:

  1. 消息已經(jīng)消費(fèi)成功了,第二條消息將被直接冪等處理掉(消費(fèi)成功)
  2. 并發(fā)場(chǎng)景下的消息,依舊能滿(mǎn)足不會(huì)出現(xiàn)消息重復(fù)
  3. 支持上游業(yè)務(wù)生產(chǎn)者重發(fā)的業(yè)務(wù)重復(fù)的消息冪等問(wèn)題

第一個(gè)問(wèn)題明顯解決了。

第二個(gè)問(wèn)題也已經(jīng)解決,主要是依靠插入消息表的這個(gè)動(dòng)作做控制的,因?yàn)椤赶?ID」的惟一的,后面的消息插入會(huì)由于主鍵沖突而失敗,走向延遲消費(fèi)的分支,然后后面延遲消費(fèi)的時(shí)候就會(huì)變成上面第一個(gè)場(chǎng)景的問(wèn)題。

關(guān)于第三個(gè)問(wèn)題,只要我們?cè)O(shè)計(jì)去重的消息鍵讓其支持業(yè)務(wù)的主鍵(例如訂單號(hào)、請(qǐng)求流水號(hào)等),而不僅僅是 messageId 即可。所以也不是問(wèn)題。

此方案是否有消息丟失的風(fēng)險(xiǎn)?

細(xì)心的讀者可能會(huì)發(fā)現(xiàn)這里實(shí)際上是有邏輯漏洞的,問(wèn)題出在上面聊到的三個(gè)問(wèn)題中的第 2 個(gè)問(wèn)題「并發(fā)場(chǎng)景」。

在并發(fā)場(chǎng)景下我們依賴(lài)于消息狀態(tài)做并發(fā)控制,使得第二條重復(fù)的消息會(huì)不斷延遲消費(fèi)(重試)。

但如果這時(shí)候第一條消息也由于一些異常原因(例如機(jī)器重啟了、外部異常導(dǎo)致消費(fèi)失?。](méi)有成功消費(fèi)成功呢?

也就是說(shuō)這時(shí)候延遲消費(fèi)實(shí)際上每次下來(lái)看到的都是「消費(fèi)中」的狀態(tài),最后消費(fèi)就會(huì)被視為消費(fèi)失敗而被投遞到死信隊(duì)列中。

對(duì)于此,我們解決的方法是,插入的消息表必須要帶一個(gè)最長(zhǎng)消費(fèi)過(guò)期時(shí)間,例如 10 分鐘,意思是如果一個(gè)消息處于消費(fèi)中超過(guò) 10 分鐘,就需要從消息表中刪除(需要程序自行實(shí)現(xiàn))。

所以最后這個(gè)消息的流程會(huì)是這樣的:

fa417202305271708553882.png

消息積壓

如果出現(xiàn)積壓,那一定是性能問(wèn)題,想要解決消息從生產(chǎn)到消費(fèi)上的性能問(wèn)題,就首先要知道哪些環(huán)節(jié)可能出現(xiàn)消息積壓,然后在考慮如何解決。

因?yàn)橄l(fā)送之后才會(huì)出現(xiàn)積壓的問(wèn)題,所以和消息生產(chǎn)端沒(méi)有關(guān)系,又因?yàn)榻^大部分的消息隊(duì)列單節(jié)點(diǎn)都能達(dá)到每秒鐘幾萬(wàn)的處理能力,相對(duì)于業(yè)務(wù)邏輯來(lái)說(shuō),性能不會(huì)出現(xiàn)在中間件的消息存儲(chǔ)上面。

毫無(wú)疑問(wèn),出問(wèn)題的肯定是消息消費(fèi)階段。

如果是線上突發(fā)問(wèn)題,要臨時(shí)擴(kuò)容,增加消費(fèi)端的數(shù)量,與此同時(shí),降級(jí)一些非核心的業(yè)務(wù)。通過(guò)擴(kuò)容和降級(jí)承擔(dān)流量。

其次,才是排查解決異常問(wèn)題,如通過(guò)監(jiān)控,日志等手段分析是否消費(fèi)端的業(yè)務(wù)邏輯代碼出現(xiàn)了問(wèn)題,優(yōu)化消費(fèi)端的業(yè)務(wù)處理邏輯。

最后,如果是消費(fèi)端的處理能力不足,可以通過(guò)水平擴(kuò)容來(lái)提供消費(fèi)端的并發(fā)處理能力。

在擴(kuò)容消費(fèi)者的是時(shí)候有一點(diǎn)需要注意,如果當(dāng)前 Topic 的 Message Queue 的數(shù)量大于消費(fèi)者數(shù)量,就可以對(duì)消費(fèi)者進(jìn)行擴(kuò)容,增加消費(fèi)者,來(lái)提高消費(fèi)能力,盡快把積壓的消息消費(fèi)完。如果消費(fèi)者的數(shù)量大于等于 Message Queue 的數(shù)量,增加消費(fèi)者是沒(méi)有用的。

順序消費(fèi)

我們知道,RocketMQ 在主題上是無(wú)序的。但是在有些場(chǎng)景下,使用 MQ 需要保證消息的順序性,比如在電商系統(tǒng)中:下單、付款、發(fā)貨、買(mǎi)家確認(rèn)收貨,消費(fèi)端需要嚴(yán)格按照業(yè)務(wù)狀態(tài)機(jī)的順序處理,否則,就會(huì)出現(xiàn)業(yè)務(wù)問(wèn)題。

我們發(fā)現(xiàn),消息帶上了狀態(tài),不再是一個(gè)個(gè)獨(dú)立的個(gè)體,有了上下文依賴(lài)關(guān)系!

那么 MQ 是如何來(lái)保證消息順序的?

我們通常發(fā)送消息的時(shí)候,消息發(fā)送默認(rèn)是會(huì)采用輪詢(xún)的方式發(fā)送到不同的 queue。

而消費(fèi)端消費(fèi)的時(shí)候,是會(huì)分配到多個(gè) queue 的,多個(gè) queue 是同時(shí)拉取提交消費(fèi)。

但是同一條 queue 里面,RocketMQ 的確是能保證 FIFO 的。那么要做到順序消息,應(yīng)該怎么實(shí)現(xiàn)呢——把消息確保投遞到同一條 queue。

對(duì)于 RocketMQ 來(lái)說(shuō),主要是通過(guò) Producer 和 Consumer 來(lái)保證消息順序的。

生產(chǎn)端

生產(chǎn)端提供了一個(gè)接口 MessageQueueSelector

public interface MessageQueueSelector {
   MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

接口內(nèi)定義一個(gè) select 方法,具體參數(shù)含義:

  • mqs:該 Topic 下所有的隊(duì)列分片
  • msg:待發(fā)送的消息
  • arg:發(fā)送消息時(shí)傳遞的參數(shù)

示例代碼

模擬訂單消息的發(fā)送,共有 3 個(gè)訂單,每個(gè)訂單都包含下單、付款、發(fā)貨、買(mǎi)家確認(rèn)收貨四個(gè)流程,對(duì)應(yīng) 4 條消息。同一個(gè)訂單的消息要求嚴(yán)格按照順序消費(fèi),不同訂單的消息可以并發(fā)執(zhí)行。

首先實(shí)現(xiàn) MessageQueueSelector 接口,定制 MessageQueue 選擇策略:

public class OrderMessageQueueSelector implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        //選擇以參數(shù)arg為索引的MessageQueue
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}

下面實(shí)現(xiàn)發(fā)送消息邏輯:

@Slf4j
@Service
public class OrderMessageProducer {
    @Value("${spring.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private static final DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");

    private static final String[] ORDER_MESSAGES = {"下單", "付款", "發(fā)貨", "買(mǎi)家確認(rèn)收貨"};

    @PostConstruct
    public void sendMessage() {
        try {
            //設(shè)置namesrv
            producer.setNamesrvAddr(namesrvAddr);

            //啟動(dòng)Producer
            producer.start();

            System.err.println("Order Message Producer Start...");

            //創(chuàng)建3組消息,每組消息發(fā)往同一個(gè)Queue,保證消息的局部有序性
            String tags = "Tags";

            OrderMessageQueueSelector orderMessageQueueSelector = new OrderMessageQueueSelector();

            //注:要實(shí)現(xiàn)順序消費(fèi),必須同步發(fā)送消息
            for (int i = 0; i < 3; i++) {
                String orderId = "" + (i + 1);
                for (int j = 0, size = ORDER_MESSAGES.length; j < size; j++) {
                    String message = "Order-" + orderId + "-" + ORDER_MESSAGES[j];
                    String keys = message;
                    byte[] messageBody = message.getBytes(RemotingHelper.DEFAULT_CHARSET);
                    Message mqMsg = new Message("TEST_TOPIC_NAME", tags, keys, messageBody);
                    producer.send(mqMsg, orderMessageQueueSelector, i);
                }
            }
        } catch (Exception e) {
            log.error("Message Producer: Send Message Error ", e);
        }
    }
}

使用 DefaultMQProducersend() 方法,指定 MessageQueueSelector 和參數(shù),Broker 將會(huì)將邏輯上需要保證順序性的消息發(fā)往同一隊(duì)列。

注意:上面的代碼把 orderId 相同的消息都會(huì)發(fā)送到同一個(gè) MessageQueue,這樣同一個(gè) orderId 的消息是有序的,這也叫做局部有序。對(duì)應(yīng)的另一種是全局有序,這需要把所有的消息都發(fā)到同一個(gè) MessageQueue。

注:想要實(shí)現(xiàn)順序消費(fèi),發(fā)送方式必須為同步發(fā)送,異步發(fā)送無(wú)法保證消息的發(fā)送順序!

這樣同一批我們需要做到順序消費(fèi)訂單肯定會(huì)投遞到同一個(gè)隊(duì)列,同一個(gè)隊(duì)列肯定會(huì)投遞到同一個(gè)消費(fèi)實(shí)例,同一個(gè)消費(fèi)實(shí)例肯定是順序拉取并順序提交線程池的,只要保證消費(fèi)端順序消費(fèi),則大功告成!

消費(fèi)端

消費(fèi)端想要實(shí)現(xiàn)順序消費(fèi),只要設(shè)置監(jiān)聽(tīng)器實(shí)現(xiàn) MessageListenerOrderly 接口即可。

示例代碼

首先自定義 MessageListenerOrderly 接口實(shí)現(xiàn)類(lèi),實(shí)現(xiàn)順序消費(fèi):

public class OrderMessageListener implements MessageListenerOrderly {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        if (CollectionUtils.isEmpty(msgs)){
            return ConsumeOrderlyStatus.SUCCESS;
        }
        //設(shè)置自動(dòng)提交
        context.setAutoCommit(true);
        msgs.stream()
                .forEach(msg -> {
                    try {
                        String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.err.println("Handle Order Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: "
                                + msg.getTags() + ",keys: " + msg.getKeys() + ",messageBody: " + messageBody);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
        return ConsumeOrderlyStatus.SUCCESS;
    }
}

下面就是消費(fèi)邏輯:

@Service
public class OrderMessageConsumer {
    @Value("${spring.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");

    @PostConstruct
    public void start() {
        try {
            //設(shè)置namesrv地址
            consumer.setNamesrvAddr(namesrvAddr);

            //從消息隊(duì)列頭部開(kāi)始消費(fèi)
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            //集群消費(fèi)模式
            consumer.setMessageModel(MessageModel.CLUSTERING);

            //訂閱主題
            consumer.subscribe("TEST_TOPIC_NAME", "*");

            //注冊(cè)消息監(jiān)聽(tīng)器,這里因?yàn)橐獙?shí)現(xiàn)順序消費(fèi),所以必須注冊(cè)MessageListenerOrderly
            consumer.registerMessageListener(new OrderMessageListener());

            //啟動(dòng)消費(fèi)端
            consumer.start();

            System.err.println("Order Message Consumer Start...");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }
}

要保證消息的順序性,就需要保證同一個(gè) MessageQueue 只能被同一個(gè) Consumer 消費(fèi)。簡(jiǎn)單來(lái)說(shuō)就是通過(guò)對(duì) MessageQueueLock 進(jìn)行加鎖,這樣就保證只有一個(gè)線程在處理當(dāng)前 MessageQueue。感興趣的同學(xué)可以深入研究下。

總結(jié)

在項(xiàng)目中引入 MQ 解決了我們系統(tǒng)之間的耦合度過(guò)高的問(wèn)題、提高系統(tǒng)的靈活性和峰值處理能力。但同時(shí)也帶來(lái)了一些問(wèn)題:消息丟失、重復(fù)消息和消息積壓。

消息丟失可分三個(gè)階段進(jìn)行分析:

  • 生產(chǎn)階段:采用同步發(fā)送,通過(guò)正確處理返回值或者捕獲異常,保證消息可靠性;采用異步發(fā)送則需要在回調(diào)方法里進(jìn)行檢查。
  • 存儲(chǔ)階段:存儲(chǔ)端的可靠性依靠持久化策略、備份(主從復(fù)制)保證。
  • 消費(fèi)階段:消費(fèi)失敗可以依靠重試策略保證可靠性。

對(duì)于重復(fù)消息,我們最后也給出一個(gè)終極方案:「基于消息冪等表的非事務(wù)方案」。不依賴(lài)事務(wù)而實(shí)現(xiàn)消息的去重,那么方案就能推廣到更復(fù)雜的場(chǎng)景例如:RPC、跨庫(kù)等。

而消息積壓,絕大部分問(wèn)題出現(xiàn)在消費(fèi)端,我們可以通過(guò)水平擴(kuò)容增加 Consumer 的實(shí)例數(shù)量來(lái)解決,需要注意的是,增加并發(fā)需要同步擴(kuò)容分區(qū)數(shù)量,否則是起不到效果的。

最后介紹了順序消費(fèi),RocketMQ 采用了局部順序一致性的機(jī)制,實(shí)現(xiàn)了單個(gè)隊(duì)列中的消息嚴(yán)格有序。


請(qǐng)登錄后查看

CRMEB-慕白寒窗雪 最后編輯于2023-05-27 17:14:09

快捷回復(fù)
回復(fù)
回復(fù)
回復(fù)({{post_count}}) {{!is_user ? '我的回復(fù)' :'全部回復(fù)'}}
排序 默認(rèn)正序 回復(fù)倒序 點(diǎn)贊倒序

{{item.user_info.nickname ? item.user_info.nickname : item.user_name}} LV.{{ item.user_info.bbs_level }}

作者 管理員 企業(yè)

{{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest == 1? '取消推薦': '推薦'}}
{{item.is_suggest == 1? '取消推薦': '推薦'}}
沙發(fā) 板凳 地板 {{item.floor}}#
{{item.user_info.title || '暫無(wú)簡(jiǎn)介'}}
附件

{{itemf.name}}

{{item.created_at}}  {{item.ip_address}}
打賞
已打賞¥{{item.reward_price}}
{{item.like_count}}
{{item.showReply ? '取消回復(fù)' : '回復(fù)'}}
刪除
回復(fù)
回復(fù)

{{itemc.user_info.nickname}}

{{itemc.user_name}}

回復(fù) {{itemc.comment_user_info.nickname}}

附件

{{itemf.name}}

{{itemc.created_at}}
打賞
已打賞¥{{itemc.reward_price}}
{{itemc.like_count}}
{{itemc.showReply ? '取消回復(fù)' : '回復(fù)'}}
刪除
回復(fù)
回復(fù)
查看更多
打賞
已打賞¥{{reward_price}}
2892
{{like_count}}
{{collect_count}}
添加回復(fù) ({{post_count}})

相關(guān)推薦

快速安全登錄

使用微信掃碼登錄
{{item.label}} 加精
{{item.label}} {{item.label}} 板塊推薦 常見(jiàn)問(wèn)題 產(chǎn)品動(dòng)態(tài) 精選推薦 首頁(yè)頭條 首頁(yè)動(dòng)態(tài) 首頁(yè)推薦
取 消 確 定
回復(fù)
回復(fù)
問(wèn)題:
問(wèn)題自動(dòng)獲取的帖子內(nèi)容,不準(zhǔn)確時(shí)需要手動(dòng)修改. [獲取答案]
答案:
提交
bug 需求 取 消 確 定
打賞金額
當(dāng)前余額:¥{{rewardUserInfo.reward_price}}
{{item.price}}元
請(qǐng)輸入 0.1-{{reward_max_price}} 范圍內(nèi)的數(shù)值
打賞成功
¥{{price}}
完成 確認(rèn)打賞

微信登錄/注冊(cè)

切換手機(jī)號(hào)登錄

{{ bind_phone ? '綁定手機(jī)' : '手機(jī)登錄'}}

{{codeText}}
切換微信登錄/注冊(cè)
暫不綁定
CRMEB客服

CRMEB咨詢(xún)熱線 咨詢(xún)熱線

400-8888-794

微信掃碼咨詢(xún)

CRMEB開(kāi)源商城下載 源碼下載 CRMEB幫助文檔 幫助文檔
返回頂部 返回頂部
CRMEB客服