消息隊列的概念、原理和場景
在高并發(fā)的時候,程序往往無法做到及時的處理。我們引入一個中間的系統(tǒng),來進(jìn)行分流和減壓。
所以從本質(zhì)上講:消息隊列就是一個隊列結(jié)構(gòu)的中間件。也就是說,你把消息和內(nèi)容放入這個容器之后就可以直接返回,不用等它后期處理的結(jié)果。另外會有一個程序,讀取這些數(shù)據(jù)并按照順序處理。
1、隊列結(jié)構(gòu)的中間件
2、消息放入后,不必立即處理
3、由訂閱者/消費者按順序處理
也就是說:當(dāng)遇到一個比較大或者耗時比較長的環(huán)節(jié)的時候,而同時你的業(yè)務(wù)又不需要立即知道這個環(huán)節(jié)的結(jié)果,使用消息隊列是好的選擇。
知識付費的拼團(tuán)功能使用的就是消息隊列功能;把每個拼團(tuán)訂單都儲存在消息隊列中,拼團(tuán)完成或拼團(tuán)結(jié)束就可以自動處理這個訂單。
application\index\controller\PushJob
/**
* 一個使用了隊列的 action
*/
public static function actionWithDoPinkJob(array $data, string $name = '')
{
try {
// 1.當(dāng)前任務(wù)將由哪個類來負(fù)責(zé)處理。
// 當(dāng)輪到該任務(wù)時,系統(tǒng)將生成一個該類的實例,并調(diào)用其 fire 方法
$jobHandlerClassName = 'app\index\job\PullDoPink';
// 2.當(dāng)前任務(wù)歸屬的隊列名稱,如果為新隊列,會自動創(chuàng)建
$jobQueueName = Config::get('queue_name', '') ? Config::get('queue_name', '') : 'doPinkJobQueue';
// 3.當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù) . 不能為 resource 類型,其他類型最終將轉(zhuǎn)化為json形式的字符串
// ( jobData 為對象時,存儲其public屬性的鍵值對 )
if ($name) {
$jobData = ['pinkInfo' => $data, 'time' => date('Y-m-d H:i:s'), 'doName' => $name];
$isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);
} else {
$jobData = ['pinkInfo' => $data, 'time' => date('Y-m-d H:i:s')];
if (!isset($data['pink_time']) || !$data['pink_time']) return true;
$timewait = $data['pink_time'] + 300;
//$jobData = [ 'pinkInfo' => 'hahah', 'time' => date('Y-m-d H:i:s'), 'b' => 21] ;
//$timewait = 20;
// 4.將該任務(wù)推送到消息隊列,等待對應(yīng)的消費者去執(zhí)行
$isPushed = Queue::later($timewait, $jobHandlerClassName, $jobData, $jobQueueName);
//$isPushed = Queue::push($jobHandlerClassName , $jobData , $jobQueueName );
// database 驅(qū)動時,返回值為 1|false ; redis 驅(qū)動時,返回值為 隨機(jī)字符串|false
}
if ($isPushed !== false) {
return 1;
} else {
return 1;
}
} catch (ErrorException $e) {
echo $e->getMessage();
}
}
application\index\job\PullDoPink
/**
* fire方法是消息隊列默認(rèn)調(diào)用的方法
* @param Job $job 當(dāng)前的任務(wù)對象
* @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
*/
public function fire(Job $job, $data)
{
// 有些消息在到達(dá)消費者時,可能已經(jīng)不再需要執(zhí)行了
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if (!$isJobStillNeedToBeDone) {
$job->delete();
return;
}
if (isset($data['doName']) && $data['doName']) {
$doName = $data['doName'];
$isJobDone = $this->$doName($data);
} else
$isJobDone = $this->doPinkJob($data);
if ($isJobDone) {
// 如果任務(wù)執(zhí)行成功, 記得刪除任務(wù)
$job->delete();
//print("<info>Hello Job has been done and deleted"."</info>\n");
} else {
if ($job->attempts() > 3) {
//通過這個方法可以檢查這個任務(wù)已經(jīng)重試了幾次了
// print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
$job->delete();
// 也可以重新發(fā)布這個任務(wù)
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay為延遲時間,表示該任務(wù)延遲2秒后再執(zhí)行
}
}
}
/**
* 有些消息在到達(dá)消費者時,可能已經(jīng)不再需要執(zhí)行了
* @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
* @return boolean 任務(wù)執(zhí)行的結(jié)果
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data)
{
return true;
}
拼團(tuán)使用,在訂單生成完成后,把參數(shù)加入$do_job_pink數(shù)組中
PushJob::actionWithDoPinkJob($do_job_pink);
在application\index\job\PullDoPink下加如下面的方法用來接受處理數(shù)據(jù)
/**
* 根據(jù)消息中的數(shù)據(jù)進(jìn)行實際的業(yè)務(wù)處理...
*/
private function doPinkJob($data)
{
$pink_id = $data['pinkInfo']['pink_id'];
if ($pink_id) {
$pink_info = \app\wap\model\store\StorePink::where(['id' => $pink_id, 'k_id' => 0, 'status' => 1])->find();
if ($pink_info ? $pink_info = $pink_info->toArray() : []) {
list($pinkAll, $pinkT, $count, $idAll) = \app\wap\model\store\StorePink::getPinkMemberAndPinkK($pink_info);
\app\wap\model\store\StorePink::PinkFail($pink_info['uid'], $idAll, $pinkAll, $pinkT, $count, 1, [], true, true);
}
}
return true;
}
如果需要加新的消息隊列可以設(shè)置不同的名稱即可
PushJob::actionWithDoPinkJob($do_job_pink,’名稱’);
application\index\job\PullDoPink:
private function 名稱($data)
{
return true;
}
完成后重新啟動消息隊列