Laravel实现队列的示例代码

Laravel框架
721
0
0
2023-08-05
目录
  • 一:队列配置
  • 1:队列相关配置
  • 2:不同队列依赖
  • 二:创建队列任务
  • 三:任务分发
  • 1:默认分发
  • 2:延时分发
  • 3:指定队列分发
  • 4:指定驱动分发
  • 5:指定驱动和队列分发
  • 四:任务处理
  • 五:失败任务处理
  • 六:使用Supervisor管理队列
  • 1:Supervisor安装
  • 2:配置Supervisor
  • 3:启动Supervisor
  • 补充

一:队列配置

队列的配置文件放置在config/queue.php文件中,laravel框架中支持的队列驱动有:sync, database, beanstalkd, sqs, redis,null对应着:同步(本地使用)驱动,数据库驱动,beanstalkd ,Amazon SQS ,redis,null 队列驱动用于那些放弃队列的任务

1:队列相关配置

(1):队列驱动配置

'default' => env('QUEUE_DRIVER', 'sync'),//队列驱动设置

(2):不同驱动相关配置

'connections' => [
    syns驱动配置
    'sync' => [
        'driver' => 'sync',
    ],
    数据库驱动配置
    'database' => [
        'driver' => 'database',
        'table' => 'jobs',//数据库驱动配置使用的数据库
        'queue' => 'default',
        'retry_after' =>,//指定了任务最多处理多少秒后就被当做失败重试,比如说,如果这个选项设置为 90,那么当这个任务持续执行了 90 秒而没有被删除,那么它将被释放回队列
    ],
    //beanstalkd驱动配置
    'beanstalkd' => [
        'driver' => 'beanstalkd',
        'host' => 'localhost',//使用beanstalkd驱动地址
        'queue' => 'default',
        'retry_after' =>,//指定了任务最多处理多少秒后就被当做失败重试,比如说,如果这个选项设置为 90,那么当这个任务持续执行了 90 秒而没有被删除,那么它将被释放回队列
        'block_for' =>,
    ],
    //sqs驱动配置
    'sqs' => [
        'driver' => 'sqs',
        'key' => env('AWS_ACCESS_KEY_ID'),
        'secret' => env('AWS_SECRET_ACCESS_KEY'),
        'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-.amazonaws.com/your-account-id'),
        'queue' => env('SQS_QUEUE', 'your-queue-name'),
        'region' => env('AWS_DEFAULT_REGION', 'us-east-'),
    ],
    //redis驱动配置
    'redis' => [
        'driver' => 'redis',
        'connection' => 'default',//使用哪个连接的redis,redis配置是在config/database.php文件中
        'queue' => env('REDIS_QUEUE', 'default'),
        'retry_after' =>,
        'block_for' => null,
    ],
 
],

2:不同队列依赖

(1):数据库驱动

使用数据库驱动需要生成一个队列驱动表

php artisan queue:table
php artisan migrate

执行上面的命令之后会发现数据库中会增加一个jobs表

(2):redis驱动

使用redis驱动需要安装一个predis/predis 拓展

composer require predis/predis

(3):Amazon SQS驱动

使用Amazon SQS驱动时需要安装aws/aws-sdk-php拓展

composer require aws/aws-sdk-php

(4):Beanstalkd驱动

使用Beanstalkd驱动需要安装pda/pheanstalk拓展

composer require pda/pheanstalk

二:创建队列任务

php artisan make:job TestJobs

执行上面的命令创建一个队列任务类,这时候会发现在app/jobs目录下生成一个TestJobs.php文件

简单的队列任务类实例:

<?php
namespace App\Jobs;
use App\Models\blog\User;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class TestJobs implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    /**
     * 任务最大尝试次数。
     *
     * @var int
     */
    public $tries =;
    /**
     * 任务运行的超时时间。
     * 指定了 Laravel 队列处理器最多执行多长时间后就应该被关闭掉
     * --timeout 应该永远都要比 retry_after 短至少几秒钟的时间。这样就能保证任务进程总能在失败重试前就被杀死了。
     * 如果你的 --timeout 选项大于 retry_after 配置选项,你的任务可能被执行两次
     *
     * @var int
     */
    public $timeout =;
    public $info;
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($info)
    {
        //
        $this->info = $info;
    }
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
        $user = new User();
        $user->user_no = $this->info['user_no'];
        $user->user_name = $this->info['user_name'];
        $user->save();
    }
}

三:任务分发

1:默认分发

$info = [
    'user_no'=>'',
    'user_name'=>'testName'
];
TestJobs::dispatch($info);

2:延时分发

TestJobs::dispatch($info)->delay(Carbon::now()->addMinutes());//表示延时十分钟分发任务

3:指定队列分发

TestJobs::dispatch($info)->onQueue('processing');//表示使用默认驱动的processing队列

4:指定驱动分发

TestJobs::dispatch($info)->onConnection('redis');//使用redis驱动的默认队列

5:指定驱动和队列分发

TestJobs::dispatch($info)->onConnection('redis')->onQueue('processing');//使用redis驱动的processing队列

四:任务处理

php artisan queue:work

执行后我们会发现user表中发现多了一条user_no为006,user_name为testName数据,但是如果你指定了驱动和队列的话,这时候执行php artisan queue:work,你就会发现数据库中没有数据加进去,这是因为php artisan queue:work命令是对默认驱动和'default'队列监听,这时候就要使用:

php artisan queue:work redis --queue="processing"  //redis表示指定驱动 processing表示指定队列

五:失败任务处理

php artisan queue:failed-table
php artisan migrate

执行上面命令后会在数据库中增加一张failed_jobs表,专门用于存储失败的任务信息,在TestJobs类中添加一个failed方法处理失败队列

/**
 * 要处理的失败任务。
 *
 * @param  Exception  $exception
 * @return void
 */
public function failed(Exception $exception)
{
    // 给用户发送失败通知,等等...
}

如果你想要注册一个只要当队列任务失败时就会被调用的事件,我们可以在 Laravel 的 app/Providers/AppServiceProvider.php文件中对这个事件附加一个回调函数即可

/**
 * 启动任意应用程序的服务。
 *
 * @return void
 */
public function boot()
{
    Queue::failing(function (JobFailed $event) {
        // $event->connectionName
        // $event->job
        // $event->exception
    });
}

六:使用Supervisor管理队列

一旦使用queue:work 命令,它将一直运行,直到你手动停止或者你关闭控制台,如果你想要让queue:work 命令永久在后台运行,这时候可以使用进程监控工具Supervisor来实现永久在后台运行

1:Supervisor安装

Supervisor安装可以参考文末补充内容

2:配置Supervisor

(1):配置supervisord.conf

在/etc/supervisord.conf文件的最后一行增加

files = supervisord.d/*.ini

(2):队列进程配置

在/etc/supervisord.d/目录下创建一个.ini文件

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work database --sleep=3 --tries=3 #/home/forge/app.com为项目地址
autostart=true
autorestart=true
user=forge
numprocs=8 #numprocs 命令会要求 Supervisor 运行并监控 8 个 queue:work 进程
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

3:启动Supervisor

systemctl start supervisord.service
supervisorctl update
supervisorctl reload
supervisorctl start laravel-worker

补充

Linux下使用supervisor实现进程管理

最近在linux下写了一个脚本需要在linux后台一直运行,这里我使用了supervisor来实现脚本进程管理

supervisor安装

1:yum安装

yum install supervisor

2:pip安装

pip install supervisor

supervisor常用命令

supervisorctl status        //查看所有进程的状态
supervisorctl stop xx       //停止指定进程(all为所有进程)
supervisorctl start xx      //启动指定进程(all为所有进程)
supervisorctl restart       //重启
supervisorctl update        //配置文件修改后使用该命令加载新的配置
supervisorctl reload        //重新启动配置中的所有程序
systemctl start supervisord.service     //启动supervisor并加载默认配置文件
systemctl enable supervisord.service    //将supervisor加入开机启动项

将指定命令加入进程管理实例

1:supervisor配置

supervisor配置文件:/etc/supervisord.conf

子进程配置文件路径:/etc/supervisord.d/ (子进程的配置文件为ini格式)

我们增加一个命令到进程中只需要在子进程配置文件目录下创建一个ini进程文件进行配置即可

例:

vim /etc/supervisord.d/test.ini

在test.ini文件中加入如下命令:

[program:test]  #项目进程名称
dircetory=/XXX  #进程目录
command=XXX  #进程命令
autostart = true  #在supervisord启动的时候是否自动启动
autorestart=false  #程序退出后是否自动重启
#日志输出 
stderr_logfile=/tmp/client_stderr.log
stdout_logfile=/tmp/client_stdout.log
user=www  #脚本运行的用户身份 

2:将test进程加入进程管理

systemctl start supervisord.service
supervisorctl update
supervisorctl reload
supervisorctl start test

根据如上布置就可以实现将指定脚本加入进程管理