admin 管理员组

文章数量: 1086866

简述thinkphp自带队列think

think-queue是thinkphp官方提供的一个消息队列服务,适用于大并发、返回结果时间较长、需要批量操作等专门支持队列服务的扩展包。例如短信发送、模板消息邮件等推送。可以进行发布、获取、执行、删除、重发、失败处理、延迟执行、操作控制等操作。

1、安装think-queue

使用composer进行安装,在项目根目录,执行: composer require topthink/think-queue

2、配置消息队列的存储环境

执行完composer安装命令后,会自动生成一个queue.php的配置文件。
这里我们用redis驱动。当然还可以使用别的驱动,例如:database驱动(与redis相比不推荐)、sync等。

<?php /**  * 消息队列配置  * 内置驱动:redis、database、topthink、sync
*/ 
use think\facade\Env; 
return ['default'     => 'redis','prefix'      => 'prefix_','connections' => ['sync'     => ['type' => 'sync',],'database' => ['type'       => 'database','queue'      => 'default','table'      => 'jobs',],'redis'    => ['type'       => 'redis','queue'      => Env::get('queue.queue_name', ''),'host'       => Env::get('redis.redis_hostname', '127.0.0.1'),'port'       => Env::get('redis.port', 6379),'password'   => Env::get('redis.redis_password', ''),'select'     => Env::get('redis.select', 0),'timeout'    => 0,'persistent' => false,],],'failed'      => ['type'  => 'none','table' => 'failed_jobs',], 
];

3、队列生产者

修改app/api/controller/Test.php里使用Queue::later或者Queue::push发布任务。
queue::push(); //创建同步任务
Queue::later(); //创建异步任务,延时执行

<?php
namespace app\api\controller;
use think\Queue;class Test extends Controller;
{public function queue(){//获取参数$data = file_get_contents("php://input");if(empty($data)){$this->error("post is null");}$params = json_decode($data, true);/*创建新消息并推送到消息队列*/// 当前任务由哪个类负责处理$job_handler_classname = "app\jobs\Send";// 当前队列归属的队列名称$job_queue_name = "send_job_queue";// 当前任务所需的业务数据$job_data = ["ts"=>time(),"uniqid"=>uniqid(), "params"=>$params];// 将任务推送到消息队列等待对应的消费者去执行$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);if($is_pushed == false){$this->error("send job queue went wrong");}//操作成功$this->success('success');}
}

4、消息的消费和删除。

在app目录下创建jobs文件夹,并创建Send.php类。

<?php
namespace app\jobs;
use think\queue\Job;/*** 消费者类* 用于处理 send_job_queue 队列中的任务
*/
class Send
{/*** fire是消息队列默认调用的方法* @param Job $job 当前的任务对象* @param array|mixed $data 发布任务时自定义的数据*/public function fire(Job $job, $data){//有效消息到达消费者时可能已经不再需要执行了if(!$this->checkJob($data)){$job->delete();return;}//执行业务处理if($this->doJob($data)){$job->delete();//任务执行成功后删除}else{//检查任务重试次数if($job->attempts() > 3){$job->delete();}}}/*** 消息在到达消费者时可能已经不需要执行了* @param array|mixed $data 发布任务时自定义的数据* @return boolean 任务执行的结果*/private function checkJob($data){$ts = $data["ts"];$uniqid = $data["uniqid"];$params = $data["params"];return true;}/*** 根据消息中的数据进行实际的业务处理*/private function doJob($data){// 实际业务流程处理return true;}
}

5、监听任务并执行

work和listen两种,具体的用法和可选参数可以输入命令加 --help 查看

php think queue:listen
php think queue:work 

以上queue配置、发布、删除、执行、监听等一个简单的队列完成。实际生产中,我们需要用常驻进程的方式运行,防止因为服务器出现问题导致监听退出手动再次启动,这就得用的我们重点提到的Supervisor。Supervisord 是用 Python 实现的一款的进程管理工具,supervisord 要求管理的程序是非 daemon 程序,supervisord 会帮你把它转成 daemon 程序,因此如果用 supervisord 来管理进程,进程需要以非daemon的方式启动。

6、安装Supervisor

yum install -y supervisor

7、查看Supervisor安装位置

supervisor安装完成后,会在/usr/bin下生成三个执行程序:supervisortd、supervisorctl、echo_supervisord_conf,分别是supervisor的守护进程服务(用于接收进程管理命令)、客户端(用于和守护进程通信,发送管理进程的指令)、生成初始配置文件程序。

[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisord
supervisord: /usr/bin/supervisord /etc/supervisord.d /etc/supervisord.conf
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis echo_supervisord_conf
echo_supervisord_conf: /usr/bin/echo_supervisord_conf
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisorctl    
supervisorctl: /usr/bin/supervisorctl
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 

8、修改配置文件

vi /etc/supervisord.conf
[include]
files = supervisord.d/*.ini

9、自定义待守护进程配置文件

在 /etc/supervisord.d 下创建以.ini 后缀的文件。编辑如下配置:
[program:shop-queue] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
command=php /www/wwwroot/shop/think queue:listen
autostart=true ;在 supervisord 启动的时候也自动启动
autorestart=true ; 程序异常退出后自动重启
user=root ;用哪个用户启动
redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
stdout_logfile_backups=20 ;stdout 日志文件备份数
stderr_logfile=/www/wwwroot/shop/public/queue/worker_err.log ; 错误日志文件
stdout_logfile=/www/wwwroot/shop/public/queue/worker.log  ;输出日志文件

10、Surpervisor的启动

# supervisord二进制启动
supervisord -c /etc/supervisord.conf
# 检查进程
ps aux | grep supervisord
# 更新Supervisor
supervisorctl update

欢迎留言讨论。

本文标签: 简述thinkphp自带队列think