一、消息隊列簡介
消息隊列(Message Queue),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括 3 個角色:
- 消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker)
- 生產(chǎn)者:發(fā)送消息到消息隊列
- 消費者:從消息隊列獲取消息并處理消息
消息隊列和阻塞隊列的區(qū)別:
① 消息隊列是在 JVM 以外的獨立服務(wù),所以不受 JVM 內(nèi)存的限制
② 消息隊列不僅僅做數(shù)據(jù)存儲,還需要確保數(shù)據(jù)安全,存入到消息隊列中的所有消息都需要做持久化,這樣不管是服務(wù)宕機還是重啟,數(shù)據(jù)都不會丟失。而且消息隊列還會在消息投遞給消費者后,要求消費者做消息確認,如果消費者沒有確認,那么這條消息就會一直存在于消息隊列中,下一次會繼續(xù)投遞給消費者,讓消費者繼續(xù)處理,直到消息被成功處理。
二、Redis 提供的消息隊列
Redis 提供了三種不同的方式來實現(xiàn)消息隊列:
- list 結(jié)構(gòu):基于 List 結(jié)構(gòu)模擬消息隊列
- PubSub:基本的點對點消息隊列
- Stream:比較完善的消息隊列模型
2.1 基于 List 結(jié)構(gòu)模擬消息隊列
消息隊列(Message Queue),字面意思就是存放消息的隊列。而 Redis 的 List 數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表,很容易模擬出隊列效果。
隊列(先進先出)是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP 來實現(xiàn)。
不過要注意的是,當隊列中沒有消息時 RPOP 或 LPOP 操作會返回 null,并不像 JVM 的阻塞隊列那樣會阻塞并等待下消息。因此這里應(yīng)該使用 BRPOP 或者 BLPOP 來實現(xiàn)阻塞效果。
基于 List 的消息隊列有哪些優(yōu)缺點?
優(yōu)點:
① 利用 Redis 存儲,不受限于 JVM 內(nèi)存上限
② 基于 Redis 的持久化機制,數(shù)據(jù)安全性有保證
③ 可以滿足消息有序性
缺點:
① 無法避免消息丟失。假設(shè)某個消費者從消息隊列(List 結(jié)構(gòu))中獲取到一條消息,但還未來得及處理,該消費者出現(xiàn)故障,那么這條消息就會丟失,這是因為 POP 命令是 remove and get,會將消息直接從消息隊列中直接移除,這樣其他消費者就獲取不到。
② 只支持單消費者。消息隊列(List 結(jié)構(gòu))中的消息,一旦被某個消費者取走,就會從隊列中移除,其他消費者就獲取不到了,無法實現(xiàn)一條消息被很多消費者消費的需求。
2.2 基于 PubSub 的消息隊列
PubSub(發(fā)布訂閱)是 Redis2.0 版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個 channel,生產(chǎn)者向?qū)?yīng) channel 發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。
相關(guān)命令如下:
- SUBSCRIBE channel [channel]:訂閱一個或多個頻道
- PUBLISH channel msg:向一個頻道發(fā)送消息
- PSUBSCRIBE pattern [pattern]:訂閱與 pattern 格式匹配的所有頻道
關(guān)于 PubSub 的具體命令使用方法可以參看官網(wǎng):?https://redis.io/commands/?group=pubsub
基于 PubSub 的消息隊列有哪些優(yōu)缺點:
優(yōu)點:
采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費。一條消息可以發(fā)給多個消費者,也可以發(fā)給一個消費者,而且也支持不同生產(chǎn)者往相同頻道發(fā)。
缺點:
① 不支持數(shù)據(jù)持久化。本身不像 List 結(jié)構(gòu)那樣支持數(shù)據(jù)持久化,List 結(jié)構(gòu)本身就是用來存儲數(shù)據(jù)的,而 PubSub 則是用來做消息發(fā)送的。因此,當發(fā)送一條消息時,但卻沒有任何消費者訂閱,那么該條消息就直接消失了。
② 無法避免消息丟失
③ 消息堆積有上限,超出時數(shù)據(jù)丟失。當發(fā)送一條消息時,如果有消費者監(jiān)聽,消費者會將發(fā)送過來的消息緩存至消息緩存區(qū),由消費者進行處理。而消費者的緩存空間是有上限的,如果超出了就會丟失。
2.3 基于 Stream 的消息隊列
Stream 是 Redis5.0 引入的一種新的數(shù)據(jù)類型,可以實現(xiàn)一個功能非常完善的消息隊列。
發(fā)送消息的命令:
最簡用法如下:
讀取消息的方式之一:XREAD
使用 XREAD 讀取消息
XREAD 阻塞方式,讀取最新的消息:
在業(yè)務(wù)開發(fā)中,我們可以循環(huán)的調(diào)用 XREAD 阻塞方式來查詢最新消息,從而實現(xiàn)持續(xù)監(jiān)聽隊列的效果,偽代碼如下:
while(true) {
// 嘗試讀取隊列中的消息,最多阻塞 2 秒
Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
if(msg == null) {
continue;
}
// 處理消息
handleMessage(msg);
}
注意:當我們指定起始 ID 為 $ 時,代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過 1 條以上的消息到達隊列,則下次獲取時也只能獲取到最新的一條,會出現(xiàn)漏讀消息的問題。
STREAM 類型消息隊列的 XREAD 命令特點:
- 消息可回溯。消息讀完后不消失,永久保存在隊列中。
- 一個消息可以被多個消費者讀取
- 可以阻塞讀取
- 有消息漏讀的風(fēng)險
2.4 基于 Stream 的消息隊列-消費者組
消費者組(Consumer Group):將多個消費者劃分到一個組中,監(jiān)聽同一個隊列。具備下列特點:
① 消息分流:隊列中的消息會分流給組內(nèi)的不同消費者,而不是重復(fù)消費,從而加快消息處理的速度。
處于一個組內(nèi)的多個消費者實際上是競爭關(guān)系,凡是進入到這個組的消息,組內(nèi)的消費者就會競爭該消息的處理權(quán)。這種方式可以大大提高消息的處理速度,避免消息堆積。如果想要一條消息被多個消費者處理,可以添加多個消費者組。
② 消息標識:消費者組會維護一個標識,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標識之后讀取消息。確保每一個消息都會被消費。
③ 消息確認:消費者獲取消息后,消息處于 pending 狀態(tài),并存入一個 pending-list。當處理完成后需要通過 XACK 來確認消息,標記消息為已處理,才會從 pending-list 移除。
創(chuàng)建消費者組:
- key:隊列名稱
- groupName:消費者組名稱
- ID:起始 ID 標識,$ 代表隊列中最后一個消息,0 代表隊列中第一個消息。
- MKSTREAM:隊列不存在時自動創(chuàng)建隊列
其他常見命令:
從消費者組讀取消息:
- group:消費者組名稱
- consumer:消費者名稱,如果消費者不存在,會自動創(chuàng)建一個消費者
- count:本次查詢的最大數(shù)量
- BLOCK milliseconds:當沒有消息時最長等待時間
- NOACK:無需手動 ACK,獲取到消息后自動確認
- STREAMS key:指定隊列名稱
- ID:獲取消息的起始 ID:
“>”:從下一個未消費的消息開始
其他:根據(jù)指定 id 從 pending-list 中獲取已消費但未確認的消息,例如 0,是從 pending-list 中的第一個消息開始。
使用 Java 代碼處理消費者監(jiān)聽消息的基本思路:
whilt(true){
// 嘗試監(jiān)聽隊列,使用阻塞模式,最長等待 2000 毫秒
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
// 含義:消費者組 g1 中的消費者 c1 使用阻塞式嘗試從消息隊列 s1 中讀取下一個未被消費的消息,阻塞時長為 2000 毫秒
Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if(msg == null){
continue;
}
try{
// 處理消息,完成后一定要 ACK
handleMessage(msg);
} catch(Exception e){
while(true){
// XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
// 含義:消費者組 g1 中的消費者 c1 從消息隊列 s1 的pending-list 中讀取第一個消息
Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if(msg == null){ // null 說明沒有異常消息,所有消息都已確認,結(jié)束循環(huán)
break;
}
try{
// 說明有異常消息,再次處理
handleMessage(msg);
}catch(Exception e){
// 再次出現(xiàn)異常,記錄日志,繼續(xù)循環(huán)
continue;
}
}
}
}
Stream 類型消息隊列的 XREADGROUP 命令特點:
- 消息可回溯
- 可以多消費者爭搶消息,加快消費速度
- 可以阻塞讀取
- 沒有消息漏讀的風(fēng)險
- 有消息確認機制,保證消息至少被消費一次
三、Redis 消息隊列比對
四、基于 Stream 消息隊列實現(xiàn)異步秒殺
需求:
① 創(chuàng)建一個 Stream 類型的消息隊列,名為 stream.orders
② 修改之前的秒殺下單 Lua 腳本,在認定有搶購資格后,直接向 stream.orders 中添加消息,內(nèi)容包含 voucherId、userId、orderId
③ 項目啟動時,開啟一個線程任務(wù),嘗試獲取 stream.orders 中的消息,完成下單
4.1 通過命令行的方式創(chuàng)建消息隊列以及消費者組
創(chuàng)建隊列名為 stream.orders 且組名為 g1 的消費者組,消息 ID 從 0 開始
4.2 Lua 腳本
-- 優(yōu)惠券id
local voucherId = ARGV[1]
-- 用戶id
local userId = ARGV[2]
-- 訂單id
local orderId = ARGV[3]
-- 庫存key
local stockKey = "seckill:stock:"..voucherId
-- 訂單key
local orderKey = "seckill:order:"..voucherId
-- 判斷庫存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
-- 判斷用戶是否已經(jīng)下過單
if(redis.call('sismember', orderKey, userId) == 1) then
return 2
end
-- 扣減庫存
redis.call('incrby', stockKey, -1)
-- 將 userId 存入當前優(yōu)惠券的 set 集合
redis.call('sadd', orderKey, userId)
-- 將訂單信息存入到消息隊列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
4.3 代碼改進
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
/***
* 創(chuàng)建線程池
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
/***
* 容器啟動時,便開始創(chuàng)建獨立線程,從隊列中讀取數(shù)據(jù),創(chuàng)建訂單
*/
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while(true){
try {
// 獲取消息隊列中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 判斷訂單信息是否為空
if(list == null || list.isEmpty()){
// 如果為 null,說明沒有消息,繼續(xù)下一次循環(huán)
continue;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 創(chuàng)建訂單
createVoucherOrder(voucherOrder);
// 確認消息 xack s1 g1 id
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
} catch (Exception e) {
log.error("處理訂單異常!", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while(true){
try {
// 獲取 pending-list 中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 判斷訂單信息是否為空
if(list == null || list.isEmpty()){
break;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 創(chuàng)建訂單
createVoucherOrder(voucherOrder);
// 確認消息 xack s1 g1 id
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
} catch (Exception e) {
log.error("處理訂單異常!", e);
try {
Thread.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}
private void createVoucherOrder(VoucherOrder voucherOrder) {
// 判斷當前優(yōu)惠券用戶是否已經(jīng)下過單
// 用戶 id
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 獲取互斥鎖
// 使用空參意味著不會進行重復(fù)嘗試獲取鎖
boolean isLock = lock.tryLock();
if (!isLock) {
// 獲取鎖失敗,直接返回失敗或者重試
log.error("不允許重復(fù)下單!");
return;
}
try {
// 查詢訂單
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("不允許重復(fù)下單!");
return;
}
// 扣減庫存
boolean success = seckillVoucherService.update().
setSql("stock = stock - 1").
eq("voucher_id", voucherId).
gt("stock", 0).
update();
// 扣減失敗
if (!success) {
log.error("庫存不足!");
return;
}
// 創(chuàng)建訂單
save(voucherOrder);
} finally {
// 釋放鎖
lock.unlock();
}
}
@Override
public Result seckillVoucher(Long voucherId) {
UserDTO user = UserHolder.getUser();
// 生成訂單 id
Long orderId = redisIdWorker.nextId("order");
// 執(zhí)行 lua 腳本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), user.getId().toString(), orderId.toString());
int r = result.intValue();
// 判斷結(jié)果是否為 0
if(r != 0){
// 不為 0 ,代表沒有購買資格
Result.fail(r == 1 ? "庫存不足!" : "不能重復(fù)下單!");
}
// 返回訂單 id
return Result.ok(orderId);
}
}