データストリームの使用

データストリームを設定した後、次のことができます:

データストリームにドキュメントを追加する

個々のドキュメントを追加するには、インデックスAPIを使用します。インジェストパイプラインがサポートされています。

Python

  1. resp = client.index(
  2. index="my-data-stream",
  3. document={
  4. "@timestamp": "2099-03-08T11:06:07.000Z",
  5. "user": {
  6. "id": "8a4f500d"
  7. },
  8. "message": "Login successful"
  9. },
  10. )
  11. print(resp)

Ruby

  1. response = client.index(
  2. index: 'my-data-stream',
  3. body: {
  4. "@timestamp": '2099-03-08T11:06:07.000Z',
  5. user: {
  6. id: '8a4f500d'
  7. },
  8. message: 'Login successful'
  9. }
  10. )
  11. puts response

Js

  1. const response = await client.index({
  2. index: "my-data-stream",
  3. document: {
  4. "@timestamp": "2099-03-08T11:06:07.000Z",
  5. user: {
  6. id: "8a4f500d",
  7. },
  8. message: "Login successful",
  9. },
  10. });
  11. console.log(response);

コンソール

  1. POST /my-data-stream/_doc/
  2. {
  3. "@timestamp": "2099-03-08T11:06:07.000Z",
  4. "user": {
  5. "id": "8a4f500d"
  6. },
  7. "message": "Login successful"
  8. }

インデックスAPIのPUT /<target>/_doc/<_id>リクエスト形式を使用して、データストリームに新しいドキュメントを追加することはできません。ドキュメントIDを指定するには、PUT /<target>/_create/<_id>形式を使用してください。createop_typeのみがサポートされています。

単一のリクエストで複数のドキュメントを追加するには、バルクAPIを使用します。createアクションのみがサポートされています。

Python

  1. resp = client.bulk(
  2. index="my-data-stream",
  3. refresh=True,
  4. operations=[
  5. {
  6. "create": {}
  7. },
  8. {
  9. "@timestamp": "2099-03-08T11:04:05.000Z",
  10. "user": {
  11. "id": "vlb44hny"
  12. },
  13. "message": "Login attempt failed"
  14. },
  15. {
  16. "create": {}
  17. },
  18. {
  19. "@timestamp": "2099-03-08T11:06:07.000Z",
  20. "user": {
  21. "id": "8a4f500d"
  22. },
  23. "message": "Login successful"
  24. },
  25. {
  26. "create": {}
  27. },
  28. {
  29. "@timestamp": "2099-03-09T11:07:08.000Z",
  30. "user": {
  31. "id": "l7gk7f82"
  32. },
  33. "message": "Logout successful"
  34. }
  35. ],
  36. )
  37. print(resp)

Ruby

  1. response = client.bulk(
  2. index: 'my-data-stream',
  3. refresh: true,
  4. body: [
  5. {
  6. create: {}
  7. },
  8. {
  9. "@timestamp": '2099-03-08T11:04:05.000Z',
  10. user: {
  11. id: 'vlb44hny'
  12. },
  13. message: 'Login attempt failed'
  14. },
  15. {
  16. create: {}
  17. },
  18. {
  19. "@timestamp": '2099-03-08T11:06:07.000Z',
  20. user: {
  21. id: '8a4f500d'
  22. },
  23. message: 'Login successful'
  24. },
  25. {
  26. create: {}
  27. },
  28. {
  29. "@timestamp": '2099-03-09T11:07:08.000Z',
  30. user: {
  31. id: 'l7gk7f82'
  32. },
  33. message: 'Logout successful'
  34. }
  35. ]
  36. )
  37. puts response

Js

  1. const response = await client.bulk({
  2. index: "my-data-stream",
  3. refresh: "true",
  4. operations: [
  5. {
  6. create: {},
  7. },
  8. {
  9. "@timestamp": "2099-03-08T11:04:05.000Z",
  10. user: {
  11. id: "vlb44hny",
  12. },
  13. message: "Login attempt failed",
  14. },
  15. {
  16. create: {},
  17. },
  18. {
  19. "@timestamp": "2099-03-08T11:06:07.000Z",
  20. user: {
  21. id: "8a4f500d",
  22. },
  23. message: "Login successful",
  24. },
  25. {
  26. create: {},
  27. },
  28. {
  29. "@timestamp": "2099-03-09T11:07:08.000Z",
  30. user: {
  31. id: "l7gk7f82",
  32. },
  33. message: "Logout successful",
  34. },
  35. ],
  36. });
  37. console.log(response);

コンソール

  1. PUT /my-data-stream/_bulk?refresh
  2. {"create":{ }}
  3. { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
  4. {"create":{ }}
  5. { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
  6. {"create":{ }}
  7. { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

データストリームを検索する

次の検索APIはデータストリームをサポートしています:

データストリームの統計を取得する

1つ以上のデータストリームの統計を取得するには、データストリーム統計APIを使用します:

Python

  1. resp = client.indices.data_streams_stats(
  2. name="my-data-stream",
  3. human=True,
  4. )
  5. print(resp)

Ruby

  1. response = client.indices.data_streams_stats(
  2. name: 'my-data-stream',
  3. human: true
  4. )
  5. puts response

Js

  1. const response = await client.indices.dataStreamsStats({
  2. name: "my-data-stream",
  3. human: "true",
  4. });
  5. console.log(response);

コンソール

  1. GET /_data_stream/my-data-stream/_stats?human=true

データストリームを手動でロールオーバーする

ロールオーバーAPIを使用して、データストリームを手動でロールオーバーします。手動でロールオーバーする際には、2つのオプションがあります:

  • 1. すぐにロールオーバーをトリガーする:

Python

  1. resp = client.indices.rollover(
  2. alias="my-data-stream",
  3. )
  4. print(resp)

Ruby

  1. response = client.indices.rollover(
  2. alias: 'my-data-stream'
  3. )
  4. puts response

Js

  1. const response = await client.indices.rollover({
  2. alias: "my-data-stream",
  3. });
  4. console.log(response);

コンソール

  1. POST /my-data-stream/_rollover/
  • 2. または、次のインデックスイベントが発生するまでロールオーバーを延期する:

Python

  1. resp = client.indices.rollover(
  2. alias="my-data-stream",
  3. lazy=True,
  4. )
  5. print(resp)

Ruby

  1. response = client.indices.rollover(
  2. alias: 'my-data-stream',
  3. lazy: true
  4. )
  5. puts response

Js

  1. const response = await client.indices.rollover({
  2. alias: "my-data-stream",
  3. lazy: "true",
  4. });
  5. console.log(response);

コンソール

  1. POST /my-data-stream/_rollover?lazy

データストリームが頻繁に更新されない場合、空のバックインデックスを持たないようにするために、2番目のオプションを使用します。

閉じたバックインデックスを開く

閉じたバックインデックスを検索することはできません。データストリームを検索しても、バックインデックスを検索することはできません。また、閉じたインデックス内のドキュメントを更新または削除することもできません。

閉じたバックインデックスを再オープンするには、インデックスに直接オープンインデックスAPIリクエストを送信します:

Python

  1. resp = client.indices.open(
  2. index=".ds-my-data-stream-2099.03.07-000001",
  3. )
  4. print(resp)

Ruby

  1. response = client.indices.open(
  2. index: '.ds-my-data-stream-2099.03.07-000001'
  3. )
  4. puts response

Js

  1. const response = await client.indices.open({
  2. index: ".ds-my-data-stream-2099.03.07-000001",
  3. });
  4. console.log(response);

コンソール

  1. POST /.ds-my-data-stream-2099.03.07-000001/_open/

データストリームのすべての閉じたバックインデックスを再オープンするには、ストリームにオープンインデックスAPIリクエストを送信します:

Python

  1. resp = client.indices.open(
  2. index="my-data-stream",
  3. )
  4. print(resp)

Ruby

  1. response = client.indices.open(
  2. index: 'my-data-stream'
  3. )
  4. puts response

Js

  1. const response = await client.indices.open({
  2. index: "my-data-stream",
  3. });
  4. console.log(response);

コンソール

  1. POST /my-data-stream/_open/

データストリームで再インデックスする

再インデックスAPIを使用して、既存のインデックス、エイリアス、またはデータストリームからデータストリームにドキュメントをコピーします。データストリームは追加専用であるため、データストリームへの再インデックスはop_typecreateを使用する必要があります。再インデックスは、データストリーム内の既存のドキュメントを更新することはできません。

Python

  1. resp = client.reindex(
  2. source={
  3. "index": "archive"
  4. },
  5. dest={
  6. "index": "my-data-stream",
  7. "op_type": "create"
  8. },
  9. )
  10. print(resp)

Ruby

  1. response = client.reindex(
  2. body: {
  3. source: {
  4. index: 'archive'
  5. },
  6. dest: {
  7. index: 'my-data-stream',
  8. op_type: 'create'
  9. }
  10. }
  11. )
  12. puts response

Js

  1. const response = await client.reindex({
  2. source: {
  3. index: "archive",
  4. },
  5. dest: {
  6. index: "my-data-stream",
  7. op_type: "create",
  8. },
  9. });
  10. console.log(response);

コンソール

  1. POST /_reindex
  2. {
  3. "source": {
  4. "index": "archive"
  5. },
  6. "dest": {
  7. "index": "my-data-stream",
  8. "op_type": "create"
  9. }
  10. }

クエリによってデータストリーム内のドキュメントを更新する

提供されたクエリに一致するデータストリーム内のドキュメントを更新するには、クエリによる更新APIを使用します:

Python

  1. resp = client.update_by_query(
  2. index="my-data-stream",
  3. query={
  4. "match": {
  5. "user.id": "l7gk7f82"
  6. }
  7. },
  8. script={
  9. "source": "ctx._source.user.id = params.new_id",
  10. "params": {
  11. "new_id": "XgdX0NoX"
  12. }
  13. },
  14. )
  15. print(resp)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-data-stream',
  3. body: {
  4. query: {
  5. match: {
  6. 'user.id' => 'l7gk7f82'
  7. }
  8. },
  9. script: {
  10. source: 'ctx._source.user.id = params.new_id',
  11. params: {
  12. new_id: 'XgdX0NoX'
  13. }
  14. }
  15. }
  16. )
  17. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-data-stream",
  3. query: {
  4. match: {
  5. "user.id": "l7gk7f82",
  6. },
  7. },
  8. script: {
  9. source: "ctx._source.user.id = params.new_id",
  10. params: {
  11. new_id: "XgdX0NoX",
  12. },
  13. },
  14. });
  15. console.log(response);

コンソール

  1. POST /my-data-stream/_update_by_query
  2. {
  3. "query": {
  4. "match": {
  5. "user.id": "l7gk7f82"
  6. }
  7. },
  8. "script": {
  9. "source": "ctx._source.user.id = params.new_id",
  10. "params": {
  11. "new_id": "XgdX0NoX"
  12. }
  13. }
  14. }

クエリによってデータストリーム内のドキュメントを削除する

提供されたクエリに一致するデータストリーム内のドキュメントを削除するには、クエリによる削除APIを使用します:

Python

  1. resp = client.delete_by_query(
  2. index="my-data-stream",
  3. query={
  4. "match": {
  5. "user.id": "vlb44hny"
  6. }
  7. },
  8. )
  9. print(resp)

Ruby

  1. response = client.delete_by_query(
  2. index: 'my-data-stream',
  3. body: {
  4. query: {
  5. match: {
  6. 'user.id' => 'vlb44hny'
  7. }
  8. }
  9. }
  10. )
  11. puts response

Js

  1. const response = await client.deleteByQuery({
  2. index: "my-data-stream",
  3. query: {
  4. match: {
  5. "user.id": "vlb44hny",
  6. },
  7. },
  8. });
  9. console.log(response);

コンソール

  1. POST /my-data-stream/_delete_by_query
  2. {
  3. "query": {
  4. "match": {
  5. "user.id": "vlb44hny"
  6. }
  7. }
  8. }

バックインデックス内のドキュメントを更新または削除する

必要に応じて、ドキュメントを含むバックインデックスにリクエストを送信することで、データストリーム内のドキュメントを更新または削除できます。必要な情報は次のとおりです:

この情報を取得するには、検索リクエスト:

Python

  1. resp = client.search(
  2. index="my-data-stream",
  3. seq_no_primary_term=True,
  4. query={
  5. "match": {
  6. "user.id": "yWIumJd7"
  7. }
  8. },
  9. )
  10. print(resp)

Ruby

  1. response = client.search(
  2. index: 'my-data-stream',
  3. body: {
  4. seq_no_primary_term: true,
  5. query: {
  6. match: {
  7. 'user.id' => 'yWIumJd7'
  8. }
  9. }
  10. }
  11. )
  12. puts response

Js

  1. const response = await client.search({
  2. index: "my-data-stream",
  3. seq_no_primary_term: true,
  4. query: {
  5. match: {
  6. "user.id": "yWIumJd7",
  7. },
  8. },
  9. });
  10. console.log(response);

コンソール

  1. GET /my-data-stream/_search
  2. {
  3. "seq_no_primary_term": true,
  4. "query": {
  5. "match": {
  6. "user.id": "yWIumJd7"
  7. }
  8. }
  9. }

レスポンス:

コンソール-結果

  1. {
  2. "took": 20,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 3,
  6. "successful": 3,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 1,
  13. "relation": "eq"
  14. },
  15. "max_score": 0.2876821,
  16. "hits": [
  17. {
  18. "_index": ".ds-my-data-stream-2099.03.08-000003",
  19. "_id": "bfspvnIBr7VVZlfp2lqX",
  20. "_seq_no": 0,
  21. "_primary_term": 1,
  22. "_score": 0.2876821,
  23. "_source": {
  24. "@timestamp": "2099-03-08T11:06:07.000Z",
  25. "user": {
  26. "id": "yWIumJd7"
  27. },
  28. "message": "Login successful"
  29. }
  30. }
  31. ]
  32. }
  33. }
一致するドキュメントを含むバックインデックス
ドキュメントのID
ドキュメントの現在のシーケンス番号
ドキュメントのプライマリターム

ドキュメントを更新するには、有効なif_seq_noif_primary_term引数を持つインデックスAPIリクエストを使用します:

Python

  1. resp = client.index(
  2. index=".ds-my-data-stream-2099-03-08-000003",
  3. id="bfspvnIBr7VVZlfp2lqX",
  4. if_seq_no="0",
  5. if_primary_term="1",
  6. document={
  7. "@timestamp": "2099-03-08T11:06:07.000Z",
  8. "user": {
  9. "id": "8a4f500d"
  10. },
  11. "message": "Login successful"
  12. },
  13. )
  14. print(resp)

Js

  1. const response = await client.index({
  2. index: ".ds-my-data-stream-2099-03-08-000003",
  3. id: "bfspvnIBr7VVZlfp2lqX",
  4. if_seq_no: 0,
  5. if_primary_term: 1,
  6. document: {
  7. "@timestamp": "2099-03-08T11:06:07.000Z",
  8. user: {
  9. id: "8a4f500d",
  10. },
  11. message: "Login successful",
  12. },
  13. });
  14. console.log(response);

コンソール

  1. PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
  2. {
  3. "@timestamp": "2099-03-08T11:06:07.000Z",
  4. "user": {
  5. "id": "8a4f500d"
  6. },
  7. "message": "Login successful"
  8. }

ドキュメントを削除するには、削除APIを使用します:

Python

  1. resp = client.delete(
  2. index=".ds-my-data-stream-2099.03.08-000003",
  3. id="bfspvnIBr7VVZlfp2lqX",
  4. )
  5. print(resp)

Ruby

  1. response = client.delete(
  2. index: '.ds-my-data-stream-2099.03.08-000003',
  3. id: 'bfspvnIBr7VVZlfp2lqX'
  4. )
  5. puts response

Js

  1. const response = await client.delete({
  2. index: ".ds-my-data-stream-2099.03.08-000003",
  3. id: "bfspvnIBr7VVZlfp2lqX",
  4. });
  5. console.log(response);

コンソール

  1. DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX

単一のリクエストで複数のドキュメントを削除または更新するには、バルクAPIdeleteindexupdateアクションを使用します。indexアクションの場合、有効なif_seq_noif_primary_term引数を含めてください。

Python

  1. resp = client.bulk(
  2. refresh=True,
  3. operations=[
  4. {
  5. "index": {
  6. "_index": ".ds-my-data-stream-2099.03.08-000003",
  7. "_id": "bfspvnIBr7VVZlfp2lqX",
  8. "if_seq_no": 0,
  9. "if_primary_term": 1
  10. }
  11. },
  12. {
  13. "@timestamp": "2099-03-08T11:06:07.000Z",
  14. "user": {
  15. "id": "8a4f500d"
  16. },
  17. "message": "Login successful"
  18. }
  19. ],
  20. )
  21. print(resp)

Ruby

  1. response = client.bulk(
  2. refresh: true,
  3. body: [
  4. {
  5. index: {
  6. _index: '.ds-my-data-stream-2099.03.08-000003',
  7. _id: 'bfspvnIBr7VVZlfp2lqX',
  8. if_seq_no: 0,
  9. if_primary_term: 1
  10. }
  11. },
  12. {
  13. "@timestamp": '2099-03-08T11:06:07.000Z',
  14. user: {
  15. id: '8a4f500d'
  16. },
  17. message: 'Login successful'
  18. }
  19. ]
  20. )
  21. puts response

Js

  1. const response = await client.bulk({
  2. refresh: "true",
  3. operations: [
  4. {
  5. index: {
  6. _index: ".ds-my-data-stream-2099.03.08-000003",
  7. _id: "bfspvnIBr7VVZlfp2lqX",
  8. if_seq_no: 0,
  9. if_primary_term: 1,
  10. },
  11. },
  12. {
  13. "@timestamp": "2099-03-08T11:06:07.000Z",
  14. user: {
  15. id: "8a4f500d",
  16. },
  17. message: "Login successful",
  18. },
  19. ],
  20. });
  21. console.log(response);

コンソール

  1. PUT /_bulk?refresh
  2. { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
  3. { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }