チェックポイントの変換の仕組み

各変換がソースインデックスを調べ、宛先インデックスを作成または更新するたびに、チェックポイントが生成されます。

変換が一度だけ実行される場合、論理的にはチェックポイントは1つだけです。しかし、変換が継続的に実行される場合、新しいソースデータを取り込み、変換する際にチェックポイントが作成されます。変換のsyncプロパティは、時間フィールドを指定することによってチェックポイントを構成します。

チェックポイントを作成するために、継続的な変換は:

  • 1. ソースインデックスの変更を確認します。
    シンプルな定期タイマーを使用して、変換はソースインデックスの変更を確認します。このチェックは、変換のfrequencyプロパティで定義された間隔に基づいて行われます。
    ソースインデックスが変更されていない場合、またはチェックポイントがすでに進行中の場合は、次のタイマーを待ちます。
    変更が見つかった場合、チェックポイントが作成されます。
  • 2. どのエンティティおよび/または時間バケットが変更されたかを特定します。
    変換は、最後のチェックポイントと新しいチェックポイントの間でどのエンティティまたは時間バケットが変更されたかを確認します。変換は、完全な再実行よりも少ない操作でソースインデックスと宛先インデックスを同期するために値を使用します。
  • 3. 変更を宛先インデックス(データフレーム)に更新します。
    変換は、新しいまたは変更されたエンティティまたは時間バケットに関連する変更を宛先インデックスに適用します。変更のセットはページネート可能です。変換は、バッチ変換操作と同様に複合集約を実行しますが、作業量を減らすために前のステップに基づいてクエリフィルターを注入します。すべての変更が適用された後、チェックポイントは完了します。

このチェックポイントプロセスは、クラスター上での検索とインデックス作成の両方のアクティビティを含みます。私たちは、変換を開発する際にパフォーマンスよりも制御を優先するよう努めました。変換が迅速に完了するよりも、完了に時間がかかる方が望ましいと判断しましたが、クラスターは依然として複合集約検索とその結果のインデックス作成をサポートするために十分なリソースを必要とします。

変換によるパフォーマンスの低下が発生した場合は、変換を停止し、パフォーマンスの考慮事項を参照してください。

変換の同期にインジェストタイムスタンプを使用する

ほとんどの場合、変換の同期にはソースインデックスのインジェストタイムスタンプを使用することを強く推奨します。これは、変換が新しい変更を特定する最も最適な方法です。データソースがECS標準に従っている場合、すでにevent.ingestedフィールドを持っている可能性があります。この場合、event.ingestedsync.time.fieldプロパティとして使用します。

event.ingestedフィールドがない場合、またはそれが入力されていない場合は、インジェストパイプラインを使用して設定できます。インジェストパイプラインAPI(以下の例のように)を使用してインジェストパイプラインを作成するか、Kibanaのスタック管理 > インジェストパイプラインから作成します。setプロセッサを使用してフィールドを設定し、インジェストタイムスタンプの値に関連付けます。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="set_ingest_time",
  3. description="Set ingest timestamp.",
  4. processors=[
  5. {
  6. "set": {
  7. "field": "event.ingested",
  8. "value": "{{{_ingest.timestamp}}}"
  9. }
  10. }
  11. ],
  12. )
  13. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'set_ingest_time',
  3. body: {
  4. description: 'Set ingest timestamp.',
  5. processors: [
  6. {
  7. set: {
  8. field: 'event.ingested',
  9. value: '{{{_ingest.timestamp}}}'
  10. }
  11. }
  12. ]
  13. }
  14. )
  15. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "set_ingest_time",
  3. description: "Set ingest timestamp.",
  4. processors: [
  5. {
  6. set: {
  7. field: "event.ingested",
  8. value: "{{{_ingest.timestamp}}}",
  9. },
  10. },
  11. ],
  12. });
  13. console.log(response);

コンソール

  1. PUT _ingest/pipeline/set_ingest_time
  2. {
  3. "description": "Set ingest timestamp.",
  4. "processors": [
  5. {
  6. "set": {
  7. "field": "event.ingested",
  8. "value": "{{{_ingest.timestamp}}}"
  9. }
  10. }
  11. ]
  12. }

インジェストパイプラインを作成した後、それを変換のソースインデックスに適用します。パイプラインは、インジェストタイムスタンプの値を持つevent.ingestedフィールドをすべてのドキュメントに追加します。新しい変換の場合はcreate transform APIを使用して、既存の変換の場合はupdate transform APIを使用して、sync.time.fieldプロパティをフィールドを使用するように構成します。event.ingestedフィールドは、変換の同期に使用されます。

インデックスリクエストにパイプラインを追加するおよびインジェストパイプラインを参照して、インジェストパイプラインの使用方法について詳しく学んでください。

変更検出ヒューリスティック

変換が継続モードで実行されると、新しいデータが入ると宛先インデックスのドキュメントが更新されます。変換は、変更検出と呼ばれる一連のヒューリスティックを使用して、宛先インデックスをより少ない操作で更新します。

この例では、データはホスト名でグループ化されます。変更検出は、ホストACGなど、どのホスト名が変更されたかを検出し、これらのホストに関するドキュメントのみを更新しますが、ホストBD、または変更されていない他のホストに関するドキュメントは更新しません。

date_histogramを使用して時間バケットでグループ化する場合、別のヒューリスティックが適用されます。変更検出は、どの時間バケットが変更されたかを検出し、それらのみを更新します。

エラーハンドリング

変換の失敗は、検索またはインデックス作成に関連することが多いです。変換の耐障害性を高めるために、集約検索と変更されたエンティティ検索のカーソル位置がメモリ内で追跡され、定期的に永続化されます。

チェックポイントの失敗は次のように分類できます:

  • 一時的な失敗: チェックポイントが再試行されます。10回連続して失敗が発生した場合、変換は失敗状態になります。たとえば、この状況はシャードの失敗が発生し、クエリが部分的な結果のみを返す場合に発生する可能性があります。
  • 回復不可能な失敗: 変換は即座に失敗します。たとえば、この状況はソースインデックスが見つからない場合に発生します。
  • 調整失敗: 変換は調整された設定で再試行します。たとえば、複合集約中に親サーキットブレーカーのメモリエラーが発生した場合、変換は部分的な結果を受け取ります。集約検索は、より少ないバケット数で再試行されます。この再試行は、変換のfrequencyプロパティで定義された間隔で行われます。検索が最小のバケット数に達するまで再試行されると、回復不可能な失敗が発生します。

変換を実行しているノードが失敗した場合、変換は最新の永続化されたカーソル位置から再起動します。この回復プロセスは、変換がすでに行った作業の一部を繰り返す可能性がありますが、データの整合性を確保します。