- Update By Query API
- Request
- Prerequisites
- Description
- Query parameters
- Request body
- Response body
- Examples
- Update the document source
- Update documents using an ingest pipeline
- Python
- Ruby
- Js
- Console
- Get the status of update by query operations
- Php
- Python
- Ruby
- Go
- Js
- Console
- Console-Result
- Php
- Python
- Ruby
- Go
- Js
- Console
- Cancel an update by query operation
- Php
- Python
- Ruby
- Go
- Js
- Console
- Change throttling for a request
- Php
- Python
- Ruby
- Go
- Js
- Console
- Slice manually
- Python
- Ruby
- Js
- Console
- Python
- Ruby
- Js
- Console
- Console-Result
- Use automatic slicing
- Python
- Ruby
- Js
- Console
- Python
- Ruby
- Js
- Console
- Console-Result
- Python
- Ruby
- Go
- Js
- Console
- Php
- Python
- Ruby
- Go
- Js
- Console
- Console-Result
- Php
- Python
- Ruby
- Go
- Js
- Console
- Console-Result
Update By Query API
指定されたクエリに一致するドキュメントを更新します。クエリが指定されていない場合、データストリームまたはインデックス内のすべてのドキュメントに対してソースを変更せずに更新を実行します。これは、マッピングの変更を取得するのに便利です。
Python
resp = client.update_by_query(
index="my-index-000001",
conflicts="proceed",
)
print(resp)
Ruby
response = client.update_by_query(
index: 'my-index-000001',
conflicts: 'proceed'
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001",
conflicts: "proceed",
});
console.log(response);
Console
POST my-index-000001/_update_by_query?conflicts=proceed
Request
POST /<target>/_update_by_query
Prerequisites
- Elasticsearchのセキュリティ機能が有効になっている場合、ターゲットデータストリーム、インデックス、またはエイリアスに対して次のインデックス権限を持っている必要があります:
read
index
またはwrite
Description
リクエストURIまたはリクエストボディで、Search APIと同じ構文を使用してクエリ基準を指定できます。
クエリによる更新リクエストを送信すると、Elasticsearchはリクエストの処理を開始する際にデータストリームまたはインデックスのスナップショットを取得し、internal
バージョニングを使用して一致するドキュメントを更新します。バージョンが一致すると、ドキュメントが更新され、バージョン番号がインクリメントされます。スナップショットが取得されてから更新操作が処理されるまでの間にドキュメントが変更されると、バージョンの競合が発生し、操作は失敗します。バージョンの競合をカウントすることを選択することもでき、conflicts
をproceed
に設定することで、停止して返すのではなくカウントします。バージョンの競合をカウントすることを選択した場合、操作はmax_docs
からソースのドキュメントを更新しようとする可能性があり、max_docs
ドキュメントが正常に更新されるか、ソースクエリ内のすべてのドキュメントを通過するまで続きます。
バージョンが0のドキュメントは、internal
バージョニングが0を有効なバージョン番号としてサポートしていないため、クエリによる更新を使用して更新できません。
クエリによる更新リクエストを処理している間、Elasticsearchは一致するすべてのドキュメントを見つけるために複数の検索リクエストを順次実行します。一致するドキュメントの各バッチに対してバルク更新リクエストが実行されます。クエリまたは更新の失敗は、クエリによる更新リクエストを失敗させ、失敗は応答に表示されます。成功裏に完了した更新リクエストはそのまま残り、ロールバックされることはありません。
Refreshing shards
### Running update by query asynchronously
リクエストに`````wait_for_completion=false`````が含まれている場合、Elasticsearchは一部の事前チェックを実行し、リクエストを開始し、タスクのキャンセルまたはステータスを取得するために使用できる[`````task`````](/read/elasticsearch-8-15/4819d08b09fe89d8.md)を返します。Elasticsearchはこのタスクの記録を`````.tasks/task/${taskId}`````にドキュメントとして作成します。
### Waiting for active shards
`````wait_for_active_shards`````は、リクエストを進める前にアクティブでなければならないシャードのコピーの数を制御します。詳細については、[Active shards](9a3777dee7ec0d3c.md#index-wait-for-active-shards)を参照してください。`````timeout`````は、各書き込みリクエストが利用できないシャードが利用可能になるまで待機する時間を制御します。両方とも、[Bulk API](/read/elasticsearch-8-15/7cc0d7a47de34557.md)での動作とまったく同じです。クエリによる更新はスクロール検索を使用するため、`````scroll`````パラメータを指定して検索コンテキストを生存させる時間を制御することもできます。たとえば、`````?scroll=10m`````。デフォルトは5分です。
### Throttling update requests
クエリによる更新が更新操作のバッチを発行する速度を制御するには、`````requests_per_second`````を任意の正の小数に設定できます。これにより、各バッチに待機時間が追加され、速度が制御されます。`````requests_per_second`````を`````-1`````に設定すると、スロットリングが無効になります。
スロットリングは、バッチ間に待機時間を使用して、内部スクロールリクエストにリクエストパディングを考慮したタイムアウトを与えます。パディング時間は、バッチサイズを`````requests_per_second`````で割った値と、書き込みに費やした時間の差です。デフォルトのバッチサイズは`````1000`````であるため、`````requests_per_second`````が`````500`````に設定されている場合:
#### Txt
``````txt
target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
`
バッチが単一の_bulk
リクエストとして発行されるため、大きなバッチサイズはElasticsearchが多くのリクエストを作成し、次のセットを開始する前に待機する原因となります。これは「バースト」ではなく「スムーズ」です。
Slicing
クエリによる更新は、更新プロセスを並列化するためにsliced scrollをサポートしています。これにより、効率が向上し、リクエストを小さな部分に分割する便利な方法が提供されます。
- クエリパフォーマンスは、`````slices`````の数がインデックスまたはバックインデックスのシャードの数と等しいときに最も効率的です。その数が大きい場合(たとえば、500)、パフォーマンスを損なうため、より低い数を選択してください。`````slices`````をシャードの数よりも高く設定することは、一般的に効率を向上させず、オーバーヘッドを追加します。
- 更新パフォーマンスは、スライスの数に応じて利用可能なリソース全体で線形にスケールします。
クエリまたは更新パフォーマンスがランタイムを支配するかどうかは、再インデックスされるドキュメントとクラスターリソースによって異なります。
## Path parameters
- `````<target>
- (オプション、文字列) 検索するデータストリーム、インデックス、およびエイリアスのカンマ区切りリスト。ワイルドカードをサポートしています(
*
)。すべてのデータストリームまたはインデックスを検索するには、このパラメータを省略するか、*
または_all
を使用します。
Query parameters
allow_no_indices
- (オプション、ブール値)
false
の場合、リクエストは、任意のワイルドカード式、インデックスエイリアスまたは_all
の値が欠落または閉じたインデックスのみをターゲットにしている場合にエラーを返します。この動作は、リクエストが他のオープンインデックスをターゲットにしている場合でも適用されます。たとえば、foo*,bar*
をターゲットにするリクエストは、foo
で始まるインデックスがあっても、bar
で始まるインデックスがない場合にエラーを返します。
デフォルトはtrue
です。 analyzer
- (オプション、文字列) クエリ文字列に使用するアナライザー。
このパラメータは、q
クエリ文字列パラメータが指定されている場合にのみ使用できます。 analyze_wildcard
- (オプション、ブール値)
true
の場合、ワイルドカードおよびプレフィックスクエリが分析されます。デフォルトはfalse
です。
このパラメータは、q
クエリ文字列パラメータが指定されている場合にのみ使用できます。 conflicts
- (オプション、文字列) クエリによる更新がバージョンの競合に遭遇した場合の処理方法:
abort
またはproceed
。デフォルトはabort
です。 default_operator
- (オプション、文字列) クエリ文字列クエリのデフォルト演算子:ANDまたはOR。デフォルトは
OR
です。
このパラメータは、q
クエリ文字列パラメータが指定されている場合にのみ使用できます。 df
- (オプション、文字列) クエリ文字列にフィールドプレフィックスが指定されていない場合に使用するデフォルトのフィールド。
このパラメータは、q
クエリ文字列パラメータが指定されている場合にのみ使用できます。 expand_wildcards
- (オプション、文字列) ワイルドカードパターンが一致できるインデックスのタイプ。リクエストがデータストリームをターゲットにできる場合、この引数はワイルドカード式が隠しデータストリームと一致するかどうかを決定します。カンマ区切りの値をサポートしています。
open,hidden
のように。 有効な値は:all
- すべてのデータストリームまたはインデックスに一致し、隠しも含まれます。
open
- オープンで非隠しのインデックスに一致します。また、非隠しデータストリームにも一致します。
closed
- クローズドで非隠しのインデックスに一致します。また、非隠しデータストリームにも一致します。データストリームはクローズできません。
hidden
- 隠しデータストリームと隠しインデックスに一致します。
open
、closed
、またはその両方と組み合わせる必要があります。 none
- ワイルドカードパターンは受け入れられません。
デフォルトはopen
です。
ignore_unavailable
- (オプション、ブール値)
false
の場合、リクエストは欠落または閉じたインデックスをターゲットにしている場合にエラーを返します。デフォルトはfalse
です。 lenient
- (オプション、ブール値)
true
の場合、クエリ文字列内の形式ベースのクエリ失敗(数値フィールドにテキストを提供するなど)は無視されます。デフォルトはfalse
です。
このパラメータは、q
クエリ文字列パラメータが指定されている場合にのみ使用できます。 max_docs
- (オプション、整数) 処理する最大ドキュメント数。デフォルトはすべてのドキュメントです。
scroll_size
以下の値に設定すると、操作の結果を取得するためにスクロールは使用されません。 pipeline
- (オプション、文字列) 受信ドキュメントを前処理するために使用するパイプラインのID。インデックスにデフォルトのインジェストパイプラインが指定されている場合、この値を
_none
に設定すると、このリクエストのデフォルトのインジェストパイプラインが無効になります。最終的なパイプラインが構成されている場合は、このパラメータの値に関係なく常に実行されます。 preference
- (オプション、文字列) 操作を実行するノードまたはシャードを指定します。デフォルトはランダムです。
q
- (オプション、文字列) Luceneクエリ文字列構文のクエリ。
request_cache
- (オプション、ブール値)
true
の場合、リクエストキャッシュがこのリクエストに使用されます。デフォルトはインデックスレベルの設定です。 refresh
- (オプション、ブール値)
true
の場合、Elasticsearchは影響を受けるシャードをリフレッシュして操作を検索可能にします。デフォルトはfalse
です。 requests_per_second
- (オプション、整数) このリクエストのスロットルをサブリクエスト毎秒で指定します。デフォルトは
-1
(スロットルなし)です。 routing
- (オプション、文字列) 操作を特定のシャードにルーティングするために使用されるカスタム値。
scroll
- (オプション、時間値) スクロールのための検索コンテキストを保持する期間。 Scroll search resultsを参照してください。
scroll_size
- (オプション、整数) 操作を支えるスクロールリクエストのサイズ。デフォルトは1000です。
search_type
- (オプション、文字列) 検索操作のタイプ。利用可能なオプション:
query_then_fetch
dfs_query_then_fetch
search_timeout
- (オプション、時間単位) 各検索リクエストの明示的なタイムアウト。デフォルトはタイムアウトなしです。
slices
- (オプション、整数) このタスクが分割されるべきスライスの数。デフォルトは1で、タスクはサブタスクに分割されません。
sort
- (オプション、文字列) カンマ区切りのリストの
: ペア。 stats
- (オプション、文字列) ロギングおよび統計目的のためのリクエストの特定の
tag
。 terminate_after
- (オプション、整数) 各シャードの収集する最大ドキュメント数。この制限に達した場合、Elasticsearchはクエリを早期に終了します。Elasticsearchはソートの前にドキュメントを収集します。
注意して使用してください。Elasticsearchはこのパラメータをリクエストを処理する各シャードに適用します。可能な場合は、Elasticsearchに自動的に早期終了を実行させてください。複数のデータティアにわたるバックインデックスを持つデータストリームをターゲットにするリクエストには、このパラメータを指定しないでください。 timeout
- (オプション、時間単位) 各更新リクエストが次の操作を待機する期間:
- 動的マッピングの更新
- アクティブシャードの待機
デフォルトは1m
(1分)です。これにより、Elasticsearchは失敗する前に少なくともタイムアウトまで待機します。実際の待機時間は、特に複数の待機が発生する場合、長くなる可能性があります。
version
- (オプション、ブール値)
true
の場合、ヒットの一部としてドキュメントバージョンを返します。 wait_for_active_shards
- (オプション、文字列) 操作を進める前にアクティブでなければならないシャードのコピーの数。
all
またはインデックス内のシャードの総数(number_of_replicas+1
)までの任意の正の整数に設定します。デフォルト:1、プライマリシャード。
Active shardsを参照してください。
Request body
query
- (オプション、query object) Query DSLを使用して更新するドキュメントを指定します。
Response body
took
- 操作全体の開始から終了までのミリ秒数。
timed_out
- 更新によるクエリ実行中に実行されたリクエストのいずれかがタイムアウトした場合、このフラグは
true
に設定されます。 total
- 成功裏に処理されたドキュメントの数。
updated
- 成功裏に更新されたドキュメントの数。
deleted
- 成功裏に削除されたドキュメントの数。
batches
- 更新によるクエリで引き戻されたスクロール応答の数。
version_conflicts
- 更新によるクエリが遭遇したバージョンの競合の数。
noops
- 更新によるクエリのために使用されたスクリプトが
noop
の値をctx.op
に返したために無視されたドキュメントの数。 retries
- 更新によるクエリが試みた再試行の数。
bulk
は再試行されたバルクアクションの数で、search
は再試行された検索アクションの数です。 throttled_millis
requests_per_second
に準拠するためにリクエストがスリープしたミリ秒数。requests_per_second
- 更新によるクエリ中に実行されたリクエストの数。
throttled_until_millis
- このフィールドは、
_update_by_query
応答で常にゼロと等しい必要があります。これは、Task APIを使用している場合にのみ意味を持ち、スロットルされたリクエストがrequests_per_second
に準拠するために再度実行される次の時間(エポックからのミリ秒)を示します。 failures
- プロセス中に回復不可能なエラーが発生した場合の失敗の配列。これが空でない場合、リクエストはこれらの失敗のために中止されました。クエリによる更新はバッチを使用して実装されています。いずれかの失敗が発生すると、全体のプロセスが中止されますが、現在のバッチ内のすべての失敗は配列に収集されます。
conflicts
オプションを使用して、バージョンの競合で中止されないようにすることができます。
Examples
選択したドキュメントを更新するには、リクエストボディにクエリを指定します:
#### Python
``````python
resp = client.update_by_query(
index="my-index-000001",
conflicts="proceed",
query={
"term": {
"user.id": "kimchy"
}
},
)
print(resp)
`
Ruby
response = client.update_by_query(
index: 'my-index-000001',
conflicts: 'proceed',
body: {
query: {
term: {
'user.id' => 'kimchy'
}
}
}
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001",
conflicts: "proceed",
query: {
term: {
"user.id": "kimchy",
},
},
});
console.log(response);
Console
POST my-index-000001/_update_by_query?conflicts=proceed
{
"query": {
"term": {
"user.id": "kimchy"
}
}
}
クエリは、Search APIと同じ方法でquery キーに値として渡す必要があります。q パラメータも、検索APIと同じ方法で使用できます。 |
複数のデータストリームまたはインデックス内のドキュメントを更新します:
Python
resp = client.update_by_query(
index="my-index-000001,my-index-000002",
)
print(resp)
Ruby
response = client.update_by_query(
index: 'my-index-000001,my-index-000002'
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001,my-index-000002",
});
console.log(response);
Console
POST my-index-000001,my-index-000002/_update_by_query
特定のルーティング値に対してクエリによる更新操作を制限します:
Python
resp = client.update_by_query(
index="my-index-000001",
routing="1",
)
print(resp)
Ruby
response = client.update_by_query(
index: 'my-index-000001',
routing: 1
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001",
routing: 1,
});
console.log(response);
Console
POST my-index-000001/_update_by_query?routing=1
デフォルトでは、クエリによる更新は1000のスクロールバッチを使用します。scroll_size
パラメータを使用してバッチサイズを変更できます:
Python
resp = client.update_by_query(
index="my-index-000001",
scroll_size="100",
)
print(resp)
Ruby
response = client.update_by_query(
index: 'my-index-000001',
scroll_size: 100
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001",
scroll_size: 100,
});
console.log(response);
Console
POST my-index-000001/_update_by_query?scroll_size=100
ユニークな属性を使用してドキュメントを更新します:
Python
resp = client.update_by_query(
index="my-index-000001",
query={
"term": {
"user.id": "kimchy"
}
},
max_docs=1,
)
print(resp)
Ruby
response = client.update_by_query(
index: 'my-index-000001',
body: {
query: {
term: {
'user.id' => 'kimchy'
}
},
max_docs: 1
}
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001",
query: {
term: {
"user.id": "kimchy",
},
},
max_docs: 1,
});
console.log(response);
Console
POST my-index-000001/_update_by_query
{
"query": {
"term": {
"user.id": "kimchy"
}
},
"max_docs": 1
}
Update the document source
クエリによる更新は、ドキュメントソースを更新するためにスクリプトをサポートしています。たとえば、次のリクエストは、count
フィールドをuser.id
がkimchy
のすべてのドキュメントに対してインクリメントします:
Python
resp = client.update_by_query(
index="my-index-000001",
script={
"source": "ctx._source.count++",
"lang": "painless"
},
query={
"term": {
"user.id": "kimchy"
}
},
)
print(resp)
Ruby
response = client.update_by_query(
index: 'my-index-000001',
body: {
script: {
source: 'ctx._source.count++',
lang: 'painless'
},
query: {
term: {
'user.id' => 'kimchy'
}
}
}
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001",
script: {
source: "ctx._source.count++",
lang: "painless",
},
query: {
term: {
"user.id": "kimchy",
},
},
});
console.log(response);
Console
POST my-index-000001/_update_by_query
{
"script": {
"source": "ctx._source.count++",
"lang": "painless"
},
"query": {
"term": {
"user.id": "kimchy"
}
}
}
この例ではconflicts=proceed
が指定されていないことに注意してください。この場合、バージョンの競合がプロセスを停止させる必要がありますので、失敗を処理できます。
Update APIと同様に、ctx.op
を設定して実行される操作を変更できます:
noop |
スクリプトが変更を行う必要がないと判断した場合はctx.op = "noop" を設定します。クエリによる更新操作はドキュメントの更新をスキップし、 noop カウンターをインクリメントします。 |
delete |
スクリプトがドキュメントを削除する必要があると判断した場合はctx.op = "delete" を設定します。クエリによる更新操作はドキュメントを削除し、 deleted カウンターをインクリメントします。 |
クエリによる更新はindex
、noop
、delete
のみをサポートします。ctx.op
を他の何かに設定することはエラーです。ctx
の他のフィールドを設定することもエラーです。このAPIは、一致するドキュメントのソースを変更することのみを可能にし、移動することはできません。
Update documents using an ingest pipeline
クエリによる更新は、pipeline
を指定することにより、Ingest pipelines機能を使用できます。
Python
resp = client.ingest.put_pipeline(
id="set-foo",
description="sets foo",
processors=[
{
"set": {
"field": "foo",
"value": "bar"
}
}
],
)
print(resp)
resp1 = client.update_by_query(
index="my-index-000001",
pipeline="set-foo",
)
print(resp1)
Ruby
response = client.ingest.put_pipeline(
id: 'set-foo',
body: {
description: 'sets foo',
processors: [
{
set: {
field: 'foo',
value: 'bar'
}
}
]
}
)
puts response
response = client.update_by_query(
index: 'my-index-000001',
pipeline: 'set-foo'
)
puts response
Js
const response = await client.ingest.putPipeline({
id: "set-foo",
description: "sets foo",
processors: [
{
set: {
field: "foo",
value: "bar",
},
},
],
});
console.log(response);
const response1 = await client.updateByQuery({
index: "my-index-000001",
pipeline: "set-foo",
});
console.log(response1);
Console
PUT _ingest/pipeline/set-foo
{
"description" : "sets foo",
"processors" : [ {
"set" : {
"field": "foo",
"value": "bar"
}
} ]
}
POST my-index-000001/_update_by_query?pipeline=set-foo
Get the status of update by query operations
すべての実行中のクエリによる更新リクエストのステータスをTask APIで取得できます:
Php
$response = $client->tasks()->list();
Python
resp = client.tasks.list(
detailed=True,
actions="*byquery",
)
print(resp)
Ruby
response = client.tasks.list(
detailed: true,
actions: '*byquery'
)
puts response
Go
res, err := es.Tasks.List(
es.Tasks.List.WithActions("*byquery"),
es.Tasks.List.WithDetailed(true),
)
fmt.Println(res, err)
Js
const response = await client.tasks.list({
detailed: "true",
actions: "*byquery",
});
console.log(response);
Console
GET _tasks?detailed=true&actions=*byquery
Console-Result
{
"nodes" : {
"r1A2WoRbTwKZ516z6NEs5A" : {
"name" : "r1A2WoR",
"transport_address" : "127.0.0.1:9300",
"host" : "127.0.0.1",
"ip" : "127.0.0.1:9300",
"attributes" : {
"testattr" : "test",
"portsfile" : "true"
},
"tasks" : {
"r1A2WoRbTwKZ516z6NEs5A:36619" : {
"node" : "r1A2WoRbTwKZ516z6NEs5A",
"id" : 36619,
"type" : "transport",
"action" : "indices:data/write/update/byquery",
"status" : {
"total" : 6154,
"updated" : 3500,
"created" : 0,
"deleted" : 0,
"batches" : 4,
"version_conflicts" : 0,
"noops" : 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0
},
"description" : ""
}
}
}
}
}
このオブジェクトには実際のステータスが含まれています。これは、重要なtotal フィールドの追加を伴う応答JSONと同じです。 total は、再インデックスが実行することを期待する操作の合計数です。updated 、created 、およびdeleted フィールドを追加することで進捗を推定できます。リクエストは、これらの合計がtotal フィールドと等しくなると終了します。 |
タスクIDを使用して、タスクを直接検索できます。次の例は、タスクr1A2WoRbTwKZ516z6NEs5A:36619
に関する情報を取得します:
Php
$params = [
'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->tasks()->get($params);
Python
resp = client.tasks.get(
task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
)
print(resp)
Ruby
response = client.tasks.get(
task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
)
puts response
Go
res, err := es.Tasks.Get(
"r1A2WoRbTwKZ516z6NEs5A:36619",
)
fmt.Println(res, err)
Js
const response = await client.tasks.get({
task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
});
console.log(response);
Console
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619
このAPIの利点は、wait_for_completion=false
と統合されて、完了したタスクのステータスを透過的に返すことです。タスクが完了し、wait_for_completion=false
が設定されている場合、results
またはerror
フィールドが返されます。この機能のコストは、wait_for_completion=false
が.tasks/task/${taskId}
で作成するドキュメントです。そのドキュメントを削除するかどうかはあなた次第です。
Cancel an update by query operation
任意のクエリによる更新は、Task Cancel APIを使用してキャンセルできます:
Php
$params = [
'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->tasks()->cancel($params);
Python
resp = client.tasks.cancel(
task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
)
print(resp)
Ruby
response = client.tasks.cancel(
task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
)
puts response
Go
res, err := es.Tasks.Cancel(
es.Tasks.Cancel.WithTaskID("r1A2WoRbTwKZ516z6NEs5A:36619"),
)
fmt.Println(res, err)
Js
const response = await client.tasks.cancel({
task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
});
console.log(response);
Console
POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel
タスクIDは、tasks APIを使用して見つけることができます。
キャンセルは迅速に行われるべきですが、数秒かかる場合があります。上記のタスクステータスAPIは、タスクがキャンセルされたことを確認し、自身を終了するまで、クエリによる更新タスクをリストし続けます。
Change throttling for a request
requests_per_second
の値は、_rethrottle
APIを使用して実行中のクエリによる更新で変更できます:
Php
$params = [
'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->updateByQueryRethrottle($params);
Python
resp = client.update_by_query_rethrottle(
task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
requests_per_second="-1",
)
print(resp)
Ruby
response = client.update_by_query_rethrottle(
task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619',
requests_per_second: -1
)
puts response
Go
res, err := es.UpdateByQueryRethrottle(
"r1A2WoRbTwKZ516z6NEs5A:36619",
esapi.IntPtr(-1),
)
fmt.Println(res, err)
Js
const response = await client.updateByQueryRethrottle({
task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
requests_per_second: "-1",
});
console.log(response);
Console
POST _update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1
タスクIDは、tasks APIを使用して見つけることができます。
_update_by_query
APIで設定する場合と同様に、requests_per_second
はスロットリングを無効にするための-1
または1.7
または12
のような任意の小数にすることができます。クエリを加速するための再スロットリングは即座に効果を発揮しますが、クエリを遅くするための再スロットリングは、現在のバッチが完了した後に効果を発揮します。これにより、スクロールタイムアウトが防止されます。
Slice manually
スライスIDと各リクエストのスライスの総数を指定して、クエリによる更新を手動でスライスします:
Python
resp = client.update_by_query(
index="my-index-000001",
slice={
"id": 0,
"max": 2
},
script={
"source": "ctx._source['extra'] = 'test'"
},
)
print(resp)
resp1 = client.update_by_query(
index="my-index-000001",
slice={
"id": 1,
"max": 2
},
script={
"source": "ctx._source['extra'] = 'test'"
},
)
print(resp1)
Ruby
response = client.update_by_query(
index: 'my-index-000001',
body: {
slice: {
id: 0,
max: 2
},
script: {
source: "ctx._source['extra'] = 'test'"
}
}
)
puts response
response = client.update_by_query(
index: 'my-index-000001',
body: {
slice: {
id: 1,
max: 2
},
script: {
source: "ctx._source['extra'] = 'test'"
}
}
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001",
slice: {
id: 0,
max: 2,
},
script: {
source: "ctx._source['extra'] = 'test'",
},
});
console.log(response);
const response1 = await client.updateByQuery({
index: "my-index-000001",
slice: {
id: 1,
max: 2,
},
script: {
source: "ctx._source['extra'] = 'test'",
},
});
console.log(response1);
Console
POST my-index-000001/_update_by_query
{
"slice": {
"id": 0,
"max": 2
},
"script": {
"source": "ctx._source['extra'] = 'test'"
}
}
POST my-index-000001/_update_by_query
{
"slice": {
"id": 1,
"max": 2
},
"script": {
"source": "ctx._source['extra'] = 'test'"
}
}
これが機能することを確認できます:
Python
resp = client.indices.refresh()
print(resp)
resp1 = client.search(
index="my-index-000001",
size="0",
q="extra:test",
filter_path="hits.total",
)
print(resp1)
Ruby
response = client.indices.refresh
puts response
response = client.search(
index: 'my-index-000001',
size: 0,
q: 'extra:test',
filter_path: 'hits.total'
)
puts response
Js
const response = await client.indices.refresh();
console.log(response);
const response1 = await client.search({
index: "my-index-000001",
size: 0,
q: "extra:test",
filter_path: "hits.total",
});
console.log(response1);
Console
GET _refresh
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total
Console-Result
{
"hits": {
"total": {
"value": 120,
"relation": "eq"
}
}
}
Use automatic slicing
クエリによる更新を自動的に並列化するために、Sliced scrollを使用して_id
でスライスできます。slices
を使用して使用するスライスの数を指定します:
Python
resp = client.update_by_query(
index="my-index-000001",
refresh=True,
slices="5",
script={
"source": "ctx._source['extra'] = 'test'"
},
)
print(resp)
Ruby
response = client.update_by_query(
index: 'my-index-000001',
refresh: true,
slices: 5,
body: {
script: {
source: "ctx._source['extra'] = 'test'"
}
}
)
puts response
Js
const response = await client.updateByQuery({
index: "my-index-000001",
refresh: "true",
slices: 5,
script: {
source: "ctx._source['extra'] = 'test'",
},
});
console.log(response);
Console
POST my-index-000001/_update_by_query?refresh&slices=5
{
"script": {
"source": "ctx._source['extra'] = 'test'"
}
}
これが機能することを確認できます:
Python
resp = client.search(
index="my-index-000001",
size="0",
q="extra:test",
filter_path="hits.total",
)
print(resp)
Ruby
response = client.search(
index: 'my-index-000001',
size: 0,
q: 'extra:test',
filter_path: 'hits.total'
)
puts response
Js
const response = await client.search({
index: "my-index-000001",
size: 0,
q: "extra:test",
filter_path: "hits.total",
});
console.log(response);
Console
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total
Console-Result
{
"hits": {
"total": {
"value": 120,
"relation": "eq"
}
}
}
`````slices`````を`````_update_by_query`````に追加すると、上記のセクションで使用される手動プロセスが自動化され、サブリクエストが作成されます。これにはいくつかの特異性があります:
- これらのリクエストは、[Tasks APIs](2455b9c12b1f4c10.md#docs-update-by-query-task-api)で確認できます。これらのサブリクエストは、`````slices`````を持つリクエストのタスクの「子」タスクです。
- `````slices`````を持つリクエストのタスクのステータスを取得すると、完了したスライスのステータスのみが含まれます。
- これらのサブリクエストは、キャンセルや再スロットリングなどのために個別にアドレス指定できます。
- `````slices`````を持つリクエストを再スロットリングすると、未完了のサブリクエストが比例的に再スロットリングされます。
- `````slices`````を持つリクエストをキャンセルすると、各サブリクエストがキャンセルされます。
- `````slices`````の性質上、各サブリクエストはソースデータストリームまたはインデックスのスナップショットをわずかに異なるものとして取得しますが、これらはすべてほぼ同時に取得されます。
#### Pick up a new property
動的マッピングなしでインデックスを作成し、データで埋め、その後、データからさらにフィールドを取得するためにマッピング値を追加したとします:
#### Php
``````php
$params = [
'index' => 'test',
'body' => [
'mappings' => [
'dynamic' => false,
'properties' => [
'text' => [
'type' => 'text',
],
],
],
],
];
$response = $client->indices()->create($params);
$params = [
'index' => 'test',
'body' => [
'text' => 'words words',
'flag' => 'bar',
],
];
$response = $client->index($params);
$params = [
'index' => 'test',
'body' => [
'text' => 'words words',
'flag' => 'foo',
],
];
$response = $client->index($params);
$params = [
'index' => 'test',
'body' => [
'properties' => [
'text' => [
'type' => 'text',
],
'flag' => [
'type' => 'text',
'analyzer' => 'keyword',
],
],
],
];
$response = $client->indices()->putMapping($params);
`
Python
resp = client.indices.create(
index="test",
mappings={
"dynamic": False,
"properties": {
"text": {
"type": "text"
}
}
},
)
print(resp)
resp1 = client.index(
index="test",
refresh=True,
document={
"text": "words words",
"flag": "bar"
},
)
print(resp1)
resp2 = client.index(
index="test",
refresh=True,
document={
"text": "words words",
"flag": "foo"
},
)
print(resp2)
resp3 = client.indices.put_mapping(
index="test",
properties={
"text": {
"type": "text"
},
"flag": {
"type": "text",
"analyzer": "keyword"
}
},
)
print(resp3)
Ruby
response = client.indices.create(
index: 'test',
body: {
mappings: {
dynamic: false,
properties: {
text: {
type: 'text'
}
}
}
}
)
puts response
response = client.index(
index: 'test',
refresh: true,
body: {
text: 'words words',
flag: 'bar'
}
)
puts response
response = client.index(
index: 'test',
refresh: true,
body: {
text: 'words words',
flag: 'foo'
}
)
puts response
response = client.indices.put_mapping(
index: 'test',
body: {
properties: {
text: {
type: 'text'
},
flag: {
type: 'text',
analyzer: 'keyword'
}
}
}
)
puts response
Go
{
res, err := es.Indices.Create(
"test",
es.Indices.Create.WithBody(strings.NewReader(`{
"mappings": {
"dynamic": false,
"properties": {
"text": {
"type": "text"
}
}
}
}`)),
)
fmt.Println(res, err)
}
{
res, err := es.Index(
"test",
strings.NewReader(`{
"text": "words words",
"flag": "bar"
}`),
es.Index.WithRefresh("true"),
es.Index.WithPretty(),
)
fmt.Println(res, err)
}
{
res, err := es.Index(
"test",
strings.NewReader(`{
"text": "words words",
"flag": "foo"
}`),
es.Index.WithRefresh("true"),
es.Index.WithPretty(),
)
fmt.Println(res, err)
}
{
res, err := es.Indices.PutMapping(
[]string{"test"},
strings.NewReader(`{
"properties": {
"text": {
"type": "text"
},
"flag": {
"type": "text",
"analyzer": "keyword"
}
}
}`),
)
fmt.Println(res, err)
}
Js
const response = await client.indices.create({
index: "test",
mappings: {
dynamic: false,
properties: {
text: {
type: "text",
},
},
},
});
console.log(response);
const response1 = await client.index({
index: "test",
refresh: "true",
document: {
text: "words words",
flag: "bar",
},
});
console.log(response1);
const response2 = await client.index({
index: "test",
refresh: "true",
document: {
text: "words words",
flag: "foo",
},
});
console.log(response2);
const response3 = await client.indices.putMapping({
index: "test",
properties: {
text: {
type: "text",
},
flag: {
type: "text",
analyzer: "keyword",
},
},
});
console.log(response3);
Console
PUT test
{
"mappings": {
"dynamic": false,
"properties": {
"text": {"type": "text"}
}
}
}
POST test/_doc?refresh
{
"text": "words words",
"flag": "bar"
}
POST test/_doc?refresh
{
"text": "words words",
"flag": "foo"
}
PUT test/_mapping
{
"properties": {
"text": {"type": "text"},
"flag": {"type": "text", "analyzer": "keyword"}
}
}
これは新しいフィールドがインデックスされず、_source に保存されることを意味します。 |
|
これは新しいflag フィールドを追加するためにマッピングを更新します。新しいフィールドを取得するには、すべてのドキュメントを再インデックス化する必要があります。 |
データを検索しても何も見つかりません:
Php
$params = [
'index' => 'test',
'body' => [
'query' => [
'match' => [
'flag' => 'foo',
],
],
],
];
$response = $client->search($params);
Python
resp = client.search(
index="test",
filter_path="hits.total",
query={
"match": {
"flag": "foo"
}
},
)
print(resp)
Ruby
response = client.search(
index: 'test',
filter_path: 'hits.total',
body: {
query: {
match: {
flag: 'foo'
}
}
}
)
puts response
Go
res, err := es.Search(
es.Search.WithIndex("test"),
es.Search.WithBody(strings.NewReader(`{
"query": {
"match": {
"flag": "foo"
}
}
}`)),
es.Search.WithFilterPath("hits.total"),
es.Search.WithPretty(),
)
fmt.Println(res, err)
Js
const response = await client.search({
index: "test",
filter_path: "hits.total",
query: {
match: {
flag: "foo",
},
},
});
console.log(response);
Console
POST test/_search?filter_path=hits.total
{
"query": {
"match": {
"flag": "foo"
}
}
}
Console-Result
{
"hits" : {
"total": {
"value": 0,
"relation": "eq"
}
}
}
しかし、新しいマッピングを取得するために_update_by_query
リクエストを発行することができます:
Php
$params = [
'index' => 'test',
];
$response = $client->updateByQuery($params);
$params = [
'index' => 'test',
'body' => [
'query' => [
'match' => [
'flag' => 'foo',
],
],
],
];
$response = $client->search($params);
Python
resp = client.update_by_query(
index="test",
refresh=True,
conflicts="proceed",
)
print(resp)
resp1 = client.search(
index="test",
filter_path="hits.total",
query={
"match": {
"flag": "foo"
}
},
)
print(resp1)
Ruby
response = client.update_by_query(
index: 'test',
refresh: true,
conflicts: 'proceed'
)
puts response
response = client.search(
index: 'test',
filter_path: 'hits.total',
body: {
query: {
match: {
flag: 'foo'
}
}
}
)
puts response
Go
{
res, err := es.UpdateByQuery(
[]string{"test"},
es.UpdateByQuery.WithConflicts("proceed"),
es.UpdateByQuery.WithRefresh(true),
)
fmt.Println(res, err)
}
{
res, err := es.Search(
es.Search.WithIndex("test"),
es.Search.WithBody(strings.NewReader(`{
"query": {
"match": {
"flag": "foo"
}
}
}`)),
es.Search.WithFilterPath("hits.total"),
es.Search.WithPretty(),
)
fmt.Println(res, err)
}
Js
const response = await client.updateByQuery({
index: "test",
refresh: "true",
conflicts: "proceed",
});
console.log(response);
const response1 = await client.search({
index: "test",
filter_path: "hits.total",
query: {
match: {
flag: "foo",
},
},
});
console.log(response1);
Console
POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
"query": {
"match": {
"flag": "foo"
}
}
}
Console-Result
{
"hits" : {
"total": {
"value": 1,
"relation": "eq"
}
}
}
マルチフィールドにフィールドを追加する際にも、全く同じことができます。