Introduction

ウェブアプリケーションを構築する際、アップロードされたCSVファイルを解析して保存するなど、通常のウェブリクエスト中に実行するには時間がかかりすぎるタスクがあるかもしれません。幸いなことに、Laravelではバックグラウンドで処理されるキュー付きジョブを簡単に作成できます。時間のかかるタスクをキューに移動させることで、アプリケーションはウェブリクエストに対して驚異的なスピードで応答し、顧客により良いユーザー体験を提供できます。

Laravelのキューは、Amazon SQSRedis、またはリレーショナルデータベースなど、さまざまなキューバックエンドにわたる統一されたキューイングAPIを提供します。

Laravelのキュー設定オプションは、アプリケーションのconfig/queue.php設定ファイルに保存されています。このファイルには、データベース、Amazon SQSRedis、およびBeanstalkdドライバーを含む、フレームワークに含まれる各キュードライバーの接続設定が見つかります。また、ジョブを即座に実行するための同期ドライバーも含まれています(ローカル開発中に使用)。キューに追加されたジョブを破棄するnullキュードライバーも含まれています。

Laravelは現在、Redisを使用したキューのための美しいダッシュボードと設定システムであるHorizonを提供しています。詳細については、完全なHorizonドキュメントを確認してください。

Connections vs. Queues

Laravelのキューを始める前に、「接続」と「キュー」の違いを理解することが重要です。config/queue.php設定ファイルには、connections設定配列があります。このオプションは、Amazon SQS、Beanstalk、またはRedisなどのバックエンドキューサービスへの接続を定義します。ただし、特定のキュー接続には、異なるスタックやキューされたジョブの山と考えられる複数の「キュー」がある場合があります。

  1. ``````php
  2. use App\Jobs\ProcessPodcast;
  3. // This job is sent to the default connection's default queue...
  4. ProcessPodcast::dispatch();
  5. // This job is sent to the default connection's "emails" queue...
  6. ProcessPodcast::dispatch()->onQueue('emails');
  7. `

一部のアプリケーションでは、複数のキューにジョブをプッシュする必要がない場合があり、代わりに1つのシンプルなキューを好むことがあります。ただし、複数のキューにジョブをプッシュすることは、ジョブの処理を優先したりセグメント化したりしたいアプリケーションにとって特に便利です。Laravelのキュー作業者は、優先順位によって処理すべきキューを指定できます。たとえば、highキューにジョブをプッシュすると、より高い処理優先度を与える作業者を実行できます:

  1. php artisan queue:work --queue=high,default

Driver Notes and Prerequisites

Database

databaseキュードライバーを使用するには、ジョブを保持するためのデータベーステーブルが必要です。通常、これはLaravelのデフォルトの0001_01_01_000002_create_jobs_table.php データベースマイグレーションに含まれています。ただし、アプリケーションにこのマイグレーションが含まれていない場合は、make:queue-table Artisanコマンドを使用して作成できます:

  1. php artisan make:queue-table
  2. php artisan migrate

Redis

  1. `````serializer`````および`````compression````` Redisオプションは、`````redis`````キュードライバーではサポートされていません。
  2. **Redisクラスタ**
  3. Redisキュー接続がRedisクラスタを使用している場合、キュー名には[key hash tag](https://redis.io/docs/reference/cluster-spec/#hash-tags)を含める必要があります。これは、特定のキューのすべてのRedisキーが同じハッシュスロットに配置されることを保証するために必要です:
  4. ``````php
  5. 'redis' => [
  6. 'driver' => 'redis',
  7. 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
  8. 'queue' => env('REDIS_QUEUE', '{default}'),
  9. 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
  10. 'block_for' => null,
  11. 'after_commit' => false,
  12. ],
  13. `

ブロッキング

Redisキューを使用する場合、block_for設定オプションを使用して、ジョブが利用可能になるまでドライバーがどれくらい待機するかを指定できます。

この値をキューの負荷に基づいて調整することで、新しいジョブのためにRedisデータベースを継続的にポーリングするよりも効率的です。たとえば、値を5に設定すると、ジョブが利用可能になるまでドライバーが5秒間ブロックすることを示します:

  1. 'redis' => [
  2. 'driver' => 'redis',
  3. 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
  4. 'queue' => env('REDIS_QUEUE', 'default'),
  5. 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
  6. 'block_for' => 5,
  7. 'after_commit' => false,
  8. ],
  1. <a name="other-driver-prerequisites"></a>
  2. #### Other Driver Prerequisites
  3. 次の依存関係は、リストされたキュードライバーに必要です。これらの依存関係は、Composerパッケージマネージャーを介してインストールできます:
  4. - Amazon SQS: `````aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0またはphpredis PHP拡張

Creating Jobs

Generating Job Classes

デフォルトでは、アプリケーションのすべてのキュー可能なジョブはapp/Jobsディレクトリに保存されます。app/Jobsディレクトリが存在しない場合、make:job Artisanコマンドを実行すると作成されます:

  1. php artisan make:job ProcessPodcast

生成されたクラスはIlluminate\Contracts\Queue\ShouldQueueインターフェースを実装し、ジョブが非同期で実行されるためにキューにプッシュされるべきであることをLaravelに示します。

ジョブスタブは、スタブの公開を使用してカスタマイズできます。

Class Structure

ジョブクラスは非常にシンプルで、通常はジョブがキューによって処理されるときに呼び出されるhandleメソッドのみを含みます。始めるために、例のジョブクラスを見てみましょう。この例では、ポッドキャスト配信サービスを管理していると仮定し、公開される前にアップロードされたポッドキャストファイルを処理する必要があります:

  1. <?php
  2. namespace App\Jobs;
  3. use App\Models\Podcast;
  4. use App\Services\AudioProcessor;
  5. use Illuminate\Contracts\Queue\ShouldQueue;
  6. use Illuminate\Foundation\Queue\Queueable;
  7. class ProcessPodcast implements ShouldQueue
  8. {
  9. use Queueable;
  10. /**
  11. * Create a new job instance.
  12. */
  13. public function __construct(
  14. public Podcast $podcast,
  15. ) {}
  16. /**
  17. * Execute the job.
  18. */
  19. public function handle(AudioProcessor $processor): void
  20. {
  21. // Process uploaded podcast...
  22. }
  23. }

この例では、キューされたジョブのコンストラクタにEloquentモデルを直接渡すことができたことに注意してください。ジョブが使用しているQueueableトレイトのおかげで、Eloquentモデルとその読み込まれたリレーションシップは、ジョブが処理されるときに優雅にシリアライズおよび非シリアライズされます。

キューされたジョブがコンストラクタにEloquentモデルを受け入れる場合、モデルの識別子のみがキューにシリアライズされます。ジョブが実際に処理されると、キューシステムは自動的にデータベースから完全なモデルインスタンスとその読み込まれたリレーションシップを再取得します。このモデルシリアライズのアプローチにより、キュードライバーに送信されるジョブペイロードがはるかに小さくなります。

handle Method Dependency Injection

  1. コンテナが`````handle`````メソッドに依存関係を注入する方法を完全に制御したい場合は、コンテナの`````bindMethod`````メソッドを使用できます。`````bindMethod`````メソッドは、ジョブとコンテナを受け取るコールバックを受け入れます。コールバック内では、`````handle`````メソッドを自由に呼び出すことができます。通常、`````App\Providers\AppServiceProvider````` [サービスプロバイダー](/read/laravel-11-x/934b4900a19307fc.md)の`````boot`````メソッドからこのメソッドを呼び出すべきです:
  2. ``````php
  3. use App\Jobs\ProcessPodcast;
  4. use App\Services\AudioProcessor;
  5. use Illuminate\Contracts\Foundation\Application;
  6. $this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
  7. return $job->handle($app->make(AudioProcessor::class));
  8. });
  9. `

生の画像コンテンツなどのバイナリデータは、キューされたジョブに渡す前にbase64_encode関数を通過させる必要があります。そうしないと、ジョブがキューに配置されるときにJSONに正しくシリアライズされない可能性があります。

Queued Relationships

すべての読み込まれたEloquentモデルのリレーションシップもジョブがキューに追加されるときにシリアライズされるため、シリアライズされたジョブ文字列は時々非常に大きくなる可能性があります。さらに、ジョブが非シリアライズされ、モデルのリレーションシップがデータベースから再取得されるとき、それらは完全に取得されます。ジョブキュー処理中にモデルがシリアライズされる前に適用された以前のリレーションシップ制約は、ジョブが非シリアライズされるときには適用されません。したがって、特定のリレーションシップのサブセットで作業したい場合は、キューされたジョブ内でそのリレーションシップを再制約する必要があります。

また、リレーションがシリアライズされないようにするには、プロパティ値を設定するときにモデルのwithoutRelationsメソッドを呼び出すことができます。このメソッドは、読み込まれたリレーションシップなしのモデルのインスタンスを返します:

  1. /**
  2. * Create a new job instance.
  3. */
  4. public function __construct(
  5. Podcast $podcast,
  6. ) {
  7. $this->podcast = $podcast->withoutRelations();
  8. }

PHPのコンストラクタプロパティプロモーションを使用していて、Eloquentモデルがリレーションをシリアライズしないことを示したい場合は、WithoutRelations属性を使用できます:

  1. use Illuminate\Queue\Attributes\WithoutRelations;
  2. /**
  3. * Create a new job instance.
  4. */
  5. public function __construct(
  6. #[WithoutRelations]
  7. public Podcast $podcast,
  8. ) {}

ジョブが単一のモデルの代わりにEloquentモデルのコレクションまたは配列を受け取る場合、そのコレクション内のモデルは、ジョブが非シリアライズされて実行されるときにリレーションシップが復元されません。これは、大量のモデルを扱うジョブで過剰なリソース使用を防ぐためです。

Unique Jobs

ユニークなジョブには、ロックをサポートするキャッシュドライバーが必要です。現在、memcachedredisdynamodbdatabasefile、およびarrayキャッシュドライバーは、原子ロックをサポートしています。さらに、ユニークジョブ制約はバッチ内のジョブには適用されません。

時には、特定のジョブのインスタンスがキューに存在することを保証したい場合があります。これを実現するには、ジョブクラスでShouldBeUniqueインターフェースを実装します。このインターフェースは、クラスに追加のメソッドを定義する必要はありません:

  1. <?php
  2. use Illuminate\Contracts\Queue\ShouldQueue;
  3. use Illuminate\Contracts\Queue\ShouldBeUnique;
  4. class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
  5. {
  6. ...
  7. }

上記の例では、UpdateSearchIndexジョブはユニークです。したがって、ジョブの別のインスタンスがすでにキューにあり、処理が完了していない場合、そのジョブはディスパッチされません。

特定の「キー」を定義してジョブをユニークにするか、ジョブがもはやユニークでなくなるタイムアウトを指定したい場合は、ジョブクラスにuniqueIdおよびuniqueForプロパティまたはメソッドを定義できます:

  1. <?php
  2. use App\Models\Product;
  3. use Illuminate\Contracts\Queue\ShouldQueue;
  4. use Illuminate\Contracts\Queue\ShouldBeUnique;
  5. class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
  6. {
  7. /**
  8. * The product instance.
  9. *
  10. * @var \App\Product
  11. */
  12. public $product;
  13. /**
  14. * The number of seconds after which the job's unique lock will be released.
  15. *
  16. * @var int
  17. */
  18. public $uniqueFor = 3600;
  19. /**
  20. * Get the unique ID for the job.
  21. */
  22. public function uniqueId(): string
  23. {
  24. return $this->product->id;
  25. }
  26. }

上記の例では、UpdateSearchIndexジョブは製品IDによってユニークです。したがって、同じ製品IDを持つジョブの新しいディスパッチは、既存のジョブが処理を完了するまで無視されます。さらに、既存のジョブが1時間以内に処理されない場合、ユニークロックは解除され、同じユニークキーを持つ別のジョブがキューにディスパッチされることができます。

アプリケーションが複数のウェブサーバーまたはコンテナからジョブをディスパッチする場合、すべてのサーバーが同じ中央キャッシュサーバーと通信していることを確認し、Laravelがジョブがユニークであるかどうかを正確に判断できるようにする必要があります。

Keeping Jobs Unique Until Processing Begins

デフォルトでは、ユニークなジョブは、ジョブが処理を完了するか、すべての再試行が失敗した後に「ロック解除」されます。ただし、ジョブが処理される前にすぐにロック解除したい場合があるかもしれません。これを実現するには、ジョブはShouldBeUniqueUntilProcessing契約を実装する必要があります。ShouldBeUnique契約の代わりに:

  1. <?php
  2. use App\Models\Product;
  3. use Illuminate\Contracts\Queue\ShouldQueue;
  4. use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
  5. class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
  6. {
  7. // ...
  8. }

Unique Job Locks

裏側では、ShouldBeUniqueジョブがディスパッチされると、LaravelはuniqueIdキーでロックを取得しようとします。ロックが取得できない場合、ジョブはディスパッチされません。このロックは、ジョブが処理を完了するか、すべての再試行が失敗したときに解除されます。デフォルトでは、Laravelはこのロックを取得するためにデフォルトのキャッシュドライバーを使用します。ただし、ロックを取得するために別のドライバーを使用したい場合は、使用すべきキャッシュドライバーを返すuniqueViaメソッドを定義できます:

  1. use Illuminate\Contracts\Cache\Repository;
  2. use Illuminate\Support\Facades\Cache;
  3. class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
  4. {
  5. ...
  6. /**
  7. * Get the cache driver for the unique job lock.
  8. */
  9. public function uniqueVia(): Repository
  10. {
  11. return Cache::driver('redis');
  12. }
  13. }

ジョブの同時処理を制限するだけが必要な場合は、WithoutOverlappingジョブミドルウェアを使用してください。

Encrypted Jobs

Laravelは、ジョブのデータのプライバシーと整合性を暗号化によって確保することを可能にします。始めるには、ジョブクラスにShouldBeEncryptedインターフェースを追加するだけです。このインターフェースがクラスに追加されると、Laravelは自動的にジョブをキューにプッシュする前に暗号化します:

  1. <?php
  2. use Illuminate\Contracts\Queue\ShouldBeEncrypted;
  3. use Illuminate\Contracts\Queue\ShouldQueue;
  4. class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
  5. {
  6. // ...
  7. }

Job Middleware

ジョブミドルウェアは、キューされたジョブの実行の周りにカスタムロジックをラップすることを可能にし、ジョブ自体のボイラープレートを減らします。たとえば、次のhandleメソッドを考えてみましょう。これは、LaravelのRedisレート制限機能を利用して、5秒ごとに1つのジョブのみを処理できるようにします:

  1. use Illuminate\Support\Facades\Redis;
  2. /**
  3. * Execute the job.
  4. */
  5. public function handle(): void
  6. {
  7. Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
  8. info('Lock obtained...');
  9. // Handle job...
  10. }, function () {
  11. // Could not obtain lock...
  12. return $this->release(5);
  13. });
  14. }

このコードは有効ですが、handleメソッドの実装は、Redisレート制限ロジックで混雑しているため、騒がしくなります。さらに、このレート制限ロジックは、レート制限したい他のジョブに対して複製する必要があります。

ハンドルメソッドでレート制限を行う代わりに、レート制限を処理するジョブミドルウェアを定義できます。Laravelにはジョブミドルウェアのデフォルトの場所がないため、アプリケーション内の任意の場所にジョブミドルウェアを配置できます。この例では、ミドルウェアをapp/Jobs/Middlewareディレクトリに配置します:

  1. <?php
  2. namespace App\Jobs\Middleware;
  3. use Closure;
  4. use Illuminate\Support\Facades\Redis;
  5. class RateLimited
  6. {
  7. /**
  8. * Process the queued job.
  9. *
  10. * @param \Closure(object): void $next
  11. */
  12. public function handle(object $job, Closure $next): void
  13. {
  14. Redis::throttle('key')
  15. ->block(0)->allow(1)->every(5)
  16. ->then(function () use ($job, $next) {
  17. // Lock obtained...
  18. $next($job);
  19. }, function () use ($job) {
  20. // Could not obtain lock...
  21. $job->release(5);
  22. });
  23. }
  24. }

ご覧のとおり、ルートミドルウェアと同様に、ジョブミドルウェアは処理中のジョブと、ジョブの処理を続行するために呼び出すべきコールバックを受け取ります。

ジョブミドルウェアを作成した後、ジョブのmiddlewareメソッドからそれらを返すことで、ジョブにアタッチできます。このメソッドは、make:job Artisanコマンドによってスカフォールドされたジョブには存在しないため、ジョブクラスに手動で追加する必要があります:

  1. use App\Jobs\Middleware\RateLimited;
  2. /**
  3. * Get the middleware the job should pass through.
  4. *
  5. * @return array<int, object>
  6. */
  7. public function middleware(): array
  8. {
  9. return [new RateLimited];
  10. }

ジョブミドルウェアは、キュー可能なイベントリスナー、メール、通知にも割り当てることができます。

Rate Limiting

独自のレート制限ジョブミドルウェアを書く方法を示しましたが、Laravelには実際にジョブをレート制限するために利用できるレート制限ミドルウェアが含まれています。ルートレートリミッターと同様に、ジョブレートリミッターはRateLimiterファサードのforメソッドを使用して定義されます。

たとえば、ユーザーがデータを1時間に1回バックアップできるようにし、プレミアム顧客にはそのような制限を課さないようにしたい場合、RateLimiterbootメソッドのAppServiceProviderで定義できます:

  1. use Illuminate\Cache\RateLimiting\Limit;
  2. use Illuminate\Support\Facades\RateLimiter;
  3. /**
  4. * Bootstrap any application services.
  5. */
  6. public function boot(): void
  7. {
  8. RateLimiter::for('backups', function (object $job) {
  9. return $job->user->vipCustomer()
  10. ? Limit::none()
  11. : Limit::perHour(1)->by($job->user->id);
  12. });
  13. }

上記の例では、1時間のレート制限を定義しましたが、perMinuteメソッドを使用して分単位のレート制限を簡単に定義できます。さらに、レート制限のbyメソッドに任意の値を渡すことができますが、この値は通常、顧客によってレート制限をセグメント化するために使用されます:

  1. return Limit::perMinute(50)->by($job->user->id);

レート制限を定義したら、Illuminate\Queue\Middleware\RateLimitedミドルウェアを使用してジョブにレートリミッターをアタッチできます。ジョブがレート制限を超えるたびに、このミドルウェアは適切な遅延を伴ってジョブをキューに戻します。

  1. use Illuminate\Queue\Middleware\RateLimited;
  2. /**
  3. * Get the middleware the job should pass through.
  4. *
  5. * @return array<int, object>
  6. */
  7. public function middleware(): array
  8. {
  9. return [new RateLimited('backups')];
  10. }

レート制限されたジョブをキューに戻すと、ジョブの合計attemptsが増加します。ジョブクラスのtriesおよびmaxExceptionsプロパティを適切に調整することをお勧めします。または、retryUntilメソッドを使用して、ジョブが再試行されなくなるまでの時間を定義することもできます。

ジョブがレート制限されているときに再試行されないようにするには、dontReleaseメソッドを使用できます:

  1. /**
  2. * Get the middleware the job should pass through.
  3. *
  4. * @return array<int, object>
  5. */
  6. public function middleware(): array
  7. {
  8. return [(new RateLimited('backups'))->dontRelease()];
  9. }

Redisを使用している場合は、Illuminate\Queue\Middleware\RateLimitedWithRedisミドルウェアを使用できます。これはRedisに最適化されており、基本的なレート制限ミドルウェアよりも効率的です。

Preventing Job Overlaps

Laravelには、任意のキーに基づいてジョブの重複を防ぐことができるIlluminate\Queue\Middleware\WithoutOverlappingミドルウェアが含まれています。これは、キューされたジョブが1回のジョブでのみ変更されるべきリソースを変更している場合に役立ちます。

たとえば、ユーザーのクレジットスコアを更新するキューされたジョブがあり、同じユーザーIDのクレジットスコア更新ジョブの重複を防ぎたいとします。これを実現するには、ジョブのmiddlewareメソッドからWithoutOverlappingミドルウェアを返すことができます:

  1. use Illuminate\Queue\Middleware\WithoutOverlapping;
  2. /**
  3. * Get the middleware the job should pass through.
  4. *
  5. * @return array<int, object>
  6. */
  7. public function middleware(): array
  8. {
  9. return [new WithoutOverlapping($this->user->id)];
  10. }

同じタイプの重複ジョブは、キューに戻されます。また、リリースされたジョブが再度試行されるまでに経過する必要がある秒数を指定することもできます:

  1. /**
  2. * Get the middleware the job should pass through.
  3. *
  4. * @return array<int, object>
  5. */
  6. public function middleware(): array
  7. {
  8. return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
  9. }

重複するジョブを即座に削除して再試行されないようにするには、dontReleaseメソッドを使用できます:

  1. /**
  2. * Get the middleware the job should pass through.
  3. *
  4. * @return array<int, object>
  5. */
  6. public function middleware(): array
  7. {
  8. return [(new WithoutOverlapping($this->order->id))->dontRelease()];
  9. }
  1. ``````php
  2. /**
  3. * Get the middleware the job should pass through.
  4. *
  5. * @return array<int, object>
  6. */
  7. public function middleware(): array
  8. {
  9. return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
  10. }
  11. `
  1. <a name="sharing-lock-keys"></a>
  2. #### Sharing Lock Keys Across Job Classes
  3. デフォルトでは、`````WithoutOverlapping`````ミドルウェアは同じクラスの重複ジョブのみを防ぎます。したがって、異なる2つのジョブクラスが同じロックキーを使用していても、重複を防ぐことはできません。ただし、`````shared`````メソッドを使用して、ジョブクラス間でキーを適用するようにLaravelに指示できます:
  4. ``````php
  5. use Illuminate\Queue\Middleware\WithoutOverlapping;
  6. class ProviderIsDown
  7. {
  8. // ...
  9. public function middleware(): array
  10. {
  11. return [
  12. (new WithoutOverlapping("status:{$this->provider}"))->shared(),
  13. ];
  14. }
  15. }
  16. class ProviderIsUp
  17. {
  18. // ...
  19. public function middleware(): array
  20. {
  21. return [
  22. (new WithoutOverlapping("status:{$this->provider}"))->shared(),
  23. ];
  24. }
  25. }
  26. `

Throttling Exceptions

Laravelには、例外をスロットルすることを可能にするIlluminate\Queue\Middleware\ThrottlesExceptionsミドルウェアが含まれています。ジョブが指定された数の例外をスローすると、ジョブを実行するためのすべてのさらなる試行は、指定された時間間隔が経過するまで遅延されます。このミドルウェアは、特に不安定なサードパーティサービスと対話するジョブに便利です。

たとえば、例外をスローし始めるサードパーティAPIと対話するキューされたジョブを想像してみましょう。例外をスロットルするには、ジョブのmiddlewareメソッドからThrottlesExceptionsミドルウェアを返すことができます。通常、このミドルウェアは時間ベースの試行を実装するジョブとペアにする必要があります:

  1. use DateTime;
  2. use Illuminate\Queue\Middleware\ThrottlesExceptions;
  3. /**
  4. * Get the middleware the job should pass through.
  5. *
  6. * @return array<int, object>
  7. */
  8. public function middleware(): array
  9. {
  10. return [new ThrottlesExceptions(10, 5 * 60)];
  11. }
  12. /**
  13. * Determine the time at which the job should timeout.
  14. */
  15. public function retryUntil(): DateTime
  16. {
  17. return now()->addMinutes(30);
  18. }

ミドルウェアが受け入れる最初のコンストラクタ引数は、ジョブがスロットルされる前にスローできる例外の数であり、2番目のコンストラクタ引数は、スロットルされた後にジョブが再試行されるまでに経過する必要がある秒数です。上記のコード例では、ジョブが10回連続して例外をスローすると、30分の時間制限に制約されて、5分間待機してから再試行します。

ジョブが例外をスローしますが、例外の閾値に達していない場合、ジョブは通常即座に再試行されます。ただし、backoffメソッドを呼び出して、ミドルウェアをジョブにアタッチするときに、ジョブが遅延される分数を指定できます:

  1. use Illuminate\Queue\Middleware\ThrottlesExceptions;
  2. /**
  3. * Get the middleware the job should pass through.
  4. *
  5. * @return array<int, object>
  6. */
  7. public function middleware(): array
  8. {
  9. return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
  10. }

内部的に、このミドルウェアはLaravelのキャッシュシステムを使用してレート制限を実装し、ジョブのクラス名がキャッシュの「キー」として利用されます。このキーは、ミドルウェアをジョブにアタッチするときにbyメソッドを呼び出すことでオーバーライドできます。これは、同じサードパーティサービスと対話する複数のジョブがあり、共通のスロットリング「バケット」を共有したい場合に便利です:

  1. use Illuminate\Queue\Middleware\ThrottlesExceptions;
  2. /**
  3. * Get the middleware the job should pass through.
  4. *
  5. * @return array<int, object>
  6. */
  7. public function middleware(): array
  8. {
  9. return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
  10. }

デフォルトでは、このミドルウェアはすべての例外をスロットルします。この動作は、ミドルウェアをジョブにアタッチするときにwhenメソッドを呼び出すことで変更できます。例外は、whenメソッドに提供されたクロージャがtrueを返す場合にのみスロットルされます:

  1. use Illuminate\Http\Client\HttpClientException;
  2. use Illuminate\Queue\Middleware\ThrottlesExceptions;
  3. /**
  4. * Get the middleware the job should pass through.
  5. *
  6. * @return array<int, object>
  7. */
  8. public function middleware(): array
  9. {
  10. return [(new ThrottlesExceptions(10, 10 * 60))->when(
  11. fn (Throwable $throwable) => $throwable instanceof HttpClientException
  12. )];
  13. }

スロットルされた例外をアプリケーションの例外ハンドラーに報告したい場合は、reportメソッドを呼び出すことで実行できます。オプションで、reportメソッドにクロージャを提供すると、指定されたクロージャがtrueを返す場合にのみ例外が報告されます:

  1. use Illuminate\Http\Client\HttpClientException;
  2. use Illuminate\Queue\Middleware\ThrottlesExceptions;
  3. /**
  4. * Get the middleware the job should pass through.
  5. *
  6. * @return array<int, object>
  7. */
  8. public function middleware(): array
  9. {
  10. return [(new ThrottlesExceptions(10, 10 * 60))->report(
  11. fn (Throwable $throwable) => $throwable instanceof HttpClientException
  12. )];
  13. }

Redisを使用している場合は、Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedisミドルウェアを使用できます。これはRedisに最適化されており、基本的な例外スロットリングミドルウェアよりも効率的です。

Skipping Jobs

  1. ``````php
  2. use Illuminate\Queue\Middleware\Skip;
  3. /**
  4. * Get the middleware the job should pass through.
  5. */
  6. public function middleware(): array
  7. {
  8. return [
  9. Skip::when($someCondition),
  10. ];
  11. }
  12. `
  1. ``````php
  2. use Illuminate\Queue\Middleware\Skip;
  3. /**
  4. * Get the middleware the job should pass through.
  5. */
  6. public function middleware(): array
  7. {
  8. return [
  9. Skip::when(function (): bool {
  10. return $this->shouldSkip();
  11. }),
  12. ];
  13. }
  14. `

Dispatching Jobs

ジョブクラスを書いたら、ジョブ自体のdispatchメソッドを使用してディスパッチできます。dispatchメソッドに渡される引数は、ジョブのコンストラクタに渡されます:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use App\Models\Podcast;
  6. use Illuminate\Http\RedirectResponse;
  7. use Illuminate\Http\Request;
  8. class PodcastController extends Controller
  9. {
  10. /**
  11. * Store a new podcast.
  12. */
  13. public function store(Request $request): RedirectResponse
  14. {
  15. $podcast = Podcast::create(/* ... */);
  16. // ...
  17. ProcessPodcast::dispatch($podcast);
  18. return redirect('/podcasts');
  19. }
  20. }

条件付きでジョブをディスパッチしたい場合は、dispatchIfおよびdispatchUnlessメソッドを使用できます:

  1. ProcessPodcast::dispatchIf($accountActive, $podcast);
  2. ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

新しいLaravelアプリケーションでは、syncドライバーがデフォルトのキュードライバーです。このドライバーは、現在のリクエストのフォアグラウンドでジョブを同期的に実行します。これは、ローカル開発中に便利です。バックグラウンド処理のために実際にジョブをキューに追加したい場合は、アプリケーションのconfig/queue.php設定ファイル内で別のキュードライバーを指定できます。

Delayed Dispatching

ジョブがキュー作業者によって即座に処理されないように指定したい場合は、ジョブをディスパッチするときにdelayメソッドを使用できます。たとえば、ジョブがディスパッチされてから10分後に処理可能であることを指定しましょう:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use App\Models\Podcast;
  6. use Illuminate\Http\RedirectResponse;
  7. use Illuminate\Http\Request;
  8. class PodcastController extends Controller
  9. {
  10. /**
  11. * Store a new podcast.
  12. */
  13. public function store(Request $request): RedirectResponse
  14. {
  15. $podcast = Podcast::create(/* ... */);
  16. // ...
  17. ProcessPodcast::dispatch($podcast)
  18. ->delay(now()->addMinutes(10));
  19. return redirect('/podcasts');
  20. }
  21. }

場合によっては、ジョブにデフォルトの遅延が設定されていることがあります。この遅延をバイパスして即座に処理のためにジョブをディスパッチする必要がある場合は、withoutDelayメソッドを使用できます:

  1. ProcessPodcast::dispatch($podcast)->withoutDelay();

Amazon SQSキューサービスには、最大遅延時間が15分あります。

Dispatching After the Response is Sent to the Browser

また、dispatchAfterResponseメソッドは、ウェブサーバーがFastCGIを使用している場合、HTTPレスポンスがユーザーのブラウザに送信された後にジョブのディスパッチを遅延させます。これにより、キューされたジョブがまだ実行されている間に、ユーザーがアプリケーションを使用し始めることができます。これは通常、メール送信など、約1秒かかるジョブにのみ使用されるべきです。これらは現在のHTTPリクエスト内で処理されるため、この方法でディスパッチされたジョブは、処理されるためにキュー作業者が実行されている必要はありません:

  1. use App\Jobs\SendNotification;
  2. SendNotification::dispatchAfterResponse();

クロージャをdispatchし、afterResponseヘルパーにdispatchメソッドをチェーンして、HTTPレスポンスがブラウザに送信された後にクロージャを実行することもできます:

  1. use App\Mail\WelcomeMessage;
  2. use Illuminate\Support\Facades\Mail;
  3. dispatch(function () {
  4. Mail::to('')->send(new WelcomeMessage);
  5. })->afterResponse();

Synchronous Dispatching

ジョブを即座に(同期的に)ディスパッチしたい場合は、dispatchSyncメソッドを使用できます。このメソッドを使用すると、ジョブはキューに追加されず、現在のプロセス内で即座に実行されます:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use App\Models\Podcast;
  6. use Illuminate\Http\RedirectResponse;
  7. use Illuminate\Http\Request;
  8. class PodcastController extends Controller
  9. {
  10. /**
  11. * Store a new podcast.
  12. */
  13. public function store(Request $request): RedirectResponse
  14. {
  15. $podcast = Podcast::create(/* ... */);
  16. // Create podcast...
  17. ProcessPodcast::dispatchSync($podcast);
  18. return redirect('/podcasts');
  19. }
  20. }

Jobs & Database Transactions

データベーストランザクション内でジョブをディスパッチすることは完全に問題ありませんが、ジョブが実際に成功裏に実行できることを確認するために特別な注意を払う必要があります。トランザクション内でジョブをディスパッチすると、親トランザクションがコミットされる前にワーカーによってジョブが処理される可能性があります。この場合、データベーストランザクション中にモデルやデータベースレコードに加えた更新が、まだデータベースに反映されていない可能性があります。さらに、トランザクション内で作成されたモデルやデータベースレコードは、データベースに存在しない可能性があります。

幸いなことに、Laravelはこの問題を回避するためのいくつかの方法を提供しています。まず、キュー接続の設定配列でafter_commit接続オプションを設定できます:

  1. 'redis' => [
  2. 'driver' => 'redis',
  3. // ...
  4. 'after_commit' => true,
  5. ],
  1. トランザクション中に発生した例外によりトランザクションがロールバックされた場合、そのトランザクション中にディスパッチされたジョブは破棄されます。
  2. `````after_commit`````設定オプションを`````true`````に設定すると、キューされたイベントリスナー、メール、通知、およびブロードキャストイベントも、すべてのオープンなデータベーストランザクションがコミットされた後にディスパッチされます。
  3. <a name="specifying-commit-dispatch-behavior-inline"></a>
  4. #### Specifying Commit Dispatch Behavior Inline
  5. `````after_commit`````キュー接続設定オプションを`````true`````に設定しない場合でも、特定のジョブがすべてのオープンなデータベーストランザクションがコミットされた後にディスパッチされるべきであることを示すことができます。これを実現するには、ディスパッチ操作に`````afterCommit`````メソッドをチェーンします:
  6. ``````php
  7. use App\Jobs\ProcessPodcast;
  8. ProcessPodcast::dispatch($podcast)->afterCommit();
  9. `

同様に、after_commit設定オプションがtrueに設定されている場合、特定のジョブが即座にディスパッチされるべきであることを示すことができます。オープンなデータベーストランザクションがコミットされるのを待たずに:

  1. ProcessPodcast::dispatch($podcast)->beforeCommit();

Job Chaining

ジョブチェイニングを使用すると、プライマリジョブが正常に実行された後に実行されるべきキューされたジョブのリストを指定できます。シーケンス内の1つのジョブが失敗すると、残りのジョブは実行されません。キューされたジョブチェーンを実行するには、chainファサードによって提供されるメソッドを使用できます。Laravelのコマンドバスは、キューされたジョブディスパッチングの上に構築された低レベルのコンポーネントです:

  1. use App\Jobs\OptimizePodcast;
  2. use App\Jobs\ProcessPodcast;
  3. use App\Jobs\ReleasePodcast;
  4. use Illuminate\Support\Facades\Bus;
  5. Bus::chain([
  6. new ProcessPodcast,
  7. new OptimizePodcast,
  8. new ReleasePodcast,
  9. ])->dispatch();

ジョブクラスインスタンスをチェーンするだけでなく、クロージャもチェーンできます:

  1. Bus::chain([
  2. new ProcessPodcast,
  3. new OptimizePodcast,
  4. function () {
  5. Podcast::update(/* ... */);
  6. },
  7. ])->dispatch();

ジョブ内で$this->delete()メソッドを使用してジョブを削除しても、チェーン内のジョブが処理されるのを防ぐことはありません。チェーンは、チェーン内のジョブが失敗した場合にのみ実行を停止します。

Chain Connection and Queue

チェーンされたジョブに使用される接続とキューを指定したい場合は、onConnectionおよびonQueueメソッドを使用できます。これらのメソッドは、キューされたジョブが明示的に異なる接続/キューを割り当てられない限り、使用されるキュー接続とキュー名を指定します:

  1. Bus::chain([
  2. new ProcessPodcast,
  3. new OptimizePodcast,
  4. new ReleasePodcast,
  5. ])->onConnection('redis')->onQueue('podcasts')->dispatch();

Adding Jobs to the Chain

時には、別のジョブから既存のジョブチェーンにジョブを追加または前置きする必要があるかもしれません。これをprependToChainおよびappendToChainメソッドを使用して実現できます:

  1. /**
  2. * Execute the job.
  3. */
  4. public function handle(): void
  5. {
  6. // ...
  7. // Prepend to the current chain, run job immediately after current job...
  8. $this->prependToChain(new TranscribePodcast);
  9. // Append to the current chain, run job at end of chain...
  10. $this->appendToChain(new TranscribePodcast);
  11. }

チェーンの失敗

ジョブをチェーンする際、catch メソッドを使用して、チェーン内のジョブが失敗した場合に呼び出されるクロージャを指定できます。指定されたコールバックは、ジョブの失敗を引き起こした Throwable インスタンスを受け取ります:

  1. use Illuminate\Support\Facades\Bus;
  2. use Throwable;
  3. Bus::chain([
  4. new ProcessPodcast,
  5. new OptimizePodcast,
  6. new ReleasePodcast,
  7. ])->catch(function (Throwable $e) {
  8. // A job within the chain has failed...
  9. })->dispatch();

チェーンコールバックはシリアライズされ、Laravelキューによって後で実行されるため、チェーンコールバック内で $this 変数を使用しないでください。

キューと接続のカスタマイズ

特定のキューへのディスパッチ

異なるキューにジョブをプッシュすることで、キューに入れたジョブを「カテゴリ分け」し、さまざまなキューに割り当てるワーカーの数を優先順位付けすることができます。これは、キュー設定ファイルで定義された異なるキュー「接続」にはプッシュされず、単一の接続内の特定のキューにのみプッシュされることに注意してください。キューを指定するには、ジョブをディスパッチする際に onQueue メソッドを使用します:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use App\Models\Podcast;
  6. use Illuminate\Http\RedirectResponse;
  7. use Illuminate\Http\Request;
  8. class PodcastController extends Controller
  9. {
  10. /**
  11. * Store a new podcast.
  12. */
  13. public function store(Request $request): RedirectResponse
  14. {
  15. $podcast = Podcast::create(/* ... */);
  16. // Create podcast...
  17. ProcessPodcast::dispatch($podcast)->onQueue('processing');
  18. return redirect('/podcasts');
  19. }
  20. }

また、ジョブのコンストラクタ内で onQueue メソッドを呼び出すことで、ジョブのキューを指定することもできます:

  1. <?php
  2. namespace App\Jobs;
  3. use Illuminate\Contracts\Queue\ShouldQueue;
  4. use Illuminate\Foundation\Queue\Queueable;
  5. class ProcessPodcast implements ShouldQueue
  6. {
  7. use Queueable;
  8. /**
  9. * Create a new job instance.
  10. */
  11. public function __construct()
  12. {
  13. $this->onQueue('processing');
  14. }
  15. }

特定の接続へのディスパッチ

アプリケーションが複数のキュー接続と対話する場合、onConnection メソッドを使用してジョブをプッシュする接続を指定できます:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use App\Models\Podcast;
  6. use Illuminate\Http\RedirectResponse;
  7. use Illuminate\Http\Request;
  8. class PodcastController extends Controller
  9. {
  10. /**
  11. * Store a new podcast.
  12. */
  13. public function store(Request $request): RedirectResponse
  14. {
  15. $podcast = Podcast::create(/* ... */);
  16. // Create podcast...
  17. ProcessPodcast::dispatch($podcast)->onConnection('sqs');
  18. return redirect('/podcasts');
  19. }
  20. }

onConnectiononQueue メソッドを連結して、ジョブの接続とキューを指定することもできます:

  1. ProcessPodcast::dispatch($podcast)
  2. ->onConnection('sqs')
  3. ->onQueue('processing');

また、ジョブのコンストラクタ内で onConnection メソッドを呼び出すことで、ジョブの接続を指定することもできます:

  1. <?php
  2. namespace App\Jobs;
  3. use Illuminate\Contracts\Queue\ShouldQueue;
  4. use Illuminate\Foundation\Queue\Queueable;
  5. class ProcessPodcast implements ShouldQueue
  6. {
  7. use Queueable;
  8. /**
  9. * Create a new job instance.
  10. */
  11. public function __construct()
  12. {
  13. $this->onConnection('sqs');
  14. }
  15. }

最大ジョブ試行回数 / タイムアウト値の指定

最大試行回数

キューに入れたジョブの1つがエラーに遭遇している場合、無限に再試行させたくないでしょう。したがって、Laravelはジョブが試行される回数や期間を指定するさまざまな方法を提供します。

ジョブが試行される最大回数を指定する1つの方法は、Artisanコマンドラインの --tries スイッチを使用することです。これは、処理されるジョブが試行される回数を指定しない限り、ワーカーによって処理されるすべてのジョブに適用されます:

  1. php artisan queue:work --tries=3

ジョブが最大試行回数を超えると、それは「失敗した」ジョブと見なされます。失敗したジョブの処理に関する詳細は、失敗したジョードキュメントを参照してください。--tries=0queue:work コマンドに提供されると、ジョブは無限に再試行されます。

ジョブクラス自体でジョブが試行される最大回数を定義することで、より詳細なアプローチを取ることができます。ジョブに最大試行回数が指定されている場合、それはコマンドラインで提供された --tries 値よりも優先されます:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * The number of times the job may be attempted.
  7. *
  8. * @var int
  9. */
  10. public $tries = 5;
  11. }

特定のジョブの最大試行回数を動的に制御する必要がある場合、ジョブに tries メソッドを定義できます:

  1. /**
  2. * Determine number of times the job may be attempted.
  3. */
  4. public function tries(): int
  5. {
  6. return 5;
  7. }

時間ベースの試行

ジョブが失敗する前に試行される回数を定義する代わりに、ジョブがもはや試行されない時間を定義することができます。これにより、特定の時間枠内でジョブを任意の回数試行できます。ジョブがもはや試行されない時間を定義するには、ジョブクラスに retryUntil メソッドを追加します。このメソッドは DateTime インスタンスを返す必要があります:

  1. use DateTime;
  2. /**
  3. * Determine the time at which the job should timeout.
  4. */
  5. public function retryUntil(): DateTime
  6. {
  7. return now()->addMinutes(10);
  8. }

また、キューイベントリスナーtries プロパティまたは retryUntil メソッドを定義することもできます。

最大例外

時には、ジョブが多くの回数試行されることを指定したいが、特定の数の未処理の例外によって再試行がトリガーされた場合には失敗すべきであることを指定したい場合があります(release メソッドによって直接解放されるのではなく)。これを実現するために、ジョブクラスに maxExceptions プロパティを定義できます:

  1. <?php
  2. namespace App\Jobs;
  3. use Illuminate\Support\Facades\Redis;
  4. class ProcessPodcast implements ShouldQueue
  5. {
  6. /**
  7. * The number of times the job may be attempted.
  8. *
  9. * @var int
  10. */
  11. public $tries = 25;
  12. /**
  13. * The maximum number of unhandled exceptions to allow before failing.
  14. *
  15. * @var int
  16. */
  17. public $maxExceptions = 3;
  18. /**
  19. * Execute the job.
  20. */
  21. public function handle(): void
  22. {
  23. Redis::throttle('key')->allow(10)->every(60)->then(function () {
  24. // Lock obtained, process the podcast...
  25. }, function () {
  26. // Unable to obtain lock...
  27. return $this->release(10);
  28. });
  29. }
  30. }

この例では、アプリケーションがRedisロックを取得できない場合、ジョブは10秒間解放され、最大25回まで再試行されます。ただし、ジョブによって3つの未処理の例外がスローされた場合、ジョブは失敗します。

タイムアウト

通常、キューに入れたジョブがどのくらいの時間がかかるかを大まかに知っています。このため、Laravelは「タイムアウト」値を指定することを許可します。デフォルトでは、タイムアウト値は60秒です。ジョブがタイムアウト値で指定された秒数よりも長く処理される場合、ジョブを処理しているワーカーはエラーで終了します。通常、ワーカーはサーバーに設定されたプロセスマネージャーによって自動的に再起動されます。

ジョブが実行できる最大秒数は、Artisanコマンドラインの --timeout スイッチを使用して指定できます:

  1. php artisan queue:work --timeout=30

ジョブがタイムアウトによって最大試行回数を超えると、失敗としてマークされます。

ジョブクラス自体でジョブが実行される最大秒数を定義することもできます。ジョブにタイムアウトが指定されている場合、それはコマンドラインで指定されたタイムアウトよりも優先されます:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * The number of seconds the job can run before timing out.
  7. *
  8. * @var int
  9. */
  10. public $timeout = 120;
  11. }

時には、ソケットや外部HTTP接続などのIOブロッキングプロセスが指定されたタイムアウトを尊重しない場合があります。したがって、これらの機能を使用する際は、常にAPIを使用してタイムアウトを指定するようにしてください。たとえば、Guzzleを使用する場合は、常に接続とリクエストのタイムアウト値を指定する必要があります。

ジョブタイムアウトを指定するには、pcntl PHP拡張がインストールされている必要があります。また、ジョブの「タイムアウト」値は常にその「再試行後」の値よりも小さい必要があります。そうしないと、ジョブが実行を完了する前に再試行される可能性があります。

タイムアウト時の失敗

ジョブがタイムアウト時に失敗としてマークされるべきであることを示したい場合、ジョブクラスに $failOnTimeout プロパティを定義できます:

  1. /**
  2. * Indicate if the job should be marked as failed on timeout.
  3. *
  4. * @var bool
  5. */
  6. public $failOnTimeout = true;

エラーハンドリング

ジョブが処理されている間に例外がスローされると、ジョブは自動的にキューに戻され、再試行されます。ジョブは、アプリケーションによって許可されている最大回数まで再試行され続けます。最大試行回数は、--tries スイッチによって queue:work Artisanコマンドで定義されます。あるいは、最大試行回数はジョブクラス自体で定義できます。キューのワーカーを実行することに関する詳細は、以下を参照

ジョブを手動で解放する

時には、ジョブを手動でキューに戻して、後で再試行できるようにしたい場合があります。これを実現するには、release メソッドを呼び出します:

  1. /**
  2. * Execute the job.
  3. */
  4. public function handle(): void
  5. {
  6. // ...
  7. $this->release();
  8. }

デフォルトでは、release メソッドはジョブを即座に処理のためにキューに戻します。ただし、release メソッドに整数または日付インスタンスを渡すことで、指定された秒数が経過するまでジョブを処理可能にしないようにキューに指示することができます:

  1. $this->release(10);
  2. $this->release(now()->addSeconds(10));

ジョブを手動で失敗させる

時には、ジョブを「失敗」として手動でマークする必要があります。これを行うには、fail メソッドを呼び出します:

  1. /**
  2. * Execute the job.
  3. */
  4. public function handle(): void
  5. {
  6. // ...
  7. $this->fail();
  8. }

キャッチした例外のためにジョブを失敗としてマークしたい場合は、例外を fail メソッドに渡すことができます。または、便利のために、文字列エラーメッセージを渡すことができ、これは例外に変換されます:

  1. $this->fail($exception);
  2. $this->fail('Something went wrong.');

失敗したジョブに関する詳細は、ジョブの失敗に関するドキュメントを確認してください。

ジョブバッチ処理

Laravelのジョブバッチ処理機能を使用すると、ジョブのバッチを簡単に実行し、ジョブのバッチが実行を完了したときに何らかのアクションを実行できます。始める前に、ジョブバッチに関するメタ情報を含むテーブルを構築するためのデータベースマイグレーションを作成する必要があります。たとえば、完了率などです。このマイグレーションは、make:queue-batches-table Artisanコマンドを使用して生成できます:

  1. php artisan make:queue-batches-table
  2. php artisan migrate

バッチ可能なジョブの定義

バッチ可能なジョブを定義するには、通常通りキュー可能なジョブを作成する必要があります。ただし、ジョブクラスに Illuminate\Bus\Batchable トレイトを追加する必要があります。このトレイトは、ジョブが実行されている現在のバッチを取得するために使用できる batch メソッドへのアクセスを提供します:

  1. <?php
  2. namespace App\Jobs;
  3. use Illuminate\Bus\Batchable;
  4. use Illuminate\Contracts\Queue\ShouldQueue;
  5. use Illuminate\Foundation\Queue\Queueable;
  6. class ImportCsv implements ShouldQueue
  7. {
  8. use Batchable, Queueable;
  9. /**
  10. * Execute the job.
  11. */
  12. public function handle(): void
  13. {
  14. if ($this->batch()->cancelled()) {
  15. // Determine if the batch has been cancelled...
  16. return;
  17. }
  18. // Import a portion of the CSV file...
  19. }
  20. }

バッチのディスパッチ

ジョブのバッチをディスパッチするには、batch ファサードの Bus メソッドを使用する必要があります。もちろん、バッチ処理は主に完了コールバックと組み合わせて使用されます。したがって、thencatch、および finally メソッドを使用してバッチの完了コールバックを定義できます。これらのコールバックのそれぞれは、呼び出されたときに Illuminate\Bus\Batch インスタンスを受け取ります。この例では、CSVファイルから指定された行数を処理するジョブのバッチをキューに入れていると想像します:

  1. use App\Jobs\ImportCsv;
  2. use Illuminate\Bus\Batch;
  3. use Illuminate\Support\Facades\Bus;
  4. use Throwable;
  5. $batch = Bus::batch([
  6. new ImportCsv(1, 100),
  7. new ImportCsv(101, 200),
  8. new ImportCsv(201, 300),
  9. new ImportCsv(301, 400),
  10. new ImportCsv(401, 500),
  11. ])->before(function (Batch $batch) {
  12. // The batch has been created but no jobs have been added...
  13. })->progress(function (Batch $batch) {
  14. // A single job has completed successfully...
  15. })->then(function (Batch $batch) {
  16. // All jobs completed successfully...
  17. })->catch(function (Batch $batch, Throwable $e) {
  18. // First batch job failure detected...
  19. })->finally(function (Batch $batch) {
  20. // The batch has finished executing...
  21. })->dispatch();
  22. return $batch->id;

バッチのIDは、$batch->id プロパティを介してアクセスでき、ディスパッチされた後にバッチに関する情報をLaravelコマンドバスにクエリするために使用できます。

バッチコールバックはシリアライズされ、Laravelキューによって後で実行されるため、コールバック内で $this 変数を使用しないでください。また、バッチ処理されたジョブはデータベーストランザクション内にラップされているため、暗黙的なコミットをトリガーするデータベースステートメントはジョブ内で実行しないでください。

バッチの命名

Laravel HorizonやLaravel Telescopeなどのツールは、バッチに名前が付けられている場合、バッチのよりユーザーフレンドリーなデバッグ情報を提供する場合があります。バッチに任意の名前を付けるには、バッチを定義する際に name メソッドを呼び出します:

  1. $batch = Bus::batch([
  2. // ...
  3. ])->then(function (Batch $batch) {
  4. // All jobs completed successfully...
  5. })->name('Import CSV')->dispatch();

バッチ接続とキュー

バッチ処理されたジョブに使用する接続とキューを指定したい場合は、onConnection および onQueue メソッドを使用できます。すべてのバッチ処理されたジョブは、同じ接続とキュー内で実行する必要があります:

  1. $batch = Bus::batch([
  2. // ...
  3. ])->then(function (Batch $batch) {
  4. // All jobs completed successfully...
  5. })->onConnection('redis')->onQueue('imports')->dispatch();

チェーンとバッチ

バッチ内にチェーンジョブのセットを定義するには、チェーンジョブを配列内に配置します。たとえば、2つのジョブチェーンを並行して実行し、両方のジョブチェーンが処理を完了したときにコールバックを実行することができます:

  1. use App\Jobs\ReleasePodcast;
  2. use App\Jobs\SendPodcastReleaseNotification;
  3. use Illuminate\Bus\Batch;
  4. use Illuminate\Support\Facades\Bus;
  5. Bus::batch([
  6. [
  7. new ReleasePodcast(1),
  8. new SendPodcastReleaseNotification(1),
  9. ],
  10. [
  11. new ReleasePodcast(2),
  12. new SendPodcastReleaseNotification(2),
  13. ],
  14. ])->then(function (Batch $batch) {
  15. // ...
  16. })->dispatch();

逆に、バッチ内にバッチを定義することで、チェーン内でジョブのバッチを実行できます。たとえば、最初に複数のポッドキャストを解放するバッチを実行し、その後リリース通知を送信するバッチを実行することができます:

  1. use App\Jobs\FlushPodcastCache;
  2. use App\Jobs\ReleasePodcast;
  3. use App\Jobs\SendPodcastReleaseNotification;
  4. use Illuminate\Support\Facades\Bus;
  5. Bus::chain([
  6. new FlushPodcastCache,
  7. Bus::batch([
  8. new ReleasePodcast(1),
  9. new ReleasePodcast(2),
  10. ]),
  11. Bus::batch([
  12. new SendPodcastReleaseNotification(1),
  13. new SendPodcastReleaseNotification(2),
  14. ]),
  15. ])->dispatch();

バッチにジョブを追加する

時には、バッチジョブ内からバッチに追加のジョブを追加することが有用な場合があります。このパターンは、Webリクエスト中にディスパッチするには時間がかかりすぎる可能性のある数千のジョブをバッチ処理する必要がある場合に便利です。したがって、代わりに、バッチをさらに多くのジョブで水和する「ローダー」ジョブの初期バッチをディスパッチしたい場合があります:

  1. $batch = Bus::batch([
  2. new LoadImportBatch,
  3. new LoadImportBatch,
  4. new LoadImportBatch,
  5. ])->then(function (Batch $batch) {
  6. // All jobs completed successfully...
  7. })->name('Import Contacts')->dispatch();

この例では、LoadImportBatch ジョブを使用してバッチに追加のジョブを水和します。これを実現するために、ジョブの batch メソッドを介してアクセスできるバッチインスタンスの add メソッドを使用できます:

  1. use App\Jobs\ImportContacts;
  2. use Illuminate\Support\Collection;
  3. /**
  4. * Execute the job.
  5. */
  6. public function handle(): void
  7. {
  8. if ($this->batch()->cancelled()) {
  9. return;
  10. }
  11. $this->batch()->add(Collection::times(1000, function () {
  12. return new ImportContacts;
  13. }));
  14. }

バッチにジョブを追加できるのは、同じバッチに属するジョブ内からのみです。

バッチの検査

バッチ完了コールバックに提供される Illuminate\Bus\Batch インスタンスには、特定のジョブのバッチと対話し、検査するのに役立つさまざまなプロパティとメソッドがあります:

  1. // The UUID of the batch...
  2. $batch->id;
  3. // The name of the batch (if applicable)...
  4. $batch->name;
  5. // The number of jobs assigned to the batch...
  6. $batch->totalJobs;
  7. // The number of jobs that have not been processed by the queue...
  8. $batch->pendingJobs;
  9. // The number of jobs that have failed...
  10. $batch->failedJobs;
  11. // The number of jobs that have been processed thus far...
  12. $batch->processedJobs();
  13. // The completion percentage of the batch (0-100)...
  14. $batch->progress();
  15. // Indicates if the batch has finished executing...
  16. $batch->finished();
  17. // Cancel the execution of the batch...
  18. $batch->cancel();
  19. // Indicates if the batch has been cancelled...
  20. $batch->cancelled();

ルートからのバッチの返却

すべての Illuminate\Bus\Batch インスタンスはJSONシリアライズ可能であり、アプリケーションのルートの1つから直接返すことができ、バッチに関する情報を含むJSONペイロードを取得できます。これには、完了進捗が含まれます。これにより、アプリケーションのUIでバッチの完了進捗に関する情報を表示するのが便利です。

バッチIDでバッチを取得するには、Bus ファサードの findBatch メソッドを使用できます:

  1. use Illuminate\Support\Facades\Bus;
  2. use Illuminate\Support\Facades\Route;
  3. Route::get('/batch/{batchId}', function (string $batchId) {
  4. return Bus::findBatch($batchId);
  5. });

バッチのキャンセル

時には、特定のバッチの実行をキャンセルする必要があります。これは、Illuminate\Bus\Batch インスタンスの cancel メソッドを呼び出すことで実現できます:

  1. /**
  2. * Execute the job.
  3. */
  4. public function handle(): void
  5. {
  6. if ($this->user->exceedsImportLimit()) {
  7. return $this->batch()->cancel();
  8. }
  9. if ($this->batch()->cancelled()) {
  10. return;
  11. }
  12. }

前の例で気づいたように、バッチ処理されたジョブは、実行を続ける前に対応するバッチがキャンセルされたかどうかを判断する必要があります。ただし、便利のために、ジョブに SkipIfBatchCancelled ミドルウェアを代わりに割り当てることができます。その名前が示すように、このミドルウェアは、対応するバッチがキャンセルされた場合、Laravelにジョブを処理しないように指示します:

  1. use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
  2. /**
  3. * Get the middleware the job should pass through.
  4. */
  5. public function middleware(): array
  6. {
  7. return [new SkipIfBatchCancelled];
  8. }

バッチの失敗

バッチ処理されたジョブが失敗した場合、catch コールバック(割り当てられている場合)が呼び出されます。このコールバックは、バッチ内で最初に失敗したジョブに対してのみ呼び出されます。

失敗を許可する

バッチ内のジョブが失敗した場合、Laravelは自動的にバッチを「キャンセル」としてマークします。これを無効にしたい場合は、ジョブの失敗がバッチを自動的にキャンセルとしてマークしないようにすることができます。これは、バッチをディスパッチする際に allowFailures メソッドを呼び出すことで実現できます:

  1. $batch = Bus::batch([
  2. // ...
  3. ])->then(function (Batch $batch) {
  4. // All jobs completed successfully...
  5. })->allowFailures()->dispatch();

失敗したバッチジョブの再試行

便利のために、Laravelは特定のバッチの失敗したすべてのジョブを簡単に再試行できる queue:retry-batch Artisanコマンドを提供します。queue:retry-batch コマンドは、失敗したジョブを再試行するバッチのUUIDを受け取ります:

  1. php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

バッチのプルーニング

プルーニングを行わないと、job_batches テーブルは非常に迅速にレコードが蓄積される可能性があります。これを軽減するために、queue:prune-batches Artisanコマンドを毎日実行するようにスケジュールする必要があります:

  1. use Illuminate\Support\Facades\Schedule;
  2. Schedule::command('queue:prune-batches')->daily();

デフォルトでは、24時間以上前に完了したすべてのバッチがプルーニングされます。コマンドを呼び出す際に hours オプションを使用して、バッチデータを保持する期間を決定できます。たとえば、次のコマンドは、48時間以上前に完了したすべてのバッチを削除します:

  1. use Illuminate\Support\Facades\Schedule;
  2. Schedule::command('queue:prune-batches --hours=48')->daily();

時には、jobs_batches テーブルが、成功裏に完了しなかったバッチのレコードを蓄積することがあります。たとえば、ジョブが失敗し、そのジョブが再試行されなかった場合などです。queue:prune-batches コマンドに unfinished オプションを使用して、これらの未完了のバッチレコードをプルーニングするように指示できます:

  1. use Illuminate\Support\Facades\Schedule;
  2. Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同様に、jobs_batches テーブルもキャンセルされたバッチのレコードを蓄積する可能性があります。queue:prune-batches コマンドに cancelled オプションを使用して、これらのキャンセルされたバッチレコードをプルーニングするように指示できます:

  1. use Illuminate\Support\Facades\Schedule;
  2. Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

DynamoDBにバッチを保存する

Laravelは、リレーショナルデータベースの代わりにDynamoDBにバッチメタ情報を保存するサポートも提供します。ただし、すべてのバッチレコードを保存するためにDynamoDBテーブルを手動で作成する必要があります。

通常、このテーブルは job_batches と名付けるべきですが、アプリケーションの queue 設定ファイル内の queue.batching.table 設定値に基づいてテーブルに名前を付けるべきです。

DynamoDBバッチテーブルの設定

job_batches テーブルには、application という名前の文字列プライマリパーティションキーと、id という名前の文字列プライマリソートキーが必要です。キーの application 部分には、アプリケーションの name 設定値によって定義されたアプリケーション名が含まれます。アプリケーション名はDynamoDBテーブルのキーの一部であるため、複数のLaravelアプリケーションのジョブバッチを保存するために同じテーブルを使用できます。

さらに、自動バッチプルーニングを利用したい場合は、テーブルに ttl 属性を定義できます。

DynamoDBの設定

次に、AWS SDKをインストールして、LaravelアプリケーションがAmazon DynamoDBと通信できるようにします:

  1. composer require aws/aws-sdk-php

次に、queue.batching.driver 設定オプションの値を dynamodb に設定します。さらに、keysecret、および region 設定オプションを batching 設定配列内に定義する必要があります。これらのオプションは、AWSとの認証に使用されます。dynamodb ドライバーを使用する場合、queue.batching.database 設定オプションは不要です:

  1. 'batching' => [
  2. 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
  3. 'key' => env('AWS_ACCESS_KEY_ID'),
  4. 'secret' => env('AWS_SECRET_ACCESS_KEY'),
  5. 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
  6. 'table' => 'job_batches',
  7. ],

DynamoDBでのバッチのプルーニング

DynamoDBを使用してジョブバッチ情報を保存する場合、リレーショナルデータベースに保存されたバッチをプルーニングするために使用される通常のプルーニングコマンドは機能しません。代わりに、DynamoDBのネイティブTTL機能を利用して、古いバッチのレコードを自動的に削除できます。

DynamoDBテーブルに ttl 属性を定義した場合、Laravelにバッチレコードをプルーニングする方法を指示するための設定パラメータを定義できます。queue.batching.ttl_attribute 設定値はTTLを保持する属性の名前を定義し、queue.batching.ttl 設定値は、レコードが更新された最後の時間からの秒数を定義します:

  1. 'batching' => [
  2. 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
  3. 'key' => env('AWS_ACCESS_KEY_ID'),
  4. 'secret' => env('AWS_SECRET_ACCESS_KEY'),
  5. 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
  6. 'table' => 'job_batches',
  7. 'ttl_attribute' => 'ttl',
  8. 'ttl' => 60 * 60 * 24 * 7, // 7 days...
  9. ],

クロージャのキューイング

ジョブクラスをキューにディスパッチする代わりに、クロージャをディスパッチすることもできます。これは、現在のリクエストサイクルの外で実行する必要がある迅速で簡単なタスクに最適です。キューにクロージャをディスパッチする際、クロージャのコード内容は暗号的に署名され、転送中に変更できないようにされます:

  1. $podcast = App\Podcast::find(1);
  2. dispatch(function () use ($podcast) {
  3. $podcast->publish();
  4. });

catch メソッドを使用して、キューに入れたクロージャがすべてのキューの設定された再試行回数を使い果たした後に成功裏に完了しなかった場合に実行されるクロージャを提供できます:

  1. use Throwable;
  2. dispatch(function () use ($podcast) {
  3. $podcast->publish();
  4. })->catch(function (Throwable $e) {
  5. // This job has failed...
  6. });

catch コールバックはシリアライズされ、Laravelキューによって後で実行されるため、$this 変数を catch コールバック内で使用しないでください。

キューワーカーの実行

queue:work コマンド

Laravelには、キューワーカーを起動し、新しいジョブがキューにプッシュされるとそれを処理するArtisanコマンドが含まれています。queue:work Artisanコマンドを使用してワーカーを実行できます。queue:work コマンドが開始されると、手動で停止するか、ターミナルを閉じるまで実行され続けることに注意してください:

  1. php artisan queue:work

queue:work プロセスをバックグラウンドで永続的に実行し続けるには、Supervisorのようなプロセスマネージャを使用して、キューのワーカーが停止しないようにする必要があります。

-v フラグを指定して queue:work コマンドを呼び出すと、処理されたジョブIDがコマンドの出力に含まれるようになります:

  1. php artisan queue:work -v

キューのワーカーは長寿命のプロセスであり、起動されたアプリケーションの状態をメモリに保存します。そのため、起動された後はコードベースの変更に気づきません。したがって、デプロイプロセス中に、キューのワーカーを再起動することを確認してください。また、アプリケーションによって作成または変更された静的状態は、ジョブ間で自動的にリセットされません。

また、queue:listen コマンドを実行することもできます。queue:listen コマンドを使用すると、更新されたコードを再読み込みしたり、アプリケーションの状態をリセットしたりする際に、ワーカーを手動で再起動する必要はありません。ただし、このコマンドは queue:work コマンドよりも効率が大幅に低下します:

  1. php artisan queue:listen

複数のキューワーカーの実行

複数のワーカーをキューに割り当ててジョブを同時に処理するには、単に複数の queue:work プロセスを開始する必要があります。これは、ターミナルの複数のタブを介してローカルで行うか、プロダクションでプロセスマネージャの設定を使用して行うことができます。Supervisorを使用する場合numprocs 設定値を使用できます。

接続とキューの指定

ワーカーが使用するキュー接続を指定することもできます。work コマンドに渡される接続名は、config/queue.php 設定ファイルで定義された接続の1つに対応する必要があります:

  1. php artisan queue:work redis

デフォルトでは、queue:work コマンドは、特定の接続のデフォルトキューのジョブのみを処理します。ただし、特定の接続の特定のキューのみを処理するようにキューのワーカーをさらにカスタマイズすることもできます。たとえば、すべてのメールが emails キューで redis キュー接続で処理される場合、次のコマンドを発行して、そのキューのみを処理するワーカーを開始できます:

  1. php artisan queue:work redis --queue=emails

指定された数のジョブを処理する

--once オプションを使用して、ワーカーにキューから単一のジョブのみを処理するように指示できます:

  1. php artisan queue:work --once

--max-jobs オプションを使用して、ワーカーに指定された数のジョブを処理してから終了するように指示できます。このオプションは、Supervisorと組み合わせて使用すると便利で、指定された数のジョブを処理した後にワーカーが自動的に再起動し、蓄積されたメモリを解放します:

  1. php artisan queue:work --max-jobs=1000

すべてのキューに入れたジョブを処理してから終了する

--stop-when-empty オプションを使用して、ワーカーにすべてのジョブを処理してから優雅に終了するように指示できます。このオプションは、Dockerコンテナ内でLaravelキューを処理する際に、キューが空になった後にコンテナをシャットダウンしたい場合に便利です:

  1. php artisan queue:work --stop-when-empty

指定された秒数のジョブを処理する

--max-time オプションを使用して、ワーカーに指定された秒数のジョブを処理してから終了するように指示できます。このオプションは、Supervisorと組み合わせて使用すると便利で、指定された時間の間にジョブを処理した後にワーカーが自動的に再起動し、蓄積されたメモリを解放します:

  1. # 1時間のジョブを処理してから終了...
  2. php artisan queue:work --max-time=3600

ワーカーのスリープ時間

ジョブがキューにある場合、ワーカーはジョブの間に遅延なく処理を続けます。ただし、sleep オプションは、ジョブが利用可能でない場合にワーカーが「スリープ」する秒数を決定します。もちろん、スリープ中はワーカーは新しいジョブを処理しません:

  1. php artisan queue:work --sleep=3

メンテナンスモードとキュー

アプリケーションがメンテナンスモードにある間、キューに入れたジョブは処理されません。アプリケーションがメンテナンスモードから出ると、ジョブは通常通り処理され続けます。

キューのワーカーにメンテナンスモードが有効になっていてもジョブを処理させたい場合は、--force オプションを使用できます:

  1. php artisan queue:work --force

リソースの考慮事項

デーモンキューのワーカーは、各ジョブを処理する前にフレームワークを「再起動」しません。したがって、各ジョブが完了した後に重いリソースを解放する必要があります。たとえば、GDライブラリを使用して画像操作を行っている場合、画像の処理が完了したら imagedestroy でメモリを解放する必要があります。

キューの優先順位

時には、キューの処理の優先順位を付けたい場合があります。たとえば、config/queue.php 設定ファイルで、queueredis 接続のデフォルトに設定できます。ただし、時折、次のように high 優先度の高いキューにジョブをプッシュしたい場合があります:

  1. dispatch((new Job)->onQueue('high'));

すべての high キューのジョブが処理されることを確認してから、low キューのジョブに続くようにワーカーを開始するには、キュー名のカンマ区切りリストを work コマンドに渡します:

  1. php artisan queue:work --queue=high,low

キューのワーカーとデプロイメント

キューのワーカーは長寿命のプロセスであるため、再起動しない限り、コードの変更に気づきません。したがって、キューのワーカーを使用するアプリケーションをデプロイする最も簡単な方法は、デプロイプロセス中にワーカーを再起動することです。queue:restart コマンドを発行して、すべてのワーカーを優雅に再起動できます:

  1. php artisan queue:restart

このコマンドは、現在のジョブの処理が完了した後にすべてのキューのワーカーに優雅に終了するように指示し、既存のジョブが失われないようにします。queue:restart コマンドが実行されると、キューのワーカーは終了しますので、Supervisorのようなプロセスマネージャを実行して、キューのワーカーを自動的に再起動する必要があります。

キューはキャッシュを使用して再起動信号を保存するため、この機能を使用する前に、アプリケーションに適切にキャッシュドライバーが設定されていることを確認してください。

ジョブの有効期限とタイムアウト

ジョブの有効期限

config/queue.php 設定ファイルでは、各キュー接続が retry_after オプションを定義します。このオプションは、処理中のジョブを再試行する前にキュー接続が待機する秒数を指定します。たとえば、retry_after の値が 90 に設定されている場合、ジョブは90秒間処理され、解放または削除されない場合、キューに戻されます。通常、retry_after 値は、ジョブが処理を完了するのに合理的にかかる最大秒数に設定するべきです。

有効期限値を含まないキュー接続はAmazon SQSのみです。SQSは、AWSコンソール内で管理されるデフォルトの可視性タイムアウトに基づいてジョブを再試行します。

ワーカータイムアウト

queue:work Artisan コマンドは --timeout オプションを公開します。デフォルトでは、--timeout 値は 60 秒です。ジョブがタイムアウト値で指定された秒数よりも長く処理されている場合、ジョブを処理しているワーカーはエラーで終了します。通常、ワーカーは サーバーに設定されたプロセスマネージャー によって自動的に再起動されます:

  1. php artisan queue:work --timeout=60

retry_after 設定オプションと --timeout CLI オプションは異なりますが、ジョブが失われず、ジョブが一度だけ正常に処理されることを保証するために連携して機能します。

--timeout 値は常に retry_after 設定値よりも数秒短くする必要があります。これにより、フリーズしたジョブを処理しているワーカーが、ジョブが再試行される前に常に終了されることが保証されます。--timeout オプションが retry_after 設定値よりも長い場合、ジョブが二重に処理される可能性があります。

スーパーバイザー設定

本番環境では、queue:work プロセスを実行し続ける方法が必要です。queue:work プロセスは、ワーカータイムアウトの超過や queue:restart コマンドの実行など、さまざまな理由で停止する可能性があります。

このため、queue:work プロセスが終了したときに検出し、自動的に再起動するプロセスモニターを設定する必要があります。さらに、プロセスモニターを使用すると、同時に実行したい queue:work プロセスの数を指定できます。スーパーバイザーは、Linux 環境で一般的に使用されるプロセスモニターであり、次のドキュメントでその設定方法について説明します。

スーパーバイザーのインストール

スーパーバイザーは Linux オペレーティングシステム用のプロセスモニターであり、失敗した場合は queue:work プロセスを自動的に再起動します。Ubuntu にスーパーバイザーをインストールするには、次のコマンドを使用できます:

  1. sudo apt-get install supervisor

スーパーバイザーを自分で設定および管理するのが大変に感じる場合は、Laravel Forge を使用することを検討してください。これにより、プロダクションの Laravel プロジェクト用にスーパーバイザーが自動的にインストールおよび設定されます。

スーパーバイザーの設定

スーパーバイザーの設定ファイルは通常、/etc/supervisor/conf.d ディレクトリに保存されます。このディレクトリ内に、スーパーバイザーがプロセスをどのように監視するかを指示する任意の数の設定ファイルを作成できます。たとえば、laravel-worker.conf ファイルを作成して queue:work プロセスを開始および監視しましょう:

  1. [program:laravel-worker]
  2. process_name=%(program_name)s_%(process_num)02d
  3. command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
  4. autostart=true
  5. autorestart=true
  6. stopasgroup=true
  7. killasgroup=true
  8. user=forge
  9. numprocs=8
  10. redirect_stderr=true
  11. stdout_logfile=/home/forge/app.com/worker.log
  12. stopwaitsecs=3600

この例では、numprocs ディレクティブはスーパーバイザーに 8 つの queue:work プロセスを実行し、すべてを監視し、失敗した場合は自動的に再起動するよう指示します。設定の command ディレクティブを変更して、希望するキュー接続とワーカーオプションを反映させる必要があります。

stopwaitsecs の値が、最も長く実行されるジョブによって消費される秒数よりも大きいことを確認してください。そうしないと、スーパーバイザーがジョブを処理が完了する前に終了させる可能性があります。

スーパーバイザーの起動

設定ファイルが作成されたら、次のコマンドを使用してスーパーバイザーの設定を更新し、プロセスを開始できます:

  1. sudo supervisorctl reread
  2. sudo supervisorctl update
  3. sudo supervisorctl start "laravel-worker:*"

スーパーバイザーに関する詳細は、スーパーバイザーのドキュメントを参照してください。

失敗したジョブの処理

時々、キューに入れたジョブが失敗することがあります。心配しないでください、物事は常に計画通りに進むわけではありません!Laravel には、ジョブが試行される最大回数を指定する便利な方法が含まれています。非同期ジョブがこの試行回数を超えた場合、failed_jobs データベーステーブルに挿入されます。同期的にディスパッチされたジョブが失敗した場合、これらのテーブルには保存されず、その例外はアプリケーションによって即座に処理されます。

failed_jobs テーブルを作成するためのマイグレーションは、新しい Laravel アプリケーションには通常すでに存在します。ただし、アプリケーションにこのテーブルのマイグレーションが含まれていない場合は、make:queue-failed-table コマンドを使用してマイグレーションを作成できます:

  1. php artisan make:queue-failed-table
  2. php artisan migrate

キュー ワーカー プロセスを実行する際、--tries スイッチを使用してジョブが試行される最大回数を指定できます。--tries オプションの値を指定しない場合、ジョブは一度だけ試行されるか、ジョブクラスの $tries プロパティで指定された回数だけ試行されます:

  1. php artisan queue:work redis --tries=3

--backoff オプションを使用すると、例外が発生したジョブを再試行する前に Laravel が待機する秒数を指定できます。デフォルトでは、ジョブは即座にキューに戻され、再試行される可能性があります:

  1. php artisan queue:work redis --tries=3 --backoff=3

例外が発生したジョブを再試行する前に Laravel が待機する秒数をジョブごとに設定したい場合は、ジョブクラスに backoff プロパティを定義することで実現できます:

  1. /**
  2. * The number of seconds to wait before retrying the job.
  3. *
  4. * @var int
  5. */
  6. public $backoff = 3;

ジョブのバックオフ時間を決定するためにより複雑なロジックが必要な場合は、ジョブクラスに backoff メソッドを定義できます:

  1. /**
  2. * Calculate the number of seconds to wait before retrying the job.
  3. */
  4. public function backoff(): int
  5. {
  6. return 3;
  7. }

backoff メソッドからバックオフ値の配列を返すことで、「指数的」バックオフを簡単に設定できます。この例では、最初の再試行の遅延は 1 秒、2 回目の再試行は 5 秒、3 回目の再試行は 10 秒、残りの再試行はすべて 10 秒になります:

  1. /**
  2. * Calculate the number of seconds to wait before retrying the job.
  3. *
  4. * @return array<int, int>
  5. */
  6. public function backoff(): array
  7. {
  8. return [1, 5, 10];
  9. }

失敗したジョブの後処理

特定のジョブが失敗した場合、ユーザーにアラートを送信したり、ジョブによって部分的に完了したアクションを元に戻したりしたい場合があります。これを実現するには、ジョブクラスに failed メソッドを定義できます。ジョブを失敗させた Throwable インスタンスが failed メソッドに渡されます:

  1. <?php
  2. namespace App\Jobs;
  3. use App\Models\Podcast;
  4. use App\Services\AudioProcessor;
  5. use Illuminate\Contracts\Queue\ShouldQueue;
  6. use Illuminate\Foundation\Queue\Queueable;
  7. use Throwable;
  8. class ProcessPodcast implements ShouldQueue
  9. {
  10. use Queueable;
  11. /**
  12. * Create a new job instance.
  13. */
  14. public function __construct(
  15. public Podcast $podcast,
  16. ) {}
  17. /**
  18. * Execute the job.
  19. */
  20. public function handle(AudioProcessor $processor): void
  21. {
  22. // Process uploaded podcast...
  23. }
  24. /**
  25. * Handle a job failure.
  26. */
  27. public function failed(?Throwable $exception): void
  28. {
  29. // Send user notification of failure, etc...
  30. }
  31. }

failed メソッドを呼び出す前に新しいジョブのインスタンスが生成されるため、handle メソッド内で発生した可能性のあるクラスプロパティの変更は失われます。

失敗したジョブの再試行

failed_jobs データベーステーブルに挿入されたすべての失敗したジョブを表示するには、queue:failed Artisan コマンドを使用できます:

  1. php artisan queue:failed

queue:failed コマンドは、ジョブ ID、接続、キュー、失敗時間、およびジョブに関するその他の情報をリストします。ジョブ ID は、失敗したジョブを再試行するために使用できます。たとえば、ID が ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece の失敗したジョブを再試行するには、次のコマンドを発行します:

  1. php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

必要に応じて、コマンドに複数の ID を渡すことができます:

  1. php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

特定のキューのすべての失敗したジョブを再試行することもできます:

  1. php artisan queue:retry --queue=name

すべての失敗したジョブを再試行するには、queue:retry コマンドを実行し、all を ID として渡します:

  1. php artisan queue:retry all

失敗したジョブを削除したい場合は、queue:forget コマンドを使用できます:

  1. php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

Horizon を使用している場合は、horizon:forget コマンドを使用して失敗したジョブを削除する必要があります。queue:forget コマンドの代わりに。

failed_jobs テーブルからすべての失敗したジョブを削除するには、queue:flush コマンドを使用できます:

  1. php artisan queue:flush

欠落しているモデルの無視

Eloquent モデルをジョブに注入する際、モデルはキューに置かれる前に自動的にシリアル化され、ジョブが処理されるときにデータベースから再取得されます。ただし、ジョブがワーカーによって処理されるのを待っている間にモデルが削除された場合、ジョブは ModelNotFoundException で失敗する可能性があります。

便利なことに、ジョブの deleteWhenMissingModels プロパティを true に設定することで、欠落しているモデルを持つジョブを自動的に削除することができます。このプロパティが true に設定されている場合、Laravel は例外を発生させることなくジョブを静かに破棄します:

  1. /**
  2. * Delete the job if its models no longer exist.
  3. *
  4. * @var bool
  5. */
  6. public $deleteWhenMissingModels = true;

失敗したジョブのプルーニング

アプリケーションの failed_jobs テーブルのレコードをプルーニングするには、queue:prune-failed Artisan コマンドを呼び出します:

  1. php artisan queue:prune-failed

デフォルトでは、24 時間以上前のすべての失敗したジョブレコードがプルーニングされます。コマンドに --hours オプションを提供すると、直近 N 時間内に挿入された失敗したジョブレコードのみが保持されます。たとえば、次のコマンドは、48 時間以上前に挿入されたすべての失敗したジョブレコードを削除します:

  1. php artisan queue:prune-failed --hours=48

DynamoDB に失敗したジョブを保存

Laravel は、失敗したジョブレコードをリレーショナルデータベーステーブルの代わりに DynamoDB に保存するサポートも提供します。ただし、すべての失敗したジョブレコードを保存するために DynamoDB テーブルを手動で作成する必要があります。通常、このテーブルは failed_jobs と名付けられますが、アプリケーションの queue 設定ファイル内の queue.failed.table 設定値に基づいてテーブルの名前を付ける必要があります。

failed_jobs テーブルには、application という名前の文字列のプライマリパーティションキーと、uuid という名前の文字列のプライマリソートキーがあります。application キーの部分には、アプリケーションの name 設定値によって定義されたアプリケーション名が含まれます。アプリケーション名は DynamoDB テーブルのキーの一部であるため、複数の Laravel アプリケーションの失敗したジョブを保存するために同じテーブルを使用できます。

さらに、Laravel アプリケーションが Amazon DynamoDB と通信できるように、AWS SDK をインストールすることを確認してください:

  1. composer require aws/aws-sdk-php

次に、queue.failed.driver 設定オプションの値を dynamodb に設定します。さらに、失敗したジョブ設定配列内に keysecretregion 設定オプションを定義する必要があります。これらのオプションは、AWS との認証に使用されます。dynamodb ドライバーを使用する場合、queue.failed.database 設定オプションは不要です:

  1. 'failed' => [
  2. 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
  3. 'key' => env('AWS_ACCESS_KEY_ID'),
  4. 'secret' => env('AWS_SECRET_ACCESS_KEY'),
  5. 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
  6. 'table' => 'failed_jobs',
  7. ],

失敗したジョブの保存を無効にする

Laravel に失敗したジョブを保存せずに破棄するよう指示するには、queue.failed.driver 設定オプションの値を null に設定します。通常、これは QUEUE_FAILED_DRIVER 環境変数を介して実行できます:

  1. QUEUE_FAILED_DRIVER=null

失敗したジョブイベント

ジョブが失敗したときに呼び出されるイベントリスナーを登録したい場合は、Queue ファサードの failing メソッドを使用できます。たとえば、Laravel に含まれる AppServiceProvider メソッドからこのイベントにクロージャを添付できます:

  1. <?php
  2. namespace App\Providers;
  3. use Illuminate\Support\Facades\Queue;
  4. use Illuminate\Support\ServiceProvider;
  5. use Illuminate\Queue\Events\JobFailed;
  6. class AppServiceProvider extends ServiceProvider
  7. {
  8. /**
  9. * Register any application services.
  10. */
  11. public function register(): void
  12. {
  13. // ...
  14. }
  15. /**
  16. * Bootstrap any application services.
  17. */
  18. public function boot(): void
  19. {
  20. Queue::failing(function (JobFailed $event) {
  21. // $event->connectionName
  22. // $event->job
  23. // $event->exception
  24. });
  25. }
  26. }

キューからのジョブのクリア

Horizon を使用している場合、horizon:clear コマンドを使用してキューからジョブをクリアする必要があります。queue:clear コマンドの代わりに。

デフォルト接続のデフォルトキューからすべてのジョブを削除したい場合は、queue:clear Artisan コマンドを使用して実行できます:

  1. php artisan queue:clear

connection 引数と queue オプションを提供して、特定の接続とキューからジョブを削除することもできます:

  1. php artisan queue:clear redis --queue=emails

キューからジョブをクリアする機能は、SQS、Redis、およびデータベースキュードライバーでのみ利用可能です。さらに、SQS メッセージの削除プロセスには最大 60 秒かかるため、キューをクリアした後に SQS キューに送信されたジョブも削除される可能性があります。

キューの監視

キューに突然のジョブの流入があると、圧倒されてジョブの完了までの待機時間が長くなる可能性があります。希望する場合、Laravel はキューのジョブ数が指定された閾値を超えたときに通知することができます。

始めるには、queue:monitor コマンドを 毎分実行する ようにスケジュールする必要があります。コマンドは、監視したいキューの名前と希望するジョブ数の閾値を受け入れます:

  1. php artisan queue:monitor redis:default,redis:deployments --max=100

このコマンドをスケジュールするだけでは、キューが圧倒されている状態を通知するアラートをトリガーするには不十分です。コマンドがジョブ数が閾値を超えるキューに遭遇すると、Illuminate\Queue\Events\QueueBusy イベントが発生します。このイベントをアプリケーションの AppServiceProvider でリッスンして、あなたや開発チームに通知を送信できます:

  1. use App\Notifications\QueueHasLongWaitTime;
  2. use Illuminate\Queue\Events\QueueBusy;
  3. use Illuminate\Support\Facades\Event;
  4. use Illuminate\Support\Facades\Notification;
  5. /**
  6. * Bootstrap any application services.
  7. */
  8. public function boot(): void
  9. {
  10. Event::listen(function (QueueBusy $event) {
  11. Notification::route('mail', '')
  12. ->notify(new QueueHasLongWaitTime(
  13. $event->connection,
  14. $event->queue,
  15. $event->size
  16. ));
  17. });
  18. }

テスト

ジョブをディスパッチするコードをテストする際、ジョブのコードは直接テストでき、ディスパッチするコードとは別にテストできるため、Laravel にジョブ自体を実行しないよう指示したい場合があります。もちろん、ジョブ自体をテストするには、ジョブインスタンスを生成し、テスト内で handle メソッドを直接呼び出すことができます。

Queue ファサードの fake メソッドを使用して、キューに実際にプッシュされることのないキューに入れられたジョブを防ぐことができます。Queue ファサードの fake メソッドを呼び出した後、アプリケーションがジョブをキューにプッシュしようとしたことを確認できます:

  1. <?php
  2. use App\Jobs\AnotherJob;
  3. use App\Jobs\FinalJob;
  4. use App\Jobs\ShipOrder;
  5. use Illuminate\Support\Facades\Queue;
  6. test('orders can be shipped', function () {
  7. Queue::fake();
  8. // Perform order shipping...
  9. // Assert that no jobs were pushed...
  10. Queue::assertNothingPushed();
  11. // Assert a job was pushed to a given queue...
  12. Queue::assertPushedOn('queue-name', ShipOrder::class);
  13. // Assert a job was pushed twice...
  14. Queue::assertPushed(ShipOrder::class, 2);
  15. // Assert a job was not pushed...
  16. Queue::assertNotPushed(AnotherJob::class);
  17. // Assert that a Closure was pushed to the queue...
  18. Queue::assertClosurePushed();
  19. // Assert the total number of jobs that were pushed...
  20. Queue::assertCount(3);
  21. });
  1. <?php
  2. namespace Tests\Feature;
  3. use App\Jobs\AnotherJob;
  4. use App\Jobs\FinalJob;
  5. use App\Jobs\ShipOrder;
  6. use Illuminate\Support\Facades\Queue;
  7. use Tests\TestCase;
  8. class ExampleTest extends TestCase
  9. {
  10. public function test_orders_can_be_shipped(): void
  11. {
  12. Queue::fake();
  13. // Perform order shipping...
  14. // Assert that no jobs were pushed...
  15. Queue::assertNothingPushed();
  16. // Assert a job was pushed to a given queue...
  17. Queue::assertPushedOn('queue-name', ShipOrder::class);
  18. // Assert a job was pushed twice...
  19. Queue::assertPushed(ShipOrder::class, 2);
  20. // Assert a job was not pushed...
  21. Queue::assertNotPushed(AnotherJob::class);
  22. // Assert that a Closure was pushed to the queue...
  23. Queue::assertClosurePushed();
  24. // Assert the total number of jobs that were pushed...
  25. Queue::assertCount(3);
  26. }
  27. }

assertPushed または assertNotPushed メソッドにクロージャを渡して、特定の「真実テスト」を通過するジョブがプッシュされたことを確認できます。指定された真実テストを通過するジョブが少なくとも 1 つプッシュされた場合、アサーションは成功します:

  1. Queue::assertPushed(function (ShipOrder $job) use ($order) {
  2. return $job->order->id === $order->id;
  3. });

ジョブのサブセットをフェイクする

特定のジョブをフェイクし、他のジョブを通常通り実行させたい場合は、フェイクする必要のあるジョブのクラス名を fake メソッドに渡すことができます:

  1. test('orders can be shipped', function () {
  2. Queue::fake([
  3. ShipOrder::class,
  4. ]);
  5. // Perform order shipping...
  6. // Assert a job was pushed twice...
  7. Queue::assertPushed(ShipOrder::class, 2);
  8. });
  1. public function test_orders_can_be_shipped(): void
  2. {
  3. Queue::fake([
  4. ShipOrder::class,
  5. ]);
  6. // Perform order shipping...
  7. // Assert a job was pushed twice...
  8. Queue::assertPushed(ShipOrder::class, 2);
  9. }

指定されたジョブのセットを除くすべてのジョブをフェイクするには、except メソッドを使用します:

  1. Queue::fake()->except([
  2. ShipOrder::class,
  3. ]);

ジョブチェーンのテスト

ジョブチェーンをテストするには、Bus ファサードのフェイキング機能を利用する必要があります。Bus ファサードの assertChained メソッドを使用して、ジョブのチェーンがディスパッチされたことを確認できます。assertChained メソッドは、チェーンされたジョブの配列を最初の引数として受け取ります:

  1. use App\Jobs\RecordShipment;
  2. use App\Jobs\ShipOrder;
  3. use App\Jobs\UpdateInventory;
  4. use Illuminate\Support\Facades\Bus;
  5. Bus::fake();
  6. // ...
  7. Bus::assertChained([
  8. ShipOrder::class,
  9. RecordShipment::class,
  10. UpdateInventory::class
  11. ]);

上記の例のように、チェーンされたジョブの配列はジョブのクラス名の配列である場合があります。ただし、実際のジョブインスタンスの配列を提供することもできます。その場合、Laravel はジョブインスタンスが同じクラスであり、アプリケーションによってディスパッチされたチェーンされたジョブのプロパティ値が同じであることを確認します:

  1. Bus::assertChained([
  2. new ShipOrder,
  3. new RecordShipment,
  4. new UpdateInventory,
  5. ]);

assertDispatchedWithoutChain メソッドを使用して、ジョブがチェーンなしでプッシュされたことを確認できます:

  1. Bus::assertDispatchedWithoutChain(ShipOrder::class);

チェーンの変更のテスト

チェーンされたジョブが 既存のチェーンにジョブを追加または挿入する 場合、ジョブの assertHasChain メソッドを使用して、ジョブが期待される残りのジョブのチェーンを持っていることを確認できます:

  1. $job = new ProcessPodcast;
  2. $job->handle();
  3. $job->assertHasChain([
  4. new TranscribePodcast,
  5. new OptimizePodcast,
  6. new ReleasePodcast,
  7. ]);

assertDoesntHaveChain メソッドを使用して、ジョブの残りのチェーンが空であることを確認できます:

  1. $job->assertDoesntHaveChain();

チェーンバッチのテスト

ジョブチェーンが ジョブのバッチを含む 場合、チェーンアサーション内に Bus::chainedBatch 定義を挿入することで、チェーンバッチが期待通りであることを確認できます:

  1. use App\Jobs\ShipOrder;
  2. use App\Jobs\UpdateInventory;
  3. use Illuminate\Bus\PendingBatch;
  4. use Illuminate\Support\Facades\Bus;
  5. Bus::assertChained([
  6. new ShipOrder,
  7. Bus::chainedBatch(function (PendingBatch $batch) {
  8. return $batch->jobs->count() === 3;
  9. }),
  10. new UpdateInventory,
  11. ]);

ジョブバッチのテスト

Bus ファサードの assertBatched メソッドを使用して、ジョブのバッチがディスパッチされたことを確認できます。assertBatched メソッドに渡されるクロージャは、Illuminate\Bus\PendingBatch のインスタンスを受け取り、バッチ内のジョブを検査するために使用できます:

  1. use Illuminate\Bus\PendingBatch;
  2. use Illuminate\Support\Facades\Bus;
  3. Bus::fake();
  4. // ...
  5. Bus::assertBatched(function (PendingBatch $batch) {
  6. return $batch->name == 'import-csv' &&
  7. $batch->jobs->count() === 10;
  8. });

assertBatchCount メソッドを使用して、指定された数のバッチがディスパッチされたことを確認できます:

  1. Bus::assertBatchCount(3);

assertNothingBatched を使用して、バッチがディスパッチされなかったことを確認できます:

  1. Bus::assertNothingBatched();

ジョブ / バッチの相互作用のテスト

さらに、個々のジョブがその基盤となるバッチと相互作用することをテストする必要がある場合があります。たとえば、ジョブがバッチのさらなる処理をキャンセルしたかどうかをテストする必要がある場合があります。これを実現するには、withFakeBatch メソッドを介してジョブにフェイクバッチを割り当てる必要があります。withFakeBatch メソッドは、ジョブインスタンスとフェイクバッチを含むタプルを返します:

  1. [$job, $batch] = (new ShipOrder)->withFakeBatch();
  2. $job->handle();
  3. $this->assertTrue($batch->cancelled());
  4. $this->assertEmpty($batch->added);

ジョブ / キューの相互作用のテスト

時には、キューに入れられたジョブが 自分自身をキューに戻す ことをテストする必要があるかもしれません。また、ジョブが自分自身を削除したかどうかをテストする必要があるかもしれません。これらのキューの相互作用をテストするには、ジョブをインスタンス化し、withFakeQueueInteractions メソッドを呼び出します。

ジョブのキューの相互作用がフェイクされた後、ジョブの handle メソッドを呼び出すことができます。ジョブを呼び出した後、assertReleasedassertDeletedassertNotDeletedassertFailed、および assertNotFailed メソッドを使用して、ジョブのキューの相互作用に対してアサーションを行うことができます:

  1. use App\Jobs\ProcessPodcast;
  2. $job = (new ProcessPodcast)->withFakeQueueInteractions();
  3. $job->handle();
  4. $job->assertReleased(delay: 30);
  5. $job->assertDeleted();
  6. $job->assertNotDeleted();
  7. $job->assertFailed();
  8. $job->assertNotFailed();

ジョブイベント

before および after メソッドを使用して、Queue ファサード で、キューに入れられたジョブが処理される前後に実行されるコールバックを指定できます。これらのコールバックは、追加のロギングを行ったり、ダッシュボードの統計を増加させたりする絶好の機会です。通常、これらのメソッドは サービスプロバイダーboot メソッドから呼び出すべきです。たとえば、Laravel に含まれる AppServiceProvider を使用できます:

  1. <?php
  2. namespace App\Providers;
  3. use Illuminate\Support\Facades\Queue;
  4. use Illuminate\Support\ServiceProvider;
  5. use Illuminate\Queue\Events\JobProcessed;
  6. use Illuminate\Queue\Events\JobProcessing;
  7. class AppServiceProvider extends ServiceProvider
  8. {
  9. /**
  10. * Register any application services.
  11. */
  12. public function register(): void
  13. {
  14. // ...
  15. }
  16. /**
  17. * Bootstrap any application services.
  18. */
  19. public function boot(): void
  20. {
  21. Queue::before(function (JobProcessing $event) {
  22. // $event->connectionName
  23. // $event->job
  24. // $event->job->payload()
  25. });
  26. Queue::after(function (JobProcessed $event) {
  27. // $event->connectionName
  28. // $event->job
  29. // $event->job->payload()
  30. });
  31. }
  32. }

Queue ファサードlooping メソッドを使用して、ワーカーがキューからジョブを取得しようとする前に実行されるコールバックを指定できます。たとえば、以前の失敗したジョブによって開かれたトランザクションをロールバックするためにクロージャを登録することができます:

  1. use Illuminate\Support\Facades\DB;
  2. use Illuminate\Support\Facades\Queue;
  3. Queue::looping(function () {
  4. while (DB::transactionLevel() > 0) {
  5. DB::rollBack();
  6. }
  7. });