Introduction
ウェブアプリケーションを構築する際、アップロードされたCSVファイルを解析して保存するなど、通常のウェブリクエスト中に実行するには時間がかかりすぎるタスクがあるかもしれません。幸いなことに、Laravelではバックグラウンドで処理されるキュー付きジョブを簡単に作成できます。時間のかかるタスクをキューに移動させることで、アプリケーションはウェブリクエストに対して驚異的なスピードで応答し、顧客により良いユーザー体験を提供できます。
Laravelのキューは、Amazon SQS、Redis、またはリレーショナルデータベースなど、さまざまなキューバックエンドにわたる統一されたキューイングAPIを提供します。
Laravelのキュー設定オプションは、アプリケーションのconfig/queue.php
設定ファイルに保存されています。このファイルには、データベース、Amazon SQS、Redis、およびBeanstalkdドライバーを含む、フレームワークに含まれる各キュードライバーの接続設定が見つかります。また、ジョブを即座に実行するための同期ドライバーも含まれています(ローカル開発中に使用)。キューに追加されたジョブを破棄するnull
キュードライバーも含まれています。
Laravelは現在、Redisを使用したキューのための美しいダッシュボードと設定システムであるHorizonを提供しています。詳細については、完全なHorizonドキュメントを確認してください。
Connections vs. Queues
Laravelのキューを始める前に、「接続」と「キュー」の違いを理解することが重要です。config/queue.php
設定ファイルには、connections
設定配列があります。このオプションは、Amazon SQS、Beanstalk、またはRedisなどのバックエンドキューサービスへの接続を定義します。ただし、特定のキュー接続には、異なるスタックやキューされたジョブの山と考えられる複数の「キュー」がある場合があります。
``````php
use App\Jobs\ProcessPodcast;
// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();
// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');
`
一部のアプリケーションでは、複数のキューにジョブをプッシュする必要がない場合があり、代わりに1つのシンプルなキューを好むことがあります。ただし、複数のキューにジョブをプッシュすることは、ジョブの処理を優先したりセグメント化したりしたいアプリケーションにとって特に便利です。Laravelのキュー作業者は、優先順位によって処理すべきキューを指定できます。たとえば、high
キューにジョブをプッシュすると、より高い処理優先度を与える作業者を実行できます:
php artisan queue:work --queue=high,default
Driver Notes and Prerequisites
Database
database
キュードライバーを使用するには、ジョブを保持するためのデータベーステーブルが必要です。通常、これはLaravelのデフォルトの0001_01_01_000002_create_jobs_table.php
データベースマイグレーションに含まれています。ただし、アプリケーションにこのマイグレーションが含まれていない場合は、make:queue-table
Artisanコマンドを使用して作成できます:
php artisan make:queue-table
php artisan migrate
Redis
`````serializer`````および`````compression````` Redisオプションは、`````redis`````キュードライバーではサポートされていません。
**Redisクラスタ**
Redisキュー接続がRedisクラスタを使用している場合、キュー名には[key hash tag](https://redis.io/docs/reference/cluster-spec/#hash-tags)を含める必要があります。これは、特定のキューのすべてのRedisキーが同じハッシュスロットに配置されることを保証するために必要です:
``````php
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],
`
ブロッキング
Redisキューを使用する場合、block_for
設定オプションを使用して、ジョブが利用可能になるまでドライバーがどれくらい待機するかを指定できます。
この値をキューの負荷に基づいて調整することで、新しいジョブのためにRedisデータベースを継続的にポーリングするよりも効率的です。たとえば、値を5
に設定すると、ジョブが利用可能になるまでドライバーが5秒間ブロックすることを示します:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
<a name="other-driver-prerequisites"></a>
#### Other Driver Prerequisites
次の依存関係は、リストされたキュードライバーに必要です。これらの依存関係は、Composerパッケージマネージャーを介してインストールできます:
- Amazon SQS: `````aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
またはphpredis PHP拡張
Creating Jobs
Generating Job Classes
デフォルトでは、アプリケーションのすべてのキュー可能なジョブはapp/Jobs
ディレクトリに保存されます。app/Jobs
ディレクトリが存在しない場合、make:job
Artisanコマンドを実行すると作成されます:
php artisan make:job ProcessPodcast
生成されたクラスはIlluminate\Contracts\Queue\ShouldQueue
インターフェースを実装し、ジョブが非同期で実行されるためにキューにプッシュされるべきであることをLaravelに示します。
ジョブスタブは、スタブの公開を使用してカスタマイズできます。
Class Structure
ジョブクラスは非常にシンプルで、通常はジョブがキューによって処理されるときに呼び出されるhandle
メソッドのみを含みます。始めるために、例のジョブクラスを見てみましょう。この例では、ポッドキャスト配信サービスを管理していると仮定し、公開される前にアップロードされたポッドキャストファイルを処理する必要があります:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}
この例では、キューされたジョブのコンストラクタにEloquentモデルを直接渡すことができたことに注意してください。ジョブが使用しているQueueable
トレイトのおかげで、Eloquentモデルとその読み込まれたリレーションシップは、ジョブが処理されるときに優雅にシリアライズおよび非シリアライズされます。
キューされたジョブがコンストラクタにEloquentモデルを受け入れる場合、モデルの識別子のみがキューにシリアライズされます。ジョブが実際に処理されると、キューシステムは自動的にデータベースから完全なモデルインスタンスとその読み込まれたリレーションシップを再取得します。このモデルシリアライズのアプローチにより、キュードライバーに送信されるジョブペイロードがはるかに小さくなります。
handle Method Dependency Injection
コンテナが`````handle`````メソッドに依存関係を注入する方法を完全に制御したい場合は、コンテナの`````bindMethod`````メソッドを使用できます。`````bindMethod`````メソッドは、ジョブとコンテナを受け取るコールバックを受け入れます。コールバック内では、`````handle`````メソッドを自由に呼び出すことができます。通常、`````App\Providers\AppServiceProvider````` [サービスプロバイダー](/read/laravel-11-x/934b4900a19307fc.md)の`````boot`````メソッドからこのメソッドを呼び出すべきです:
``````php
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
`
生の画像コンテンツなどのバイナリデータは、キューされたジョブに渡す前にbase64_encode
関数を通過させる必要があります。そうしないと、ジョブがキューに配置されるときにJSONに正しくシリアライズされない可能性があります。
Queued Relationships
すべての読み込まれたEloquentモデルのリレーションシップもジョブがキューに追加されるときにシリアライズされるため、シリアライズされたジョブ文字列は時々非常に大きくなる可能性があります。さらに、ジョブが非シリアライズされ、モデルのリレーションシップがデータベースから再取得されるとき、それらは完全に取得されます。ジョブキュー処理中にモデルがシリアライズされる前に適用された以前のリレーションシップ制約は、ジョブが非シリアライズされるときには適用されません。したがって、特定のリレーションシップのサブセットで作業したい場合は、キューされたジョブ内でそのリレーションシップを再制約する必要があります。
また、リレーションがシリアライズされないようにするには、プロパティ値を設定するときにモデルのwithoutRelations
メソッドを呼び出すことができます。このメソッドは、読み込まれたリレーションシップなしのモデルのインスタンスを返します:
/**
* Create a new job instance.
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}
PHPのコンストラクタプロパティプロモーションを使用していて、Eloquentモデルがリレーションをシリアライズしないことを示したい場合は、WithoutRelations
属性を使用できます:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}
ジョブが単一のモデルの代わりにEloquentモデルのコレクションまたは配列を受け取る場合、そのコレクション内のモデルは、ジョブが非シリアライズされて実行されるときにリレーションシップが復元されません。これは、大量のモデルを扱うジョブで過剰なリソース使用を防ぐためです。
Unique Jobs
ユニークなジョブには、ロックをサポートするキャッシュドライバーが必要です。現在、memcached
、redis
、dynamodb
、database
、file
、およびarray
キャッシュドライバーは、原子ロックをサポートしています。さらに、ユニークジョブ制約はバッチ内のジョブには適用されません。
時には、特定のジョブのインスタンスがキューに存在することを保証したい場合があります。これを実現するには、ジョブクラスでShouldBeUnique
インターフェースを実装します。このインターフェースは、クラスに追加のメソッドを定義する必要はありません:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}
上記の例では、UpdateSearchIndex
ジョブはユニークです。したがって、ジョブの別のインスタンスがすでにキューにあり、処理が完了していない場合、そのジョブはディスパッチされません。
特定の「キー」を定義してジョブをユニークにするか、ジョブがもはやユニークでなくなるタイムアウトを指定したい場合は、ジョブクラスにuniqueId
およびuniqueFor
プロパティまたはメソッドを定義できます:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Product
*/
public $product;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}
上記の例では、UpdateSearchIndex
ジョブは製品IDによってユニークです。したがって、同じ製品IDを持つジョブの新しいディスパッチは、既存のジョブが処理を完了するまで無視されます。さらに、既存のジョブが1時間以内に処理されない場合、ユニークロックは解除され、同じユニークキーを持つ別のジョブがキューにディスパッチされることができます。
アプリケーションが複数のウェブサーバーまたはコンテナからジョブをディスパッチする場合、すべてのサーバーが同じ中央キャッシュサーバーと通信していることを確認し、Laravelがジョブがユニークであるかどうかを正確に判断できるようにする必要があります。
Keeping Jobs Unique Until Processing Begins
デフォルトでは、ユニークなジョブは、ジョブが処理を完了するか、すべての再試行が失敗した後に「ロック解除」されます。ただし、ジョブが処理される前にすぐにロック解除したい場合があるかもしれません。これを実現するには、ジョブはShouldBeUniqueUntilProcessing
契約を実装する必要があります。ShouldBeUnique
契約の代わりに:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
Unique Job Locks
裏側では、ShouldBeUnique
ジョブがディスパッチされると、LaravelはuniqueId
キーでロックを取得しようとします。ロックが取得できない場合、ジョブはディスパッチされません。このロックは、ジョブが処理を完了するか、すべての再試行が失敗したときに解除されます。デフォルトでは、Laravelはこのロックを取得するためにデフォルトのキャッシュドライバーを使用します。ただし、ロックを取得するために別のドライバーを使用したい場合は、使用すべきキャッシュドライバーを返すuniqueVia
メソッドを定義できます:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
ジョブの同時処理を制限するだけが必要な場合は、WithoutOverlapping
ジョブミドルウェアを使用してください。
Encrypted Jobs
Laravelは、ジョブのデータのプライバシーと整合性を暗号化によって確保することを可能にします。始めるには、ジョブクラスにShouldBeEncrypted
インターフェースを追加するだけです。このインターフェースがクラスに追加されると、Laravelは自動的にジョブをキューにプッシュする前に暗号化します:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}
Job Middleware
ジョブミドルウェアは、キューされたジョブの実行の周りにカスタムロジックをラップすることを可能にし、ジョブ自体のボイラープレートを減らします。たとえば、次のhandle
メソッドを考えてみましょう。これは、LaravelのRedisレート制限機能を利用して、5秒ごとに1つのジョブのみを処理できるようにします:
use Illuminate\Support\Facades\Redis;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Handle job...
}, function () {
// Could not obtain lock...
return $this->release(5);
});
}
このコードは有効ですが、handle
メソッドの実装は、Redisレート制限ロジックで混雑しているため、騒がしくなります。さらに、このレート制限ロジックは、レート制限したい他のジョブに対して複製する必要があります。
ハンドルメソッドでレート制限を行う代わりに、レート制限を処理するジョブミドルウェアを定義できます。Laravelにはジョブミドルウェアのデフォルトの場所がないため、アプリケーション内の任意の場所にジョブミドルウェアを配置できます。この例では、ミドルウェアをapp/Jobs/Middleware
ディレクトリに配置します:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
$next($job);
}, function () use ($job) {
// Could not obtain lock...
$job->release(5);
});
}
}
ご覧のとおり、ルートミドルウェアと同様に、ジョブミドルウェアは処理中のジョブと、ジョブの処理を続行するために呼び出すべきコールバックを受け取ります。
ジョブミドルウェアを作成した後、ジョブのmiddleware
メソッドからそれらを返すことで、ジョブにアタッチできます。このメソッドは、make:job
Artisanコマンドによってスカフォールドされたジョブには存在しないため、ジョブクラスに手動で追加する必要があります:
use App\Jobs\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
ジョブミドルウェアは、キュー可能なイベントリスナー、メール、通知にも割り当てることができます。
Rate Limiting
独自のレート制限ジョブミドルウェアを書く方法を示しましたが、Laravelには実際にジョブをレート制限するために利用できるレート制限ミドルウェアが含まれています。ルートレートリミッターと同様に、ジョブレートリミッターはRateLimiter
ファサードのfor
メソッドを使用して定義されます。
たとえば、ユーザーがデータを1時間に1回バックアップできるようにし、プレミアム顧客にはそのような制限を課さないようにしたい場合、RateLimiter
をboot
メソッドのAppServiceProvider
で定義できます:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
上記の例では、1時間のレート制限を定義しましたが、perMinute
メソッドを使用して分単位のレート制限を簡単に定義できます。さらに、レート制限のby
メソッドに任意の値を渡すことができますが、この値は通常、顧客によってレート制限をセグメント化するために使用されます:
return Limit::perMinute(50)->by($job->user->id);
レート制限を定義したら、Illuminate\Queue\Middleware\RateLimited
ミドルウェアを使用してジョブにレートリミッターをアタッチできます。ジョブがレート制限を超えるたびに、このミドルウェアは適切な遅延を伴ってジョブをキューに戻します。
use Illuminate\Queue\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}
レート制限されたジョブをキューに戻すと、ジョブの合計attempts
が増加します。ジョブクラスのtries
およびmaxExceptions
プロパティを適切に調整することをお勧めします。または、retryUntil
メソッドを使用して、ジョブが再試行されなくなるまでの時間を定義することもできます。
ジョブがレート制限されているときに再試行されないようにするには、dontRelease
メソッドを使用できます:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
Redisを使用している場合は、Illuminate\Queue\Middleware\RateLimitedWithRedis
ミドルウェアを使用できます。これはRedisに最適化されており、基本的なレート制限ミドルウェアよりも効率的です。
Preventing Job Overlaps
Laravelには、任意のキーに基づいてジョブの重複を防ぐことができるIlluminate\Queue\Middleware\WithoutOverlapping
ミドルウェアが含まれています。これは、キューされたジョブが1回のジョブでのみ変更されるべきリソースを変更している場合に役立ちます。
たとえば、ユーザーのクレジットスコアを更新するキューされたジョブがあり、同じユーザーIDのクレジットスコア更新ジョブの重複を防ぎたいとします。これを実現するには、ジョブのmiddleware
メソッドからWithoutOverlapping
ミドルウェアを返すことができます:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}
同じタイプの重複ジョブは、キューに戻されます。また、リリースされたジョブが再度試行されるまでに経過する必要がある秒数を指定することもできます:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
重複するジョブを即座に削除して再試行されないようにするには、dontRelease
メソッドを使用できます:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
``````php
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
`
<a name="sharing-lock-keys"></a>
#### Sharing Lock Keys Across Job Classes
デフォルトでは、`````WithoutOverlapping`````ミドルウェアは同じクラスの重複ジョブのみを防ぎます。したがって、異なる2つのジョブクラスが同じロックキーを使用していても、重複を防ぐことはできません。ただし、`````shared`````メソッドを使用して、ジョブクラス間でキーを適用するようにLaravelに指示できます:
``````php
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
`
Throttling Exceptions
Laravelには、例外をスロットルすることを可能にするIlluminate\Queue\Middleware\ThrottlesExceptions
ミドルウェアが含まれています。ジョブが指定された数の例外をスローすると、ジョブを実行するためのすべてのさらなる試行は、指定された時間間隔が経過するまで遅延されます。このミドルウェアは、特に不安定なサードパーティサービスと対話するジョブに便利です。
たとえば、例外をスローし始めるサードパーティAPIと対話するキューされたジョブを想像してみましょう。例外をスロットルするには、ジョブのmiddleware
メソッドからThrottlesExceptions
ミドルウェアを返すことができます。通常、このミドルウェアは時間ベースの試行を実装するジョブとペアにする必要があります:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}
ミドルウェアが受け入れる最初のコンストラクタ引数は、ジョブがスロットルされる前にスローできる例外の数であり、2番目のコンストラクタ引数は、スロットルされた後にジョブが再試行されるまでに経過する必要がある秒数です。上記のコード例では、ジョブが10回連続して例外をスローすると、30分の時間制限に制約されて、5分間待機してから再試行します。
ジョブが例外をスローしますが、例外の閾値に達していない場合、ジョブは通常即座に再試行されます。ただし、backoff
メソッドを呼び出して、ミドルウェアをジョブにアタッチするときに、ジョブが遅延される分数を指定できます:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}
内部的に、このミドルウェアはLaravelのキャッシュシステムを使用してレート制限を実装し、ジョブのクラス名がキャッシュの「キー」として利用されます。このキーは、ミドルウェアをジョブにアタッチするときにby
メソッドを呼び出すことでオーバーライドできます。これは、同じサードパーティサービスと対話する複数のジョブがあり、共通のスロットリング「バケット」を共有したい場合に便利です:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}
デフォルトでは、このミドルウェアはすべての例外をスロットルします。この動作は、ミドルウェアをジョブにアタッチするときにwhen
メソッドを呼び出すことで変更できます。例外は、when
メソッドに提供されたクロージャがtrue
を返す場合にのみスロットルされます:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
スロットルされた例外をアプリケーションの例外ハンドラーに報告したい場合は、report
メソッドを呼び出すことで実行できます。オプションで、report
メソッドにクロージャを提供すると、指定されたクロージャがtrue
を返す場合にのみ例外が報告されます:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
Redisを使用している場合は、Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
ミドルウェアを使用できます。これはRedisに最適化されており、基本的な例外スロットリングミドルウェアよりも効率的です。
Skipping Jobs
``````php
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when($someCondition),
];
}
`
``````php
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}
`
Dispatching Jobs
ジョブクラスを書いたら、ジョブ自体のdispatch
メソッドを使用してディスパッチできます。dispatch
メソッドに渡される引数は、ジョブのコンストラクタに渡されます:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}
条件付きでジョブをディスパッチしたい場合は、dispatchIf
およびdispatchUnless
メソッドを使用できます:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
新しいLaravelアプリケーションでは、sync
ドライバーがデフォルトのキュードライバーです。このドライバーは、現在のリクエストのフォアグラウンドでジョブを同期的に実行します。これは、ローカル開発中に便利です。バックグラウンド処理のために実際にジョブをキューに追加したい場合は、アプリケーションのconfig/queue.php
設定ファイル内で別のキュードライバーを指定できます。
Delayed Dispatching
ジョブがキュー作業者によって即座に処理されないように指定したい場合は、ジョブをディスパッチするときにdelay
メソッドを使用できます。たとえば、ジョブがディスパッチされてから10分後に処理可能であることを指定しましょう:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}
場合によっては、ジョブにデフォルトの遅延が設定されていることがあります。この遅延をバイパスして即座に処理のためにジョブをディスパッチする必要がある場合は、withoutDelay
メソッドを使用できます:
ProcessPodcast::dispatch($podcast)->withoutDelay();
Amazon SQSキューサービスには、最大遅延時間が15分あります。
Dispatching After the Response is Sent to the Browser
また、dispatchAfterResponse
メソッドは、ウェブサーバーがFastCGIを使用している場合、HTTPレスポンスがユーザーのブラウザに送信された後にジョブのディスパッチを遅延させます。これにより、キューされたジョブがまだ実行されている間に、ユーザーがアプリケーションを使用し始めることができます。これは通常、メール送信など、約1秒かかるジョブにのみ使用されるべきです。これらは現在のHTTPリクエスト内で処理されるため、この方法でディスパッチされたジョブは、処理されるためにキュー作業者が実行されている必要はありません:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
クロージャをdispatch
し、afterResponse
ヘルパーにdispatch
メソッドをチェーンして、HTTPレスポンスがブラウザに送信された後にクロージャを実行することもできます:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('')->send(new WelcomeMessage);
})->afterResponse();
Synchronous Dispatching
ジョブを即座に(同期的に)ディスパッチしたい場合は、dispatchSync
メソッドを使用できます。このメソッドを使用すると、ジョブはキューに追加されず、現在のプロセス内で即座に実行されます:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}
Jobs & Database Transactions
データベーストランザクション内でジョブをディスパッチすることは完全に問題ありませんが、ジョブが実際に成功裏に実行できることを確認するために特別な注意を払う必要があります。トランザクション内でジョブをディスパッチすると、親トランザクションがコミットされる前にワーカーによってジョブが処理される可能性があります。この場合、データベーストランザクション中にモデルやデータベースレコードに加えた更新が、まだデータベースに反映されていない可能性があります。さらに、トランザクション内で作成されたモデルやデータベースレコードは、データベースに存在しない可能性があります。
幸いなことに、Laravelはこの問題を回避するためのいくつかの方法を提供しています。まず、キュー接続の設定配列でafter_commit
接続オプションを設定できます:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
トランザクション中に発生した例外によりトランザクションがロールバックされた場合、そのトランザクション中にディスパッチされたジョブは破棄されます。
`````after_commit`````設定オプションを`````true`````に設定すると、キューされたイベントリスナー、メール、通知、およびブロードキャストイベントも、すべてのオープンなデータベーストランザクションがコミットされた後にディスパッチされます。
<a name="specifying-commit-dispatch-behavior-inline"></a>
#### Specifying Commit Dispatch Behavior Inline
`````after_commit`````キュー接続設定オプションを`````true`````に設定しない場合でも、特定のジョブがすべてのオープンなデータベーストランザクションがコミットされた後にディスパッチされるべきであることを示すことができます。これを実現するには、ディスパッチ操作に`````afterCommit`````メソッドをチェーンします:
``````php
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
`
同様に、after_commit
設定オプションがtrue
に設定されている場合、特定のジョブが即座にディスパッチされるべきであることを示すことができます。オープンなデータベーストランザクションがコミットされるのを待たずに:
ProcessPodcast::dispatch($podcast)->beforeCommit();
Job Chaining
ジョブチェイニングを使用すると、プライマリジョブが正常に実行された後に実行されるべきキューされたジョブのリストを指定できます。シーケンス内の1つのジョブが失敗すると、残りのジョブは実行されません。キューされたジョブチェーンを実行するには、chain
ファサードによって提供されるメソッドを使用できます。Laravelのコマンドバスは、キューされたジョブディスパッチングの上に構築された低レベルのコンポーネントです:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
ジョブクラスインスタンスをチェーンするだけでなく、クロージャもチェーンできます:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
ジョブ内で$this->delete()
メソッドを使用してジョブを削除しても、チェーン内のジョブが処理されるのを防ぐことはありません。チェーンは、チェーン内のジョブが失敗した場合にのみ実行を停止します。
Chain Connection and Queue
チェーンされたジョブに使用される接続とキューを指定したい場合は、onConnection
およびonQueue
メソッドを使用できます。これらのメソッドは、キューされたジョブが明示的に異なる接続/キューを割り当てられない限り、使用されるキュー接続とキュー名を指定します:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
Adding Jobs to the Chain
時には、別のジョブから既存のジョブチェーンにジョブを追加または前置きする必要があるかもしれません。これをprependToChain
およびappendToChain
メソッドを使用して実現できます:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);
// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}
チェーンの失敗
ジョブをチェーンする際、catch
メソッドを使用して、チェーン内のジョブが失敗した場合に呼び出されるクロージャを指定できます。指定されたコールバックは、ジョブの失敗を引き起こした Throwable
インスタンスを受け取ります:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();
チェーンコールバックはシリアライズされ、Laravelキューによって後で実行されるため、チェーンコールバック内で $this
変数を使用しないでください。
キューと接続のカスタマイズ
特定のキューへのディスパッチ
異なるキューにジョブをプッシュすることで、キューに入れたジョブを「カテゴリ分け」し、さまざまなキューに割り当てるワーカーの数を優先順位付けすることができます。これは、キュー設定ファイルで定義された異なるキュー「接続」にはプッシュされず、単一の接続内の特定のキューにのみプッシュされることに注意してください。キューを指定するには、ジョブをディスパッチする際に onQueue
メソッドを使用します:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}
また、ジョブのコンストラクタ内で onQueue
メソッドを呼び出すことで、ジョブのキューを指定することもできます:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}
特定の接続へのディスパッチ
アプリケーションが複数のキュー接続と対話する場合、onConnection
メソッドを使用してジョブをプッシュする接続を指定できます:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}
onConnection
と onQueue
メソッドを連結して、ジョブの接続とキューを指定することもできます:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
また、ジョブのコンストラクタ内で onConnection
メソッドを呼び出すことで、ジョブの接続を指定することもできます:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
最大ジョブ試行回数 / タイムアウト値の指定
最大試行回数
キューに入れたジョブの1つがエラーに遭遇している場合、無限に再試行させたくないでしょう。したがって、Laravelはジョブが試行される回数や期間を指定するさまざまな方法を提供します。
ジョブが試行される最大回数を指定する1つの方法は、Artisanコマンドラインの --tries
スイッチを使用することです。これは、処理されるジョブが試行される回数を指定しない限り、ワーカーによって処理されるすべてのジョブに適用されます:
php artisan queue:work --tries=3
ジョブが最大試行回数を超えると、それは「失敗した」ジョブと見なされます。失敗したジョブの処理に関する詳細は、失敗したジョードキュメントを参照してください。--tries=0
が queue:work
コマンドに提供されると、ジョブは無限に再試行されます。
ジョブクラス自体でジョブが試行される最大回数を定義することで、より詳細なアプローチを取ることができます。ジョブに最大試行回数が指定されている場合、それはコマンドラインで提供された --tries
値よりも優先されます:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
特定のジョブの最大試行回数を動的に制御する必要がある場合、ジョブに tries
メソッドを定義できます:
/**
* Determine number of times the job may be attempted.
*/
public function tries(): int
{
return 5;
}
時間ベースの試行
ジョブが失敗する前に試行される回数を定義する代わりに、ジョブがもはや試行されない時間を定義することができます。これにより、特定の時間枠内でジョブを任意の回数試行できます。ジョブがもはや試行されない時間を定義するには、ジョブクラスに retryUntil
メソッドを追加します。このメソッドは DateTime
インスタンスを返す必要があります:
use DateTime;
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
また、キューイベントリスナーに tries
プロパティまたは retryUntil
メソッドを定義することもできます。
最大例外
時には、ジョブが多くの回数試行されることを指定したいが、特定の数の未処理の例外によって再試行がトリガーされた場合には失敗すべきであることを指定したい場合があります(release
メソッドによって直接解放されるのではなく)。これを実現するために、ジョブクラスに maxExceptions
プロパティを定義できます:
<?php
namespace App\Jobs;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}
この例では、アプリケーションがRedisロックを取得できない場合、ジョブは10秒間解放され、最大25回まで再試行されます。ただし、ジョブによって3つの未処理の例外がスローされた場合、ジョブは失敗します。
タイムアウト
通常、キューに入れたジョブがどのくらいの時間がかかるかを大まかに知っています。このため、Laravelは「タイムアウト」値を指定することを許可します。デフォルトでは、タイムアウト値は60秒です。ジョブがタイムアウト値で指定された秒数よりも長く処理される場合、ジョブを処理しているワーカーはエラーで終了します。通常、ワーカーはサーバーに設定されたプロセスマネージャーによって自動的に再起動されます。
ジョブが実行できる最大秒数は、Artisanコマンドラインの --timeout
スイッチを使用して指定できます:
php artisan queue:work --timeout=30
ジョブがタイムアウトによって最大試行回数を超えると、失敗としてマークされます。
ジョブクラス自体でジョブが実行される最大秒数を定義することもできます。ジョブにタイムアウトが指定されている場合、それはコマンドラインで指定されたタイムアウトよりも優先されます:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
時には、ソケットや外部HTTP接続などのIOブロッキングプロセスが指定されたタイムアウトを尊重しない場合があります。したがって、これらの機能を使用する際は、常にAPIを使用してタイムアウトを指定するようにしてください。たとえば、Guzzleを使用する場合は、常に接続とリクエストのタイムアウト値を指定する必要があります。
ジョブタイムアウトを指定するには、pcntl
PHP拡張がインストールされている必要があります。また、ジョブの「タイムアウト」値は常にその「再試行後」の値よりも小さい必要があります。そうしないと、ジョブが実行を完了する前に再試行される可能性があります。
タイムアウト時の失敗
ジョブがタイムアウト時に失敗としてマークされるべきであることを示したい場合、ジョブクラスに $failOnTimeout
プロパティを定義できます:
/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;
エラーハンドリング
ジョブが処理されている間に例外がスローされると、ジョブは自動的にキューに戻され、再試行されます。ジョブは、アプリケーションによって許可されている最大回数まで再試行され続けます。最大試行回数は、--tries
スイッチによって queue:work
Artisanコマンドで定義されます。あるいは、最大試行回数はジョブクラス自体で定義できます。キューのワーカーを実行することに関する詳細は、以下を参照。
ジョブを手動で解放する
時には、ジョブを手動でキューに戻して、後で再試行できるようにしたい場合があります。これを実現するには、release
メソッドを呼び出します:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->release();
}
デフォルトでは、release
メソッドはジョブを即座に処理のためにキューに戻します。ただし、release
メソッドに整数または日付インスタンスを渡すことで、指定された秒数が経過するまでジョブを処理可能にしないようにキューに指示することができます:
$this->release(10);
$this->release(now()->addSeconds(10));
ジョブを手動で失敗させる
時には、ジョブを「失敗」として手動でマークする必要があります。これを行うには、fail
メソッドを呼び出します:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->fail();
}
キャッチした例外のためにジョブを失敗としてマークしたい場合は、例外を fail
メソッドに渡すことができます。または、便利のために、文字列エラーメッセージを渡すことができ、これは例外に変換されます:
$this->fail($exception);
$this->fail('Something went wrong.');
失敗したジョブに関する詳細は、ジョブの失敗に関するドキュメントを確認してください。
ジョブバッチ処理
Laravelのジョブバッチ処理機能を使用すると、ジョブのバッチを簡単に実行し、ジョブのバッチが実行を完了したときに何らかのアクションを実行できます。始める前に、ジョブバッチに関するメタ情報を含むテーブルを構築するためのデータベースマイグレーションを作成する必要があります。たとえば、完了率などです。このマイグレーションは、make:queue-batches-table
Artisanコマンドを使用して生成できます:
php artisan make:queue-batches-table
php artisan migrate
バッチ可能なジョブの定義
バッチ可能なジョブを定義するには、通常通りキュー可能なジョブを作成する必要があります。ただし、ジョブクラスに Illuminate\Bus\Batchable
トレイトを追加する必要があります。このトレイトは、ジョブが実行されている現在のバッチを取得するために使用できる batch
メソッドへのアクセスを提供します:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...
return;
}
// Import a portion of the CSV file...
}
}
バッチのディスパッチ
ジョブのバッチをディスパッチするには、batch
ファサードの Bus
メソッドを使用する必要があります。もちろん、バッチ処理は主に完了コールバックと組み合わせて使用されます。したがって、then
、catch
、および finally
メソッドを使用してバッチの完了コールバックを定義できます。これらのコールバックのそれぞれは、呼び出されたときに Illuminate\Bus\Batch
インスタンスを受け取ります。この例では、CSVファイルから指定された行数を処理するジョブのバッチをキューに入れていると想像します:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
// A single job has completed successfully...
})->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->dispatch();
return $batch->id;
バッチのIDは、$batch->id
プロパティを介してアクセスでき、ディスパッチされた後にバッチに関する情報をLaravelコマンドバスにクエリするために使用できます。
バッチコールバックはシリアライズされ、Laravelキューによって後で実行されるため、コールバック内で $this
変数を使用しないでください。また、バッチ処理されたジョブはデータベーストランザクション内にラップされているため、暗黙的なコミットをトリガーするデータベースステートメントはジョブ内で実行しないでください。
バッチの命名
Laravel HorizonやLaravel Telescopeなどのツールは、バッチに名前が付けられている場合、バッチのよりユーザーフレンドリーなデバッグ情報を提供する場合があります。バッチに任意の名前を付けるには、バッチを定義する際に name
メソッドを呼び出します:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import CSV')->dispatch();
バッチ接続とキュー
バッチ処理されたジョブに使用する接続とキューを指定したい場合は、onConnection
および onQueue
メソッドを使用できます。すべてのバッチ処理されたジョブは、同じ接続とキュー内で実行する必要があります:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();
チェーンとバッチ
バッチ内にチェーンジョブのセットを定義するには、チェーンジョブを配列内に配置します。たとえば、2つのジョブチェーンを並行して実行し、両方のジョブチェーンが処理を完了したときにコールバックを実行することができます:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();
逆に、バッチ内にバッチを定義することで、チェーン内でジョブのバッチを実行できます。たとえば、最初に複数のポッドキャストを解放するバッチを実行し、その後リリース通知を送信するバッチを実行することができます:
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();
バッチにジョブを追加する
時には、バッチジョブ内からバッチに追加のジョブを追加することが有用な場合があります。このパターンは、Webリクエスト中にディスパッチするには時間がかかりすぎる可能性のある数千のジョブをバッチ処理する必要がある場合に便利です。したがって、代わりに、バッチをさらに多くのジョブで水和する「ローダー」ジョブの初期バッチをディスパッチしたい場合があります:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();
この例では、LoadImportBatch
ジョブを使用してバッチに追加のジョブを水和します。これを実現するために、ジョブの batch
メソッドを介してアクセスできるバッチインスタンスの add
メソッドを使用できます:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
バッチにジョブを追加できるのは、同じバッチに属するジョブ内からのみです。
バッチの検査
バッチ完了コールバックに提供される Illuminate\Bus\Batch
インスタンスには、特定のジョブのバッチと対話し、検査するのに役立つさまざまなプロパティとメソッドがあります:
// The UUID of the batch...
$batch->id;
// The name of the batch (if applicable)...
$batch->name;
// The number of jobs assigned to the batch...
$batch->totalJobs;
// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;
// The number of jobs that have failed...
$batch->failedJobs;
// The number of jobs that have been processed thus far...
$batch->processedJobs();
// The completion percentage of the batch (0-100)...
$batch->progress();
// Indicates if the batch has finished executing...
$batch->finished();
// Cancel the execution of the batch...
$batch->cancel();
// Indicates if the batch has been cancelled...
$batch->cancelled();
ルートからのバッチの返却
すべての Illuminate\Bus\Batch
インスタンスはJSONシリアライズ可能であり、アプリケーションのルートの1つから直接返すことができ、バッチに関する情報を含むJSONペイロードを取得できます。これには、完了進捗が含まれます。これにより、アプリケーションのUIでバッチの完了進捗に関する情報を表示するのが便利です。
バッチIDでバッチを取得するには、Bus
ファサードの findBatch
メソッドを使用できます:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
バッチのキャンセル
時には、特定のバッチの実行をキャンセルする必要があります。これは、Illuminate\Bus\Batch
インスタンスの cancel
メソッドを呼び出すことで実現できます:
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
前の例で気づいたように、バッチ処理されたジョブは、実行を続ける前に対応するバッチがキャンセルされたかどうかを判断する必要があります。ただし、便利のために、ジョブに SkipIfBatchCancelled
ミドルウェアを代わりに割り当てることができます。その名前が示すように、このミドルウェアは、対応するバッチがキャンセルされた場合、Laravelにジョブを処理しないように指示します:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}
バッチの失敗
バッチ処理されたジョブが失敗した場合、catch
コールバック(割り当てられている場合)が呼び出されます。このコールバックは、バッチ内で最初に失敗したジョブに対してのみ呼び出されます。
失敗を許可する
バッチ内のジョブが失敗した場合、Laravelは自動的にバッチを「キャンセル」としてマークします。これを無効にしたい場合は、ジョブの失敗がバッチを自動的にキャンセルとしてマークしないようにすることができます。これは、バッチをディスパッチする際に allowFailures
メソッドを呼び出すことで実現できます:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();
失敗したバッチジョブの再試行
便利のために、Laravelは特定のバッチの失敗したすべてのジョブを簡単に再試行できる queue:retry-batch
Artisanコマンドを提供します。queue:retry-batch
コマンドは、失敗したジョブを再試行するバッチのUUIDを受け取ります:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
バッチのプルーニング
プルーニングを行わないと、job_batches
テーブルは非常に迅速にレコードが蓄積される可能性があります。これを軽減するために、queue:prune-batches
Artisanコマンドを毎日実行するようにスケジュールする必要があります:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();
デフォルトでは、24時間以上前に完了したすべてのバッチがプルーニングされます。コマンドを呼び出す際に hours
オプションを使用して、バッチデータを保持する期間を決定できます。たとえば、次のコマンドは、48時間以上前に完了したすべてのバッチを削除します:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();
時には、jobs_batches
テーブルが、成功裏に完了しなかったバッチのレコードを蓄積することがあります。たとえば、ジョブが失敗し、そのジョブが再試行されなかった場合などです。queue:prune-batches
コマンドに unfinished
オプションを使用して、これらの未完了のバッチレコードをプルーニングするように指示できます:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同様に、jobs_batches
テーブルもキャンセルされたバッチのレコードを蓄積する可能性があります。queue:prune-batches
コマンドに cancelled
オプションを使用して、これらのキャンセルされたバッチレコードをプルーニングするように指示できます:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
DynamoDBにバッチを保存する
Laravelは、リレーショナルデータベースの代わりにDynamoDBにバッチメタ情報を保存するサポートも提供します。ただし、すべてのバッチレコードを保存するためにDynamoDBテーブルを手動で作成する必要があります。
通常、このテーブルは job_batches
と名付けるべきですが、アプリケーションの queue
設定ファイル内の queue.batching.table
設定値に基づいてテーブルに名前を付けるべきです。
DynamoDBバッチテーブルの設定
job_batches
テーブルには、application
という名前の文字列プライマリパーティションキーと、id
という名前の文字列プライマリソートキーが必要です。キーの application
部分には、アプリケーションの name
設定値によって定義されたアプリケーション名が含まれます。アプリケーション名はDynamoDBテーブルのキーの一部であるため、複数のLaravelアプリケーションのジョブバッチを保存するために同じテーブルを使用できます。
さらに、自動バッチプルーニングを利用したい場合は、テーブルに ttl
属性を定義できます。
DynamoDBの設定
次に、AWS SDKをインストールして、LaravelアプリケーションがAmazon DynamoDBと通信できるようにします:
composer require aws/aws-sdk-php
次に、queue.batching.driver
設定オプションの値を dynamodb
に設定します。さらに、key
、secret
、および region
設定オプションを batching
設定配列内に定義する必要があります。これらのオプションは、AWSとの認証に使用されます。dynamodb
ドライバーを使用する場合、queue.batching.database
設定オプションは不要です:
'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],
DynamoDBでのバッチのプルーニング
DynamoDBを使用してジョブバッチ情報を保存する場合、リレーショナルデータベースに保存されたバッチをプルーニングするために使用される通常のプルーニングコマンドは機能しません。代わりに、DynamoDBのネイティブTTL機能を利用して、古いバッチのレコードを自動的に削除できます。
DynamoDBテーブルに ttl
属性を定義した場合、Laravelにバッチレコードをプルーニングする方法を指示するための設定パラメータを定義できます。queue.batching.ttl_attribute
設定値はTTLを保持する属性の名前を定義し、queue.batching.ttl
設定値は、レコードが更新された最後の時間からの秒数を定義します:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],
クロージャのキューイング
ジョブクラスをキューにディスパッチする代わりに、クロージャをディスパッチすることもできます。これは、現在のリクエストサイクルの外で実行する必要がある迅速で簡単なタスクに最適です。キューにクロージャをディスパッチする際、クロージャのコード内容は暗号的に署名され、転送中に変更できないようにされます:
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
catch
メソッドを使用して、キューに入れたクロージャがすべてのキューの設定された再試行回数を使い果たした後に成功裏に完了しなかった場合に実行されるクロージャを提供できます:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});
catch
コールバックはシリアライズされ、Laravelキューによって後で実行されるため、$this
変数を catch
コールバック内で使用しないでください。
キューワーカーの実行
queue:work コマンド
Laravelには、キューワーカーを起動し、新しいジョブがキューにプッシュされるとそれを処理するArtisanコマンドが含まれています。queue:work
Artisanコマンドを使用してワーカーを実行できます。queue:work
コマンドが開始されると、手動で停止するか、ターミナルを閉じるまで実行され続けることに注意してください:
php artisan queue:work
queue:work
プロセスをバックグラウンドで永続的に実行し続けるには、Supervisorのようなプロセスマネージャを使用して、キューのワーカーが停止しないようにする必要があります。
-v
フラグを指定して queue:work
コマンドを呼び出すと、処理されたジョブIDがコマンドの出力に含まれるようになります:
php artisan queue:work -v
キューのワーカーは長寿命のプロセスであり、起動されたアプリケーションの状態をメモリに保存します。そのため、起動された後はコードベースの変更に気づきません。したがって、デプロイプロセス中に、キューのワーカーを再起動することを確認してください。また、アプリケーションによって作成または変更された静的状態は、ジョブ間で自動的にリセットされません。
また、queue:listen
コマンドを実行することもできます。queue:listen
コマンドを使用すると、更新されたコードを再読み込みしたり、アプリケーションの状態をリセットしたりする際に、ワーカーを手動で再起動する必要はありません。ただし、このコマンドは queue:work
コマンドよりも効率が大幅に低下します:
php artisan queue:listen
複数のキューワーカーの実行
複数のワーカーをキューに割り当ててジョブを同時に処理するには、単に複数の queue:work
プロセスを開始する必要があります。これは、ターミナルの複数のタブを介してローカルで行うか、プロダクションでプロセスマネージャの設定を使用して行うことができます。Supervisorを使用する場合、numprocs
設定値を使用できます。
接続とキューの指定
ワーカーが使用するキュー接続を指定することもできます。work
コマンドに渡される接続名は、config/queue.php
設定ファイルで定義された接続の1つに対応する必要があります:
php artisan queue:work redis
デフォルトでは、queue:work
コマンドは、特定の接続のデフォルトキューのジョブのみを処理します。ただし、特定の接続の特定のキューのみを処理するようにキューのワーカーをさらにカスタマイズすることもできます。たとえば、すべてのメールが emails
キューで redis
キュー接続で処理される場合、次のコマンドを発行して、そのキューのみを処理するワーカーを開始できます:
php artisan queue:work redis --queue=emails
指定された数のジョブを処理する
--once
オプションを使用して、ワーカーにキューから単一のジョブのみを処理するように指示できます:
php artisan queue:work --once
--max-jobs
オプションを使用して、ワーカーに指定された数のジョブを処理してから終了するように指示できます。このオプションは、Supervisorと組み合わせて使用すると便利で、指定された数のジョブを処理した後にワーカーが自動的に再起動し、蓄積されたメモリを解放します:
php artisan queue:work --max-jobs=1000
すべてのキューに入れたジョブを処理してから終了する
--stop-when-empty
オプションを使用して、ワーカーにすべてのジョブを処理してから優雅に終了するように指示できます。このオプションは、Dockerコンテナ内でLaravelキューを処理する際に、キューが空になった後にコンテナをシャットダウンしたい場合に便利です:
php artisan queue:work --stop-when-empty
指定された秒数のジョブを処理する
--max-time
オプションを使用して、ワーカーに指定された秒数のジョブを処理してから終了するように指示できます。このオプションは、Supervisorと組み合わせて使用すると便利で、指定された時間の間にジョブを処理した後にワーカーが自動的に再起動し、蓄積されたメモリを解放します:
# 1時間のジョブを処理してから終了...
php artisan queue:work --max-time=3600
ワーカーのスリープ時間
ジョブがキューにある場合、ワーカーはジョブの間に遅延なく処理を続けます。ただし、sleep
オプションは、ジョブが利用可能でない場合にワーカーが「スリープ」する秒数を決定します。もちろん、スリープ中はワーカーは新しいジョブを処理しません:
php artisan queue:work --sleep=3
メンテナンスモードとキュー
アプリケーションがメンテナンスモードにある間、キューに入れたジョブは処理されません。アプリケーションがメンテナンスモードから出ると、ジョブは通常通り処理され続けます。
キューのワーカーにメンテナンスモードが有効になっていてもジョブを処理させたい場合は、--force
オプションを使用できます:
php artisan queue:work --force
リソースの考慮事項
デーモンキューのワーカーは、各ジョブを処理する前にフレームワークを「再起動」しません。したがって、各ジョブが完了した後に重いリソースを解放する必要があります。たとえば、GDライブラリを使用して画像操作を行っている場合、画像の処理が完了したら imagedestroy
でメモリを解放する必要があります。
キューの優先順位
時には、キューの処理の優先順位を付けたい場合があります。たとえば、config/queue.php
設定ファイルで、queue
を redis
接続のデフォルトに設定できます。ただし、時折、次のように high
優先度の高いキューにジョブをプッシュしたい場合があります:
dispatch((new Job)->onQueue('high'));
すべての high
キューのジョブが処理されることを確認してから、low
キューのジョブに続くようにワーカーを開始するには、キュー名のカンマ区切りリストを work
コマンドに渡します:
php artisan queue:work --queue=high,low
キューのワーカーとデプロイメント
キューのワーカーは長寿命のプロセスであるため、再起動しない限り、コードの変更に気づきません。したがって、キューのワーカーを使用するアプリケーションをデプロイする最も簡単な方法は、デプロイプロセス中にワーカーを再起動することです。queue:restart
コマンドを発行して、すべてのワーカーを優雅に再起動できます:
php artisan queue:restart
このコマンドは、現在のジョブの処理が完了した後にすべてのキューのワーカーに優雅に終了するように指示し、既存のジョブが失われないようにします。queue:restart
コマンドが実行されると、キューのワーカーは終了しますので、Supervisorのようなプロセスマネージャを実行して、キューのワーカーを自動的に再起動する必要があります。
キューはキャッシュを使用して再起動信号を保存するため、この機能を使用する前に、アプリケーションに適切にキャッシュドライバーが設定されていることを確認してください。
ジョブの有効期限とタイムアウト
ジョブの有効期限
config/queue.php
設定ファイルでは、各キュー接続が retry_after
オプションを定義します。このオプションは、処理中のジョブを再試行する前にキュー接続が待機する秒数を指定します。たとえば、retry_after
の値が 90
に設定されている場合、ジョブは90秒間処理され、解放または削除されない場合、キューに戻されます。通常、retry_after
値は、ジョブが処理を完了するのに合理的にかかる最大秒数に設定するべきです。
有効期限値を含まないキュー接続はAmazon SQSのみです。SQSは、AWSコンソール内で管理されるデフォルトの可視性タイムアウトに基づいてジョブを再試行します。
ワーカータイムアウト
queue:work
Artisan コマンドは --timeout
オプションを公開します。デフォルトでは、--timeout
値は 60 秒です。ジョブがタイムアウト値で指定された秒数よりも長く処理されている場合、ジョブを処理しているワーカーはエラーで終了します。通常、ワーカーは サーバーに設定されたプロセスマネージャー によって自動的に再起動されます:
php artisan queue:work --timeout=60
retry_after
設定オプションと --timeout
CLI オプションは異なりますが、ジョブが失われず、ジョブが一度だけ正常に処理されることを保証するために連携して機能します。
--timeout
値は常に retry_after
設定値よりも数秒短くする必要があります。これにより、フリーズしたジョブを処理しているワーカーが、ジョブが再試行される前に常に終了されることが保証されます。--timeout
オプションが retry_after
設定値よりも長い場合、ジョブが二重に処理される可能性があります。
スーパーバイザー設定
本番環境では、queue:work
プロセスを実行し続ける方法が必要です。queue:work
プロセスは、ワーカータイムアウトの超過や queue:restart
コマンドの実行など、さまざまな理由で停止する可能性があります。
このため、queue:work
プロセスが終了したときに検出し、自動的に再起動するプロセスモニターを設定する必要があります。さらに、プロセスモニターを使用すると、同時に実行したい queue:work
プロセスの数を指定できます。スーパーバイザーは、Linux 環境で一般的に使用されるプロセスモニターであり、次のドキュメントでその設定方法について説明します。
スーパーバイザーのインストール
スーパーバイザーは Linux オペレーティングシステム用のプロセスモニターであり、失敗した場合は queue:work
プロセスを自動的に再起動します。Ubuntu にスーパーバイザーをインストールするには、次のコマンドを使用できます:
sudo apt-get install supervisor
スーパーバイザーを自分で設定および管理するのが大変に感じる場合は、Laravel Forge を使用することを検討してください。これにより、プロダクションの Laravel プロジェクト用にスーパーバイザーが自動的にインストールおよび設定されます。
スーパーバイザーの設定
スーパーバイザーの設定ファイルは通常、/etc/supervisor/conf.d
ディレクトリに保存されます。このディレクトリ内に、スーパーバイザーがプロセスをどのように監視するかを指示する任意の数の設定ファイルを作成できます。たとえば、laravel-worker.conf
ファイルを作成して queue:work
プロセスを開始および監視しましょう:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
この例では、numprocs
ディレクティブはスーパーバイザーに 8 つの queue:work
プロセスを実行し、すべてを監視し、失敗した場合は自動的に再起動するよう指示します。設定の command
ディレクティブを変更して、希望するキュー接続とワーカーオプションを反映させる必要があります。
stopwaitsecs
の値が、最も長く実行されるジョブによって消費される秒数よりも大きいことを確認してください。そうしないと、スーパーバイザーがジョブを処理が完了する前に終了させる可能性があります。
スーパーバイザーの起動
設定ファイルが作成されたら、次のコマンドを使用してスーパーバイザーの設定を更新し、プロセスを開始できます:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"
スーパーバイザーに関する詳細は、スーパーバイザーのドキュメントを参照してください。
失敗したジョブの処理
時々、キューに入れたジョブが失敗することがあります。心配しないでください、物事は常に計画通りに進むわけではありません!Laravel には、ジョブが試行される最大回数を指定する便利な方法が含まれています。非同期ジョブがこの試行回数を超えた場合、failed_jobs
データベーステーブルに挿入されます。同期的にディスパッチされたジョブが失敗した場合、これらのテーブルには保存されず、その例外はアプリケーションによって即座に処理されます。
failed_jobs
テーブルを作成するためのマイグレーションは、新しい Laravel アプリケーションには通常すでに存在します。ただし、アプリケーションにこのテーブルのマイグレーションが含まれていない場合は、make:queue-failed-table
コマンドを使用してマイグレーションを作成できます:
php artisan make:queue-failed-table
php artisan migrate
キュー ワーカー プロセスを実行する際、--tries
スイッチを使用してジョブが試行される最大回数を指定できます。--tries
オプションの値を指定しない場合、ジョブは一度だけ試行されるか、ジョブクラスの $tries
プロパティで指定された回数だけ試行されます:
php artisan queue:work redis --tries=3
--backoff
オプションを使用すると、例外が発生したジョブを再試行する前に Laravel が待機する秒数を指定できます。デフォルトでは、ジョブは即座にキューに戻され、再試行される可能性があります:
php artisan queue:work redis --tries=3 --backoff=3
例外が発生したジョブを再試行する前に Laravel が待機する秒数をジョブごとに設定したい場合は、ジョブクラスに backoff
プロパティを定義することで実現できます:
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;
ジョブのバックオフ時間を決定するためにより複雑なロジックが必要な場合は、ジョブクラスに backoff
メソッドを定義できます:
/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}
backoff
メソッドからバックオフ値の配列を返すことで、「指数的」バックオフを簡単に設定できます。この例では、最初の再試行の遅延は 1 秒、2 回目の再試行は 5 秒、3 回目の再試行は 10 秒、残りの再試行はすべて 10 秒になります:
/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}
失敗したジョブの後処理
特定のジョブが失敗した場合、ユーザーにアラートを送信したり、ジョブによって部分的に完了したアクションを元に戻したりしたい場合があります。これを実現するには、ジョブクラスに failed
メソッドを定義できます。ジョブを失敗させた Throwable
インスタンスが failed
メソッドに渡されます:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
/**
* Handle a job failure.
*/
public function failed(?Throwable $exception): void
{
// Send user notification of failure, etc...
}
}
failed
メソッドを呼び出す前に新しいジョブのインスタンスが生成されるため、handle
メソッド内で発生した可能性のあるクラスプロパティの変更は失われます。
失敗したジョブの再試行
failed_jobs
データベーステーブルに挿入されたすべての失敗したジョブを表示するには、queue:failed
Artisan コマンドを使用できます:
php artisan queue:failed
queue:failed
コマンドは、ジョブ ID、接続、キュー、失敗時間、およびジョブに関するその他の情報をリストします。ジョブ ID は、失敗したジョブを再試行するために使用できます。たとえば、ID が ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
の失敗したジョブを再試行するには、次のコマンドを発行します:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
必要に応じて、コマンドに複数の ID を渡すことができます:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
特定のキューのすべての失敗したジョブを再試行することもできます:
php artisan queue:retry --queue=name
すべての失敗したジョブを再試行するには、queue:retry
コマンドを実行し、all
を ID として渡します:
php artisan queue:retry all
失敗したジョブを削除したい場合は、queue:forget
コマンドを使用できます:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
Horizon を使用している場合は、horizon:forget
コマンドを使用して失敗したジョブを削除する必要があります。queue:forget
コマンドの代わりに。
failed_jobs
テーブルからすべての失敗したジョブを削除するには、queue:flush
コマンドを使用できます:
php artisan queue:flush
欠落しているモデルの無視
Eloquent モデルをジョブに注入する際、モデルはキューに置かれる前に自動的にシリアル化され、ジョブが処理されるときにデータベースから再取得されます。ただし、ジョブがワーカーによって処理されるのを待っている間にモデルが削除された場合、ジョブは ModelNotFoundException
で失敗する可能性があります。
便利なことに、ジョブの deleteWhenMissingModels
プロパティを true
に設定することで、欠落しているモデルを持つジョブを自動的に削除することができます。このプロパティが true
に設定されている場合、Laravel は例外を発生させることなくジョブを静かに破棄します:
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
失敗したジョブのプルーニング
アプリケーションの failed_jobs
テーブルのレコードをプルーニングするには、queue:prune-failed
Artisan コマンドを呼び出します:
php artisan queue:prune-failed
デフォルトでは、24 時間以上前のすべての失敗したジョブレコードがプルーニングされます。コマンドに --hours
オプションを提供すると、直近 N 時間内に挿入された失敗したジョブレコードのみが保持されます。たとえば、次のコマンドは、48 時間以上前に挿入されたすべての失敗したジョブレコードを削除します:
php artisan queue:prune-failed --hours=48
DynamoDB に失敗したジョブを保存
Laravel は、失敗したジョブレコードをリレーショナルデータベーステーブルの代わりに DynamoDB に保存するサポートも提供します。ただし、すべての失敗したジョブレコードを保存するために DynamoDB テーブルを手動で作成する必要があります。通常、このテーブルは failed_jobs
と名付けられますが、アプリケーションの queue
設定ファイル内の queue.failed.table
設定値に基づいてテーブルの名前を付ける必要があります。
failed_jobs
テーブルには、application
という名前の文字列のプライマリパーティションキーと、uuid
という名前の文字列のプライマリソートキーがあります。application
キーの部分には、アプリケーションの name
設定値によって定義されたアプリケーション名が含まれます。アプリケーション名は DynamoDB テーブルのキーの一部であるため、複数の Laravel アプリケーションの失敗したジョブを保存するために同じテーブルを使用できます。
さらに、Laravel アプリケーションが Amazon DynamoDB と通信できるように、AWS SDK をインストールすることを確認してください:
composer require aws/aws-sdk-php
次に、queue.failed.driver
設定オプションの値を dynamodb
に設定します。さらに、失敗したジョブ設定配列内に key
、secret
、region
設定オプションを定義する必要があります。これらのオプションは、AWS との認証に使用されます。dynamodb
ドライバーを使用する場合、queue.failed.database
設定オプションは不要です:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],
失敗したジョブの保存を無効にする
Laravel に失敗したジョブを保存せずに破棄するよう指示するには、queue.failed.driver
設定オプションの値を null
に設定します。通常、これは QUEUE_FAILED_DRIVER
環境変数を介して実行できます:
QUEUE_FAILED_DRIVER=null
失敗したジョブイベント
ジョブが失敗したときに呼び出されるイベントリスナーを登録したい場合は、Queue
ファサードの failing
メソッドを使用できます。たとえば、Laravel に含まれる AppServiceProvider
メソッドからこのイベントにクロージャを添付できます:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
キューからのジョブのクリア
Horizon を使用している場合、horizon:clear
コマンドを使用してキューからジョブをクリアする必要があります。queue:clear
コマンドの代わりに。
デフォルト接続のデフォルトキューからすべてのジョブを削除したい場合は、queue:clear
Artisan コマンドを使用して実行できます:
php artisan queue:clear
connection
引数と queue
オプションを提供して、特定の接続とキューからジョブを削除することもできます:
php artisan queue:clear redis --queue=emails
キューからジョブをクリアする機能は、SQS、Redis、およびデータベースキュードライバーでのみ利用可能です。さらに、SQS メッセージの削除プロセスには最大 60 秒かかるため、キューをクリアした後に SQS キューに送信されたジョブも削除される可能性があります。
キューの監視
キューに突然のジョブの流入があると、圧倒されてジョブの完了までの待機時間が長くなる可能性があります。希望する場合、Laravel はキューのジョブ数が指定された閾値を超えたときに通知することができます。
始めるには、queue:monitor
コマンドを 毎分実行する ようにスケジュールする必要があります。コマンドは、監視したいキューの名前と希望するジョブ数の閾値を受け入れます:
php artisan queue:monitor redis:default,redis:deployments --max=100
このコマンドをスケジュールするだけでは、キューが圧倒されている状態を通知するアラートをトリガーするには不十分です。コマンドがジョブ数が閾値を超えるキューに遭遇すると、Illuminate\Queue\Events\QueueBusy
イベントが発生します。このイベントをアプリケーションの AppServiceProvider
でリッスンして、あなたや開発チームに通知を送信できます:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', '')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}
テスト
ジョブをディスパッチするコードをテストする際、ジョブのコードは直接テストでき、ディスパッチするコードとは別にテストできるため、Laravel にジョブ自体を実行しないよう指示したい場合があります。もちろん、ジョブ自体をテストするには、ジョブインスタンスを生成し、テスト内で handle
メソッドを直接呼び出すことができます。
Queue
ファサードの fake
メソッドを使用して、キューに実際にプッシュされることのないキューに入れられたジョブを防ぐことができます。Queue
ファサードの fake
メソッドを呼び出した後、アプリケーションがジョブをキューにプッシュしようとしたことを確認できます:
<?php
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('orders can be shipped', function () {
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}
assertPushed
または assertNotPushed
メソッドにクロージャを渡して、特定の「真実テスト」を通過するジョブがプッシュされたことを確認できます。指定された真実テストを通過するジョブが少なくとも 1 つプッシュされた場合、アサーションは成功します:
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
ジョブのサブセットをフェイクする
特定のジョブをフェイクし、他のジョブを通常通り実行させたい場合は、フェイクする必要のあるジョブのクラス名を fake
メソッドに渡すことができます:
test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}
指定されたジョブのセットを除くすべてのジョブをフェイクするには、except
メソッドを使用します:
Queue::fake()->except([
ShipOrder::class,
]);
ジョブチェーンのテスト
ジョブチェーンをテストするには、Bus
ファサードのフェイキング機能を利用する必要があります。Bus
ファサードの assertChained
メソッドを使用して、ジョブのチェーンがディスパッチされたことを確認できます。assertChained
メソッドは、チェーンされたジョブの配列を最初の引数として受け取ります:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);
上記の例のように、チェーンされたジョブの配列はジョブのクラス名の配列である場合があります。ただし、実際のジョブインスタンスの配列を提供することもできます。その場合、Laravel はジョブインスタンスが同じクラスであり、アプリケーションによってディスパッチされたチェーンされたジョブのプロパティ値が同じであることを確認します:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);
assertDispatchedWithoutChain
メソッドを使用して、ジョブがチェーンなしでプッシュされたことを確認できます:
Bus::assertDispatchedWithoutChain(ShipOrder::class);
チェーンの変更のテスト
チェーンされたジョブが 既存のチェーンにジョブを追加または挿入する 場合、ジョブの assertHasChain
メソッドを使用して、ジョブが期待される残りのジョブのチェーンを持っていることを確認できます:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);
assertDoesntHaveChain
メソッドを使用して、ジョブの残りのチェーンが空であることを確認できます:
$job->assertDoesntHaveChain();
チェーンバッチのテスト
ジョブチェーンが ジョブのバッチを含む 場合、チェーンアサーション内に Bus::chainedBatch
定義を挿入することで、チェーンバッチが期待通りであることを確認できます:
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);
ジョブバッチのテスト
Bus
ファサードの assertBatched
メソッドを使用して、ジョブのバッチがディスパッチされたことを確認できます。assertBatched
メソッドに渡されるクロージャは、Illuminate\Bus\PendingBatch
のインスタンスを受け取り、バッチ内のジョブを検査するために使用できます:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});
assertBatchCount
メソッドを使用して、指定された数のバッチがディスパッチされたことを確認できます:
Bus::assertBatchCount(3);
assertNothingBatched
を使用して、バッチがディスパッチされなかったことを確認できます:
Bus::assertNothingBatched();
ジョブ / バッチの相互作用のテスト
さらに、個々のジョブがその基盤となるバッチと相互作用することをテストする必要がある場合があります。たとえば、ジョブがバッチのさらなる処理をキャンセルしたかどうかをテストする必要がある場合があります。これを実現するには、withFakeBatch
メソッドを介してジョブにフェイクバッチを割り当てる必要があります。withFakeBatch
メソッドは、ジョブインスタンスとフェイクバッチを含むタプルを返します:
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);
ジョブ / キューの相互作用のテスト
時には、キューに入れられたジョブが 自分自身をキューに戻す ことをテストする必要があるかもしれません。また、ジョブが自分自身を削除したかどうかをテストする必要があるかもしれません。これらのキューの相互作用をテストするには、ジョブをインスタンス化し、withFakeQueueInteractions
メソッドを呼び出します。
ジョブのキューの相互作用がフェイクされた後、ジョブの handle
メソッドを呼び出すことができます。ジョブを呼び出した後、assertReleased
、assertDeleted
、assertNotDeleted
、assertFailed
、および assertNotFailed
メソッドを使用して、ジョブのキューの相互作用に対してアサーションを行うことができます:
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertNotFailed();
ジョブイベント
before
および after
メソッドを使用して、Queue
ファサード で、キューに入れられたジョブが処理される前後に実行されるコールバックを指定できます。これらのコールバックは、追加のロギングを行ったり、ダッシュボードの統計を増加させたりする絶好の機会です。通常、これらのメソッドは サービスプロバイダー の boot
メソッドから呼び出すべきです。たとえば、Laravel に含まれる AppServiceProvider
を使用できます:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
Queue
ファサード の looping
メソッドを使用して、ワーカーがキューからジョブを取得しようとする前に実行されるコールバックを指定できます。たとえば、以前の失敗したジョブによって開かれたトランザクションをロールバックするためにクロージャを登録することができます:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});