宅男在线永久免费观看网直播,亚洲欧洲日产国码无码久久99,野花社区在线观看视频,亚洲人交乣女bbw,一本一本久久a久久精品综合不卡

全部
常見問題
產(chǎn)品動態(tài)
精選推薦

Redis隊列原理解析:讓你的應(yīng)用程序運行更加穩(wěn)定!

管理 管理 編輯 刪除

一、消息隊列簡介

消息隊列(Message Queue),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括 3 個角色:

  • 消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker)
  • 生產(chǎn)者:發(fā)送消息到消息隊列
  • 消費者:從消息隊列獲取消息并處理消息

f3db520231213175325232.png

消息隊列和阻塞隊列的區(qū)別:

① 消息隊列是在 JVM 以外的獨立服務(wù),所以不受 JVM 內(nèi)存的限制

② 消息隊列不僅僅做數(shù)據(jù)存儲,還需要確保數(shù)據(jù)安全,存入到消息隊列中的所有消息都需要做持久化,這樣不管是服務(wù)宕機還是重啟,數(shù)據(jù)都不會丟失。而且消息隊列還會在消息投遞給消費者后,要求消費者做消息確認,如果消費者沒有確認,那么這條消息就會一直存在于消息隊列中,下一次會繼續(xù)投遞給消費者,讓消費者繼續(xù)處理,直到消息被成功處理。

二、Redis 提供的消息隊列

Redis 提供了三種不同的方式來實現(xiàn)消息隊列:

  • list 結(jié)構(gòu):基于 List 結(jié)構(gòu)模擬消息隊列
  • PubSub:基本的點對點消息隊列
  • Stream:比較完善的消息隊列模型

2.1 基于 List 結(jié)構(gòu)模擬消息隊列

消息隊列(Message Queue),字面意思就是存放消息的隊列。而 Redis 的 List 數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表,很容易模擬出隊列效果。

隊列(先進先出)是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP 來實現(xiàn)。

不過要注意的是,當隊列中沒有消息時 RPOP 或 LPOP 操作會返回 null,并不像 JVM 的阻塞隊列那樣會阻塞并等待下消息。因此這里應(yīng)該使用 BRPOP 或者 BLPOP 來實現(xiàn)阻塞效果。

基于 List 的消息隊列有哪些優(yōu)缺點?

優(yōu)點:

① 利用 Redis 存儲,不受限于 JVM 內(nèi)存上限

② 基于 Redis 的持久化機制,數(shù)據(jù)安全性有保證

③ 可以滿足消息有序性

缺點:

① 無法避免消息丟失。假設(shè)某個消費者從消息隊列(List 結(jié)構(gòu))中獲取到一條消息,但還未來得及處理,該消費者出現(xiàn)故障,那么這條消息就會丟失,這是因為 POP 命令是 remove and get,會將消息直接從消息隊列中直接移除,這樣其他消費者就獲取不到。

② 只支持單消費者。消息隊列(List 結(jié)構(gòu))中的消息,一旦被某個消費者取走,就會從隊列中移除,其他消費者就獲取不到了,無法實現(xiàn)一條消息被很多消費者消費的需求。

2.2 基于 PubSub 的消息隊列

PubSub(發(fā)布訂閱)是 Redis2.0 版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個 channel,生產(chǎn)者向?qū)?yīng) channel 發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。

相關(guān)命令如下:

  • SUBSCRIBE channel [channel]:訂閱一個或多個頻道
  • PUBLISH channel msg:向一個頻道發(fā)送消息
  • PSUBSCRIBE pattern [pattern]:訂閱與 pattern 格式匹配的所有頻道

關(guān)于 PubSub 的具體命令使用方法可以參看官網(wǎng):?https://redis.io/commands/?group=pubsub

基于 PubSub 的消息隊列有哪些優(yōu)缺點:

優(yōu)點:

采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費。一條消息可以發(fā)給多個消費者,也可以發(fā)給一個消費者,而且也支持不同生產(chǎn)者往相同頻道發(fā)。

缺點:

① 不支持數(shù)據(jù)持久化。本身不像 List 結(jié)構(gòu)那樣支持數(shù)據(jù)持久化,List 結(jié)構(gòu)本身就是用來存儲數(shù)據(jù)的,而 PubSub 則是用來做消息發(fā)送的。因此,當發(fā)送一條消息時,但卻沒有任何消費者訂閱,那么該條消息就直接消失了。

② 無法避免消息丟失

③ 消息堆積有上限,超出時數(shù)據(jù)丟失。當發(fā)送一條消息時,如果有消費者監(jiān)聽,消費者會將發(fā)送過來的消息緩存至消息緩存區(qū),由消費者進行處理。而消費者的緩存空間是有上限的,如果超出了就會丟失。

2.3 基于 Stream 的消息隊列

Stream 是 Redis5.0 引入的一種新的數(shù)據(jù)類型,可以實現(xiàn)一個功能非常完善的消息隊列。

發(fā)送消息的命令:

1b937202312131756428041.png

最簡用法如下:

c6f6320231213175759232.png

讀取消息的方式之一:XREAD

159f920231213175856456.png

使用 XREAD 讀取消息

50c47202312131801176933.png

XREAD 阻塞方式,讀取最新的消息:

6211c202312131802599576.png

在業(yè)務(wù)開發(fā)中,我們可以循環(huán)的調(diào)用 XREAD 阻塞方式來查詢最新消息,從而實現(xiàn)持續(xù)監(jiān)聽隊列的效果,偽代碼如下:

while(true) {
	// 嘗試讀取隊列中的消息,最多阻塞 2 秒
	Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
	if(msg == null) {
		continue;
	}
	// 處理消息
	handleMessage(msg);
}

注意:當我們指定起始 ID 為 $ 時,代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過 1 條以上的消息到達隊列,則下次獲取時也只能獲取到最新的一條,會出現(xiàn)漏讀消息的問題。

STREAM 類型消息隊列的 XREAD 命令特點:

  • 消息可回溯。消息讀完后不消失,永久保存在隊列中。
  • 一個消息可以被多個消費者讀取
  • 可以阻塞讀取
  • 有消息漏讀的風(fēng)險

2.4 基于 Stream 的消息隊列-消費者組

消費者組(Consumer Group):將多個消費者劃分到一個組中,監(jiān)聽同一個隊列。具備下列特點:

① 消息分流:隊列中的消息會分流給組內(nèi)的不同消費者,而不是重復(fù)消費,從而加快消息處理的速度。

處于一個組內(nèi)的多個消費者實際上是競爭關(guān)系,凡是進入到這個組的消息,組內(nèi)的消費者就會競爭該消息的處理權(quán)。這種方式可以大大提高消息的處理速度,避免消息堆積。如果想要一條消息被多個消費者處理,可以添加多個消費者組。

② 消息標識:消費者組會維護一個標識,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標識之后讀取消息。確保每一個消息都會被消費。

③ 消息確認:消費者獲取消息后,消息處于 pending 狀態(tài),并存入一個 pending-list。當處理完成后需要通過 XACK 來確認消息,標記消息為已處理,才會從 pending-list 移除。

創(chuàng)建消費者組:

8ec60202312131804259918.png

  • key:隊列名稱
  • groupName:消費者組名稱
  • ID:起始 ID 標識,$ 代表隊列中最后一個消息,0 代表隊列中第一個消息。
  • MKSTREAM:隊列不存在時自動創(chuàng)建隊列

其他常見命令:

bc5e4202312131805209027.png

從消費者組讀取消息:

378fd202312131805486853.png

  • group:消費者組名稱
  • consumer:消費者名稱,如果消費者不存在,會自動創(chuàng)建一個消費者
  • count:本次查詢的最大數(shù)量
  • BLOCK milliseconds:當沒有消息時最長等待時間
  • NOACK:無需手動 ACK,獲取到消息后自動確認
  • STREAMS key:指定隊列名稱
  • ID:獲取消息的起始 ID:

“>”:從下一個未消費的消息開始

其他:根據(jù)指定 id 從 pending-list 中獲取已消費但未確認的消息,例如 0,是從 pending-list 中的第一個消息開始。

使用 Java 代碼處理消費者監(jiān)聽消息的基本思路:

whilt(true){
	// 嘗試監(jiān)聽隊列,使用阻塞模式,最長等待 2000 毫秒
	// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
	// 含義:消費者組 g1 中的消費者 c1 使用阻塞式嘗試從消息隊列 s1 中讀取下一個未被消費的消息,阻塞時長為 2000 毫秒
	Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
	if(msg == null){
		continue;
	}
	try{
		// 處理消息,完成后一定要 ACK
		handleMessage(msg);
	} catch(Exception e){
		while(true){
			// XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
			// 含義:消費者組 g1 中的消費者 c1 從消息隊列 s1 的pending-list 中讀取第一個消息
			Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
			if(msg == null){ // null 說明沒有異常消息,所有消息都已確認,結(jié)束循環(huán)
				break;
			}
			try{
				// 說明有異常消息,再次處理
				handleMessage(msg);
			}catch(Exception e){
				// 再次出現(xiàn)異常,記錄日志,繼續(xù)循環(huán)
				continue;
			}
		}
	}

}

Stream 類型消息隊列的 XREADGROUP 命令特點:

  • 消息可回溯
  • 可以多消費者爭搶消息,加快消費速度
  • 可以阻塞讀取
  • 沒有消息漏讀的風(fēng)險
  • 有消息確認機制,保證消息至少被消費一次

三、Redis 消息隊列比對

41c11202312131807378772.png

四、基于 Stream 消息隊列實現(xiàn)異步秒殺

需求:

① 創(chuàng)建一個 Stream 類型的消息隊列,名為 stream.orders

② 修改之前的秒殺下單 Lua 腳本,在認定有搶購資格后,直接向 stream.orders 中添加消息,內(nèi)容包含 voucherId、userId、orderId

③ 項目啟動時,開啟一個線程任務(wù),嘗試獲取 stream.orders 中的消息,完成下單

4.1 通過命令行的方式創(chuàng)建消息隊列以及消費者組

創(chuàng)建隊列名為 stream.orders 且組名為 g1 的消費者組,消息 ID 從 0 開始

75b0e202312131808304484.png

4.2 Lua 腳本

-- 優(yōu)惠券id
local voucherId = ARGV[1]
-- 用戶id
local userId = ARGV[2]
-- 訂單id
local orderId = ARGV[3]

-- 庫存key
local stockKey = "seckill:stock:"..voucherId
-- 訂單key
local orderKey = "seckill:order:"..voucherId

-- 判斷庫存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
    return 1
end

-- 判斷用戶是否已經(jīng)下過單
if(redis.call('sismember', orderKey, userId) == 1) then
    return 2
end

-- 扣減庫存
redis.call('incrby', stockKey, -1)

-- 將 userId 存入當前優(yōu)惠券的 set 集合
redis.call('sadd', orderKey, userId)

-- 將訂單信息存入到消息隊列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

4.3 代碼改進

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }


    /***
     * 創(chuàng)建線程池
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    /***
     * 容器啟動時,便開始創(chuàng)建獨立線程,從隊列中讀取數(shù)據(jù),創(chuàng)建訂單
     */
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable {

        @Override
        public void run() {
            while(true){
                try {
                    // 獲取消息隊列中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 判斷訂單信息是否為空
                    if(list == null || list.isEmpty()){
                        // 如果為 null,說明沒有消息,繼續(xù)下一次循環(huán)
                        continue;
                    }
                    // 解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 創(chuàng)建訂單
                    createVoucherOrder(voucherOrder);
                    // 確認消息 xack s1 g1 id
                    stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                } catch (Exception e) {
                    log.error("處理訂單異常!", e);
                    handlePendingList();
                }
            }

        }

        private void handlePendingList() {
            while(true){
                try {
                    // 獲取 pending-list 中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 判斷訂單信息是否為空
                    if(list == null || list.isEmpty()){
                        break;
                    }
                    // 解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 創(chuàng)建訂單
                    createVoucherOrder(voucherOrder);
                    // 確認消息 xack s1 g1 id
                    stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                } catch (Exception e) {
                    log.error("處理訂單異常!", e);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            }

        }
    }

    private void createVoucherOrder(VoucherOrder voucherOrder) {
        // 判斷當前優(yōu)惠券用戶是否已經(jīng)下過單
        // 用戶 id
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();

        RLock lock = redissonClient.getLock("lock:order:" + userId);
        // 獲取互斥鎖
        // 使用空參意味著不會進行重復(fù)嘗試獲取鎖
        boolean isLock = lock.tryLock();
        if (!isLock) {
            // 獲取鎖失敗,直接返回失敗或者重試
            log.error("不允許重復(fù)下單!");
            return;
        }


        try {
            // 查詢訂單
            int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            if (count > 0) {
                log.error("不允許重復(fù)下單!");
                return;
            }

            // 扣減庫存
            boolean success = seckillVoucherService.update().
                    setSql("stock = stock - 1").
                    eq("voucher_id", voucherId).
                    gt("stock", 0).
                    update();

            // 扣減失敗
            if (!success) {
                log.error("庫存不足!");
                return;
            }

            // 創(chuàng)建訂單
            save(voucherOrder);
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        UserDTO user = UserHolder.getUser();
        // 生成訂單 id
        Long orderId = redisIdWorker.nextId("order");
        // 執(zhí)行 lua 腳本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), user.getId().toString(), orderId.toString());
        int r = result.intValue();

        // 判斷結(jié)果是否為 0
        if(r != 0){
            // 不為 0 ,代表沒有購買資格
            Result.fail(r == 1 ? "庫存不足!" : "不能重復(fù)下單!");
        }

        // 返回訂單 id
        return Result.ok(orderId);
    }
}


請登錄后查看

CRMEB-慕白寒窗雪 最后編輯于2023-12-13 18:09:50

快捷回復(fù)
回復(fù)
回復(fù)
回復(fù)({{post_count}}) {{!is_user ? '我的回復(fù)' :'全部回復(fù)'}}
排序 默認正序 回復(fù)倒序 點贊倒序

{{item.user_info.nickname ? item.user_info.nickname : item.user_name}} LV.{{ item.user_info.bbs_level }}

作者 管理員 企業(yè)

{{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest == 1? '取消推薦': '推薦'}}
{{item.is_suggest == 1? '取消推薦': '推薦'}}
沙發(fā) 板凳 地板 {{item.floor}}#
{{item.user_info.title || '暫無簡介'}}
附件

{{itemf.name}}

{{item.created_at}}  {{item.ip_address}}
打賞
已打賞¥{{item.reward_price}}
{{item.like_count}}
{{item.showReply ? '取消回復(fù)' : '回復(fù)'}}
刪除
回復(fù)
回復(fù)

{{itemc.user_info.nickname}}

{{itemc.user_name}}

回復(fù) {{itemc.comment_user_info.nickname}}

附件

{{itemf.name}}

{{itemc.created_at}}
打賞
已打賞¥{{itemc.reward_price}}
{{itemc.like_count}}
{{itemc.showReply ? '取消回復(fù)' : '回復(fù)'}}
刪除
回復(fù)
回復(fù)
查看更多
打賞
已打賞¥{{reward_price}}
2928
{{like_count}}
{{collect_count}}
添加回復(fù) ({{post_count}})

相關(guān)推薦

快速安全登錄

使用微信掃碼登錄
{{item.label}} 加精
{{item.label}} {{item.label}} 板塊推薦 常見問題 產(chǎn)品動態(tài) 精選推薦 首頁頭條 首頁動態(tài) 首頁推薦
取 消 確 定
回復(fù)
回復(fù)
問題:
問題自動獲取的帖子內(nèi)容,不準確時需要手動修改. [獲取答案]
答案:
提交
bug 需求 取 消 確 定
打賞金額
當前余額:¥{{rewardUserInfo.reward_price}}
{{item.price}}元
請輸入 0.1-{{reward_max_price}} 范圍內(nèi)的數(shù)值
打賞成功
¥{{price}}
完成 確認打賞

微信登錄/注冊

切換手機號登錄

{{ bind_phone ? '綁定手機' : '手機登錄'}}

{{codeText}}
切換微信登錄/注冊
暫不綁定
CRMEB客服

CRMEB咨詢熱線 咨詢熱線

400-8888-794

微信掃碼咨詢

CRMEB開源商城下載 源碼下載 CRMEB幫助文檔 幫助文檔
返回頂部 返回頂部
CRMEB客服