データストリームの使用
データストリームを設定した後、次のことができます:
- データストリームにドキュメントを追加する
- データストリームを検索する
- データストリームの統計を取得する
- データストリームを手動でロールオーバーする
- 閉じたバックインデックスを開く
- データストリームで再インデックスする
- クエリによってデータストリーム内のドキュメントを更新する
- クエリによってデータストリーム内のドキュメントを削除する
- バックインデックス内のドキュメントを更新または削除する
データストリームにドキュメントを追加する
個々のドキュメントを追加するには、インデックスAPIを使用します。インジェストパイプラインがサポートされています。
Python
resp = client.index(
index="my-data-stream",
document={
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
},
)
print(resp)
Ruby
response = client.index(
index: 'my-data-stream',
body: {
"@timestamp": '2099-03-08T11:06:07.000Z',
user: {
id: '8a4f500d'
},
message: 'Login successful'
}
)
puts response
Js
const response = await client.index({
index: "my-data-stream",
document: {
"@timestamp": "2099-03-08T11:06:07.000Z",
user: {
id: "8a4f500d",
},
message: "Login successful",
},
});
console.log(response);
コンソール
POST /my-data-stream/_doc/
{
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
インデックスAPIのPUT
/<target>/_doc/<_id>
リクエスト形式を使用して、データストリームに新しいドキュメントを追加することはできません。ドキュメントIDを指定するには、PUT
/<target>/_create/<_id>
形式を使用してください。create
のop_type
のみがサポートされています。
単一のリクエストで複数のドキュメントを追加するには、バルクAPIを使用します。create
アクションのみがサポートされています。
Python
resp = client.bulk(
index="my-data-stream",
refresh=True,
operations=[
{
"create": {}
},
{
"@timestamp": "2099-03-08T11:04:05.000Z",
"user": {
"id": "vlb44hny"
},
"message": "Login attempt failed"
},
{
"create": {}
},
{
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
},
{
"create": {}
},
{
"@timestamp": "2099-03-09T11:07:08.000Z",
"user": {
"id": "l7gk7f82"
},
"message": "Logout successful"
}
],
)
print(resp)
Ruby
response = client.bulk(
index: 'my-data-stream',
refresh: true,
body: [
{
create: {}
},
{
"@timestamp": '2099-03-08T11:04:05.000Z',
user: {
id: 'vlb44hny'
},
message: 'Login attempt failed'
},
{
create: {}
},
{
"@timestamp": '2099-03-08T11:06:07.000Z',
user: {
id: '8a4f500d'
},
message: 'Login successful'
},
{
create: {}
},
{
"@timestamp": '2099-03-09T11:07:08.000Z',
user: {
id: 'l7gk7f82'
},
message: 'Logout successful'
}
]
)
puts response
Js
const response = await client.bulk({
index: "my-data-stream",
refresh: "true",
operations: [
{
create: {},
},
{
"@timestamp": "2099-03-08T11:04:05.000Z",
user: {
id: "vlb44hny",
},
message: "Login attempt failed",
},
{
create: {},
},
{
"@timestamp": "2099-03-08T11:06:07.000Z",
user: {
id: "8a4f500d",
},
message: "Login successful",
},
{
create: {},
},
{
"@timestamp": "2099-03-09T11:07:08.000Z",
user: {
id: "l7gk7f82",
},
message: "Logout successful",
},
],
});
console.log(response);
コンソール
PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
データストリームを検索する
次の検索APIはデータストリームをサポートしています:
データストリームの統計を取得する
1つ以上のデータストリームの統計を取得するには、データストリーム統計APIを使用します:
Python
resp = client.indices.data_streams_stats(
name="my-data-stream",
human=True,
)
print(resp)
Ruby
response = client.indices.data_streams_stats(
name: 'my-data-stream',
human: true
)
puts response
Js
const response = await client.indices.dataStreamsStats({
name: "my-data-stream",
human: "true",
});
console.log(response);
コンソール
GET /_data_stream/my-data-stream/_stats?human=true
データストリームを手動でロールオーバーする
ロールオーバーAPIを使用して、データストリームを手動でロールオーバーします。手動でロールオーバーする際には、2つのオプションがあります:
- 1. すぐにロールオーバーをトリガーする:
Python
resp = client.indices.rollover(
alias="my-data-stream",
)
print(resp)
Ruby
response = client.indices.rollover(
alias: 'my-data-stream'
)
puts response
Js
const response = await client.indices.rollover({
alias: "my-data-stream",
});
console.log(response);
コンソール
POST /my-data-stream/_rollover/
- 2. または、次のインデックスイベントが発生するまでロールオーバーを延期する:
Python
resp = client.indices.rollover(
alias="my-data-stream",
lazy=True,
)
print(resp)
Ruby
response = client.indices.rollover(
alias: 'my-data-stream',
lazy: true
)
puts response
Js
const response = await client.indices.rollover({
alias: "my-data-stream",
lazy: "true",
});
console.log(response);
コンソール
POST /my-data-stream/_rollover?lazy
データストリームが頻繁に更新されない場合、空のバックインデックスを持たないようにするために、2番目のオプションを使用します。
閉じたバックインデックスを開く
閉じたバックインデックスを検索することはできません。データストリームを検索しても、バックインデックスを検索することはできません。また、閉じたインデックス内のドキュメントを更新または削除することもできません。
閉じたバックインデックスを再オープンするには、インデックスに直接オープンインデックスAPIリクエストを送信します:
Python
resp = client.indices.open(
index=".ds-my-data-stream-2099.03.07-000001",
)
print(resp)
Ruby
response = client.indices.open(
index: '.ds-my-data-stream-2099.03.07-000001'
)
puts response
Js
const response = await client.indices.open({
index: ".ds-my-data-stream-2099.03.07-000001",
});
console.log(response);
コンソール
POST /.ds-my-data-stream-2099.03.07-000001/_open/
データストリームのすべての閉じたバックインデックスを再オープンするには、ストリームにオープンインデックスAPIリクエストを送信します:
Python
resp = client.indices.open(
index="my-data-stream",
)
print(resp)
Ruby
response = client.indices.open(
index: 'my-data-stream'
)
puts response
Js
const response = await client.indices.open({
index: "my-data-stream",
});
console.log(response);
コンソール
POST /my-data-stream/_open/
データストリームで再インデックスする
再インデックスAPIを使用して、既存のインデックス、エイリアス、またはデータストリームからデータストリームにドキュメントをコピーします。データストリームは追加専用であるため、データストリームへの再インデックスはop_type
のcreate
を使用する必要があります。再インデックスは、データストリーム内の既存のドキュメントを更新することはできません。
Python
resp = client.reindex(
source={
"index": "archive"
},
dest={
"index": "my-data-stream",
"op_type": "create"
},
)
print(resp)
Ruby
response = client.reindex(
body: {
source: {
index: 'archive'
},
dest: {
index: 'my-data-stream',
op_type: 'create'
}
}
)
puts response
Js
const response = await client.reindex({
source: {
index: "archive",
},
dest: {
index: "my-data-stream",
op_type: "create",
},
});
console.log(response);
コンソール
POST /_reindex
{
"source": {
"index": "archive"
},
"dest": {
"index": "my-data-stream",
"op_type": "create"
}
}
クエリによってデータストリーム内のドキュメントを更新する
提供されたクエリに一致するデータストリーム内のドキュメントを更新するには、クエリによる更新APIを使用します:
Python
resp = client.update_by_query(
index="my-data-stream",
query={
"match": {
"user.id": "l7gk7f82"
}
},
script={
"source": "ctx._source.user.id = params.new_id",
"params": {
"new_id": "XgdX0NoX"
}
},
)
print(resp)
Ruby
response = client.update_by_query(
index: 'my-data-stream',
body: {
query: {
match: {
'user.id' => 'l7gk7f82'
}
},
script: {
source: 'ctx._source.user.id = params.new_id',
params: {
new_id: 'XgdX0NoX'
}
}
}
)
puts response
Js
const response = await client.updateByQuery({
index: "my-data-stream",
query: {
match: {
"user.id": "l7gk7f82",
},
},
script: {
source: "ctx._source.user.id = params.new_id",
params: {
new_id: "XgdX0NoX",
},
},
});
console.log(response);
コンソール
POST /my-data-stream/_update_by_query
{
"query": {
"match": {
"user.id": "l7gk7f82"
}
},
"script": {
"source": "ctx._source.user.id = params.new_id",
"params": {
"new_id": "XgdX0NoX"
}
}
}
クエリによってデータストリーム内のドキュメントを削除する
提供されたクエリに一致するデータストリーム内のドキュメントを削除するには、クエリによる削除APIを使用します:
Python
resp = client.delete_by_query(
index="my-data-stream",
query={
"match": {
"user.id": "vlb44hny"
}
},
)
print(resp)
Ruby
response = client.delete_by_query(
index: 'my-data-stream',
body: {
query: {
match: {
'user.id' => 'vlb44hny'
}
}
}
)
puts response
Js
const response = await client.deleteByQuery({
index: "my-data-stream",
query: {
match: {
"user.id": "vlb44hny",
},
},
});
console.log(response);
コンソール
POST /my-data-stream/_delete_by_query
{
"query": {
"match": {
"user.id": "vlb44hny"
}
}
}
バックインデックス内のドキュメントを更新または削除する
必要に応じて、ドキュメントを含むバックインデックスにリクエストを送信することで、データストリーム内のドキュメントを更新または削除できます。必要な情報は次のとおりです:
- ドキュメントID
- ドキュメントを含むバックインデックスの名前
- ドキュメントを更新する場合、そのシーケンス番号とプライマリターム
この情報を取得するには、検索リクエスト:
Python
resp = client.search(
index="my-data-stream",
seq_no_primary_term=True,
query={
"match": {
"user.id": "yWIumJd7"
}
},
)
print(resp)
Ruby
response = client.search(
index: 'my-data-stream',
body: {
seq_no_primary_term: true,
query: {
match: {
'user.id' => 'yWIumJd7'
}
}
}
)
puts response
Js
const response = await client.search({
index: "my-data-stream",
seq_no_primary_term: true,
query: {
match: {
"user.id": "yWIumJd7",
},
},
});
console.log(response);
コンソール
GET /my-data-stream/_search
{
"seq_no_primary_term": true,
"query": {
"match": {
"user.id": "yWIumJd7"
}
}
}
コンソール-結果
{
"took": 20,
"timed_out": false,
"_shards": {
"total": 3,
"successful": 3,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 0.2876821,
"hits": [
{
"_index": ".ds-my-data-stream-2099.03.08-000003",
"_id": "bfspvnIBr7VVZlfp2lqX",
"_seq_no": 0,
"_primary_term": 1,
"_score": 0.2876821,
"_source": {
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "yWIumJd7"
},
"message": "Login successful"
}
}
]
}
}
一致するドキュメントを含むバックインデックス | |
ドキュメントのID | |
ドキュメントの現在のシーケンス番号 | |
ドキュメントのプライマリターム |
ドキュメントを更新するには、有効なif_seq_no
とif_primary_term
引数を持つインデックスAPIリクエストを使用します:
Python
resp = client.index(
index=".ds-my-data-stream-2099-03-08-000003",
id="bfspvnIBr7VVZlfp2lqX",
if_seq_no="0",
if_primary_term="1",
document={
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
},
)
print(resp)
Js
const response = await client.index({
index: ".ds-my-data-stream-2099-03-08-000003",
id: "bfspvnIBr7VVZlfp2lqX",
if_seq_no: 0,
if_primary_term: 1,
document: {
"@timestamp": "2099-03-08T11:06:07.000Z",
user: {
id: "8a4f500d",
},
message: "Login successful",
},
});
console.log(response);
コンソール
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
ドキュメントを削除するには、削除APIを使用します:
Python
resp = client.delete(
index=".ds-my-data-stream-2099.03.08-000003",
id="bfspvnIBr7VVZlfp2lqX",
)
print(resp)
Ruby
response = client.delete(
index: '.ds-my-data-stream-2099.03.08-000003',
id: 'bfspvnIBr7VVZlfp2lqX'
)
puts response
Js
const response = await client.delete({
index: ".ds-my-data-stream-2099.03.08-000003",
id: "bfspvnIBr7VVZlfp2lqX",
});
console.log(response);
コンソール
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX
単一のリクエストで複数のドキュメントを削除または更新するには、バルクAPIのdelete
、index
、update
アクションを使用します。index
アクションの場合、有効なif_seq_no
とif_primary_term
引数を含めてください。
Python
resp = client.bulk(
refresh=True,
operations=[
{
"index": {
"_index": ".ds-my-data-stream-2099.03.08-000003",
"_id": "bfspvnIBr7VVZlfp2lqX",
"if_seq_no": 0,
"if_primary_term": 1
}
},
{
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
],
)
print(resp)
Ruby
response = client.bulk(
refresh: true,
body: [
{
index: {
_index: '.ds-my-data-stream-2099.03.08-000003',
_id: 'bfspvnIBr7VVZlfp2lqX',
if_seq_no: 0,
if_primary_term: 1
}
},
{
"@timestamp": '2099-03-08T11:06:07.000Z',
user: {
id: '8a4f500d'
},
message: 'Login successful'
}
]
)
puts response
Js
const response = await client.bulk({
refresh: "true",
operations: [
{
index: {
_index: ".ds-my-data-stream-2099.03.08-000003",
_id: "bfspvnIBr7VVZlfp2lqX",
if_seq_no: 0,
if_primary_term: 1,
},
},
{
"@timestamp": "2099-03-08T11:06:07.000Z",
user: {
id: "8a4f500d",
},
message: "Login successful",
},
],
});
console.log(response);
コンソール
PUT /_bulk?refresh
{ "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }