PHP-Resque实现轻量级消息队列

概述

Resque是一个基于Redis的处理后台作业的库,通过将多个作业放在一个或多个队列上进行后台处理.本身是由Ruby编写的,并且被用各种语言实现,原项目访问resque/resque,php版本访问chrisboulton/php-resque.

角色划分

php-resque包含如下3种角色:

  • Job: 需要被执行的任务,例如发送邮件\销库存\配货发货等
  • Queue: 消息队列,基于Redis实现
  • Worker: 负责消费消息队列中的任务,需要以守护进程运行在后台

一个Worker可以处理一个或多个队列,并且可以通过增加Worker的进程/线程来加快处理速度.

注意事项

  • Job运行时,Worker会将参数组装以数组的方式设置到对象的属性,可用$this->args访问
  • 任何异常都会导致任务失败,所以需要特别注意下异常处理
  • Job除了perform方法外还拥有两个方法:setUptearDown,分别在perform前后执行.

具体实现

安装扩展

由于涉及到进程的开辟与管理,php-resque使用了php的PCNTL函数,所以只能在Linux下运行,并且需要php编译PCNTL函数,具体的运行环境为:

  • PHP 5.3+
  • Redis 2.2+
  • 推荐使用 Composer
1
vagrant@homestead:~/code/php-resque-mq$ composer require chrisboulton/php-resque -vvv

创建Job

将需要处理的任务编写为Job,即一个独立的Class,并且这个Class必须实现一个perform方法,Worker执行的时候会自动运行这个方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?php
require './lib.php';

class SmsJob{

private $name;

private $mobile;

/**
* 预处理
*/
public function setUp(){
$this->name = $this->args['name'];
$this->mobile = $this->args['mobile'];
stdout('新注册用户信息:');
stdout('姓名:'.$this->name);
stdout('手机号:'.$this->mobile);
}

/**
* 具体任务
*/
public function perform(){
for($i=0;$i<10;$i++){
if($i === 0){
stdout('开始执行具体任务');
}
stdout('发送短信中--第'.($i+1).'秒');
if($i === 10){
stdout('任务执行完毕');
}
sleep(1);
}
}

/**
* 后续处理
*/
public function tearDown(){
stdout('任务执行完毕');
stdout();
}
}

插入队列

  • 任务入队

使用Resque::enqueue函数将Job加入Queue,包含四个参数:

  • queue: 队列名称
  • jobClass: 任务Class
  • jobArgs: 任务参数
  • trackStatus: 为true时返回任务token,用于后续跟踪任务状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?php
require './lib.php';
require './SmsJob.php';
require './vendor/autoload.php';

$name = $argv[1];
$mobile = $argv[2];
if(empty($name) || empty($mobile)){
die("参数错误");
}
$connection = getDBConnection('127.0.0.1', 'homestead', 'secret', 'mq');
$sql = "insert into mq_user(name, mobile) values ('$name', '$mobile')";
if(!mysqli_query($connection, $sql)){
die("写入用户信息失败,原因:".$connection->error);
}
$connection->close();
$token = Resque::enqueue('sms', SmsJob::class, array('name'=>$name, 'mobile'=>$mobile),true);
echo 'Job Token: '.$token.chr(10);

查询任务状态

根据Job Token来获取作业状态,状态有四个:

  • Resque_Job_Status::STATUS_WAITING: 等待执行
  • Resque_Job_Status::STATUS_RUNNING: 正在运行
  • Resque_Job_Status::STATUS_FAILED: 执行失败
  • Resque_Job_Status::STATUS_COMPLETE: 执行完毕
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php
require_once './lib.php';
require_once './vendor/autoload.php';

$token = $argv[1];
if(empty($token)){
die("参数错误");
}
$obj = new Resque_Job_Status($token);
if(!$obj->isTracking()) {
die("该任务没有设定状态跟踪");
}
while(true) {
$status = $obj->get();
$statusLabels = ['', '等待执行', '正在运行', '执行失败', '执行完毕'];
stdout('任务['.$token.']的当前状态:'.$statusLabels[$status]);
if($status == 4){
break;
}else{
sleep(1);
}
}

启动Worker

从Queue中取出任务并处理,当队列中有Job时,实例化Job对应的Class对象并执行perform中方法.

  • QUEUE: 要处理的队列,可以设定為 QUEUE=* 表示执行所有任务
  • LOGLEVEL: 日志等级 0:不记录 1:记录简单日志 2:记录详细日志
  • COUNT: 启动Worker的数量,默认是1
  • INTERVAL: Worker检查Queue的间隔,默认是5秒
  • PIDFILE: 如果是单个Woker,可以指定Pid文件
  • PREFIX: 前缀
  • REDIS_BACKEND: Redis的IP和端口,默认是127.0.0.1:6379

完整代码访问php-resque-mq

测试流程

  • 执行主业务并将Job插入队列
1
php register.php 小毛 18702124521
  • 查看任务状态
1
php status 26bf81f1943d1a940ffd68a287a7d16c
  • 启动Worker
1
QUEUE=sms LOGLEVEL=2 php worker.php
  • 测试结果

Resque Web

resque-web是用Ruby开发的Web管理界面,安装步骤:

  • 安装Ruby及Ruby Dev
1
vagrant@homestead:~/code$ sudo apt install ruby ruby-dev -y
  • 安装resque-web
1
vagrant@homestead:~/code$ sudo gem install resque-web -v 0.0.8
  • 启动resque-web
1
2
3
vagrant@homestead:~/code$ sudo resque-web -p 666
[2018-09-21 10:24:31 +0000] Starting 'resque-web'...
[2018-09-21 10:24:31 +0000] trying port 666...
  • 截图示例

本文作者:Jormin
本文地址https://blog.lerzen.com/2018/03/07/php-resque实现轻量级消息队列/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!

----- 到这结束咯 感谢您的阅读 -----