王宁凯的个人博客

Laravel 中使用 Beanstalkd 消息队列系统发送邮件

2019.06.26

Beanstalkd是一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5百万用户的Facebook Causes应用。处理能力接收到考验,官网最新版本已是四年前的版本,不顾这丝毫不影响其性能。

环境准备

  • 系统环境 :centos7 + mysql 5.7 + nginx 1.14 + php7.2 + Laravel 5.6(仅供参考)

  • Supervisor 进程管理工具。 使用 Supervisor 接管 Beanstalkd ,前面文章我已说明如何安装配置 Supervisor 。如果前面没有阅读过或者安装过,建议去看一下 地址:Supervisor 安装配置与使用详解

  • Beanstalkd 消息队列系统。Beanstalkd 的安装也在前面的文章提到过。安装比较简单。安装参考地址:Beanstalkd - 高性能、轻量级的分布式内存队列系统

  • Laravel Beanstalkd队列系统扩展包。这里使用 pda/pheanstalk ~3.0 使用 composer require pda/pheanstalk 命令引入或者加入到 composer.json 文件中 。

创建任务

作为演示,这里主要以我博客为例子,在 Laravel 框架里使用队列发送通知邮件 。首先确保前面的环境准备好 。

生成邮件发送任务类

在你的应用程序中,队列的任务类都默认放在 app/Jobs 目录下。如果这个目录不存在,那当你运行 artisan make:job 命令时目录就会被自动创建。你可以用以下的 Artisan 命令来生成一个发送邮件的队列任务:

php artisan make:job SendEmail

执行命令会在 app/jobs 目录下生成一个SendEmail 任务类。我们需要对默认的类进行修改,使其达到我们的要求。

例如:

<?php

namespace App\Jobs;

use App\Helpers\Extensions\Tool;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class SendEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 任务最大尝试次数。
     *
     * @var int
     */
    public $tries = 5;

    /**
     * 任务运行的超时时间。
     *
     * @var int
     */
    public $timeout = 300;

    /**
     * 收件人邮箱地址
     *
     * @var
     */
    protected $email;

    /**
     * 收件人名称
     *
     * @var
     */
    protected $name;

    /**
     * 邮件标题
     *
     * @var
     */
    protected $subject;

    /**
     * 邮件内容数据
     *
     * @var $content
     */
    protected $data;

    /**
     * SendEmail constructor.
     * @param $param
     */
    public function __construct($param)
    {
        $this->email = $param['email'];
        $this->name = $param['name'];
        $this->subject = $param['subject'];
        $this->data = $param['data'];
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // 这里使用的我封装的发送邮件 助手函数
        Tool::sendEmail($this->email, $this->name, $this->subject, $this->data, 'emails.base');
    }
}

发送邮件助手函数:

    /**
     * 发送邮件函数
     *
     * @param  string $email           邮箱  如果群发 则传入数组
     * @param string $name             名称
     * @param string $subject          标题
     * @param array $data              邮件模板中用的变量 示例:['name'=>'xxx','content'=>'xxx']
     * @param string $template         邮件模板
     * @return array                   发送状态
     */
    function sendEmail($email, $name, $subject, $data = [], $template = 'emails.base')
    {
        Mail::send($template, $data, function ($message) use ($email, $name, $subject) {
            if (is_array($email)) {
                foreach ($email as $k => $v) {
                    $message->to($v, $name)->subject($subject);
                }
            } else {
                $message->to($email, $name)->subject($subject);
            }
        });
        return (count(Mail::failures()) > 0)?['status_code' => 500, 'message' => '邮件发送失败']:['status_code' => 200, 'message' => '邮件发送成功'];
    }

到这里创建任务类完成,下面是分发任务。

分发任务

创建任务类完成 后,就是任务的触发和分发了 。Laravel 的任务类提供 dispatch 方法分发它。传递给 dispatch 方法的参数将会被传递给任务的构造函数:

立即分发


use use App\Jobs\SendEmail;

// 具体方法中分发 ,注意 下面链接的 是 Beanstalkd 驱动
SendEmail::dispatch($param)->onConnection('beanstalkd');

延迟分发


use use App\Jobs\SendEmail;

// 延迟10分钟
SendEmail::dispatch($param)->delay(now()->addMinutes(10));->onConnection('beanstalkd');

Laravel 中还提供很多参数 配置,这里不再多说。具体参考 官方文档 队列

队列处理器

执行命令 php artisan queue:work 开启队列监听。

建议使用进程管理器 Supervisor 来确保队列处理器不会停止运行。

这里贴出我的 Supervisor 配置:

[program:lablog-beanstalkd-queue-1]
command=/your/php/path/php /your/laravel/path/artisan queue:work beanstalkd --daemon 
process_name=%(program_name)s_%(process_num)02d
numprocs=3
autostart=true
startsecs = 5
autorestart=true
startretries=3
user=root
stopsignal=INT
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=10
stdout_capture_maxbytes=10MB
redirect_stderr=true
stdout_logfile=/etc/supervisor/logs/lablog-beanstalkd-queue-1.log

在这里 numprocs 会告诉 Supervisor 运行 3 个 queue:work 进程并且管理它们,当它们关闭时会将其自动重启。当然,你应该将 command 选项中的 queue:work 部分修改为你的队列连接。

启动 supervisor 即可进行队列监听。要了解更多关于 Supervisor 的信息,查询 Supervisor 文档

Snipaste_2018-09-08_11-39-47.png

接下来皆可以 再具体逻辑中使用任务发送邮件,想要查看队列执行 的情况可以通过日志文件查看。通常如下 :

[2018-09-04 15:34:09][16] Processed:  App\Jobs\SendEmail
[2018-09-04 15:42:57][17] Processing: App\Jobs\SendEmail
[2018-09-04 15:42:58][17] Processed:  App\Jobs\SendEmail
[2018-09-04 23:25:57][1] Processing: App\Jobs\SendEmail
[2018-09-04 23:25:58][1] Processed:  App\Jobs\SendEmail
[2018-09-05 21:00:35][3] Processing: App\Jobs\SendEmail
[2018-09-05 21:00:35][4] Processing: App\Jobs\SendEmail
[2018-09-05 21:00:35][2] Processing: App\Jobs\SendEmail
[2018-09-05 21:00:37][3] Processed:  App\Jobs\SendEmail
[2018-09-05 21:00:37][4] Processed:  App\Jobs\SendEmail
[2018-09-05 21:00:37][2] Processed:  App\Jobs\SendEmail

如果想 要查看 Beanstalkd 中 队列情况,这需要安装具体的 Beanstalkd 可视化工具,这里推荐两个:具体安装使用参考原作者。

  1. ptrofimov/beanstalk_console 这个项目作为消息队列的控制台,直观的查看队列任务的执行。packagist 地址
  2. xuri/aurora aurora是一个基于Web的Beanstalk队列服务器控制台,用Go编写,适用于macOS,Linux和Windows机器。 github 地址