データストリームの設定

データストリームを設定するには、次の手順に従います:

また、インデックスエイリアスをデータストリームに変換することもできます。

Fleet、Elastic Agent、またはLogstashを使用している場合は、このチュートリアルをスキップしてください。これらはすべて、データストリームを自動的に設定します。

FleetおよびElastic Agentについては、このデータストリームのドキュメントを確認してください。Logstashについては、elasticsearch outputプラグインのデータストリーム設定を確認してください。

インデックスライフサイクルポリシーの作成

オプションですが、ILMを使用してデータストリームのバックインデックスの管理を自動化することをお勧めします。ILMにはインデックスライフサイクルポリシーが必要です。

Kibanaでインデックスライフサイクルポリシーを作成するには、メインメニューを開き、**スタック管理

インデックスライフサイクルポリシーに移動します。ポリシーを作成**をクリックします。

また、ライフサイクルポリシー作成APIを使用することもできます。

Python

  1. resp = client.ilm.put_lifecycle(
  2. name="my-lifecycle-policy",
  3. policy={
  4. "phases": {
  5. "hot": {
  6. "actions": {
  7. "rollover": {
  8. "max_primary_shard_size": "50gb"
  9. }
  10. }
  11. },
  12. "warm": {
  13. "min_age": "30d",
  14. "actions": {
  15. "shrink": {
  16. "number_of_shards": 1
  17. },
  18. "forcemerge": {
  19. "max_num_segments": 1
  20. }
  21. }
  22. },
  23. "cold": {
  24. "min_age": "60d",
  25. "actions": {
  26. "searchable_snapshot": {
  27. "snapshot_repository": "found-snapshots"
  28. }
  29. }
  30. },
  31. "frozen": {
  32. "min_age": "90d",
  33. "actions": {
  34. "searchable_snapshot": {
  35. "snapshot_repository": "found-snapshots"
  36. }
  37. }
  38. },
  39. "delete": {
  40. "min_age": "735d",
  41. "actions": {
  42. "delete": {}
  43. }
  44. }
  45. }
  46. },
  47. )
  48. print(resp)

Js

  1. const response = await client.ilm.putLifecycle({
  2. name: "my-lifecycle-policy",
  3. policy: {
  4. phases: {
  5. hot: {
  6. actions: {
  7. rollover: {
  8. max_primary_shard_size: "50gb",
  9. },
  10. },
  11. },
  12. warm: {
  13. min_age: "30d",
  14. actions: {
  15. shrink: {
  16. number_of_shards: 1,
  17. },
  18. forcemerge: {
  19. max_num_segments: 1,
  20. },
  21. },
  22. },
  23. cold: {
  24. min_age: "60d",
  25. actions: {
  26. searchable_snapshot: {
  27. snapshot_repository: "found-snapshots",
  28. },
  29. },
  30. },
  31. frozen: {
  32. min_age: "90d",
  33. actions: {
  34. searchable_snapshot: {
  35. snapshot_repository: "found-snapshots",
  36. },
  37. },
  38. },
  39. delete: {
  40. min_age: "735d",
  41. actions: {
  42. delete: {},
  43. },
  44. },
  45. },
  46. },
  47. });
  48. console.log(response);

コンソール

  1. PUT _ilm/policy/my-lifecycle-policy
  2. {
  3. "policy": {
  4. "phases": {
  5. "hot": {
  6. "actions": {
  7. "rollover": {
  8. "max_primary_shard_size": "50gb"
  9. }
  10. }
  11. },
  12. "warm": {
  13. "min_age": "30d",
  14. "actions": {
  15. "shrink": {
  16. "number_of_shards": 1
  17. },
  18. "forcemerge": {
  19. "max_num_segments": 1
  20. }
  21. }
  22. },
  23. "cold": {
  24. "min_age": "60d",
  25. "actions": {
  26. "searchable_snapshot": {
  27. "snapshot_repository": "found-snapshots"
  28. }
  29. }
  30. },
  31. "frozen": {
  32. "min_age": "90d",
  33. "actions": {
  34. "searchable_snapshot": {
  35. "snapshot_repository": "found-snapshots"
  36. }
  37. }
  38. },
  39. "delete": {
  40. "min_age": "735d",
  41. "actions": {
  42. "delete": {}
  43. }
  44. }
  45. }
  46. }
  47. }

コンポーネントテンプレートの作成

データストリームには、一致するインデックステンプレートが必要です。ほとんどの場合、1つ以上のコンポーネントテンプレートを使用してこのインデックステンプレートを構成します。通常、マッピングとインデックス設定には別々のコンポーネントテンプレートを使用します。これにより、複数のインデックステンプレートでコンポーネントテンプレートを再利用できます。

コンポーネントテンプレートを作成する際には、次を含めます:

  • date](/read/elasticsearch-8-15/9dfa1da42eb162ff.md)またはdate_nanosマッピングを@timestampフィールドに対して指定します。マッピングを指定しない場合、Elasticsearchは@timestampをデフォルトオプションのdateフィールドとしてマッピングします。
  • index.lifecycle.nameインデックス設定におけるライフサイクルポリシー。

フィールドをマッピングする際には、Elastic Common Schema (ECS)を使用してください。ECSフィールドは、デフォルトでいくつかのElastic Stack機能と統合されます。

フィールドのマッピング方法が不明な場合は、ランタイムフィールドを使用して、検索時に非構造化コンテンツからフィールドを抽出します。たとえば、ログメッセージをwildcardフィールドにインデックスし、後で検索中にこのフィールドからIPアドレスやその他のデータを抽出できます。

Kibanaでコンポーネントテンプレートを作成するには、メインメニューを開き、スタック
**管理

インデックス管理に移動します。インデックステンプレートビューで、コンポーネントテンプレートを作成**をクリックします。

また、コンポーネントテンプレート作成APIを使用することもできます。

Python

  1. resp = client.cluster.put_component_template(
  2. name="my-mappings",
  3. template={
  4. "mappings": {
  5. "properties": {
  6. "@timestamp": {
  7. "type": "date",
  8. "format": "date_optional_time||epoch_millis"
  9. },
  10. "message": {
  11. "type": "wildcard"
  12. }
  13. }
  14. }
  15. },
  16. meta={
  17. "description": "Mappings for @timestamp and message fields",
  18. "my-custom-meta-field": "More arbitrary metadata"
  19. },
  20. )
  21. print(resp)
  22. resp1 = client.cluster.put_component_template(
  23. name="my-settings",
  24. template={
  25. "settings": {
  26. "index.lifecycle.name": "my-lifecycle-policy"
  27. }
  28. },
  29. meta={
  30. "description": "Settings for ILM",
  31. "my-custom-meta-field": "More arbitrary metadata"
  32. },
  33. )
  34. print(resp1)

Ruby

  1. response = client.cluster.put_component_template(
  2. name: 'my-mappings',
  3. body: {
  4. template: {
  5. mappings: {
  6. properties: {
  7. "@timestamp": {
  8. type: 'date',
  9. format: 'date_optional_time||epoch_millis'
  10. },
  11. message: {
  12. type: 'wildcard'
  13. }
  14. }
  15. }
  16. },
  17. _meta: {
  18. description: 'Mappings for @timestamp and message fields',
  19. "my-custom-meta-field": 'More arbitrary metadata'
  20. }
  21. }
  22. )
  23. puts response
  24. response = client.cluster.put_component_template(
  25. name: 'my-settings',
  26. body: {
  27. template: {
  28. settings: {
  29. 'index.lifecycle.name' => 'my-lifecycle-policy'
  30. }
  31. },
  32. _meta: {
  33. description: 'Settings for ILM',
  34. "my-custom-meta-field": 'More arbitrary metadata'
  35. }
  36. }
  37. )
  38. puts response

Js

  1. const response = await client.cluster.putComponentTemplate({
  2. name: "my-mappings",
  3. template: {
  4. mappings: {
  5. properties: {
  6. "@timestamp": {
  7. type: "date",
  8. format: "date_optional_time||epoch_millis",
  9. },
  10. message: {
  11. type: "wildcard",
  12. },
  13. },
  14. },
  15. },
  16. _meta: {
  17. description: "Mappings for @timestamp and message fields",
  18. "my-custom-meta-field": "More arbitrary metadata",
  19. },
  20. });
  21. console.log(response);
  22. const response1 = await client.cluster.putComponentTemplate({
  23. name: "my-settings",
  24. template: {
  25. settings: {
  26. "index.lifecycle.name": "my-lifecycle-policy",
  27. },
  28. },
  29. _meta: {
  30. description: "Settings for ILM",
  31. "my-custom-meta-field": "More arbitrary metadata",
  32. },
  33. });
  34. console.log(response1);

コンソール

  1. # マッピング用のコンポーネントテンプレートを作成
  2. PUT _component_template/my-mappings
  3. {
  4. "template": {
  5. "mappings": {
  6. "properties": {
  7. "@timestamp": {
  8. "type": "date",
  9. "format": "date_optional_time||epoch_millis"
  10. },
  11. "message": {
  12. "type": "wildcard"
  13. }
  14. }
  15. }
  16. },
  17. "_meta": {
  18. "description": "@timestampおよびmessageフィールドのマッピング",
  19. "my-custom-meta-field": "より多くの任意のメタデータ"
  20. }
  21. }
  22. # インデックス設定用のコンポーネントテンプレートを作成
  23. PUT _component_template/my-settings
  24. {
  25. "template": {
  26. "settings": {
  27. "index.lifecycle.name": "my-lifecycle-policy"
  28. }
  29. },
  30. "_meta": {
  31. "description": "ILMの設定",
  32. "my-custom-meta-field": "より多くの任意のメタデータ"
  33. }
  34. }

インデックステンプレートの作成

コンポーネントテンプレートを使用してインデックステンプレートを作成します。次を指定します:

  • データストリームの名前に一致する1つ以上のインデックスパターン。私たちのデータストリーム命名スキームを使用することをお勧めします。
  • テンプレートがデータストリーム対応であること。
  • マッピングとインデックス設定を含む任意のコンポーネントテンプレート。
  • 組み込みテンプレートとの衝突を避けるために、200よりも高い優先度。 インデックスパターンの衝突を避けるを参照してください。

Kibanaでインデックステンプレートを作成するには、メインメニューを開き、スタック
**管理

インデックス管理に移動します。インデックステンプレートビューで、テンプレートを作成**をクリックします。

また、インデックステンプレート作成APIを使用することもできます。data_streamオブジェクトを含めてデータストリームを有効にします。

Python

  1. resp = client.indices.put_index_template(
  2. name="my-index-template",
  3. index_patterns=[
  4. "my-data-stream*"
  5. ],
  6. data_stream={},
  7. composed_of=[
  8. "my-mappings",
  9. "my-settings"
  10. ],
  11. priority=500,
  12. meta={
  13. "description": "Template for my time series data",
  14. "my-custom-meta-field": "More arbitrary metadata"
  15. },
  16. )
  17. print(resp)

Ruby

  1. response = client.indices.put_index_template(
  2. name: 'my-index-template',
  3. body: {
  4. index_patterns: [
  5. 'my-data-stream*'
  6. ],
  7. data_stream: {},
  8. composed_of: [
  9. 'my-mappings',
  10. 'my-settings'
  11. ],
  12. priority: 500,
  13. _meta: {
  14. description: 'Template for my time series data',
  15. "my-custom-meta-field": 'More arbitrary metadata'
  16. }
  17. }
  18. )
  19. puts response

Js

  1. const response = await client.indices.putIndexTemplate({
  2. name: "my-index-template",
  3. index_patterns: ["my-data-stream*"],
  4. data_stream: {},
  5. composed_of: ["my-mappings", "my-settings"],
  6. priority: 500,
  7. _meta: {
  8. description: "Template for my time series data",
  9. "my-custom-meta-field": "More arbitrary metadata",
  10. },
  11. });
  12. console.log(response);

コンソール

  1. PUT _index_template/my-index-template
  2. {
  3. "index_patterns": ["my-data-stream*"],
  4. "data_stream": { },
  5. "composed_of": [ "my-mappings", "my-settings" ],
  6. "priority": 500,
  7. "_meta": {
  8. "description": "Template for my time series data",
  9. "my-custom-meta-field": "More arbitrary metadata"
  10. }
  11. }

データストリームの作成

インデックスリクエストは、データストリームにドキュメントを追加します。これらのリクエストは、op_typecreateを使用する必要があります。ドキュメントには@timestampフィールドが含まれている必要があります。

データストリームを自動的に作成するには、ストリームの名前をターゲットにしたインデックスリクエストを送信します。この名前は、インデックステンプレートのインデックスパターンの1つと一致する必要があります。

Python

  1. resp = client.bulk(
  2. index="my-data-stream",
  3. operations=[
  4. {
  5. "create": {}
  6. },
  7. {
  8. "@timestamp": "2099-05-06T16:21:15.000Z",
  9. "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
  10. },
  11. {
  12. "create": {}
  13. },
  14. {
  15. "@timestamp": "2099-05-06T16:25:42.000Z",
  16. "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638"
  17. }
  18. ],
  19. )
  20. print(resp)
  21. resp1 = client.index(
  22. index="my-data-stream",
  23. document={
  24. "@timestamp": "2099-05-06T16:21:15.000Z",
  25. "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
  26. },
  27. )
  28. print(resp1)

Ruby

  1. response = client.bulk(
  2. index: 'my-data-stream',
  3. body: [
  4. {
  5. create: {}
  6. },
  7. {
  8. "@timestamp": '2099-05-06T16:21:15.000Z',
  9. message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736'
  10. },
  11. {
  12. create: {}
  13. },
  14. {
  15. "@timestamp": '2099-05-06T16:25:42.000Z',
  16. message: '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638'
  17. }
  18. ]
  19. )
  20. puts response
  21. response = client.index(
  22. index: 'my-data-stream',
  23. body: {
  24. "@timestamp": '2099-05-06T16:21:15.000Z',
  25. message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736'
  26. }
  27. )
  28. puts response

Js

  1. const response = await client.bulk({
  2. index: "my-data-stream",
  3. operations: [
  4. {
  5. create: {},
  6. },
  7. {
  8. "@timestamp": "2099-05-06T16:21:15.000Z",
  9. message:
  10. '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736',
  11. },
  12. {
  13. create: {},
  14. },
  15. {
  16. "@timestamp": "2099-05-06T16:25:42.000Z",
  17. message:
  18. '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638',
  19. },
  20. ],
  21. });
  22. console.log(response);
  23. const response1 = await client.index({
  24. index: "my-data-stream",
  25. document: {
  26. "@timestamp": "2099-05-06T16:21:15.000Z",
  27. message:
  28. '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736',
  29. },
  30. });
  31. console.log(response1);

コンソール

  1. PUT my-data-stream/_bulk
  2. { "create":{ } }
  3. { "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
  4. { "create":{ } }
  5. { "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" }
  6. POST my-data-stream/_doc
  7. {
  8. "@timestamp": "2099-05-06T16:21:15.000Z",
  9. "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
  10. }

また、データストリーム作成APIを使用して手動でストリームを作成することもできます。ストリームの名前は、テンプレートのインデックスパターンの1つと一致する必要があります。

Python

  1. resp = client.indices.create_data_stream(
  2. name="my-data-stream",
  3. )
  4. print(resp)

Ruby

  1. response = client.indices.create_data_stream(
  2. name: 'my-data-stream'
  3. )
  4. puts response

Js

  1. const response = await client.indices.createDataStream({
  2. name: "my-data-stream",
  3. });
  4. console.log(response);

コンソール

  1. PUT _data_stream/my-data-stream

データストリームの保護

インデックス権限を使用してデータストリームへのアクセスを制御します。データストリームに対する権限を付与すると、そのバックインデックスに対しても同じ権限が付与されます。

例については、データストリーム権限を参照してください。

インデックスエイリアスをデータストリームに変換

Elasticsearch 7.9以前では、書き込みインデックスを持つインデックスエイリアスを使用して時系列データを管理していました。データストリームはこの機能を置き換え、メンテナンスが少なく、データティアと自動的に統合されます。

書き込みインデックスを持つインデックスエイリアスを同じ名前のデータストリームに変換するには、データストリームへの移行APIを使用します。変換中に、エイリアスのインデックスはストリームの隠れたバックインデックスになります。エイリアスの書き込みインデックスはストリームの書き込みインデックスになります。ストリームには、データストリームが有効な一致するインデックステンプレートが必要です。

Python

  1. resp = client.indices.migrate_to_data_stream(
  2. name="my-time-series-data",
  3. )
  4. print(resp)

Js

  1. const response = await client.indices.migrateToDataStream({
  2. name: "my-time-series-data",
  3. });
  4. console.log(response);

コンソール

  1. POST _data_stream/_migrate/my-time-series-data

データストリームに関する情報を取得

Kibanaでデータストリームに関する情報を取得するには、メインメニューを開き、**スタック管理

インデックス管理に移動します。データストリーム**ビューで、データストリームの名前をクリックします。

また、データストリーム取得APIを使用することもできます。

Python

  1. resp = client.indices.get_data_stream(
  2. name="my-data-stream",
  3. )
  4. print(resp)

Ruby

  1. response = client.indices.get_data_stream(
  2. name: 'my-data-stream'
  3. )
  4. puts response

Js

  1. const response = await client.indices.getDataStream({
  2. name: "my-data-stream",
  3. });
  4. console.log(response);

コンソール

  1. GET _data_stream/my-data-stream

データストリームの削除

Kibanaでデータストリームとそのバックインデックスを削除するには、メインメニューを開き、**スタック管理

インデックス管理に移動します。データストリーム**ビューで、ゴミ箱アイコンをクリックします。このアイコンは、データストリームに対するdelete_index セキュリティ権限を持っている場合にのみ表示されます。

また、データストリーム削除APIを使用することもできます。

Python

  1. resp = client.indices.delete_data_stream(
  2. name="my-data-stream",
  3. )
  4. print(resp)

Ruby

  1. response = client.indices.delete_data_stream(
  2. name: 'my-data-stream'
  3. )
  4. puts response

Js

  1. const response = await client.indices.deleteDataStream({
  2. name: "my-data-stream",
  3. });
  4. console.log(response);

コンソール

  1. DELETE _data_stream/my-data-stream