Update By Query API

指定されたクエリに一致するドキュメントを更新します。クエリが指定されていない場合、データストリームまたはインデックス内のすべてのドキュメントに対してソースを変更せずに更新を実行します。これは、マッピングの変更を取得するのに便利です。

Python

  1. resp = client.update_by_query(
  2. index="my-index-000001",
  3. conflicts="proceed",
  4. )
  5. print(resp)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001',
  3. conflicts: 'proceed'
  4. )
  5. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001",
  3. conflicts: "proceed",
  4. });
  5. console.log(response);

Console

  1. POST my-index-000001/_update_by_query?conflicts=proceed

Request

POST /<target>/_update_by_query

Prerequisites

  • Elasticsearchのセキュリティ機能が有効になっている場合、ターゲットデータストリーム、インデックス、またはエイリアスに対して次のインデックス権限を持っている必要があります:
    • read
    • index または write

Description

リクエストURIまたはリクエストボディで、Search APIと同じ構文を使用してクエリ基準を指定できます。

クエリによる更新リクエストを送信すると、Elasticsearchはリクエストの処理を開始する際にデータストリームまたはインデックスのスナップショットを取得し、internalバージョニングを使用して一致するドキュメントを更新します。バージョンが一致すると、ドキュメントが更新され、バージョン番号がインクリメントされます。スナップショットが取得されてから更新操作が処理されるまでの間にドキュメントが変更されると、バージョンの競合が発生し、操作は失敗します。バージョンの競合をカウントすることを選択することもでき、conflictsproceedに設定することで、停止して返すのではなくカウントします。バージョンの競合をカウントすることを選択した場合、操作はmax_docsからソースのドキュメントを更新しようとする可能性があり、max_docsドキュメントが正常に更新されるか、ソースクエリ内のすべてのドキュメントを通過するまで続きます。

バージョンが0のドキュメントは、internalバージョニングが0を有効なバージョン番号としてサポートしていないため、クエリによる更新を使用して更新できません。

クエリによる更新リクエストを処理している間、Elasticsearchは一致するすべてのドキュメントを見つけるために複数の検索リクエストを順次実行します。一致するドキュメントの各バッチに対してバルク更新リクエストが実行されます。クエリまたは更新の失敗は、クエリによる更新リクエストを失敗させ、失敗は応答に表示されます。成功裏に完了した更新リクエストはそのまま残り、ロールバックされることはありません。

Refreshing shards

  1. ### Running update by query asynchronously
  2. リクエストに`````wait_for_completion=false`````が含まれている場合、Elasticsearchは一部の事前チェックを実行し、リクエストを開始し、タスクのキャンセルまたはステータスを取得するために使用できる[`````task`````](/read/elasticsearch-8-15/4819d08b09fe89d8.md)を返します。Elasticsearchはこのタスクの記録を`````.tasks/task/${taskId}`````にドキュメントとして作成します。
  3. ### Waiting for active shards
  4. `````wait_for_active_shards`````は、リクエストを進める前にアクティブでなければならないシャードのコピーの数を制御します。詳細については、[Active shards](9a3777dee7ec0d3c.md#index-wait-for-active-shards)を参照してください。`````timeout`````は、各書き込みリクエストが利用できないシャードが利用可能になるまで待機する時間を制御します。両方とも、[Bulk API](/read/elasticsearch-8-15/7cc0d7a47de34557.md)での動作とまったく同じです。クエリによる更新はスクロール検索を使用するため、`````scroll`````パラメータを指定して検索コンテキストを生存させる時間を制御することもできます。たとえば、`````?scroll=10m`````。デフォルトは5分です。
  5. ### Throttling update requests
  6. クエリによる更新が更新操作のバッチを発行する速度を制御するには、`````requests_per_second`````を任意の正の小数に設定できます。これにより、各バッチに待機時間が追加され、速度が制御されます。`````requests_per_second``````````-1`````に設定すると、スロットリングが無効になります。
  7. スロットリングは、バッチ間に待機時間を使用して、内部スクロールリクエストにリクエストパディングを考慮したタイムアウトを与えます。パディング時間は、バッチサイズを`````requests_per_second`````で割った値と、書き込みに費やした時間の差です。デフォルトのバッチサイズは`````1000`````であるため、`````requests_per_second``````````500`````に設定されている場合:
  8. #### Txt
  9. ``````txt
  10. target_time = 1000 / 500 per second = 2 seconds
  11. wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
  12. `

バッチが単一の_bulkリクエストとして発行されるため、大きなバッチサイズはElasticsearchが多くのリクエストを作成し、次のセットを開始する前に待機する原因となります。これは「バースト」ではなく「スムーズ」です。

Slicing

クエリによる更新は、更新プロセスを並列化するためにsliced scrollをサポートしています。これにより、効率が向上し、リクエストを小さな部分に分割する便利な方法が提供されます。

  1. - クエリパフォーマンスは、`````slices`````の数がインデックスまたはバックインデックスのシャードの数と等しいときに最も効率的です。その数が大きい場合(たとえば、500)、パフォーマンスを損なうため、より低い数を選択してください。`````slices`````をシャードの数よりも高く設定することは、一般的に効率を向上させず、オーバーヘッドを追加します。
  2. - 更新パフォーマンスは、スライスの数に応じて利用可能なリソース全体で線形にスケールします。
  3. クエリまたは更新パフォーマンスがランタイムを支配するかどうかは、再インデックスされるドキュメントとクラスターリソースによって異なります。
  4. ## Path parameters
  5. - `````<target>
  • (オプション、文字列) 検索するデータストリーム、インデックス、およびエイリアスのカンマ区切りリスト。ワイルドカードをサポートしています(*)。すべてのデータストリームまたはインデックスを検索するには、このパラメータを省略するか、*または_allを使用します。

Query parameters

  • allow_no_indices
  • (オプション、ブール値) falseの場合、リクエストは、任意のワイルドカード式、インデックスエイリアスまたは_allの値が欠落または閉じたインデックスのみをターゲットにしている場合にエラーを返します。この動作は、リクエストが他のオープンインデックスをターゲットにしている場合でも適用されます。たとえば、foo*,bar*をターゲットにするリクエストは、fooで始まるインデックスがあっても、barで始まるインデックスがない場合にエラーを返します。
    デフォルトはtrueです。
  • analyzer
  • (オプション、文字列) クエリ文字列に使用するアナライザー。
    このパラメータは、qクエリ文字列パラメータが指定されている場合にのみ使用できます。
  • analyze_wildcard
  • (オプション、ブール値) trueの場合、ワイルドカードおよびプレフィックスクエリが分析されます。デフォルトはfalseです。
    このパラメータは、qクエリ文字列パラメータが指定されている場合にのみ使用できます。
  • conflicts
  • (オプション、文字列) クエリによる更新がバージョンの競合に遭遇した場合の処理方法:abortまたはproceed。デフォルトはabortです。
  • default_operator
  • (オプション、文字列) クエリ文字列クエリのデフォルト演算子:ANDまたはOR。デフォルトはORです。
    このパラメータは、qクエリ文字列パラメータが指定されている場合にのみ使用できます。
  • df
  • (オプション、文字列) クエリ文字列にフィールドプレフィックスが指定されていない場合に使用するデフォルトのフィールド。
    このパラメータは、qクエリ文字列パラメータが指定されている場合にのみ使用できます。
  • expand_wildcards
  • (オプション、文字列) ワイルドカードパターンが一致できるインデックスのタイプ。リクエストがデータストリームをターゲットにできる場合、この引数はワイルドカード式が隠しデータストリームと一致するかどうかを決定します。カンマ区切りの値をサポートしています。open,hiddenのように。 有効な値は:
    • all
    • すべてのデータストリームまたはインデックスに一致し、隠しも含まれます。
    • open
    • オープンで非隠しのインデックスに一致します。また、非隠しデータストリームにも一致します。
    • closed
    • クローズドで非隠しのインデックスに一致します。また、非隠しデータストリームにも一致します。データストリームはクローズできません。
    • hidden
    • 隠しデータストリームと隠しインデックスに一致します。openclosed、またはその両方と組み合わせる必要があります。
    • none
    • ワイルドカードパターンは受け入れられません。
      デフォルトはopenです。
  • ignore_unavailable
  • (オプション、ブール値) falseの場合、リクエストは欠落または閉じたインデックスをターゲットにしている場合にエラーを返します。デフォルトはfalseです。
  • lenient
  • (オプション、ブール値) trueの場合、クエリ文字列内の形式ベースのクエリ失敗(数値フィールドにテキストを提供するなど)は無視されます。デフォルトはfalseです。
    このパラメータは、qクエリ文字列パラメータが指定されている場合にのみ使用できます。
  • max_docs
  • (オプション、整数) 処理する最大ドキュメント数。デフォルトはすべてのドキュメントです。scroll_size以下の値に設定すると、操作の結果を取得するためにスクロールは使用されません。
  • pipeline
  • (オプション、文字列) 受信ドキュメントを前処理するために使用するパイプラインのID。インデックスにデフォルトのインジェストパイプラインが指定されている場合、この値を_noneに設定すると、このリクエストのデフォルトのインジェストパイプラインが無効になります。最終的なパイプラインが構成されている場合は、このパラメータの値に関係なく常に実行されます。
  • preference
  • (オプション、文字列) 操作を実行するノードまたはシャードを指定します。デフォルトはランダムです。
  • q
  • (オプション、文字列) Luceneクエリ文字列構文のクエリ。
  • request_cache
  • (オプション、ブール値) trueの場合、リクエストキャッシュがこのリクエストに使用されます。デフォルトはインデックスレベルの設定です。
  • refresh
  • (オプション、ブール値) trueの場合、Elasticsearchは影響を受けるシャードをリフレッシュして操作を検索可能にします。デフォルトはfalseです。
  • requests_per_second
  • (オプション、整数) このリクエストのスロットルをサブリクエスト毎秒で指定します。デフォルトは-1(スロットルなし)です。
  • routing
  • (オプション、文字列) 操作を特定のシャードにルーティングするために使用されるカスタム値。
  • scroll
  • (オプション、時間値) スクロールのための検索コンテキストを保持する期間。 Scroll search resultsを参照してください。
  • scroll_size
  • (オプション、整数) 操作を支えるスクロールリクエストのサイズ。デフォルトは1000です。
  • search_type
  • (オプション、文字列) 検索操作のタイプ。利用可能なオプション:
    • query_then_fetch
    • dfs_query_then_fetch
  • search_timeout
  • (オプション、時間単位) 各検索リクエストの明示的なタイムアウト。デフォルトはタイムアウトなしです。
  • slices
  • (オプション、整数) このタスクが分割されるべきスライスの数。デフォルトは1で、タスクはサブタスクに分割されません。
  • sort
  • (オプション、文字列) カンマ区切りのリストの:ペア。
  • stats
  • (オプション、文字列) ロギングおよび統計目的のためのリクエストの特定のtag
  • terminate_after
  • (オプション、整数) 各シャードの収集する最大ドキュメント数。この制限に達した場合、Elasticsearchはクエリを早期に終了します。Elasticsearchはソートの前にドキュメントを収集します。
    注意して使用してください。Elasticsearchはこのパラメータをリクエストを処理する各シャードに適用します。可能な場合は、Elasticsearchに自動的に早期終了を実行させてください。複数のデータティアにわたるバックインデックスを持つデータストリームをターゲットにするリクエストには、このパラメータを指定しないでください。
  • timeout
  • (オプション、時間単位) 各更新リクエストが次の操作を待機する期間:
    • 動的マッピングの更新
    • アクティブシャードの待機
      デフォルトは1m(1分)です。これにより、Elasticsearchは失敗する前に少なくともタイムアウトまで待機します。実際の待機時間は、特に複数の待機が発生する場合、長くなる可能性があります。
  • version
  • (オプション、ブール値) trueの場合、ヒットの一部としてドキュメントバージョンを返します。
  • wait_for_active_shards
  • (オプション、文字列) 操作を進める前にアクティブでなければならないシャードのコピーの数。allまたはインデックス内のシャードの総数(number_of_replicas+1)までの任意の正の整数に設定します。デフォルト:1、プライマリシャード。
    Active shardsを参照してください。

Request body

  • query
  • (オプション、query object) Query DSLを使用して更新するドキュメントを指定します。

Response body

  • took
  • 操作全体の開始から終了までのミリ秒数。
  • timed_out
  • 更新によるクエリ実行中に実行されたリクエストのいずれかがタイムアウトした場合、このフラグはtrueに設定されます。
  • total
  • 成功裏に処理されたドキュメントの数。
  • updated
  • 成功裏に更新されたドキュメントの数。
  • deleted
  • 成功裏に削除されたドキュメントの数。
  • batches
  • 更新によるクエリで引き戻されたスクロール応答の数。
  • version_conflicts
  • 更新によるクエリが遭遇したバージョンの競合の数。
  • noops
  • 更新によるクエリのために使用されたスクリプトがnoopの値をctx.opに返したために無視されたドキュメントの数。
  • retries
  • 更新によるクエリが試みた再試行の数。bulkは再試行されたバルクアクションの数で、searchは再試行された検索アクションの数です。
  • throttled_millis
  • requests_per_secondに準拠するためにリクエストがスリープしたミリ秒数。
  • requests_per_second
  • 更新によるクエリ中に実行されたリクエストの数。
  • throttled_until_millis
  • このフィールドは、_update_by_query応答で常にゼロと等しい必要があります。これは、Task APIを使用している場合にのみ意味を持ち、スロットルされたリクエストがrequests_per_secondに準拠するために再度実行される次の時間(エポックからのミリ秒)を示します。
  • failures
  • プロセス中に回復不可能なエラーが発生した場合の失敗の配列。これが空でない場合、リクエストはこれらの失敗のために中止されました。クエリによる更新はバッチを使用して実装されています。いずれかの失敗が発生すると、全体のプロセスが中止されますが、現在のバッチ内のすべての失敗は配列に収集されます。conflictsオプションを使用して、バージョンの競合で中止されないようにすることができます。

Examples

  1. 選択したドキュメントを更新するには、リクエストボディにクエリを指定します:
  2. #### Python
  3. ``````python
  4. resp = client.update_by_query(
  5. index="my-index-000001",
  6. conflicts="proceed",
  7. query={
  8. "term": {
  9. "user.id": "kimchy"
  10. }
  11. },
  12. )
  13. print(resp)
  14. `

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001',
  3. conflicts: 'proceed',
  4. body: {
  5. query: {
  6. term: {
  7. 'user.id' => 'kimchy'
  8. }
  9. }
  10. }
  11. )
  12. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001",
  3. conflicts: "proceed",
  4. query: {
  5. term: {
  6. "user.id": "kimchy",
  7. },
  8. },
  9. });
  10. console.log(response);

Console

  1. POST my-index-000001/_update_by_query?conflicts=proceed
  2. {
  3. "query": {
  4. "term": {
  5. "user.id": "kimchy"
  6. }
  7. }
  8. }
クエリは、Search APIと同じ方法でqueryキーに値として渡す必要があります。qパラメータも、検索APIと同じ方法で使用できます。

複数のデータストリームまたはインデックス内のドキュメントを更新します:

Python

  1. resp = client.update_by_query(
  2. index="my-index-000001,my-index-000002",
  3. )
  4. print(resp)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001,my-index-000002'
  3. )
  4. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001,my-index-000002",
  3. });
  4. console.log(response);

Console

  1. POST my-index-000001,my-index-000002/_update_by_query

特定のルーティング値に対してクエリによる更新操作を制限します:

Python

  1. resp = client.update_by_query(
  2. index="my-index-000001",
  3. routing="1",
  4. )
  5. print(resp)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001',
  3. routing: 1
  4. )
  5. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001",
  3. routing: 1,
  4. });
  5. console.log(response);

Console

  1. POST my-index-000001/_update_by_query?routing=1

デフォルトでは、クエリによる更新は1000のスクロールバッチを使用します。scroll_sizeパラメータを使用してバッチサイズを変更できます:

Python

  1. resp = client.update_by_query(
  2. index="my-index-000001",
  3. scroll_size="100",
  4. )
  5. print(resp)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001',
  3. scroll_size: 100
  4. )
  5. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001",
  3. scroll_size: 100,
  4. });
  5. console.log(response);

Console

  1. POST my-index-000001/_update_by_query?scroll_size=100

ユニークな属性を使用してドキュメントを更新します:

Python

  1. resp = client.update_by_query(
  2. index="my-index-000001",
  3. query={
  4. "term": {
  5. "user.id": "kimchy"
  6. }
  7. },
  8. max_docs=1,
  9. )
  10. print(resp)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001',
  3. body: {
  4. query: {
  5. term: {
  6. 'user.id' => 'kimchy'
  7. }
  8. },
  9. max_docs: 1
  10. }
  11. )
  12. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001",
  3. query: {
  4. term: {
  5. "user.id": "kimchy",
  6. },
  7. },
  8. max_docs: 1,
  9. });
  10. console.log(response);

Console

  1. POST my-index-000001/_update_by_query
  2. {
  3. "query": {
  4. "term": {
  5. "user.id": "kimchy"
  6. }
  7. },
  8. "max_docs": 1
  9. }

Update the document source

クエリによる更新は、ドキュメントソースを更新するためにスクリプトをサポートしています。たとえば、次のリクエストは、countフィールドをuser.idkimchyのすべてのドキュメントに対してインクリメントします:

Python

  1. resp = client.update_by_query(
  2. index="my-index-000001",
  3. script={
  4. "source": "ctx._source.count++",
  5. "lang": "painless"
  6. },
  7. query={
  8. "term": {
  9. "user.id": "kimchy"
  10. }
  11. },
  12. )
  13. print(resp)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001',
  3. body: {
  4. script: {
  5. source: 'ctx._source.count++',
  6. lang: 'painless'
  7. },
  8. query: {
  9. term: {
  10. 'user.id' => 'kimchy'
  11. }
  12. }
  13. }
  14. )
  15. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001",
  3. script: {
  4. source: "ctx._source.count++",
  5. lang: "painless",
  6. },
  7. query: {
  8. term: {
  9. "user.id": "kimchy",
  10. },
  11. },
  12. });
  13. console.log(response);

Console

  1. POST my-index-000001/_update_by_query
  2. {
  3. "script": {
  4. "source": "ctx._source.count++",
  5. "lang": "painless"
  6. },
  7. "query": {
  8. "term": {
  9. "user.id": "kimchy"
  10. }
  11. }
  12. }

この例ではconflicts=proceedが指定されていないことに注意してください。この場合、バージョンの競合がプロセスを停止させる必要がありますので、失敗を処理できます。

Update APIと同様に、ctx.opを設定して実行される操作を変更できます:

noop スクリプトが変更を行う必要がないと判断した場合はctx.op = "noop"を設定します。
クエリによる更新操作はドキュメントの更新をスキップし、noopカウンターをインクリメントします。
delete スクリプトがドキュメントを削除する必要があると判断した場合はctx.op = "delete"を設定します。
クエリによる更新操作はドキュメントを削除し、deletedカウンターをインクリメントします。

クエリによる更新はindexnoopdeleteのみをサポートします。ctx.opを他の何かに設定することはエラーです。ctxの他のフィールドを設定することもエラーです。このAPIは、一致するドキュメントのソースを変更することのみを可能にし、移動することはできません。

Update documents using an ingest pipeline

クエリによる更新は、pipelineを指定することにより、Ingest pipelines機能を使用できます。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="set-foo",
  3. description="sets foo",
  4. processors=[
  5. {
  6. "set": {
  7. "field": "foo",
  8. "value": "bar"
  9. }
  10. }
  11. ],
  12. )
  13. print(resp)
  14. resp1 = client.update_by_query(
  15. index="my-index-000001",
  16. pipeline="set-foo",
  17. )
  18. print(resp1)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'set-foo',
  3. body: {
  4. description: 'sets foo',
  5. processors: [
  6. {
  7. set: {
  8. field: 'foo',
  9. value: 'bar'
  10. }
  11. }
  12. ]
  13. }
  14. )
  15. puts response
  16. response = client.update_by_query(
  17. index: 'my-index-000001',
  18. pipeline: 'set-foo'
  19. )
  20. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "set-foo",
  3. description: "sets foo",
  4. processors: [
  5. {
  6. set: {
  7. field: "foo",
  8. value: "bar",
  9. },
  10. },
  11. ],
  12. });
  13. console.log(response);
  14. const response1 = await client.updateByQuery({
  15. index: "my-index-000001",
  16. pipeline: "set-foo",
  17. });
  18. console.log(response1);

Console

  1. PUT _ingest/pipeline/set-foo
  2. {
  3. "description" : "sets foo",
  4. "processors" : [ {
  5. "set" : {
  6. "field": "foo",
  7. "value": "bar"
  8. }
  9. } ]
  10. }
  11. POST my-index-000001/_update_by_query?pipeline=set-foo

Get the status of update by query operations

すべての実行中のクエリによる更新リクエストのステータスをTask APIで取得できます:

Php

  1. $response = $client->tasks()->list();

Python

  1. resp = client.tasks.list(
  2. detailed=True,
  3. actions="*byquery",
  4. )
  5. print(resp)

Ruby

  1. response = client.tasks.list(
  2. detailed: true,
  3. actions: '*byquery'
  4. )
  5. puts response

Go

  1. res, err := es.Tasks.List(
  2. es.Tasks.List.WithActions("*byquery"),
  3. es.Tasks.List.WithDetailed(true),
  4. )
  5. fmt.Println(res, err)

Js

  1. const response = await client.tasks.list({
  2. detailed: "true",
  3. actions: "*byquery",
  4. });
  5. console.log(response);

Console

  1. GET _tasks?detailed=true&actions=*byquery

応答は次のようになります:

Console-Result

  1. {
  2. "nodes" : {
  3. "r1A2WoRbTwKZ516z6NEs5A" : {
  4. "name" : "r1A2WoR",
  5. "transport_address" : "127.0.0.1:9300",
  6. "host" : "127.0.0.1",
  7. "ip" : "127.0.0.1:9300",
  8. "attributes" : {
  9. "testattr" : "test",
  10. "portsfile" : "true"
  11. },
  12. "tasks" : {
  13. "r1A2WoRbTwKZ516z6NEs5A:36619" : {
  14. "node" : "r1A2WoRbTwKZ516z6NEs5A",
  15. "id" : 36619,
  16. "type" : "transport",
  17. "action" : "indices:data/write/update/byquery",
  18. "status" : {
  19. "total" : 6154,
  20. "updated" : 3500,
  21. "created" : 0,
  22. "deleted" : 0,
  23. "batches" : 4,
  24. "version_conflicts" : 0,
  25. "noops" : 0,
  26. "retries": {
  27. "bulk": 0,
  28. "search": 0
  29. },
  30. "throttled_millis": 0
  31. },
  32. "description" : ""
  33. }
  34. }
  35. }
  36. }
  37. }
このオブジェクトには実際のステータスが含まれています。これは、重要なtotalフィールドの追加を伴う応答JSONと同じです。 totalは、再インデックスが実行することを期待する操作の合計数です。updatedcreated、およびdeletedフィールドを追加することで進捗を推定できます。リクエストは、これらの合計がtotalフィールドと等しくなると終了します。

タスクIDを使用して、タスクを直接検索できます。次の例は、タスクr1A2WoRbTwKZ516z6NEs5A:36619に関する情報を取得します:

Php

  1. $params = [
  2. 'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
  3. ];
  4. $response = $client->tasks()->get($params);

Python

  1. resp = client.tasks.get(
  2. task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
  3. )
  4. print(resp)

Ruby

  1. response = client.tasks.get(
  2. task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
  3. )
  4. puts response

Go

  1. res, err := es.Tasks.Get(
  2. "r1A2WoRbTwKZ516z6NEs5A:36619",
  3. )
  4. fmt.Println(res, err)

Js

  1. const response = await client.tasks.get({
  2. task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
  3. });
  4. console.log(response);

Console

  1. GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619

このAPIの利点は、wait_for_completion=falseと統合されて、完了したタスクのステータスを透過的に返すことです。タスクが完了し、wait_for_completion=falseが設定されている場合、resultsまたはerrorフィールドが返されます。この機能のコストは、wait_for_completion=false.tasks/task/${taskId}で作成するドキュメントです。そのドキュメントを削除するかどうかはあなた次第です。

Cancel an update by query operation

任意のクエリによる更新は、Task Cancel APIを使用してキャンセルできます:

Php

  1. $params = [
  2. 'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
  3. ];
  4. $response = $client->tasks()->cancel($params);

Python

  1. resp = client.tasks.cancel(
  2. task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
  3. )
  4. print(resp)

Ruby

  1. response = client.tasks.cancel(
  2. task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
  3. )
  4. puts response

Go

  1. res, err := es.Tasks.Cancel(
  2. es.Tasks.Cancel.WithTaskID("r1A2WoRbTwKZ516z6NEs5A:36619"),
  3. )
  4. fmt.Println(res, err)

Js

  1. const response = await client.tasks.cancel({
  2. task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
  3. });
  4. console.log(response);

Console

  1. POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel

タスクIDは、tasks APIを使用して見つけることができます。

キャンセルは迅速に行われるべきですが、数秒かかる場合があります。上記のタスクステータスAPIは、タスクがキャンセルされたことを確認し、自身を終了するまで、クエリによる更新タスクをリストし続けます。

Change throttling for a request

requests_per_secondの値は、_rethrottle APIを使用して実行中のクエリによる更新で変更できます:

Php

  1. $params = [
  2. 'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
  3. ];
  4. $response = $client->updateByQueryRethrottle($params);

Python

  1. resp = client.update_by_query_rethrottle(
  2. task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
  3. requests_per_second="-1",
  4. )
  5. print(resp)

Ruby

  1. response = client.update_by_query_rethrottle(
  2. task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619',
  3. requests_per_second: -1
  4. )
  5. puts response

Go

  1. res, err := es.UpdateByQueryRethrottle(
  2. "r1A2WoRbTwKZ516z6NEs5A:36619",
  3. esapi.IntPtr(-1),
  4. )
  5. fmt.Println(res, err)

Js

  1. const response = await client.updateByQueryRethrottle({
  2. task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
  3. requests_per_second: "-1",
  4. });
  5. console.log(response);

Console

  1. POST _update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1

タスクIDは、tasks APIを使用して見つけることができます。

_update_by_query APIで設定する場合と同様に、requests_per_secondはスロットリングを無効にするための-1または1.7または12のような任意の小数にすることができます。クエリを加速するための再スロットリングは即座に効果を発揮しますが、クエリを遅くするための再スロットリングは、現在のバッチが完了した後に効果を発揮します。これにより、スクロールタイムアウトが防止されます。

Slice manually

スライスIDと各リクエストのスライスの総数を指定して、クエリによる更新を手動でスライスします:

Python

  1. resp = client.update_by_query(
  2. index="my-index-000001",
  3. slice={
  4. "id": 0,
  5. "max": 2
  6. },
  7. script={
  8. "source": "ctx._source['extra'] = 'test'"
  9. },
  10. )
  11. print(resp)
  12. resp1 = client.update_by_query(
  13. index="my-index-000001",
  14. slice={
  15. "id": 1,
  16. "max": 2
  17. },
  18. script={
  19. "source": "ctx._source['extra'] = 'test'"
  20. },
  21. )
  22. print(resp1)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001',
  3. body: {
  4. slice: {
  5. id: 0,
  6. max: 2
  7. },
  8. script: {
  9. source: "ctx._source['extra'] = 'test'"
  10. }
  11. }
  12. )
  13. puts response
  14. response = client.update_by_query(
  15. index: 'my-index-000001',
  16. body: {
  17. slice: {
  18. id: 1,
  19. max: 2
  20. },
  21. script: {
  22. source: "ctx._source['extra'] = 'test'"
  23. }
  24. }
  25. )
  26. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001",
  3. slice: {
  4. id: 0,
  5. max: 2,
  6. },
  7. script: {
  8. source: "ctx._source['extra'] = 'test'",
  9. },
  10. });
  11. console.log(response);
  12. const response1 = await client.updateByQuery({
  13. index: "my-index-000001",
  14. slice: {
  15. id: 1,
  16. max: 2,
  17. },
  18. script: {
  19. source: "ctx._source['extra'] = 'test'",
  20. },
  21. });
  22. console.log(response1);

Console

  1. POST my-index-000001/_update_by_query
  2. {
  3. "slice": {
  4. "id": 0,
  5. "max": 2
  6. },
  7. "script": {
  8. "source": "ctx._source['extra'] = 'test'"
  9. }
  10. }
  11. POST my-index-000001/_update_by_query
  12. {
  13. "slice": {
  14. "id": 1,
  15. "max": 2
  16. },
  17. "script": {
  18. "source": "ctx._source['extra'] = 'test'"
  19. }
  20. }

これが機能することを確認できます:

Python

  1. resp = client.indices.refresh()
  2. print(resp)
  3. resp1 = client.search(
  4. index="my-index-000001",
  5. size="0",
  6. q="extra:test",
  7. filter_path="hits.total",
  8. )
  9. print(resp1)

Ruby

  1. response = client.indices.refresh
  2. puts response
  3. response = client.search(
  4. index: 'my-index-000001',
  5. size: 0,
  6. q: 'extra:test',
  7. filter_path: 'hits.total'
  8. )
  9. puts response

Js

  1. const response = await client.indices.refresh();
  2. console.log(response);
  3. const response1 = await client.search({
  4. index: "my-index-000001",
  5. size: 0,
  6. q: "extra:test",
  7. filter_path: "hits.total",
  8. });
  9. console.log(response1);

Console

  1. GET _refresh
  2. POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total

これにより、次のような合理的なtotalが得られます:

Console-Result

  1. {
  2. "hits": {
  3. "total": {
  4. "value": 120,
  5. "relation": "eq"
  6. }
  7. }
  8. }

Use automatic slicing

クエリによる更新を自動的に並列化するために、Sliced scrollを使用して_idでスライスできます。slicesを使用して使用するスライスの数を指定します:

Python

  1. resp = client.update_by_query(
  2. index="my-index-000001",
  3. refresh=True,
  4. slices="5",
  5. script={
  6. "source": "ctx._source['extra'] = 'test'"
  7. },
  8. )
  9. print(resp)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-index-000001',
  3. refresh: true,
  4. slices: 5,
  5. body: {
  6. script: {
  7. source: "ctx._source['extra'] = 'test'"
  8. }
  9. }
  10. )
  11. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-index-000001",
  3. refresh: "true",
  4. slices: 5,
  5. script: {
  6. source: "ctx._source['extra'] = 'test'",
  7. },
  8. });
  9. console.log(response);

Console

  1. POST my-index-000001/_update_by_query?refresh&slices=5
  2. {
  3. "script": {
  4. "source": "ctx._source['extra'] = 'test'"
  5. }
  6. }

これが機能することを確認できます:

Python

  1. resp = client.search(
  2. index="my-index-000001",
  3. size="0",
  4. q="extra:test",
  5. filter_path="hits.total",
  6. )
  7. print(resp)

Ruby

  1. response = client.search(
  2. index: 'my-index-000001',
  3. size: 0,
  4. q: 'extra:test',
  5. filter_path: 'hits.total'
  6. )
  7. puts response

Js

  1. const response = await client.search({
  2. index: "my-index-000001",
  3. size: 0,
  4. q: "extra:test",
  5. filter_path: "hits.total",
  6. });
  7. console.log(response);

Console

  1. POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total

これにより、次のような合理的なtotalが得られます:

Console-Result

  1. {
  2. "hits": {
  3. "total": {
  4. "value": 120,
  5. "relation": "eq"
  6. }
  7. }
  8. }
  1. `````slices``````````_update_by_query`````に追加すると、上記のセクションで使用される手動プロセスが自動化され、サブリクエストが作成されます。これにはいくつかの特異性があります:
  2. - これらのリクエストは、[Tasks APIs](2455b9c12b1f4c10.md#docs-update-by-query-task-api)で確認できます。これらのサブリクエストは、`````slices`````を持つリクエストのタスクの「子」タスクです。
  3. - `````slices`````を持つリクエストのタスクのステータスを取得すると、完了したスライスのステータスのみが含まれます。
  4. - これらのサブリクエストは、キャンセルや再スロットリングなどのために個別にアドレス指定できます。
  5. - `````slices`````を持つリクエストを再スロットリングすると、未完了のサブリクエストが比例的に再スロットリングされます。
  6. - `````slices`````を持つリクエストをキャンセルすると、各サブリクエストがキャンセルされます。
  7. - `````slices`````の性質上、各サブリクエストはソースデータストリームまたはインデックスのスナップショットをわずかに異なるものとして取得しますが、これらはすべてほぼ同時に取得されます。
  8. #### Pick up a new property
  9. 動的マッピングなしでインデックスを作成し、データで埋め、その後、データからさらにフィールドを取得するためにマッピング値を追加したとします:
  10. #### Php
  11. ``````php
  12. $params = [
  13. 'index' => 'test',
  14. 'body' => [
  15. 'mappings' => [
  16. 'dynamic' => false,
  17. 'properties' => [
  18. 'text' => [
  19. 'type' => 'text',
  20. ],
  21. ],
  22. ],
  23. ],
  24. ];
  25. $response = $client->indices()->create($params);
  26. $params = [
  27. 'index' => 'test',
  28. 'body' => [
  29. 'text' => 'words words',
  30. 'flag' => 'bar',
  31. ],
  32. ];
  33. $response = $client->index($params);
  34. $params = [
  35. 'index' => 'test',
  36. 'body' => [
  37. 'text' => 'words words',
  38. 'flag' => 'foo',
  39. ],
  40. ];
  41. $response = $client->index($params);
  42. $params = [
  43. 'index' => 'test',
  44. 'body' => [
  45. 'properties' => [
  46. 'text' => [
  47. 'type' => 'text',
  48. ],
  49. 'flag' => [
  50. 'type' => 'text',
  51. 'analyzer' => 'keyword',
  52. ],
  53. ],
  54. ],
  55. ];
  56. $response = $client->indices()->putMapping($params);
  57. `

Python

  1. resp = client.indices.create(
  2. index="test",
  3. mappings={
  4. "dynamic": False,
  5. "properties": {
  6. "text": {
  7. "type": "text"
  8. }
  9. }
  10. },
  11. )
  12. print(resp)
  13. resp1 = client.index(
  14. index="test",
  15. refresh=True,
  16. document={
  17. "text": "words words",
  18. "flag": "bar"
  19. },
  20. )
  21. print(resp1)
  22. resp2 = client.index(
  23. index="test",
  24. refresh=True,
  25. document={
  26. "text": "words words",
  27. "flag": "foo"
  28. },
  29. )
  30. print(resp2)
  31. resp3 = client.indices.put_mapping(
  32. index="test",
  33. properties={
  34. "text": {
  35. "type": "text"
  36. },
  37. "flag": {
  38. "type": "text",
  39. "analyzer": "keyword"
  40. }
  41. },
  42. )
  43. print(resp3)

Ruby

  1. response = client.indices.create(
  2. index: 'test',
  3. body: {
  4. mappings: {
  5. dynamic: false,
  6. properties: {
  7. text: {
  8. type: 'text'
  9. }
  10. }
  11. }
  12. }
  13. )
  14. puts response
  15. response = client.index(
  16. index: 'test',
  17. refresh: true,
  18. body: {
  19. text: 'words words',
  20. flag: 'bar'
  21. }
  22. )
  23. puts response
  24. response = client.index(
  25. index: 'test',
  26. refresh: true,
  27. body: {
  28. text: 'words words',
  29. flag: 'foo'
  30. }
  31. )
  32. puts response
  33. response = client.indices.put_mapping(
  34. index: 'test',
  35. body: {
  36. properties: {
  37. text: {
  38. type: 'text'
  39. },
  40. flag: {
  41. type: 'text',
  42. analyzer: 'keyword'
  43. }
  44. }
  45. }
  46. )
  47. puts response

Go

  1. {
  2. res, err := es.Indices.Create(
  3. "test",
  4. es.Indices.Create.WithBody(strings.NewReader(`{
  5. "mappings": {
  6. "dynamic": false,
  7. "properties": {
  8. "text": {
  9. "type": "text"
  10. }
  11. }
  12. }
  13. }`)),
  14. )
  15. fmt.Println(res, err)
  16. }
  17. {
  18. res, err := es.Index(
  19. "test",
  20. strings.NewReader(`{
  21. "text": "words words",
  22. "flag": "bar"
  23. }`),
  24. es.Index.WithRefresh("true"),
  25. es.Index.WithPretty(),
  26. )
  27. fmt.Println(res, err)
  28. }
  29. {
  30. res, err := es.Index(
  31. "test",
  32. strings.NewReader(`{
  33. "text": "words words",
  34. "flag": "foo"
  35. }`),
  36. es.Index.WithRefresh("true"),
  37. es.Index.WithPretty(),
  38. )
  39. fmt.Println(res, err)
  40. }
  41. {
  42. res, err := es.Indices.PutMapping(
  43. []string{"test"},
  44. strings.NewReader(`{
  45. "properties": {
  46. "text": {
  47. "type": "text"
  48. },
  49. "flag": {
  50. "type": "text",
  51. "analyzer": "keyword"
  52. }
  53. }
  54. }`),
  55. )
  56. fmt.Println(res, err)
  57. }

Js

  1. const response = await client.indices.create({
  2. index: "test",
  3. mappings: {
  4. dynamic: false,
  5. properties: {
  6. text: {
  7. type: "text",
  8. },
  9. },
  10. },
  11. });
  12. console.log(response);
  13. const response1 = await client.index({
  14. index: "test",
  15. refresh: "true",
  16. document: {
  17. text: "words words",
  18. flag: "bar",
  19. },
  20. });
  21. console.log(response1);
  22. const response2 = await client.index({
  23. index: "test",
  24. refresh: "true",
  25. document: {
  26. text: "words words",
  27. flag: "foo",
  28. },
  29. });
  30. console.log(response2);
  31. const response3 = await client.indices.putMapping({
  32. index: "test",
  33. properties: {
  34. text: {
  35. type: "text",
  36. },
  37. flag: {
  38. type: "text",
  39. analyzer: "keyword",
  40. },
  41. },
  42. });
  43. console.log(response3);

Console

  1. PUT test
  2. {
  3. "mappings": {
  4. "dynamic": false,
  5. "properties": {
  6. "text": {"type": "text"}
  7. }
  8. }
  9. }
  10. POST test/_doc?refresh
  11. {
  12. "text": "words words",
  13. "flag": "bar"
  14. }
  15. POST test/_doc?refresh
  16. {
  17. "text": "words words",
  18. "flag": "foo"
  19. }
  20. PUT test/_mapping
  21. {
  22. "properties": {
  23. "text": {"type": "text"},
  24. "flag": {"type": "text", "analyzer": "keyword"}
  25. }
  26. }
これは新しいフィールドがインデックスされず、_sourceに保存されることを意味します。
これは新しいflagフィールドを追加するためにマッピングを更新します。新しいフィールドを取得するには、すべてのドキュメントを再インデックス化する必要があります。

データを検索しても何も見つかりません:

Php

  1. $params = [
  2. 'index' => 'test',
  3. 'body' => [
  4. 'query' => [
  5. 'match' => [
  6. 'flag' => 'foo',
  7. ],
  8. ],
  9. ],
  10. ];
  11. $response = $client->search($params);

Python

  1. resp = client.search(
  2. index="test",
  3. filter_path="hits.total",
  4. query={
  5. "match": {
  6. "flag": "foo"
  7. }
  8. },
  9. )
  10. print(resp)

Ruby

  1. response = client.search(
  2. index: 'test',
  3. filter_path: 'hits.total',
  4. body: {
  5. query: {
  6. match: {
  7. flag: 'foo'
  8. }
  9. }
  10. }
  11. )
  12. puts response

Go

  1. res, err := es.Search(
  2. es.Search.WithIndex("test"),
  3. es.Search.WithBody(strings.NewReader(`{
  4. "query": {
  5. "match": {
  6. "flag": "foo"
  7. }
  8. }
  9. }`)),
  10. es.Search.WithFilterPath("hits.total"),
  11. es.Search.WithPretty(),
  12. )
  13. fmt.Println(res, err)

Js

  1. const response = await client.search({
  2. index: "test",
  3. filter_path: "hits.total",
  4. query: {
  5. match: {
  6. flag: "foo",
  7. },
  8. },
  9. });
  10. console.log(response);

Console

  1. POST test/_search?filter_path=hits.total
  2. {
  3. "query": {
  4. "match": {
  5. "flag": "foo"
  6. }
  7. }
  8. }

Console-Result

  1. {
  2. "hits" : {
  3. "total": {
  4. "value": 0,
  5. "relation": "eq"
  6. }
  7. }
  8. }

しかし、新しいマッピングを取得するために_update_by_queryリクエストを発行することができます:

Php

  1. $params = [
  2. 'index' => 'test',
  3. ];
  4. $response = $client->updateByQuery($params);
  5. $params = [
  6. 'index' => 'test',
  7. 'body' => [
  8. 'query' => [
  9. 'match' => [
  10. 'flag' => 'foo',
  11. ],
  12. ],
  13. ],
  14. ];
  15. $response = $client->search($params);

Python

  1. resp = client.update_by_query(
  2. index="test",
  3. refresh=True,
  4. conflicts="proceed",
  5. )
  6. print(resp)
  7. resp1 = client.search(
  8. index="test",
  9. filter_path="hits.total",
  10. query={
  11. "match": {
  12. "flag": "foo"
  13. }
  14. },
  15. )
  16. print(resp1)

Ruby

  1. response = client.update_by_query(
  2. index: 'test',
  3. refresh: true,
  4. conflicts: 'proceed'
  5. )
  6. puts response
  7. response = client.search(
  8. index: 'test',
  9. filter_path: 'hits.total',
  10. body: {
  11. query: {
  12. match: {
  13. flag: 'foo'
  14. }
  15. }
  16. }
  17. )
  18. puts response

Go

  1. {
  2. res, err := es.UpdateByQuery(
  3. []string{"test"},
  4. es.UpdateByQuery.WithConflicts("proceed"),
  5. es.UpdateByQuery.WithRefresh(true),
  6. )
  7. fmt.Println(res, err)
  8. }
  9. {
  10. res, err := es.Search(
  11. es.Search.WithIndex("test"),
  12. es.Search.WithBody(strings.NewReader(`{
  13. "query": {
  14. "match": {
  15. "flag": "foo"
  16. }
  17. }
  18. }`)),
  19. es.Search.WithFilterPath("hits.total"),
  20. es.Search.WithPretty(),
  21. )
  22. fmt.Println(res, err)
  23. }

Js

  1. const response = await client.updateByQuery({
  2. index: "test",
  3. refresh: "true",
  4. conflicts: "proceed",
  5. });
  6. console.log(response);
  7. const response1 = await client.search({
  8. index: "test",
  9. filter_path: "hits.total",
  10. query: {
  11. match: {
  12. flag: "foo",
  13. },
  14. },
  15. });
  16. console.log(response1);

Console

  1. POST test/_update_by_query?refresh&conflicts=proceed
  2. POST test/_search?filter_path=hits.total
  3. {
  4. "query": {
  5. "match": {
  6. "flag": "foo"
  7. }
  8. }
  9. }

Console-Result

  1. {
  2. "hits" : {
  3. "total": {
  4. "value": 1,
  5. "relation": "eq"
  6. }
  7. }
  8. }

マルチフィールドにフィールドを追加する際にも、全く同じことができます。