実務でQueueに絡んだ処理をする機会があったのでメモ。
Queueとは
倉庫管理システムを作ってた時によく出たネタですが、時間がかかる処理があったとして、その処理は同期的に処理するのではなく、ストックしておき、あとで処理(あるいはajaxのように単純に非同期)したいなんてケースがあります。
こういう時に使われるのがQueueという概念です。
上の例の方が私のよりわかりやすい・・・(汗)
Queueで大事なこととしては
「キューを記録しておく場所」、「処理を登録する場面」と「実際の処理を行う場面」の認識でしょうか。
これを理解するのに少し時間が掛かりました・・(汗)
Laravelでこれを実行して見ます。
Laravelでのキュー処理
全体的な処理ですが、下記リンクがわかりやすくこれが最小構成で一番わかりやすいです。このブログも8割以上ここのパクリです(笑)
Laravelの処理をQueueにスタックしてレスポンスのチューニングを行おう
キューを記録しておく場所
Laravelでストック場所として定義しておけるのは下記5つです。
- sync・・defaultではこれ。同期処理らしく積極的に使う意味はないと思われます。
- database・・DBに登録。
- Beanstalk・・調査中(汗)
- Amazon SQS・・AWSのサービスです。
- Redis・・KVSのサービスです。
databaseがわかりやすいのでこれで見ておきます。
laravel5.5のキュー投入によるジョブ処理を導入する
下記2つのファイルを設定しておきましょう。
.env
1 |
QUEUE_DRIVER=database |
app/queue.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
<?php return [ /* |-------------------------------------------------------------------------- | Default Queue Driver |-------------------------------------------------------------------------- | | Laravel's queue API supports an assortment of back-ends via a single | API, giving you convenient access to each back-end using the same | syntax for each one. Here you may set the default queue driver. | | Supported: "sync", "database", "beanstalkd", "sqs", "redis", "null" | */ #ここで選択される設定を決めます 'default' => env('QUEUE_DRIVER', 'database'), /* |-------------------------------------------------------------------------- | Queue Connections |-------------------------------------------------------------------------- | | Here you may configure the connection information for each server that | is used by your application. A default configuration has been added | for each back-end shipped with Laravel. You are free to add more. | */ 'connections' => [ 'sync' => [ 'driver' => 'sync', ], 'database' => [ 'driver' => 'database', 'table' => 'jobs', 'queue' => 'default', 'retry_after' => 90, ], 'beanstalkd' => [ 'driver' => 'beanstalkd', 'host' => 'localhost', 'queue' => 'default', 'retry_after' => 90, ], 'sqs' => [ 'driver' => 'sqs', 'key' => 'your-public-key', 'secret' => 'your-secret-key', 'prefix' => 'https://sqs.us-east-1.amazonaws.com/your-account-id', 'queue' => 'your-queue-name', 'region' => 'us-east-1', ], 'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, ], ], /* |-------------------------------------------------------------------------- | Failed Queue Jobs |-------------------------------------------------------------------------- | | These options configure the behavior of failed queue job logging so you | can control which database and table are used to store the jobs that | have failed. You may change them to any database / table you wish. | */ 'failed' => [ 'database' => env('DB_CONNECTION', 'mysql'), 'table' => 'failed_jobs', ], ]; |
ちなみにdatabaseの場合、当然テーブルを作る必要があり、下記コマンドで下記のようなテーブルを自動作成しておくことができます。
1 2 3 |
$ php artisan queue:table $ php artisan queue:failed-table $ php artisan migrate |
処理を登録する場面(当然まだ処理は行いません)
次は使われる処理を登録します。
ちなみに一つ一つの処理をJobと呼びます。
LaravelではJobを下記コマンドですぐに作れます。
1 |
php artisan make:job TestJob |
app/Jobs/TestJob.php
2秒たってログを吐くという簡単な処理を書きます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
namespace App\Jobs; use App\Jobs\Job; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Bus\SelfHandling; use Illuminate\Contracts\Queue\ShouldQueue; class TestJob extends Job implements SelfHandling, ShouldQueue { use InteractsWithQueue, SerializesModels; /** * Create a new job instance. * * @return void */ public function __construct() { // } /** * Execute the job. * * @return void */ public function handle() { sleep(2); $exeTime = date('Y-m-d H:i:s'); \Log::info("処理実行 $exeTime"); } } |
そしてこの処理を登録します。
例えばwebであるURL(http://sample.com/queue)にアクセスをしたら登録するとします。
1 2 3 4 5 |
Route::get('/queue', function () { #ここで登録を行います。 Queue::push(new \App\Jobs\TestJob()); return 'At ' . date('Y-m-d H:i:s') . ' - queue pushed.'; }); |
DBを見るとJobが登録されているのがわかります。
1 2 3 4 5 6 7 8 9 10 |
mysql> select * from jobs \G; *************************** 1. row *************************** id: 2 queue: default payload: {"displayName":"App\\Jobs\\TestJob","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"timeout":null,"timeoutAt":null,"data":{"commandName":"App\\Jobs\\SampleJob","command":"O:18:\"App\\Jobs\\SampleJob\":7:{s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:7:\"chained\";a:0:{}}"}} attempts: 0 reserved_at: NULL available_at: 1532172265 created_at: 1532172265 1 row in set (0.00 sec) |
これでJobが登録されているのがわかるかと思います。
実際の処理を行う場面
ストックしたジョブを行う場面を見ていきます。
1 |
php artisan queue:work |
上記コマンドを実行するとDBに登録している処理が実行されます。
コンソールには下記のように処理が図れます。
1 2 |
[2018-07-21 20:34:02] Processing: App\Jobs\TestJob [2018-07-21 20:34:07] Processed: App\Jobs\TestJob |
ログを見ていると意図した文字が吐かれているのと、処理が行われているのがわかるかと思います。DBをみてあげるとJobが削除されているのがわかると思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
[2018-07-21 20:34:02] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172842,1532172752],2.62] [2018-07-21 20:34:02] local.DEBUG: SQL LOG:: ["update `jobs` set `reserved_at` = ?, `attempts` = ? where `id` = ?",[1532172842,1,2],0.7] [2018-07-21 20:34:07] local.INFO: 処理開始 - 2018-07-21 20:34:02 ... 処理終了 - 2018-07-21 20:34:07 [2018-07-21 20:34:07] local.DEBUG: SQL LOG:: ["select * from `jobs` where `id` = ? limit 1 for update",[2],0.75] [2018-07-21 20:34:07] local.DEBUG: SQL LOG:: ["delete from `jobs` where `id` = ?",[2],0.61] [2018-07-21 20:34:07] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172847,1532172757],0.56] [2018-07-21 20:34:10] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172850,1532172760],0.92] [2018-07-21 20:34:13] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172853,1532172763],0.77] [2018-07-21 20:34:16] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172856,1532172766],0.92] [2018-07-21 20:34:19] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172859,1532172769],0.76] [2018-07-21 20:34:23] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172863,1532172773],0.91] [2018-07-21 20:34:26] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172866,1532172776],0.9] [2018-07-21 20:34:29] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172869,1532172779],0.76] [2018-07-21 20:34:32] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172872,1532172782],0.79] [2018-07-21 20:34:35] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172875,1532172785],1.06] [2018-07-21 20:34:38] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172878,1532172788],0.88] [2018-07-21 20:34:41] local.DEBUG: SQL LOG:: ["select * from `jobs` where `queue` = ? and ((`reserved_at` is null and `available_at` <= ?) or (`reserved_at` <= ?)) order by `id` asc limit 1 for update",["default",1532172881,1532172791],0.96] |
ちなみに私が携わった処理の場合、登録は別処理だったため、結局この処理は使いませんでしたが、大変参考になりましたね・・・
[…] Queueの登録と実際の処理に関して […]
[…] Queueの登録と実際の処理に関して […]