Elasticsearchを使用した時系列データ

Elasticsearchは、ログやメトリクスなどの時系列データを保存、管理、検索するための機能を提供します。一度Elasticsearchにデータを取り込むと、Kibanaや他のElastic Stackの機能を使用してデータを分析および視覚化できます。

データティアの設定

ElasticsearchのILM機能は、データティアを使用して、古いデータを年齢に応じてコストの低いハードウェアを持つノードに自動的に移動します。これにより、パフォーマンスが向上し、ストレージコストが削減されます。

ホットティアとコンテンツティアは必須です。ウォーム、コールド、フローズンティアはオプションです。

ホットティアとウォームティアでは、高性能のノードを使用して、最新のデータに対するインデックス作成と検索を高速化します。コールドティアとフローズンティアでは、コストを削減するために、遅いコストの低いノードを使用します。

コンテンツティアは通常、時系列データには使用されません。ただし、システムインデックスやデータストリームの一部でない他のインデックスを作成するには必要です。

データティアの設定手順は、デプロイメントの種類によって異なります:

  • 1. Elasticsearch Service Consoleにログインします。
  • 2. Elasticsearch Serviceのホームページまたはデプロイメントページから、デプロイメントを追加または選択します。
  • 3. デプロイメントメニューからデプロイメントの編集を選択します。
  • 4. データティアを有効にするには、容量を追加をクリックします。

オートスケーリングを有効にする

オートスケーリングは、ストレージニーズに応じてデプロイメントの容量を自動的に調整します。オートスケーリングを有効にするには、デプロイメントの編集ページでこのデプロイメントをオートスケールを選択します。オートスケーリングはElasticsearch Serviceでのみ利用可能です。

ノードをデータティアに割り当てるには、ノードのelasticsearch.ymlファイルにそれぞれのnode roleを追加します。既存のノードの役割を変更するには、ローリング再起動が必要です。

Yaml

  1. # コンテンツティア
  2. node.roles: [ data_content ]
  3. # ホットティア
  4. node.roles: [ data_hot ]
  5. # ウォームティア
  6. node.roles: [ data_warm ]
  7. # コールドティア
  8. node.roles: [ data_cold ]
  9. # フローズンティア
  10. node.roles: [ data_frozen ]

フローズンティアでは専用ノードを使用することをお勧めします。必要に応じて、他のノードを複数のティアに割り当てることができます。

Yaml

  1. node.roles: [ data_content, data_hot, data_warm ]

クラスターに必要な他の役割をノードに割り当てます。たとえば、小さなクラスターには複数の役割を持つノードがある場合があります。

Yaml

  1. node.roles: [ master, ingest, ml, data_hot, transform ]

スナップショットリポジトリの登録

コールドティアとフローズンティアは、検索可能なスナップショットを使用してローカルストレージコストを削減できます。

検索可能なスナップショットを使用するには、サポートされているスナップショットリポジトリを登録する必要があります。このリポジトリを登録する手順は、デプロイメントの種類とストレージプロバイダーによって異なります:

クラスターを作成すると、Elasticsearch Serviceはデフォルトのfound-snapshotsリポジトリを自動的に登録します。このリポジトリは検索可能なスナップショットをサポートしています。

  1. 次のカスタムリポジトリタイプのいずれかを検索可能なスナップショットとともに使用できます:
  2. - [Google Cloud Storage (GCS)](https://www.elastic.co/guide/en/cloud/current/ec-gcs-snapshotting.html)
  3. - [Azure Blob Storage](https://www.elastic.co/guide/en/cloud/current/ec-azure-snapshotting.html)
  4. - [Amazon Web Services (AWS)](https://www.elastic.co/guide/en/cloud/current/ec-aws-custom-repository.html)
  5. 次のリポジトリタイプのいずれかを検索可能なスナップショットとともに使用できます:
  6. - [AWS S3](/read/elasticsearch-8-15/094325de711979fa.md)
  7. - [Google Cloud Storage](/read/elasticsearch-8-15/dfd43ab9354680ac.md)
  8. - [Azure Blob Storage](/read/elasticsearch-8-15/d888d8b822a8a1ce.md)
  9. - [Hadoop Distributed File Store (HDFS)](https://www.elastic.co/guide/en/elasticsearch/plugins/8.15/repository-hdfs.html)
  10. - [共有ファイルシステム](/read/elasticsearch-8-15/42b9d9b87a4241d0.md)(NFSなど)
  11. - [読み取り専用HTTPおよびHTTPSリポジトリ](/read/elasticsearch-8-15/e140b4739f315eab.md)
  12. これらのリポジトリタイプの代替実装(たとえば、[MinIO](094325de711979fa.md#repository-s3-client))を使用することもできますが、完全に互換性がある必要があります。[リポジトリアナリシス](/read/elasticsearch-8-15/8c7630461481c6c0.md)APIを使用して、検索可能なスナップショットとの互換性を分析します。
  13. ## インデックスライフサイクルポリシーの作成または編集
  14. [データストリーム](/read/elasticsearch-8-15/4618071bf1c879cb.md)は、複数のバックインデックスにわたってデータを保存します。ILMは、これらのインデックスをデータティア間で自動的に移動するために[インデックスライフサイクルポリシー](/read/elasticsearch-8-15/4188a8e0ae3d047f.md)を使用します。
  15. FleetまたはElastic Agentを使用している場合は、Elasticsearchの組み込みライフサイクルポリシーの1つを編集します。カスタムアプリケーションを使用している場合は、自分のポリシーを作成します。いずれの場合も、ポリシーが次のことを確認してください:
  16. - 構成した各データティアのフェーズを含む。
  17. - ロールオーバーからのフェーズ遷移のためのしきい値、または`````min_age`````を計算する。
  18. - 必要に応じて、コールドおよびフローズンフェーズで検索可能なスナップショットを使用する。
  19. - 必要に応じて削除フェーズを含む。
  20. FleetElastic Agentは、次の組み込みライフサイクルポリシーを使用します:
  21. - `````logs
  • metrics
  • synthetics

パフォーマンス、耐障害性、保持要件に基づいて、これらのポリシーをカスタマイズできます。

Kibanaでポリシーを編集するには、メインメニューを開き、Stack Management >
Index Lifecycle Policiesに移動します。編集したいポリシーをクリックします。

また、ライフサイクルポリシーの更新APIを使用することもできます。

Python

  1. resp = client.ilm.put_lifecycle(
  2. name="logs",
  3. policy={
  4. "phases": {
  5. "hot": {
  6. "actions": {
  7. "rollover": {
  8. "max_primary_shard_size": "50gb"
  9. }
  10. }
  11. },
  12. "warm": {
  13. "min_age": "30d",
  14. "actions": {
  15. "shrink": {
  16. "number_of_shards": 1
  17. },
  18. "forcemerge": {
  19. "max_num_segments": 1
  20. }
  21. }
  22. },
  23. "cold": {
  24. "min_age": "60d",
  25. "actions": {
  26. "searchable_snapshot": {
  27. "snapshot_repository": "found-snapshots"
  28. }
  29. }
  30. },
  31. "frozen": {
  32. "min_age": "90d",
  33. "actions": {
  34. "searchable_snapshot": {
  35. "snapshot_repository": "found-snapshots"
  36. }
  37. }
  38. },
  39. "delete": {
  40. "min_age": "735d",
  41. "actions": {
  42. "delete": {}
  43. }
  44. }
  45. }
  46. },
  47. )
  48. print(resp)

Js

  1. const response = await client.ilm.putLifecycle({
  2. name: "logs",
  3. policy: {
  4. phases: {
  5. hot: {
  6. actions: {
  7. rollover: {
  8. max_primary_shard_size: "50gb",
  9. },
  10. },
  11. },
  12. warm: {
  13. min_age: "30d",
  14. actions: {
  15. shrink: {
  16. number_of_shards: 1,
  17. },
  18. forcemerge: {
  19. max_num_segments: 1,
  20. },
  21. },
  22. },
  23. cold: {
  24. min_age: "60d",
  25. actions: {
  26. searchable_snapshot: {
  27. snapshot_repository: "found-snapshots",
  28. },
  29. },
  30. },
  31. frozen: {
  32. min_age: "90d",
  33. actions: {
  34. searchable_snapshot: {
  35. snapshot_repository: "found-snapshots",
  36. },
  37. },
  38. },
  39. delete: {
  40. min_age: "735d",
  41. actions: {
  42. delete: {},
  43. },
  44. },
  45. },
  46. },
  47. });
  48. console.log(response);

コンソール

  1. PUT _ilm/policy/logs
  2. {
  3. "policy": {
  4. "phases": {
  5. "hot": {
  6. "actions": {
  7. "rollover": {
  8. "max_primary_shard_size": "50gb"
  9. }
  10. }
  11. },
  12. "warm": {
  13. "min_age": "30d",
  14. "actions": {
  15. "shrink": {
  16. "number_of_shards": 1
  17. },
  18. "forcemerge": {
  19. "max_num_segments": 1
  20. }
  21. }
  22. },
  23. "cold": {
  24. "min_age": "60d",
  25. "actions": {
  26. "searchable_snapshot": {
  27. "snapshot_repository": "found-snapshots"
  28. }
  29. }
  30. },
  31. "frozen": {
  32. "min_age": "90d",
  33. "actions": {
  34. "searchable_snapshot": {
  35. "snapshot_repository": "found-snapshots"
  36. }
  37. }
  38. },
  39. "delete": {
  40. "min_age": "735d",
  41. "actions": {
  42. "delete": {}
  43. }
  44. }
  45. }
  46. }
  47. }

Kibanaでポリシーを作成するには、メインメニューを開き、Stack Management >
Index Lifecycle Policiesに移動します。ポリシーを作成をクリックします。

また、ライフサイクルポリシーの更新APIを使用することもできます。

Python

  1. resp = client.ilm.put_lifecycle(
  2. name="my-lifecycle-policy",
  3. policy={
  4. "phases": {
  5. "hot": {
  6. "actions": {
  7. "rollover": {
  8. "max_primary_shard_size": "50gb"
  9. }
  10. }
  11. },
  12. "warm": {
  13. "min_age": "30d",
  14. "actions": {
  15. "shrink": {
  16. "number_of_shards": 1
  17. },
  18. "forcemerge": {
  19. "max_num_segments": 1
  20. }
  21. }
  22. },
  23. "cold": {
  24. "min_age": "60d",
  25. "actions": {
  26. "searchable_snapshot": {
  27. "snapshot_repository": "found-snapshots"
  28. }
  29. }
  30. },
  31. "frozen": {
  32. "min_age": "90d",
  33. "actions": {
  34. "searchable_snapshot": {
  35. "snapshot_repository": "found-snapshots"
  36. }
  37. }
  38. },
  39. "delete": {
  40. "min_age": "735d",
  41. "actions": {
  42. "delete": {}
  43. }
  44. }
  45. }
  46. },
  47. )
  48. print(resp)

Js

  1. const response = await client.ilm.putLifecycle({
  2. name: "my-lifecycle-policy",
  3. policy: {
  4. phases: {
  5. hot: {
  6. actions: {
  7. rollover: {
  8. max_primary_shard_size: "50gb",
  9. },
  10. },
  11. },
  12. warm: {
  13. min_age: "30d",
  14. actions: {
  15. shrink: {
  16. number_of_shards: 1,
  17. },
  18. forcemerge: {
  19. max_num_segments: 1,
  20. },
  21. },
  22. },
  23. cold: {
  24. min_age: "60d",
  25. actions: {
  26. searchable_snapshot: {
  27. snapshot_repository: "found-snapshots",
  28. },
  29. },
  30. },
  31. frozen: {
  32. min_age: "90d",
  33. actions: {
  34. searchable_snapshot: {
  35. snapshot_repository: "found-snapshots",
  36. },
  37. },
  38. },
  39. delete: {
  40. min_age: "735d",
  41. actions: {
  42. delete: {},
  43. },
  44. },
  45. },
  46. },
  47. });
  48. console.log(response);

コンソール

  1. PUT _ilm/policy/my-lifecycle-policy
  2. {
  3. "policy": {
  4. "phases": {
  5. "hot": {
  6. "actions": {
  7. "rollover": {
  8. "max_primary_shard_size": "50gb"
  9. }
  10. }
  11. },
  12. "warm": {
  13. "min_age": "30d",
  14. "actions": {
  15. "shrink": {
  16. "number_of_shards": 1
  17. },
  18. "forcemerge": {
  19. "max_num_segments": 1
  20. }
  21. }
  22. },
  23. "cold": {
  24. "min_age": "60d",
  25. "actions": {
  26. "searchable_snapshot": {
  27. "snapshot_repository": "found-snapshots"
  28. }
  29. }
  30. },
  31. "frozen": {
  32. "min_age": "90d",
  33. "actions": {
  34. "searchable_snapshot": {
  35. "snapshot_repository": "found-snapshots"
  36. }
  37. }
  38. },
  39. "delete": {
  40. "min_age": "735d",
  41. "actions": {
  42. "delete": {}
  43. }
  44. }
  45. }
  46. }
  47. }

コンポーネントテンプレートの作成

FleetまたはElastic Agentを使用している場合は、データの検索と視覚化にスキップします。FleetとElastic Agentは、データストリームを作成するために組み込みテンプレートを使用します。

カスタムアプリケーションを使用している場合は、自分のデータストリームを設定する必要があります。データストリームには、一致するインデックステンプレートが必要です。ほとんどの場合、1つ以上のコンポーネントテンプレートを使用してこのインデックステンプレートを構成します。通常、マッピングとインデックス設定のために別々のコンポーネントテンプレートを使用します。これにより、複数のインデックステンプレートでコンポーネントテンプレートを再利用できます。

コンポーネントテンプレートを作成する際には、次のことを含めます:

  • date](/read/elasticsearch-8-15/9dfa1da42eb162ff.md)またはdate_nanosマッピングを@timestampフィールドに対して指定します。マッピングを指定しない場合、Elasticsearchは@timestampをデフォルトオプションのdateフィールドとしてマッピングします。
  • index.lifecycle.nameインデックス設定でのライフサイクルポリシー。

フィールドをマッピングする際には、Elastic Common Schema (ECS)を使用してください。ECSフィールドは、デフォルトでいくつかのElastic Stack機能と統合されます。

フィールドのマッピング方法が不明な場合は、ランタイムフィールドを使用して、検索時に非構造化コンテンツからフィールドを抽出できます。たとえば、ログメッセージをwildcardフィールドにインデックスし、後で検索中にこのフィールドからIPアドレスや他のデータを抽出できます。

Kibanaでコンポーネントテンプレートを作成するには、メインメニューを開き、Stack
Management > Index Managementに移動します。インデックステンプレートビューで、コンポーネントテンプレートを作成をクリックします。

また、コンポーネントテンプレート作成APIを使用することもできます。

Python

  1. resp = client.cluster.put_component_template(
  2. name="my-mappings",
  3. template={
  4. "mappings": {
  5. "properties": {
  6. "@timestamp": {
  7. "type": "date",
  8. "format": "date_optional_time||epoch_millis"
  9. },
  10. "message": {
  11. "type": "wildcard"
  12. }
  13. }
  14. }
  15. },
  16. meta={
  17. "description": "Mappings for @timestamp and message fields",
  18. "my-custom-meta-field": "More arbitrary metadata"
  19. },
  20. )
  21. print(resp)
  22. resp1 = client.cluster.put_component_template(
  23. name="my-settings",
  24. template={
  25. "settings": {
  26. "index.lifecycle.name": "my-lifecycle-policy"
  27. }
  28. },
  29. meta={
  30. "description": "Settings for ILM",
  31. "my-custom-meta-field": "More arbitrary metadata"
  32. },
  33. )
  34. print(resp1)

Ruby

  1. response = client.cluster.put_component_template(
  2. name: 'my-mappings',
  3. body: {
  4. template: {
  5. mappings: {
  6. properties: {
  7. "@timestamp": {
  8. type: 'date',
  9. format: 'date_optional_time||epoch_millis'
  10. },
  11. message: {
  12. type: 'wildcard'
  13. }
  14. }
  15. }
  16. },
  17. _meta: {
  18. description: 'Mappings for @timestamp and message fields',
  19. "my-custom-meta-field": 'More arbitrary metadata'
  20. }
  21. }
  22. )
  23. puts response
  24. response = client.cluster.put_component_template(
  25. name: 'my-settings',
  26. body: {
  27. template: {
  28. settings: {
  29. 'index.lifecycle.name' => 'my-lifecycle-policy'
  30. }
  31. },
  32. _meta: {
  33. description: 'Settings for ILM',
  34. "my-custom-meta-field": 'More arbitrary metadata'
  35. }
  36. }
  37. )
  38. puts response

Js

  1. const response = await client.cluster.putComponentTemplate({
  2. name: "my-mappings",
  3. template: {
  4. mappings: {
  5. properties: {
  6. "@timestamp": {
  7. type: "date",
  8. format: "date_optional_time||epoch_millis",
  9. },
  10. message: {
  11. type: "wildcard",
  12. },
  13. },
  14. },
  15. },
  16. _meta: {
  17. description: "Mappings for @timestamp and message fields",
  18. "my-custom-meta-field": "More arbitrary metadata",
  19. },
  20. });
  21. console.log(response);
  22. const response1 = await client.cluster.putComponentTemplate({
  23. name: "my-settings",
  24. template: {
  25. settings: {
  26. "index.lifecycle.name": "my-lifecycle-policy",
  27. },
  28. },
  29. _meta: {
  30. description: "Settings for ILM",
  31. "my-custom-meta-field": "More arbitrary metadata",
  32. },
  33. });
  34. console.log(response1);

コンソール

  1. # マッピング用のコンポーネントテンプレートを作成
  2. PUT _component_template/my-mappings
  3. {
  4. "template": {
  5. "mappings": {
  6. "properties": {
  7. "@timestamp": {
  8. "type": "date",
  9. "format": "date_optional_time||epoch_millis"
  10. },
  11. "message": {
  12. "type": "wildcard"
  13. }
  14. }
  15. }
  16. },
  17. "_meta": {
  18. "description": "@timestampとmessageフィールドのマッピング",
  19. "my-custom-meta-field": "より多くの任意のメタデータ"
  20. }
  21. }
  22. # インデックス設定用のコンポーネントテンプレートを作成
  23. PUT _component_template/my-settings
  24. {
  25. "template": {
  26. "settings": {
  27. "index.lifecycle.name": "my-lifecycle-policy"
  28. }
  29. },
  30. "_meta": {
  31. "description": "ILMの設定",
  32. "my-custom-meta-field": "より多くの任意のメタデータ"
  33. }
  34. }

インデックステンプレートの作成

コンポーネントテンプレートを使用してインデックステンプレートを作成します。次のことを指定します:

  • データストリームの名前に一致する1つ以上のインデックスパターン。私たちのデータストリーム命名スキームを使用することをお勧めします。
  • テンプレートがデータストリーム対応であること。
  • マッピングとインデックス設定を含む任意のコンポーネントテンプレート。
  • 組み込みテンプレートとの衝突を避けるために200よりも高い優先度。インデックスパターンの衝突を避けるには、インデックスパターンの衝突を避けるを参照してください。

Kibanaでインデックステンプレートを作成するには、メインメニューを開き、Stack
Management > Index Managementに移動します。インデックステンプレートビューで、テンプレートを作成をクリックします。

また、インデックステンプレート作成APIを使用することもできます。data_streamオブジェクトを含めてデータストリームを有効にします。

Python

  1. resp = client.indices.put_index_template(
  2. name="my-index-template",
  3. index_patterns=[
  4. "my-data-stream*"
  5. ],
  6. data_stream={},
  7. composed_of=[
  8. "my-mappings",
  9. "my-settings"
  10. ],
  11. priority=500,
  12. meta={
  13. "description": "Template for my time series data",
  14. "my-custom-meta-field": "More arbitrary metadata"
  15. },
  16. )
  17. print(resp)

Ruby

  1. response = client.indices.put_index_template(
  2. name: 'my-index-template',
  3. body: {
  4. index_patterns: [
  5. 'my-data-stream*'
  6. ],
  7. data_stream: {},
  8. composed_of: [
  9. 'my-mappings',
  10. 'my-settings'
  11. ],
  12. priority: 500,
  13. _meta: {
  14. description: 'Template for my time series data',
  15. "my-custom-meta-field": 'More arbitrary metadata'
  16. }
  17. }
  18. )
  19. puts response

Js

  1. const response = await client.indices.putIndexTemplate({
  2. name: "my-index-template",
  3. index_patterns: ["my-data-stream*"],
  4. data_stream: {},
  5. composed_of: ["my-mappings", "my-settings"],
  6. priority: 500,
  7. _meta: {
  8. description: "Template for my time series data",
  9. "my-custom-meta-field": "More arbitrary metadata",
  10. },
  11. });
  12. console.log(response);

コンソール

  1. PUT _index_template/my-index-template
  2. {
  3. "index_patterns": ["my-data-stream*"],
  4. "data_stream": { },
  5. "composed_of": [ "my-mappings", "my-settings" ],
  6. "priority": 500,
  7. "_meta": {
  8. "description": "Template for my time series data",
  9. "my-custom-meta-field": "More arbitrary metadata"
  10. }
  11. }

データストリームにデータを追加

インデックスリクエストは、データストリームにドキュメントを追加します。これらのリクエストは、op_typecreateを使用する必要があります。ドキュメントには@timestampフィールドを含める必要があります。

データストリームを自動的に作成するには、ストリームの名前をターゲットにしたインデックスリクエストを送信します。この名前は、インデックステンプレートのインデックスパターンの1つと一致する必要があります。

Python

  1. resp = client.bulk(
  2. index="my-data-stream",
  3. operations=[
  4. {
  5. "create": {}
  6. },
  7. {
  8. "@timestamp": "2099-05-06T16:21:15.000Z",
  9. "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
  10. },
  11. {
  12. "create": {}
  13. },
  14. {
  15. "@timestamp": "2099-05-06T16:25:42.000Z",
  16. "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638"
  17. }
  18. ],
  19. )
  20. print(resp)
  21. resp1 = client.index(
  22. index="my-data-stream",
  23. document={
  24. "@timestamp": "2099-05-06T16:21:15.000Z",
  25. "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
  26. },
  27. )
  28. print(resp1)

Ruby

  1. response = client.bulk(
  2. index: 'my-data-stream',
  3. body: [
  4. {
  5. create: {}
  6. },
  7. {
  8. "@timestamp": '2099-05-06T16:21:15.000Z',
  9. message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736'
  10. },
  11. {
  12. create: {}
  13. },
  14. {
  15. "@timestamp": '2099-05-06T16:25:42.000Z',
  16. message: '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638'
  17. }
  18. ]
  19. )
  20. puts response
  21. response = client.index(
  22. index: 'my-data-stream',
  23. body: {
  24. "@timestamp": '2099-05-06T16:21:15.000Z',
  25. message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736'
  26. }
  27. )
  28. puts response

Js

  1. const response = await client.bulk({
  2. index: "my-data-stream",
  3. operations: [
  4. {
  5. create: {},
  6. },
  7. {
  8. "@timestamp": "2099-05-06T16:21:15.000Z",
  9. message:
  10. '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736',
  11. },
  12. {
  13. create: {},
  14. },
  15. {
  16. "@timestamp": "2099-05-06T16:25:42.000Z",
  17. message:
  18. '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638',
  19. },
  20. ],
  21. });
  22. console.log(response);
  23. const response1 = await client.index({
  24. index: "my-data-stream",
  25. document: {
  26. "@timestamp": "2099-05-06T16:21:15.000Z",
  27. message:
  28. '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736',
  29. },
  30. });
  31. console.log(response1);

コンソール

  1. PUT my-data-stream/_bulk
  2. { "create":{ } }
  3. { "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
  4. { "create":{ } }
  5. { "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" }
  6. POST my-data-stream/_doc
  7. {
  8. "@timestamp": "2099-05-06T16:21:15.000Z",
  9. "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
  10. }

データを検索して視覚化する

Kibanaでデータを探索し、検索するには、メインメニューを開いてDiscoverを選択します。KibanaのDiscoverドキュメントを参照してください。

Kibanaのダッシュボード機能を使用して、チャート、テーブル、地図などでデータを視覚化します。Kibanaのダッシュボードドキュメントを参照してください。

また、検索APIを使用してデータを検索および集約できます。ランタイムフィールドgrokパターンを使用して、検索時にログメッセージや他の非構造化コンテンツからデータを動的に抽出します。

Python

  1. resp = client.search(
  2. index="my-data-stream",
  3. runtime_mappings={
  4. "source.ip": {
  5. "type": "ip",
  6. "script": "\n String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n if (sourceip != null) emit(sourceip);\n "
  7. }
  8. },
  9. query={
  10. "bool": {
  11. "filter": [
  12. {
  13. "range": {
  14. "@timestamp": {
  15. "gte": "now-1d/d",
  16. "lt": "now/d"
  17. }
  18. }
  19. },
  20. {
  21. "range": {
  22. "source.ip": {
  23. "gte": "192.0.2.0",
  24. "lte": "192.0.2.255"
  25. }
  26. }
  27. }
  28. ]
  29. }
  30. },
  31. fields=[
  32. "*"
  33. ],
  34. source=False,
  35. sort=[
  36. {
  37. "@timestamp": "desc"
  38. },
  39. {
  40. "source.ip": "desc"
  41. }
  42. ],
  43. )
  44. print(resp)

Js

  1. const response = await client.search({
  2. index: "my-data-stream",
  3. runtime_mappings: {
  4. "source.ip": {
  5. type: "ip",
  6. script:
  7. "\n String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n if (sourceip != null) emit(sourceip);\n ",
  8. },
  9. },
  10. query: {
  11. bool: {
  12. filter: [
  13. {
  14. range: {
  15. "@timestamp": {
  16. gte: "now-1d/d",
  17. lt: "now/d",
  18. },
  19. },
  20. },
  21. {
  22. range: {
  23. "source.ip": {
  24. gte: "192.0.2.0",
  25. lte: "192.0.2.255",
  26. },
  27. },
  28. },
  29. ],
  30. },
  31. },
  32. fields: ["*"],
  33. _source: false,
  34. sort: [
  35. {
  36. "@timestamp": "desc",
  37. },
  38. {
  39. "source.ip": "desc",
  40. },
  41. ],
  42. });
  43. console.log(response);

コンソール

  1. GET my-data-stream/_search
  2. {
  3. "runtime_mappings": {
  4. "source.ip": {
  5. "type": "ip",
  6. "script": """
  7. String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "message" ].value)?.sourceip;
  8. if (sourceip != null) emit(sourceip);
  9. """
  10. }
  11. },
  12. "query": {
  13. "bool": {
  14. "filter": [
  15. {
  16. "range": {
  17. "@timestamp": {
  18. "gte": "now-1d/d",
  19. "lt": "now/d"
  20. }
  21. }
  22. },
  23. {
  24. "range": {
  25. "source.ip": {
  26. "gte": "192.0.2.0",
  27. "lte": "192.0.2.255"
  28. }
  29. }
  30. }
  31. ]
  32. }
  33. },
  34. "fields": [
  35. "*"
  36. ],
  37. "_source": false,
  38. "sort": [
  39. {
  40. "@timestamp": "desc"
  41. },
  42. {
  43. "source.ip": "desc"
  44. }
  45. ]
  46. }

Elasticsearchの検索はデフォルトで同期的です。フローズンデータ、長い時間範囲、大規模データセットに対する検索は時間がかかる場合があります。非同期検索APIを使用して、バックグラウンドで検索を実行します。その他の検索オプションについては、検索APIを参照してください。

Python

  1. resp = client.async_search.submit(
  2. index="my-data-stream",
  3. runtime_mappings={
  4. "source.ip": {
  5. "type": "ip",
  6. "script": "\n String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n if (sourceip != null) emit(sourceip);\n "
  7. }
  8. },
  9. query={
  10. "bool": {
  11. "filter": [
  12. {
  13. "range": {
  14. "@timestamp": {
  15. "gte": "now-2y/d",
  16. "lt": "now/d"
  17. }
  18. }
  19. },
  20. {
  21. "range": {
  22. "source.ip": {
  23. "gte": "192.0.2.0",
  24. "lte": "192.0.2.255"
  25. }
  26. }
  27. }
  28. ]
  29. }
  30. },
  31. fields=[
  32. "*"
  33. ],
  34. source=False,
  35. sort=[
  36. {
  37. "@timestamp": "desc"
  38. },
  39. {
  40. "source.ip": "desc"
  41. }
  42. ],
  43. )
  44. print(resp)

Js

  1. const response = await client.asyncSearch.submit({
  2. index: "my-data-stream",
  3. runtime_mappings: {
  4. "source.ip": {
  5. type: "ip",
  6. script:
  7. "\n String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n if (sourceip != null) emit(sourceip);\n ",
  8. },
  9. },
  10. query: {
  11. bool: {
  12. filter: [
  13. {
  14. range: {
  15. "@timestamp": {
  16. gte: "now-2y/d",
  17. lt: "now/d",
  18. },
  19. },
  20. },
  21. {
  22. range: {
  23. "source.ip": {
  24. gte: "192.0.2.0",
  25. lte: "192.0.2.255",
  26. },
  27. },
  28. },
  29. ],
  30. },
  31. },
  32. fields: ["*"],
  33. _source: false,
  34. sort: [
  35. {
  36. "@timestamp": "desc",
  37. },
  38. {
  39. "source.ip": "desc",
  40. },
  41. ],
  42. });
  43. console.log(response);

コンソール

  1. POST my-data-stream/_async_search
  2. {
  3. "runtime_mappings": {
  4. "source.ip": {
  5. "type": "ip",
  6. "script": """
  7. String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "message" ].value)?.sourceip;
  8. if (sourceip != null) emit(sourceip);
  9. """
  10. }
  11. },
  12. "query": {
  13. "bool": {
  14. "filter": [
  15. {
  16. "range": {
  17. "@timestamp": {
  18. "gte": "now-2y/d",
  19. "lt": "now/d"
  20. }
  21. }
  22. },
  23. {
  24. "range": {
  25. "source.ip": {
  26. "gte": "192.0.2.0",
  27. "lte": "192.0.2.255"
  28. }
  29. }
  30. }
  31. ]
  32. }
  33. },
  34. "fields": [
  35. "*"
  36. ],
  37. "_source": false,
  38. "sort": [
  39. {
  40. "@timestamp": "desc"
  41. },
  42. {
  43. "source.ip": "desc"
  44. }
  45. ]
  46. }