Bulk API
単一のAPI呼び出しで複数のインデックス作成または削除操作を実行します。これによりオーバーヘッドが削減され、インデックス作成速度が大幅に向上する可能性があります。
Php
$params = [
'body' => [
[
'index' => [
'_index' => 'test',
'_id' => '1',
],
],
[
'field1' => 'value1',
],
[
'delete' => [
'_index' => 'test',
'_id' => '2',
],
],
[
'create' => [
'_index' => 'test',
'_id' => '3',
],
],
[
'field1' => 'value3',
],
[
'update' => [
'_id' => '1',
'_index' => 'test',
],
],
[
'doc' => [
'field2' => 'value2',
],
],
],
];
$response = $client->bulk($params);
Python
resp = client.bulk(
operations=[
{
"index": {
"_index": "test",
"_id": "1"
}
},
{
"field1": "value1"
},
{
"delete": {
"_index": "test",
"_id": "2"
}
},
{
"create": {
"_index": "test",
"_id": "3"
}
},
{
"field1": "value3"
},
{
"update": {
"_id": "1",
"_index": "test"
}
},
{
"doc": {
"field2": "value2"
}
}
],
)
print(resp)
Ruby
response = client.bulk(
body: [
{
index: {
_index: 'test',
_id: '1'
}
},
{
"field1": 'value1'
},
{
delete: {
_index: 'test',
_id: '2'
}
},
{
create: {
_index: 'test',
_id: '3'
}
},
{
"field1": 'value3'
},
{
update: {
_id: '1',
_index: 'test'
}
},
{
doc: {
"field2": 'value2'
}
}
]
)
puts response
Go
res, err := es.Bulk(
strings.NewReader(`
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
)
fmt.Println(res, err)
Js
const response = await client.bulk({
operations: [
{
index: {
_index: "test",
_id: "1",
},
},
{
field1: "value1",
},
{
delete: {
_index: "test",
_id: "2",
},
},
{
create: {
_index: "test",
_id: "3",
},
},
{
field1: "value3",
},
{
update: {
_id: "1",
_index: "test",
},
},
{
doc: {
field2: "value2",
},
},
],
});
console.log(response);
Console
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
Request
POST /_bulk
POST /<target>/_bulk
Prerequisites
- Elasticsearchのセキュリティ機能が有効になっている場合、ターゲットデータストリーム、インデックス、またはインデックスエイリアスに対して次のインデックス権限を持っている必要があります:
create
アクションを使用するには、create_doc
、create
、index
、またはwrite
のインデックス権限が必要です。データストリームはcreate
アクションのみをサポートします。index
アクションを使用するには、create
、index
、またはwrite
のインデックス権限が必要です。delete
アクションを使用するには、delete
またはwrite
のインデックス権限が必要です。update
アクションを使用するには、index
またはwrite
のインデックス権限が必要です。- バルクAPIリクエストでデータストリームまたはインデックスを自動的に作成するには、
auto_configure
、create_index
、またはmanage
のインデックス権限が必要です。 refresh
パラメータを使用してバルク操作の結果を検索可能にするには、maintenance
またはmanage
のインデックス権限が必要です。
- 自動データストリーム作成には、データストリームが有効な一致するインデックステンプレートが必要です。データストリームの設定を参照してください。
Description
単一のリクエストで複数のindex
、create
、delete
、およびupdate
アクションを実行する方法を提供します。
アクションは、改行区切りのJSON(NDJSON)構造を使用してリクエストボディに指定されます:
Js
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n
[データストリーム](/read/elasticsearch-8-15/4618071bf1c879cb.md)は`````create`````アクションのみをサポートします。データストリーム内のドキュメントを更新または削除するには、ドキュメントを含むバックインデックスをターゲットにする必要があります。[バックインデックス内のドキュメントを更新または削除する](491eefa734aefc6e.md#update-delete-docs-in-a-backing-index)を参照してください。
`````update`````は、部分ドキュメント、アップサート、およびスクリプトとそのオプションが次の行に指定されることを期待します。
`````delete`````は、次の行にソースを期待せず、標準削除APIと同じ意味を持ちます。
データの最終行は、改行文字`````\n`````で終わる必要があります。各改行文字の前にはキャリッジリターン`````\r`````が付く場合があります。NDJSONデータを`````_bulk`````エンドポイントに送信する際は、`````Content-Type`````ヘッダーに`````application/json`````または`````application/x-ndjson`````を使用してください。
この形式はリテラル`````\n`````'sを区切り文字として使用するため、JSONアクションとソースがきれいに印刷されていないことを確認してください。
リクエストパスに`````<target>`````を提供すると、`````_index`````引数を明示的に指定しないアクションに使用されます。
形式に関する注意:ここでのアイデアは、これをできるだけ速く処理することです。一部のアクションが他のノードの他のシャードにリダイレクトされるため、受信ノード側では`````action_meta_data`````のみが解析されます。
このプロトコルを使用するクライアントライブラリは、クライアント側で同様のことを試み、バッファリングをできるだけ減らすように努めるべきです。
単一のバルクリクエストで実行するアクションの「正しい」数はありません。特定のワークロードに最適なサイズを見つけるために、さまざまな設定で実験してください。ElasticsearchはデフォルトでHTTPリクエストの最大サイズを`````100mb`````に制限しているため、クライアントはリクエストがこのサイズを超えないことを確認する必要があります。サイズ制限を超える単一のドキュメントをインデックスすることはできないため、そのようなドキュメントはElasticsearchに送信する前に小さな部分に前処理する必要があります。たとえば、ドキュメントをページや章に分割してからインデックスするか、生のバイナリデータをElasticsearchの外部のシステムに保存し、Elasticsearchに送信するドキュメント内の生データを外部システムへのリンクに置き換えます。
### Client support for bulk requests
公式にサポートされているクライアントのいくつかは、バルクリクエストや再インデックス作成を支援するヘルパーを提供します:
- Go
- [esutil.BulkIndexer](https://github.com/elastic/go-elasticsearch/tree/master/_examples/bulk#indexergo)を参照
- Perl
- [Search::Elasticsearch::Client::5_0::Bulk](https://metacpan.org/pod/Search::Elasticsearch::Client::5_0::Bulk)および[Search::Elasticsearch::Client::5_0::Scroll](https://metacpan.org/pod/Search::Elasticsearch::Client::5_0::Scroll)を参照
- Python
- [elasticsearch.helpers.*](https://elasticsearch-py.readthedocs.io/en/latest/helpers.html)を参照
- JavaScript
- [client.helpers.*](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/client-helpers.html)を参照
- .NET
- [`````BulkAllObservable`````](https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/indexing-documents.html)を参照
- PHP
- [Bulk indexing](https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/indexing_documents.html#_bulk_indexing)を参照
### Submitting bulk requests with cURL
`````curl`````にテキストファイル入力を提供する場合、プレーン`````--data-binary`````の代わりに`````-d`````フラグを**必ず**使用する必要があります。後者は改行を保持しません。例:
#### Js
``````js
$ cat requests
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
`
Optimistic concurrency control
バルクAPI呼び出し内の各index
およびdelete
アクションには、それぞれのアクションおよびメタデータ行にif_seq_no
およびif_primary_term
パラメータを含めることができます。if_seq_no
およびif_primary_term
パラメータは、既存のドキュメントへの最後の変更に基づいて操作がどのように実行されるかを制御します。楽観的同時実行制御の詳細については、こちらを参照してください。
Versioning
各バルクアイテムは、version
フィールドを使用してバージョン値を含めることができます。これは、_version
マッピングに基づいてインデックス/削除操作の動作を自動的に追従します。また、version_type
もサポートしています(バージョン管理を参照)。
Routing
各バルクアイテムは、routing
フィールドを使用してルーティング値を含めることができます。これは、_routing
マッピングに基づいてインデックス/削除操作の動作を自動的に追従します。
データストリームは、テンプレートでallow_custom_routing
設定が有効になっている場合を除き、カスタムルーティングをサポートしていません。
Wait for active shards
バルク呼び出しを行う際、wait_for_active_shards
パラメータを設定して、バルクリクエストの処理を開始する前にアクティブなシャードコピーの最小数を要求できます。詳細と使用例についてはこちらを参照してください。
Refresh
このリクエストによって行われた変更が検索可能になるタイミングを制御します。refreshを参照してください。
バルクリクエストを受け取ったシャードのみがrefresh
の影響を受けます。たとえば、3つのドキュメントを含む_bulk?refresh=wait_for
リクエストがあり、それが5つのシャードを持つインデックスの異なるシャードにルーティングされる場合、リクエストはその3つのシャードがリフレッシュされるのを待つだけです。インデックスを構成する他の2つのシャードは、_bulk
リクエストにはまったく参加しません。
Security
URLベースのアクセス制御を参照してください。
Path parameters
<target>
- (オプション、文字列) バルクアクションを実行するデータストリーム、インデックス、またはインデックスエイリアスの名前。
Query parameters
list_executed_pipelines
- (オプション、ブール値)
true
の場合、レスポンスには各index
またはcreate
のために実行されたインジェストパイプラインが含まれます。デフォルトはfalse
です。 pipeline
- (オプション、文字列) 受信ドキュメントを前処理するために使用するパイプラインのID。インデックスにデフォルトのインジェストパイプラインが指定されている場合、
_none
に値を設定すると、このリクエストのデフォルトのインジェストパイプラインが無効になります。最終パイプラインが構成されている場合は、このパラメータの値に関係なく常に実行されます。 refresh
- (オプション、列挙型)
true
の場合、Elasticsearchは影響を受けるシャードをリフレッシュしてこの操作を検索可能にし、wait_for
の場合はリフレッシュを待ってこの操作を検索可能にし、false
の場合はリフレッシュに対して何もしません。有効な値:true
、false
、wait_for
。デフォルト:false
。 require_alias
- (オプション、ブール値)
true
の場合、リクエストのアクションはインデックスエイリアスをターゲットにする必要があります。デフォルトはfalse
です。 routing
- (オプション、文字列) 操作を特定のシャードにルーティングするために使用されるカスタム値。
_source
- (オプション、文字列)
_source
フィールドを返すかどうか、または返すフィールドのリスト。 _source_excludes
- (オプション、文字列) レスポンスから除外するソースフィールドのカンマ区切りリスト。
このパラメータを使用して、_source_includes
クエリパラメータで指定されたサブセットからフィールドを除外することもできます。_source
パラメータがfalse
の場合、このパラメータは無視されます。 _source_includes
- (オプション、文字列) レスポンスに含めるソースフィールドのカンマ区切りリスト。
このパラメータが指定されている場合、これらのソースフィールドのみが返されます。このサブセットからフィールドを除外するには、_source_excludes
クエリパラメータを使用できます。_source
パラメータがfalse
の場合、このパラメータは無視されます。 timeout
- (オプション、時間単位) 各アクションが次の操作を待機する期間:
- 自動インデックス作成
- 動的マッピングの更新
- アクティブシャードの待機
デフォルトは1m
(1分)です。これにより、Elasticsearchは失敗する前に少なくともタイムアウトを待機します。実際の待機時間は、特に複数の待機が発生する場合、長くなる可能性があります。
wait_for_active_shards
- (オプション、文字列) 操作を進める前にアクティブでなければならないシャードコピーの数。
all
またはインデックス内のシャードの総数(number_of_replicas+1
)までの任意の正の整数に設定します。デフォルト:1、プライマリシャード。
アクティブシャードを参照してください。
Request body
リクエストボディには、create
、delete
、index
、およびupdate
アクションとそれに関連するソースデータの改行区切りリストが含まれています。
create
- (オプション、文字列) 指定されたドキュメントが存在しない場合にインデックスします。次の行にはインデックスするソースデータが含まれている必要があります。
_index
- (オプション、文字列) アクションを実行するデータストリーム、インデックス、またはインデックスエイリアスの名前。このパラメータは、リクエストパスに
<target>
が指定されていない場合は必須です。 _id
- (オプション、文字列) ドキュメントID。IDが指定されていない場合、ドキュメントIDが自動的に生成されます。
list_executed_pipelines
- (オプション、ブール値)
true
の場合、レスポンスには実行されたインジェストパイプラインが含まれます。デフォルトはfalse
です。 require_alias
- (オプション、ブール値)
true
の場合、アクションはindex aliasをターゲットにする必要があります。デフォルトはfalse
です。 dynamic_templates
- (オプション、マップ) フィールドのフルネームから動的テンプレートの名前へのマップ。デフォルトは空のマップです。名前が動的テンプレートに一致する場合、そのテンプレートが適用され、テンプレートで定義された他の一致条件に関係なく適用されます。フィールドがすでにマッピングで定義されている場合、このパラメータは使用されません。
delete
- (オプション、文字列) 指定されたドキュメントをインデックスから削除します。
_index
- (オプション、文字列) アクションを実行するインデックスまたはインデックスエイリアスの名前。このパラメータは、リクエストパスに
<target>
が指定されていない場合は必須です。 _id
- (必須、文字列) ドキュメントID。
require_alias
- (オプション、ブール値)
true
の場合、アクションはindex aliasをターゲットにする必要があります。デフォルトはfalse
です。
index
- (オプション、文字列) 指定されたドキュメントをインデックスします。ドキュメントが存在する場合、ドキュメントを置き換え、バージョンをインクリメントします。次の行にはインデックスするソースデータが含まれている必要があります。
_index
- (オプション、文字列) アクションを実行するインデックスまたはインデックスエイリアスの名前。このパラメータは、リクエストパスに
<target>
が指定されていない場合は必須です。 _id
- (オプション、文字列) ドキュメントID。IDが指定されていない場合、ドキュメントIDが自動的に生成されます。
list_executed_pipelines
- (オプション、ブール値)
true
の場合、レスポンスには実行されたインジェストパイプラインが含まれます。デフォルトはfalse
です。 require_alias
- (オプション、ブール値)
true
の場合、アクションはindex aliasをターゲットにする必要があります。デフォルトはfalse
です。 dynamic_templates
- (オプション、マップ) フィールドのフルネームから動的テンプレートの名前へのマップ。デフォルトは空のマップです。名前が動的テンプレートに一致する場合、そのテンプレートが適用され、テンプレートで定義された他の一致条件に関係なく適用されます。フィールドがすでにマッピングで定義されている場合、このパラメータは使用されません。
update
- (オプション、文字列) 部分的なドキュメント更新を実行します。次の行には部分ドキュメントと更新オプションが含まれている必要があります。
_index
- (オプション、文字列) アクションを実行するインデックスまたはインデックスエイリアスの名前。このパラメータは、リクエストパスに
<target>
が指定されていない場合は必須です。 _id
- (必須、文字列) ドキュメントID。
require_alias
- (オプション、ブール値)
true
の場合、アクションはindex aliasをターゲットにする必要があります。デフォルトはfalse
です。
doc
- (オプション、オブジェクト) インデックスする部分ドキュメント。
update
操作に必要です。 <fields>
- (オプション、オブジェクト) インデックスするドキュメントソース。
create
およびindex
操作に必要です。
Response body
バルクAPIのレスポンスには、リクエスト内の各操作の個別の結果が含まれ、提出された順序で返されます。個々の操作の成功または失敗は、リクエスト内の他の操作に影響を与えません。
took
- (整数) バルクリクエストの処理にかかった時間(ミリ秒)。
errors
- (ブール値)
true
の場合、バルクリクエスト内の1つ以上の操作が正常に完了しませんでした。 items
- (オブジェクトの配列) バルクリクエスト内の各操作の結果を、提出された順序で含みます。
- \<action\>
- (オブジェクト) パラメータ名は、操作に関連付けられたアクションです。可能な値は`````create`````、`````delete`````、`````index`````、および`````update`````です。
パラメータ値は、関連する操作に関する情報を含むオブジェクトです。
`````<action>`````のプロパティ
- `````_index
- (文字列) 操作に関連付けられたインデックスの名前。操作がデータストリームをターゲットにした場合、これはドキュメントが書き込まれたバックインデックスです。
_id
- (整数) 操作に関連付けられたドキュメントID。
_version
- (整数) 操作に関連付けられたドキュメントバージョン。ドキュメントバージョンは、ドキュメントが更新されるたびにインクリメントされます。
このパラメータは、成功したアクションに対してのみ返されます。 result
- (文字列) 操作の結果。成功した値は
created
、deleted
、およびupdated
です。他の有効な値はnoop
およびnot_found
です。 _shards
- (オブジェクト) 操作のシャード情報を含みます。
このパラメータは、成功した操作に対してのみ返されます。- `````total
- (整数) 操作が実行されることを試みたシャードの数。
successful
- (整数) 操作が成功したシャードの数。
failed
- (整数) 操作が実行されることを試みたが失敗したシャードの数。
_seq_no
- (整数) 操作に対してドキュメントに割り当てられたシーケンス番号。シーケンス番号は、古いバージョンのドキュメントが新しいバージョンを上書きしないようにするために使用されます。楽観的同時実行制御を参照してください。
このパラメータは、成功した操作に対してのみ返されます。 _primary_term
- (整数) 操作に対してドキュメントに割り当てられたプライマリターム。楽観的同時実行制御を参照してください。
このパラメータは、成功した操作に対してのみ返されます。 status
- (整数) 操作に対して返されたHTTPステータスコード。
error
- (オブジェクト) 失敗した操作に関する追加情報を含みます。
このパラメータは、失敗した操作に対してのみ返されます。- `````type
- (文字列) 操作のエラータイプ。
reason
- (文字列) 失敗した操作の理由。
index_uuid
- (文字列) 失敗した操作に関連付けられたインデックスのユニバーサルユニーク識別子(UUID)。
shard
- (文字列) 失敗した操作に関連付けられたシャードのID。
index
- (文字列) 失敗した操作に関連付けられたインデックスの名前。操作がデータストリームをターゲットにした場合、これはドキュメントが書き込まれようとしたバックインデックスです。
\u003c/action
Examples
Php
$params = [
'body' => [
[
'index' => [
'_index' => 'test',
'_id' => '1',
],
],
[
'field1' => 'value1',
],
[
'delete' => [
'_index' => 'test',
'_id' => '2',
],
],
[
'create' => [
'_index' => 'test',
'_id' => '3',
],
],
[
'field1' => 'value3',
],
[
'update' => [
'_id' => '1',
'_index' => 'test',
],
],
[
'doc' => [
'field2' => 'value2',
],
],
],
];
$response = $client->bulk($params);
Python
resp = client.bulk(
operations=[
{
"index": {
"_index": "test",
"_id": "1"
}
},
{
"field1": "value1"
},
{
"delete": {
"_index": "test",
"_id": "2"
}
},
{
"create": {
"_index": "test",
"_id": "3"
}
},
{
"field1": "value3"
},
{
"update": {
"_id": "1",
"_index": "test"
}
},
{
"doc": {
"field2": "value2"
}
}
],
)
print(resp)
Ruby
response = client.bulk(
body: [
{
index: {
_index: 'test',
_id: '1'
}
},
{
"field1": 'value1'
},
{
delete: {
_index: 'test',
_id: '2'
}
},
{
create: {
_index: 'test',
_id: '3'
}
},
{
"field1": 'value3'
},
{
update: {
_id: '1',
_index: 'test'
}
},
{
doc: {
"field2": 'value2'
}
}
]
)
puts response
Go
res, err := es.Bulk(
strings.NewReader(`
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
)
fmt.Println(res, err)
Js
const response = await client.bulk({
operations: [
{
index: {
_index: "test",
_id: "1",
},
},
{
field1: "value1",
},
{
delete: {
_index: "test",
_id: "2",
},
},
{
create: {
_index: "test",
_id: "3",
},
},
{
field1: "value3",
},
{
update: {
_id: "1",
_index: "test",
},
},
{
doc: {
field2: "value2",
},
},
],
});
console.log(response);
Console
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
Console-Result
{
"took": 30,
"errors": false,
"items": [
{
"index": {
"_index": "test",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201,
"_seq_no" : 0,
"_primary_term": 1
}
},
{
"delete": {
"_index": "test",
"_id": "2",
"_version": 1,
"result": "not_found",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 404,
"_seq_no" : 1,
"_primary_term" : 2
}
},
{
"create": {
"_index": "test",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201,
"_seq_no" : 2,
"_primary_term" : 3
}
},
{
"update": {
"_index": "test",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200,
"_seq_no" : 3,
"_primary_term" : 4
}
}
]
}
Bulk update example
`````update`````アクションペイロードは、次のオプションをサポートします:`````doc`````(部分ドキュメント)、`````upsert`````、`````doc_as_upsert`````、`````script`````、`````params`````(スクリプト用)、`````lang`````(スクリプト用)、および`````_source`````。オプションの詳細については、更新ドキュメントを参照してください。更新アクションの例:
#### Php
``````php
$params = [
'body' => [
[
'update' => [
'_id' => '1',
'_index' => 'index1',
'retry_on_conflict' => 3,
],
],
[
'doc' => [
'field' => 'value',
],
],
[
'update' => [
'_id' => '0',
'_index' => 'index1',
'retry_on_conflict' => 3,
],
],
[
'script' => [
'source' => 'ctx._source.counter += params.param1',
'lang' => 'painless',
'params' => [
'param1' => 1,
],
],
'upsert' => [
'counter' => 1,
],
],
[
'update' => [
'_id' => '2',
'_index' => 'index1',
'retry_on_conflict' => 3,
],
],
[
'doc' => [
'field' => 'value',
],
'doc_as_upsert' => true,
],
[
'update' => [
'_id' => '3',
'_index' => 'index1',
'_source' => true,
],
],
[
'doc' => [
'field' => 'value',
],
],
[
'update' => [
'_id' => '4',
'_index' => 'index1',
],
],
[
'doc' => [
'field' => 'value',
],
'_source' => true,
],
],
];
$response = $client->bulk($params);
`
Python
resp = client.bulk(
operations=[
{
"update": {
"_id": "1",
"_index": "index1",
"retry_on_conflict": 3
}
},
{
"doc": {
"field": "value"
}
},
{
"update": {
"_id": "0",
"_index": "index1",
"retry_on_conflict": 3
}
},
{
"script": {
"source": "ctx._source.counter += params.param1",
"lang": "painless",
"params": {
"param1": 1
}
},
"upsert": {
"counter": 1
}
},
{
"update": {
"_id": "2",
"_index": "index1",
"retry_on_conflict": 3
}
},
{
"doc": {
"field": "value"
},
"doc_as_upsert": True
},
{
"update": {
"_id": "3",
"_index": "index1",
"_source": True
}
},
{
"doc": {
"field": "value"
}
},
{
"update": {
"_id": "4",
"_index": "index1"
}
},
{
"doc": {
"field": "value"
},
"_source": True
}
],
)
print(resp)
Ruby
response = client.bulk(
body: [
{
update: {
_id: '1',
_index: 'index1',
retry_on_conflict: 3
}
},
{
doc: {
field: 'value'
}
},
{
update: {
_id: '0',
_index: 'index1',
retry_on_conflict: 3
}
},
{
script: {
source: 'ctx._source.counter += params.param1',
lang: 'painless',
params: {
"param1": 1
}
},
upsert: {
counter: 1
}
},
{
update: {
_id: '2',
_index: 'index1',
retry_on_conflict: 3
}
},
{
doc: {
field: 'value'
},
doc_as_upsert: true
},
{
update: {
_id: '3',
_index: 'index1',
_source: true
}
},
{
doc: {
field: 'value'
}
},
{
update: {
_id: '4',
_index: 'index1'
}
},
{
doc: {
field: 'value'
},
_source: true
}
]
)
puts response
Go
res, err := es.Bulk(
strings.NewReader(`
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}
)
fmt.Println(res, err)
Js
const response = await client.bulk({
operations: [
{
update: {
_id: "1",
_index: "index1",
retry_on_conflict: 3,
},
},
{
doc: {
field: "value",
},
},
{
update: {
_id: "0",
_index: "index1",
retry_on_conflict: 3,
},
},
{
script: {
source: "ctx._source.counter += params.param1",
lang: "painless",
params: {
param1: 1,
},
},
upsert: {
counter: 1,
},
},
{
update: {
_id: "2",
_index: "index1",
retry_on_conflict: 3,
},
},
{
doc: {
field: "value",
},
doc_as_upsert: true,
},
{
update: {
_id: "3",
_index: "index1",
_source: true,
},
},
{
doc: {
field: "value",
},
},
{
update: {
_id: "4",
_index: "index1",
},
},
{
doc: {
field: "value",
},
_source: true,
},
],
});
console.log(response);
Console
POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}
Example with failed actions
次のバルクAPIリクエストには、存在しないドキュメントを更新する操作が含まれています。
Php
$params = [
'body' => [
[
'update' => [
'_id' => '5',
'_index' => 'index1',
],
],
[
'doc' => [
'my_field' => 'foo',
],
],
[
'update' => [
'_id' => '6',
'_index' => 'index1',
],
],
[
'doc' => [
'my_field' => 'foo',
],
],
[
'create' => [
'_id' => '7',
'_index' => 'index1',
],
],
[
'my_field' => 'foo',
],
],
];
$response = $client->bulk($params);
Python
resp = client.bulk(
operations=[
{
"update": {
"_id": "5",
"_index": "index1"
}
},
{
"doc": {
"my_field": "foo"
}
},
{
"update": {
"_id": "6",
"_index": "index1"
}
},
{
"doc": {
"my_field": "foo"
}
},
{
"create": {
"_id": "7",
"_index": "index1"
}
},
{
"my_field": "foo"
}
],
)
print(resp)
Ruby
response = client.bulk(
body: [
{
update: {
_id: '5',
_index: 'index1'
}
},
{
doc: {
my_field: 'foo'
}
},
{
update: {
_id: '6',
_index: 'index1'
}
},
{
doc: {
my_field: 'foo'
}
},
{
create: {
_id: '7',
_index: 'index1'
}
},
{
my_field: 'foo'
}
]
)
puts response
Go
res, err := es.Bulk(
strings.NewReader(`
{ "update": {"_id": "5", "_index": "index1"} }
{ "doc": {"my_field": "foo"} }
{ "update": {"_id": "6", "_index": "index1"} }
{ "doc": {"my_field": "foo"} }
{ "create": {"_id": "7", "_index": "index1"} }
{ "my_field": "foo" }
)
fmt.Println(res, err)
Js
const response = await client.bulk({
operations: [
{
update: {
_id: "5",
_index: "index1",
},
},
{
doc: {
my_field: "foo",
},
},
{
update: {
_id: "6",
_index: "index1",
},
},
{
doc: {
my_field: "foo",
},
},
{
create: {
_id: "7",
_index: "index1",
},
},
{
my_field: "foo",
},
],
});
console.log(response);
Console
POST /_bulk
{ "update": {"_id": "5", "_index": "index1"} }
{ "doc": {"my_field": "foo"} }
{ "update": {"_id": "6", "_index": "index1"} }
{ "doc": {"my_field": "foo"} }
{ "create": {"_id": "7", "_index": "index1"} }
{ "my_field": "foo" }
これらの操作は正常に完了できないため、APIはerrors
フラグがtrue
のレスポンスを返します。
レスポンスには、失敗した操作に関するerror
オブジェクトも含まれています。error
オブジェクトには、エラータイプや理由など、失敗に関する追加情報が含まれています。
Console-Result
{
"took": 486,
"errors": true,
"items": [
{
"update": {
"_index": "index1",
"_id": "5",
"status": 404,
"error": {
"type": "document_missing_exception",
"reason": "[5]: document missing",
"index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
"shard": "0",
"index": "index1"
}
}
},
{
"update": {
"_index": "index1",
"_id": "6",
"status": 404,
"error": {
"type": "document_missing_exception",
"reason": "[6]: document missing",
"index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
"shard": "0",
"index": "index1"
}
}
},
{
"create": {
"_index": "index1",
"_id": "7",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1,
"status": 201
}
}
]
}
失敗した操作に関する情報のみを返すには、filter_path
クエリパラメータをitems.*.error
の引数で使用します。
Php
$params = [
'body' => [
[
'update' => [
'_id' => '5',
'_index' => 'index1',
],
],
[
'doc' => [
'my_field' => 'baz',
],
],
[
'update' => [
'_id' => '6',
'_index' => 'index1',
],
],
[
'doc' => [
'my_field' => 'baz',
],
],
[
'update' => [
'_id' => '7',
'_index' => 'index1',
],
],
[
'doc' => [
'my_field' => 'baz',
],
],
],
];
$response = $client->bulk($params);
Python
resp = client.bulk(
filter_path="items.*.error",
operations=[
{
"update": {
"_id": "5",
"_index": "index1"
}
},
{
"doc": {
"my_field": "baz"
}
},
{
"update": {
"_id": "6",
"_index": "index1"
}
},
{
"doc": {
"my_field": "baz"
}
},
{
"update": {
"_id": "7",
"_index": "index1"
}
},
{
"doc": {
"my_field": "baz"
}
}
],
)
print(resp)
Ruby
response = client.bulk(
filter_path: 'items.*.error',
body: [
{
update: {
_id: '5',
_index: 'index1'
}
},
{
doc: {
my_field: 'baz'
}
},
{
update: {
_id: '6',
_index: 'index1'
}
},
{
doc: {
my_field: 'baz'
}
},
{
update: {
_id: '7',
_index: 'index1'
}
},
{
doc: {
my_field: 'baz'
}
}
]
)
puts response
Go
res, err := es.Bulk(
strings.NewReader(`
{ "update": {"_id": "5", "_index": "index1"} }
{ "doc": {"my_field": "baz"} }
{ "update": {"_id": "6", "_index": "index1"} }
{ "doc": {"my_field": "baz"} }
{ "update": {"_id": "7", "_index": "index1"} }
{ "doc": {"my_field": "baz"} }
es.Bulk.WithFilterPath("items.*.error"),
)
fmt.Println(res, err)
Js
const response = await client.bulk({
filter_path: "items.*.error",
operations: [
{
update: {
_id: "5",
_index: "index1",
},
},
{
doc: {
my_field: "baz",
},
},
{
update: {
_id: "6",
_index: "index1",
},
},
{
doc: {
my_field: "baz",
},
},
{
update: {
_id: "7",
_index: "index1",
},
},
{
doc: {
my_field: "baz",
},
},
],
});
console.log(response);
Console
POST /_bulk?filter_path=items.*.error
{ "update": {"_id": "5", "_index": "index1"} }
{ "doc": {"my_field": "baz"} }
{ "update": {"_id": "6", "_index": "index1"} }
{ "doc": {"my_field": "baz"} }
{ "update": {"_id": "7", "_index": "index1"} }
{ "doc": {"my_field": "baz"} }
Console-Result
{
"items": [
{
"update": {
"error": {
"type": "document_missing_exception",
"reason": "[5]: document missing",
"index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
"shard": "0",
"index": "index1"
}
}
},
{
"update": {
"error": {
"type": "document_missing_exception",
"reason": "[6]: document missing",
"index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
"shard": "0",
"index": "index1"
}
}
}
]
}
Example with dynamic templates parameter
以下の例では、動的テンプレートを作成し、dynamic_templates
パラメータを持つインデックス/作成リクエストで構成されるバルクリクエストを実行します。
Python
resp = client.indices.create(
index="my-index",
mappings={
"dynamic_templates": [
{
"geo_point": {
"mapping": {
"type": "geo_point"
}
}
}
]
},
)
print(resp)
resp1 = client.bulk(
operations=[
{
"index": {
"_index": "my_index",
"_id": "1",
"dynamic_templates": {
"work_location": "geo_point"
}
}
},
{
"field": "value1",
"work_location": "41.12,-71.34",
"raw_location": "41.12,-71.34"
},
{
"create": {
"_index": "my_index",
"_id": "2",
"dynamic_templates": {
"home_location": "geo_point"
}
}
},
{
"field": "value2",
"home_location": "41.12,-71.34"
}
],
)
print(resp1)
Ruby
response = client.indices.create(
index: 'my-index',
body: {
mappings: {
dynamic_templates: [
{
geo_point: {
mapping: {
type: 'geo_point'
}
}
}
]
}
}
)
puts response
response = client.bulk(
body: [
{
index: {
_index: 'my_index',
_id: '1',
dynamic_templates: {
work_location: 'geo_point'
}
}
},
{
field: 'value1',
work_location: '41.12,-71.34',
raw_location: '41.12,-71.34'
},
{
create: {
_index: 'my_index',
_id: '2',
dynamic_templates: {
home_location: 'geo_point'
}
}
},
{
field: 'value2',
home_location: '41.12,-71.34'
}
]
)
puts response
Js
const response = await client.indices.create({
index: "my-index",
mappings: {
dynamic_templates: [
{
geo_point: {
mapping: {
type: "geo_point",
},
},
},
],
},
});
console.log(response);
const response1 = await client.bulk({
operations: [
{
index: {
_index: "my_index",
_id: "1",
dynamic_templates: {
work_location: "geo_point",
},
},
},
{
field: "value1",
work_location: "41.12,-71.34",
raw_location: "41.12,-71.34",
},
{
create: {
_index: "my_index",
_id: "2",
dynamic_templates: {
home_location: "geo_point",
},
},
},
{
field: "value2",
home_location: "41.12,-71.34",
},
],
});
console.log(response1);
Console
PUT my-index/
{
"mappings": {
"dynamic_templates": [
{
"geo_point": {
"mapping": {
"type" : "geo_point"
}
}
}
]
}
}
POST /_bulk
{ "index" : { "_index" : "my_index", "_id" : "1", "dynamic_templates": {"work_location": "geo_point"}} }
{ "field" : "value1", "work_location": "41.12,-71.34", "raw_location": "41.12,-71.34"}
{ "create" : { "_index" : "my_index", "_id" : "2", "dynamic_templates": {"home_location": "geo_point"}} }
{ "field" : "value2", "home_location": "41.12,-71.34"}
バルクリクエストは、work_location
およびhome_location
という2つの新しいフィールドをgeo_point
型で作成します。dynamic_templates
パラメータに従って、ただし、raw_location
フィールドはデフォルトの動的マッピングルールを使用して作成されます。この場合、JSONドキュメント内で文字列として提供されるため、text
フィールドとして作成されます。