Yii2-Queue实现轻量级消息队列

概述

Yii2-Queue是Yii2官方制作的一个消息队列,提供多个缺点:Syncronous, File, DB, Redis, RabbitMQ, AMQP Interop, Beanstalk, Gearman等,使用Yii2开发的时候使用该扩展比较合适.

驱动配置:

Syncronous

如果打开 handle 属性,则在使用过程中同步执行任务,开发和调试阶段使用.

1
2
3
4
5
6
7
8
return [
'components' => [
'queue' => [
'class' => \yii\queue\sync\Queue::class,
'handle' => false, // 任务是否立即执行
],
],
];

File

以文件的方式来存储消息队列

1
2
3
4
5
6
7
8
9
10
11
12
return [
'bootstrap' => [
'queue', // 把这个组件注册到控制台
],
'components' => [
'queue' => [
'class' => \yii\queue\<driver>\Queue::class,
'as log' => \yii\queue\LogBehavior::class,
// 驱动的其他选项
],
],
];

DB

使用数据库来存储消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
return [
'bootstrap' => [
'queue', // 把这个组件注册到控制台
],
'components' => [
'db' => [
'class' => \yii\db\Connection::class,
// ...
],
'queue' => [
'class' => \yii\queue\db\Queue::class,
'db' => 'db', // DB 连接组件或它的配置
'tableName' => '{{%queue}}', // 表名
'channel' => 'default', // Queue channel key
'mutex' => \yii\mutex\MysqlMutex::class, // Mutex that used to sync queries
],
],
];
  • 手动创建数据表

首先在数据库中先添加数据表,假设表名是queue,以下是创建数据表的Sql语句.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE `queue` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`channel` varchar(255) NOT NULL,
`job` blob NOT NULL,
`pushed_at` int(11) NOT NULL,
`ttr` int(11) NOT NULL,
`delay` int(11) NOT NULL DEFAULT 0,
`priority` int(11) unsigned NOT NULL DEFAULT 1024,
`reserved_at` int(11) DEFAULT NULL,
`attempt` int(11) DEFAULT NULL,
`done_at` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `channel` (`channel`),
KEY `reserved_at` (`reserved_at`),
KEY `priority` (`priority`)
) ENGINE=InnoDB
  • 使用数据迁移

也可以使用数据迁移来创建表,迁移文件存储目录是 path/to/extension/src/drivers/db/migrations.,使用迁移文件的话需要在配置中增加如下配置:

1
2
3
4
5
6
7
8
9
10
11
'controllerMap' => [
// ...
'migrate' => [
'class' => 'yii\console\controllers\MigrateController',
'migrationPath' => null,
'migrationNamespaces' => [
// ...
'yii\queue\db\migrations',
],
],
],

然后使用数据迁移命令:

1
php yii migrate/up

Redis

使用Redis来存储消息队列,该驱动需要安装yiisoft/yii2-redis来配合使用,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
return [
'bootstrap' => [
'queue', // 把这个组件注册到控制台
],
'components' => [
'redis' => [
'class' => \yii\redis\Connection::class,
// ...
],
'queue' => [
'class' => \yii\queue\redis\Queue::class,
'redis' => 'redis', // 连接组件或它的配置
'channel' => 'queue', // Queue channel key
],
],
];

RabbitMQ

这个驱动自2.0.2版本已被弃用,并且将会在2.1版本被移除,官方建议使用AMQP Interop驱动.

AMQP Interop

基于AMQP协议的驱动,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
return [
'bootstrap' => [
'queue', // The component registers own console commands
],
'components' => [
'queue' => [
'class' => \yii\queue\amqp_interop\Queue::class,
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'queueName' => 'queue',
'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,

// or
'dsn' => 'amqp://guest:guest@localhost:5672/%2F',

// or, same as above
'dsn' => 'amqp:',
],
],
];

Beanstalk

队列存放于Beanstalk,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
return [
'bootstrap' => [
'queue', // 把这个组件注册到控制台
],
'components' => [
'queue' => [
'class' => \yii\queue\beanstalk\Queue::class,
'host' => 'localhost',
'port' => 11300,
'tube' => 'queue',
],
],
];

Gearman

队列存放于Gearman,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
return [
'bootstrap' => [
'queue', // 把这个组件注册到控制台
],
'components' => [
'queue' => [
'class' => \yii\queue\gearman\Queue::class,
'host' => 'localhost',
'port' => 4730,
'channel' => 'queue',
],
],
];

工作流程

Job

每个任务都是一个单独的类,需要实现接口\yii\queue\JobInterface,并且实现execute函数.

任务被放入队列中,并且在不同进程中从队列中获取,如果不确定在worker的作业环境中是否可用,则应在执行任务时避免外部依赖。所有处理任务的数据都应该放到作业对象的属性中,并连同它一起发送到队列中.

如果需要处理 ActiveRecord ,那么发送它的ID而不是对象本身。在处理时必须从DB提取它。

Queue

配置好驱动后,可以使用queue组件来操作队列

具体的组件名以实际配置为准,有可能项目中需要配置多个队列

加入队列

1
2
// 将作业推送到队列并获得其ID
$id = Yii::$app->queue->push(new SomeJob());

延迟队列

1
$id = Yii::$app->queue->delay(5 * 60)->push(new SomeJob());

任务状态

1
2
3
4
5
6
7
8
// 这个作业等待执行。
Yii::$app->queue->isWaiting($id);

// Worker 从队列获取作业,并执行它。
Yii::$app->queue->isReserved($id);

// Worker 作业执行完成。
Yii::$app->queue->isDone($id);

记录日志

此组件提供了使用日志 LogBehavior 记录队列事件,使用时需要在配置中增加如下配置:

1
2
3
4
5
6
7
8
return [
'components' => [
'queue' => [
'class' => \yii\queue\redis\Queue::class,
'as log' => \yii\queue\LogBehavior::class
],
],
];

事件监听

队列可以触发以下事件:

Event name Event class Triggered on
Queue::EVENT_BEFORE_PUSH PushEvent Adding job to queue using Queue::push() method
Queue::EVENT_AFTER_PUSH PushEvent Adding job to queue using Queue::push() method
Queue::EVENT_BEFORE_EXEC ExecEvent Before each job execution
Queue::EVENT_AFTER_EXEC ExecEvent After each success job execution
Queue::EVENT_AFTER_ERROR ExecEvent When uncaught exception occurred during the job execution
cli\Queue:EVENT_WORKER_START WorkerEvent When worker has been started
cli\Queue:EVENT_WORKER_LOOP WorkerEvent Each iteration between requests to queue
cli\Queue:EVENT_WORKER_STOP WorkerEvent When worker has been stopped

使用示例:

1
2
3
4
5
6
Yii::$app->queue->on(Queue::EVENT_AFTER_ERROR, function ($event) {
if ($event->error instanceof TemporaryUnprocessableJobException) {
$queue = $event->sender;
$queue->delay(7200)->push($event->job);
}
});

Worker

  • yii queue/listen [timeout]

启动一个守护进程,它可以无限查询队列.如果有新的任务,他们立即得到并执行.timeout 是下一次查询队列的时间 当命令正确地通过supervisor来实现时,这种方法是最有效的.可选参数有三个:

–verbose,-v: 将执行状态输出到控制台

–isolate: 详细模式执行作业,如果启用,将打印每个作业的执行结果

–color: 高亮显示输出结果

  • yii queue/run

获取并执行循环中的任务,直到队列为空,适合于cron搭配

  • yii queue/info

打印关于队列状态的信息

  • yii queue/clear

清除一个队列

  • yii queue/remove [id]

移除一个任务

Worker启动管理

Supervisor

Supervisor 是Linux的进程监视器,它会自动启动您的控制台进程,它的配置文件存于目录 /etc/supervisor/conf.d 下,配置示例:

1
2
3
4
5
6
7
8
9
[program:yii-queue-worker]
process_name=%(program_name)s_%(process_num)02d
command=/usr/bin/php7.2 /home/vagrant/code/yii2-queue-mq/yii queue/listen --verbose=1 --color=0
autostart=true
autorestart=true
user=homestead
numprocs=4
redirect_stderr=true
stdout_logfile=/home/vagrant/code/yii2-queue-mq/log/yii-queue-worker.log

上面的配置会启动4个 queue/listen Worker 并记录日志,启动 Supervisor:

1
sudo /etc/init.d/supervisor restart

Cron

使用系统自带的计划任务 crontab 来配合命令来处理,配置示例:

1
* * * * * /usr/bin/php7.2 /home/vagrant/code/yii2-queue-mq/yii queue/run

上面的配置 cron 将每分钟启动一次命令

###

错误与重试

任务执行过程中有可能会失败,可以通过三种方法重试.

Sync 驱动不会重试失败的工作, Gearman 不支持重试

选项配置

通过在组件中增加 ttrattempts来设置,这种设置作用范围是全局

  • ttr: 设置在队列中保留工作的时间,如果一份作业在这段时间没有执行,它将返回队列进行重试
  • attempts 选项设置了最大的尝试次数,如果尝试已经结束,作业作还没有完成,它将从队列中移除
1
2
3
4
5
6
7
'components' => [
'queue' => [
'class' => \yii\queue\<driver>\Queue::class,
'ttr' => 5 * 60, // Max time for anything job handling
'attempts' => 3, // Max number of attempts
],
]

实现重试接口

可以通过为Job实现 RetryableJobInterfacegetTtr()canRetry() 接口来配置任务保留时间和最大尝试次数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class SomeJob extends BaseObject implements RetryableJobInterface
{
public function execute($queue)
{
//...
}

public function getTtr()
{
return 15 * 60;
}

public function canRetry($attempt, $error)
{
return ($attempt < 5) && ($error instanceof TemporaryException);
}
}

监听事件

通过监听 Queue::EVENT_BEFORE_PUSHQueue::EVENT_AFTER_ERROR 事件来分别设置 TTR是否可以重试.

事件处理程序在 RetryableJobInterface 方法之后执行,因此具有最高优先级

  • Queue::EVENT_BEFORE_PUSH: 设置TTR
1
2
3
4
5
Yii::$app->queue->on(Queue::EVENT_BEFORE_PUSH, function (PushEvent $event) {
if ($event->job instanceof SomeJob) {
$event->ttr = 300;
}
});
  • Queue::EVENT_AFTER_ERROR: 设置是否可以重试
1
2
3
4
5
Yii::$app->queue->on(Queue::EVENT_AFTER_ERROR, function (ExecEvent $event) {
if ($event->job instanceof SomeJob) {
$event->retry = ($event->attempt < 5) && ($event->error instanceof TemporaryException);
}
});

调试

通过向 Yii2 调试模块中增加个面板,面板显示计数器和队列任务列表,需要安装 yiisoft/yii2-debug.

1
2
3
4
5
6
7
8
9
10
return [
'modules' => [
'debug' => [
'class' => \yii\debug\Module::class,
'panels' => [
'queue' => \yii\queue\debug\Panel::class,
],
],
],
];

代码中使用

本次以Redis驱动为例,使用这个扩展,完成后台批量为用户发送推广邮件的功能.

安装所需扩展

1
require --prefer-dist yiisoft/yii2-queue

编写EmailJob

EmailJob继承了BaseJob,而BaseJob实现了JobInterfaceRetryableJobInterface接口,并覆盖了execute,getTtr,canRetry三个函数.

为了完整看到测试日志,EmailJob在发送邮件前sleep了5秒.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php

namespace common\jobs;

use yii\base\ViewNotFoundException;

/**
* Class EmailJob
* @property array $args
* @package common\jobs
*/
class EmailJob extends BaseJob
{

public $view, $viewParams, $to, $subject, $body;

public function execute($queue)
{
parent::execute($queue);
sleep(5);
$response = \Yii::$app->mailer->compose($this->view, $this->viewParams)
->setTo($this->to)
->setSubject($this->subject)
->send();
$this->stdout($response);
}

public function getTtr()
{
return 15 * 60;
}

public function canRetry($attempt, $error)
{
$this->stdout($attempt.'---'.$error);
return ($attempt < 5) && ($error instanceof ViewNotFoundException);
}
}

模拟添加队列

读取用户表中的所有用户,并未他们都发送一封测试邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$queue = \Yii::$app->queue;
$users = User::find()->asArray()->all();
foreach ($users as $user){
$config = [
'view' => 'mqTest',
'viewParams' => [
'username' => explode('@', $user->email)[0],
],
'to' => $user->email,
'subject' => '测试邮件',
];
$jobID = $queue->push(new EmailJob($config));
$this->log($jobID);
}

完成代码见jormin/yii2-queue-mq

测试运行

  • 启动测试脚本,像队列中添加EmailJob
1
vagrant@homestead:~/code/yii2-queue-mq$ php yii mq
  • 每隔一秒查询下Job的状态
1
vagrant@homestead:~/code/yii2-queue-mq$ php yii mq/status 1
  • 查看队列当前信息
1
vagrant@homestead:~/code/yii2-queue-mq$ php yii queue/info
  • 开始执行队列,并注意队列中Job的执行状态
1
vagrant@homestead:~/code/yii2-queue-mq$ php yii queue/run --verbose --isolate --color

测试结果

本文作者:Jormin
本文地址https://blog.lerzen.com/2018/03/08/yii2-queue实现轻量级消息队列/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!

----- 到这结束咯 感谢您的阅读 -----