PHP+Redis实现轻量级消息队列

概述

业务实现过程中,即便没有高并发与大流量,业务的解耦与异步化也是需要考虑实现的,此时MQ就显得很重要,中小型业务开发中,RabbitMQ就显得过重,这种业务下需要的就是一个轻量级的MQ,此时用Redis就刚刚好.

流程

基于Redis的轻量级的MQ用到了Redis的三个特性:

  • List存储类型
  • BRLPOP或者BLPOP操作List
  • 发布/订阅模式

大体流程是:主业务完成操作后写入消息到MQ,此时并发布消息到指定Channel,而之前已经订阅了该Channel的Worker收到消息后就去MQ获取并消费消息,如下图:

程序实现

主业务

主业务主要功能是接收两个参数:姓名和手机号,将这两个参数作为用户信息写入用户表,写入成功后插入一条消息到消息队列register_users,并且发送消息OK到频道register_success,主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?php
require './lib.php';
$name = $argv[1];
$mobile = $argv[2];
if(empty($name) || empty($mobile)){
die("参数错误");
}
$connection = getDBConnection('127.0.0.1', 'homestead', 'secret', 'mq');
// 开启事务
mysqli_begin_transaction($connection);
$sql = "insert into mq_user(name, mobile) values ('$name', '$mobile')";
if(!mysqli_query($connection, $sql)){
die("写入用户信息失败,原因:".$connection->error);
}
$redis = getRedis();
// 添加消息
$result = $redis->lpush('register_users', json_encode(array('name'=>$name, 'mobile'=>$mobile), JSON_UNESCAPED_UNICODE));
if($result === false){
mysqli_rollback($connection);
die("添加消息队列失败");
}
// 发布消息
$redis->publish('register_success', 'ok');
// 所有操作完成后提交事务
mysqli_commit($connection);
$connection->close();
$redis->close();

Worker

Worker的主要功能是订阅频道register_success,如果收到消息且消息内容是OK就从队列register_users获取并消费消息,主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?php
require './lib.php';
$redis = getRedis();
$redis->subscribe(['register_success'], function ($instance, $channelName, $message){
if($channelName == "register_success" && $message = "ok"){
$redis = getRedis();
$arr = $redis->brPop(['register_users'], 20);
if(count($arr)){
$userInfo = json_decode($arr[1], true);
stdout("新注册用户信息:");
stdout("姓名:".$userInfo['name']);
stdout("手机号:".$userInfo['mobile']);
stdout();
}
}
});

完整代码访问php-redis-mq

运行结果

本文作者:Jormin
本文地址https://blog.lerzen.com/2018/03/07/php-redis实现轻量级消息队列/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!

----- 到这结束咯 感谢您的阅读 -----