インジェストパイプライン

インジェストパイプラインを使用すると、インデックス作成前にデータに対して一般的な変換を実行できます。たとえば、パイプラインを使用してフィールドを削除したり、テキストから値を抽出したり、データを強化したりできます。

パイプラインは、プロセッサと呼ばれる一連の構成可能なタスクで構成されています。各プロセッサは順次実行され、受信したドキュメントに特定の変更を加えます。プロセッサが実行された後、Elasticsearchは変換されたドキュメントをデータストリームまたはインデックスに追加します。

インジェストパイプラインの図

Kibanaのインジェストパイプライン機能またはインジェストAPIを使用して、インジェストパイプラインを作成および管理できます。Elasticsearchは、クラスタ状態にパイプラインを保存します。

前提条件

  • ingestノードロールを持つノードがパイプライン処理を処理します。インジェストパイプラインを使用するには、クラスターに少なくとも1つのingestロールを持つノードが必要です。重いインジェスト負荷の場合は、専用のインジェストノードを作成することをお勧めします。
  • Elasticsearchのセキュリティ機能が有効になっている場合、インジェストパイプラインを管理するには、manage_pipeline クラスタ特権を持っている必要があります。Kibanaのインジェストパイプライン機能を使用するには、cluster:monitor/nodes/infoクラスタ特権も必要です。
  • enrichプロセッサを含むパイプラインには追加の設定が必要です。詳細はデータを強化するを参照してください。

パイプラインの作成と管理

Kibanaで、メインメニューを開き、**スタック管理

インジェスト パイプライン**をクリックします。リストビューから、次のことができます:

  • パイプラインのリストを表示し、詳細を掘り下げる
  • 既存のパイプラインを編集または複製する
  • パイプラインを削除する

    Kibanaのインジェストパイプラインリストビュー

パイプラインを作成するには、**パイプラインを作成

新しいパイプライン*をクリックします。チュートリアルの例については、[例:ログの解析*](/read/elasticsearch-8-15/4ffc62b2cad80217.md)を参照してください。

CSVから新しいパイプラインオプションを使用すると、CSVを使用してカスタムデータをElastic Common Schema (ECS)にマッピングするインジェストパイプラインを作成できます。カスタムデータをECSにマッピングすると、データの検索が容易になり、他のデータセットからの視覚化を再利用できます。始めるには、カスタムデータをECSにマッピングするを確認してください。

また、インジェストAPIを使用してパイプラインを作成および管理することもできます。次のパイプライン作成APIリクエストは、2つのsetプロセッサとlowercaseプロセッサを含むパイプラインを作成します。プロセッサは指定された順序で順次実行されます。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. description="My optional pipeline description",
  4. processors=[
  5. {
  6. "set": {
  7. "description": "My optional processor description",
  8. "field": "my-long-field",
  9. "value": 10
  10. }
  11. },
  12. {
  13. "set": {
  14. "description": "Set 'my-boolean-field' to true",
  15. "field": "my-boolean-field",
  16. "value": True
  17. }
  18. },
  19. {
  20. "lowercase": {
  21. "field": "my-keyword-field"
  22. }
  23. }
  24. ],
  25. )
  26. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. description: 'My optional pipeline description',
  5. processors: [
  6. {
  7. set: {
  8. description: 'My optional processor description',
  9. field: 'my-long-field',
  10. value: 10
  11. }
  12. },
  13. {
  14. set: {
  15. description: "Set 'my-boolean-field' to true",
  16. field: 'my-boolean-field',
  17. value: true
  18. }
  19. },
  20. {
  21. lowercase: {
  22. field: 'my-keyword-field'
  23. }
  24. }
  25. ]
  26. }
  27. )
  28. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. description: "My optional pipeline description",
  4. processors: [
  5. {
  6. set: {
  7. description: "My optional processor description",
  8. field: "my-long-field",
  9. value: 10,
  10. },
  11. },
  12. {
  13. set: {
  14. description: "Set 'my-boolean-field' to true",
  15. field: "my-boolean-field",
  16. value: true,
  17. },
  18. },
  19. {
  20. lowercase: {
  21. field: "my-keyword-field",
  22. },
  23. },
  24. ],
  25. });
  26. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "description": "My optional pipeline description",
  4. "processors": [
  5. {
  6. "set": {
  7. "description": "My optional processor description",
  8. "field": "my-long-field",
  9. "value": 10
  10. }
  11. },
  12. {
  13. "set": {
  14. "description": "Set 'my-boolean-field' to true",
  15. "field": "my-boolean-field",
  16. "value": true
  17. }
  18. },
  19. {
  20. "lowercase": {
  21. "field": "my-keyword-field"
  22. }
  23. }
  24. ]
  25. }

パイプラインバージョンの管理

パイプラインを作成または更新する際に、オプションのversion整数を指定できます。このバージョン番号をif_versionパラメータと共に使用して、条件付きでパイプラインを更新できます。if_versionパラメータが指定されている場合、成功した更新によりパイプラインのバージョンが増加します。

Console

  1. PUT _ingest/pipeline/my-pipeline-id
  2. {
  3. "version": 1,
  4. "processors": [ ... ]
  5. }

APIを使用してversion番号を解除するには、versionパラメータを指定せずにパイプラインを置き換えるか更新します。

パイプラインのテスト

パイプラインを本番環境で使用する前に、サンプルドキュメントを使用してテストすることをお勧めします。Kibanaでパイプラインを作成または編集する際に、ドキュメントを追加をクリックします。ドキュメントタブで、サンプルドキュメントを提供し、パイプラインを実行をクリックします。

Kibanaでのパイプラインのテスト

また、シミュレートパイプラインAPIを使用してパイプラインをテストすることもできます。リクエストパスに構成されたパイプラインを指定できます。たとえば、次のリクエストはmy-pipelineをテストします。

Python

  1. resp = client.ingest.simulate(
  2. id="my-pipeline",
  3. docs=[
  4. {
  5. "_source": {
  6. "my-keyword-field": "FOO"
  7. }
  8. },
  9. {
  10. "_source": {
  11. "my-keyword-field": "BAR"
  12. }
  13. }
  14. ],
  15. )
  16. print(resp)

Ruby

  1. response = client.ingest.simulate(
  2. id: 'my-pipeline',
  3. body: {
  4. docs: [
  5. {
  6. _source: {
  7. "my-keyword-field": 'FOO'
  8. }
  9. },
  10. {
  11. _source: {
  12. "my-keyword-field": 'BAR'
  13. }
  14. }
  15. ]
  16. }
  17. )
  18. puts response

Js

  1. const response = await client.ingest.simulate({
  2. id: "my-pipeline",
  3. docs: [
  4. {
  5. _source: {
  6. "my-keyword-field": "FOO",
  7. },
  8. },
  9. {
  10. _source: {
  11. "my-keyword-field": "BAR",
  12. },
  13. },
  14. ],
  15. });
  16. console.log(response);

Console

  1. POST _ingest/pipeline/my-pipeline/_simulate
  2. {
  3. "docs": [
  4. {
  5. "_source": {
  6. "my-keyword-field": "FOO"
  7. }
  8. },
  9. {
  10. "_source": {
  11. "my-keyword-field": "BAR"
  12. }
  13. }
  14. ]
  15. }

リクエストボディにパイプラインとそのプロセッサを指定することもできます。

Python

  1. resp = client.ingest.simulate(
  2. pipeline={
  3. "processors": [
  4. {
  5. "lowercase": {
  6. "field": "my-keyword-field"
  7. }
  8. }
  9. ]
  10. },
  11. docs=[
  12. {
  13. "_source": {
  14. "my-keyword-field": "FOO"
  15. }
  16. },
  17. {
  18. "_source": {
  19. "my-keyword-field": "BAR"
  20. }
  21. }
  22. ],
  23. )
  24. print(resp)

Ruby

  1. response = client.ingest.simulate(
  2. body: {
  3. pipeline: {
  4. processors: [
  5. {
  6. lowercase: {
  7. field: 'my-keyword-field'
  8. }
  9. }
  10. ]
  11. },
  12. docs: [
  13. {
  14. _source: {
  15. "my-keyword-field": 'FOO'
  16. }
  17. },
  18. {
  19. _source: {
  20. "my-keyword-field": 'BAR'
  21. }
  22. }
  23. ]
  24. }
  25. )
  26. puts response

Js

  1. const response = await client.ingest.simulate({
  2. pipeline: {
  3. processors: [
  4. {
  5. lowercase: {
  6. field: "my-keyword-field",
  7. },
  8. },
  9. ],
  10. },
  11. docs: [
  12. {
  13. _source: {
  14. "my-keyword-field": "FOO",
  15. },
  16. },
  17. {
  18. _source: {
  19. "my-keyword-field": "BAR",
  20. },
  21. },
  22. ],
  23. });
  24. console.log(response);

Console

  1. POST _ingest/pipeline/_simulate
  2. {
  3. "pipeline": {
  4. "processors": [
  5. {
  6. "lowercase": {
  7. "field": "my-keyword-field"
  8. }
  9. }
  10. ]
  11. },
  12. "docs": [
  13. {
  14. "_source": {
  15. "my-keyword-field": "FOO"
  16. }
  17. },
  18. {
  19. "_source": {
  20. "my-keyword-field": "BAR"
  21. }
  22. }
  23. ]
  24. }

APIは変換されたドキュメントを返します:

Console-Result

  1. {
  2. "docs": [
  3. {
  4. "doc": {
  5. "_index": "_index",
  6. "_id": "_id",
  7. "_version": "-3",
  8. "_source": {
  9. "my-keyword-field": "foo"
  10. },
  11. "_ingest": {
  12. "timestamp": "2099-03-07T11:04:03.000Z"
  13. }
  14. }
  15. },
  16. {
  17. "doc": {
  18. "_index": "_index",
  19. "_id": "_id",
  20. "_version": "-3",
  21. "_source": {
  22. "my-keyword-field": "bar"
  23. },
  24. "_ingest": {
  25. "timestamp": "2099-03-07T11:04:04.000Z"
  26. }
  27. }
  28. }
  29. ]
  30. }

インデックスリクエストにパイプラインを追加

  1. #### Python
  2. ``````python
  3. resp = client.index(
  4. index="my-data-stream",
  5. pipeline="my-pipeline",
  6. document={
  7. "@timestamp": "2099-03-07T11:04:05.000Z",
  8. "my-keyword-field": "foo"
  9. },
  10. )
  11. print(resp)
  12. resp1 = client.bulk(
  13. index="my-data-stream",
  14. pipeline="my-pipeline",
  15. operations=[
  16. {
  17. "create": {}
  18. },
  19. {
  20. "@timestamp": "2099-03-07T11:04:06.000Z",
  21. "my-keyword-field": "foo"
  22. },
  23. {
  24. "create": {}
  25. },
  26. {
  27. "@timestamp": "2099-03-07T11:04:07.000Z",
  28. "my-keyword-field": "bar"
  29. }
  30. ],
  31. )
  32. print(resp1)
  33. `

Ruby

  1. response = client.index(
  2. index: 'my-data-stream',
  3. pipeline: 'my-pipeline',
  4. body: {
  5. "@timestamp": '2099-03-07T11:04:05.000Z',
  6. "my-keyword-field": 'foo'
  7. }
  8. )
  9. puts response
  10. response = client.bulk(
  11. index: 'my-data-stream',
  12. pipeline: 'my-pipeline',
  13. body: [
  14. {
  15. create: {}
  16. },
  17. {
  18. "@timestamp": '2099-03-07T11:04:06.000Z',
  19. "my-keyword-field": 'foo'
  20. },
  21. {
  22. create: {}
  23. },
  24. {
  25. "@timestamp": '2099-03-07T11:04:07.000Z',
  26. "my-keyword-field": 'bar'
  27. }
  28. ]
  29. )
  30. puts response

Js

  1. const response = await client.index({
  2. index: "my-data-stream",
  3. pipeline: "my-pipeline",
  4. document: {
  5. "@timestamp": "2099-03-07T11:04:05.000Z",
  6. "my-keyword-field": "foo",
  7. },
  8. });
  9. console.log(response);
  10. const response1 = await client.bulk({
  11. index: "my-data-stream",
  12. pipeline: "my-pipeline",
  13. operations: [
  14. {
  15. create: {},
  16. },
  17. {
  18. "@timestamp": "2099-03-07T11:04:06.000Z",
  19. "my-keyword-field": "foo",
  20. },
  21. {
  22. create: {},
  23. },
  24. {
  25. "@timestamp": "2099-03-07T11:04:07.000Z",
  26. "my-keyword-field": "bar",
  27. },
  28. ],
  29. });
  30. console.log(response1);

Console

  1. POST my-data-stream/_doc?pipeline=my-pipeline
  2. {
  3. "@timestamp": "2099-03-07T11:04:05.000Z",
  4. "my-keyword-field": "foo"
  5. }
  6. PUT my-data-stream/_bulk?pipeline=my-pipeline
  7. { "create":{ } }
  8. { "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
  9. { "create":{ } }
  10. { "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

また、update by queryまたはreindex APIでpipelineパラメータを使用することもできます。

Python

  1. resp = client.update_by_query(
  2. index="my-data-stream",
  3. pipeline="my-pipeline",
  4. )
  5. print(resp)
  6. resp1 = client.reindex(
  7. source={
  8. "index": "my-data-stream"
  9. },
  10. dest={
  11. "index": "my-new-data-stream",
  12. "op_type": "create",
  13. "pipeline": "my-pipeline"
  14. },
  15. )
  16. print(resp1)

Ruby

  1. response = client.update_by_query(
  2. index: 'my-data-stream',
  3. pipeline: 'my-pipeline'
  4. )
  5. puts response
  6. response = client.reindex(
  7. body: {
  8. source: {
  9. index: 'my-data-stream'
  10. },
  11. dest: {
  12. index: 'my-new-data-stream',
  13. op_type: 'create',
  14. pipeline: 'my-pipeline'
  15. }
  16. }
  17. )
  18. puts response

Js

  1. const response = await client.updateByQuery({
  2. index: "my-data-stream",
  3. pipeline: "my-pipeline",
  4. });
  5. console.log(response);
  6. const response1 = await client.reindex({
  7. source: {
  8. index: "my-data-stream",
  9. },
  10. dest: {
  11. index: "my-new-data-stream",
  12. op_type: "create",
  13. pipeline: "my-pipeline",
  14. },
  15. });
  16. console.log(response1);

Console

  1. POST my-data-stream/_update_by_query?pipeline=my-pipeline
  2. POST _reindex
  3. {
  4. "source": {
  5. "index": "my-data-stream"
  6. },
  7. "dest": {
  8. "index": "my-new-data-stream",
  9. "op_type": "create",
  10. "pipeline": "my-pipeline"
  11. }
  12. }

デフォルトパイプラインの設定

index.default_pipelineインデックス設定を使用してデフォルトパイプラインを設定します。pipelineパラメータが指定されていない場合、Elasticsearchはこのパイプラインをインデックスリクエストに適用します。

最終パイプラインの設定

index.final_pipelineインデックス設定を使用して最終パイプラインを設定します。Elasticsearchは、リクエストまたはデフォルトパイプラインの後にこのパイプラインを適用します。指定されていない場合でも適用されます。

Beats用のパイプライン

Elastic Beatにインジェストパイプラインを追加するには、pipelineパラメータをoutput.elasticsearchの下に<BEAT_NAME>.ymlで指定します。たとえば、Filebeatの場合、pipelinefilebeat.ymlで指定します。

Yaml

  1. output.elasticsearch:
  2. hosts: ["localhost:9200"]
  3. pipeline: my-pipeline

FleetおよびElastic Agent用のパイプライン

Elastic Agentの統合には、データをインデックス作成前に前処理および強化するデフォルトのインジェストパイプラインが付属しています。Fleetは、パイプラインインデックス設定を含むインデックステンプレートを使用してこれらのパイプラインを適用します。Elasticsearchは、ストリームの命名スキームに基づいて、これらのテンプレートをFleetデータストリームに一致させます。

各デフォルト統合パイプラインは、存在しないバージョン管理されていない*@customインジェストパイプラインを呼び出します。変更されていない場合、このパイプライン呼び出しはデータに影響を与えません。ただし、この呼び出しを変更して、アップグレードを超えて持続する統合用のカスタムパイプラインを作成できます。チュートリアル:カスタムインジェストパイプラインでデータを変換するを参照して詳細を学んでください。

Fleetは、カスタムログ統合のデフォルトインジェストパイプラインを提供しませんが、インデックステンプレートまたはカスタム構成を使用してこの統合のためのパイプラインを指定できます。

オプション1:インデックステンプレート

  • 1. 作成し、テストしてインジェストパイプラインを作成します。パイプラインにlogs-<dataset-name>-defaultという名前を付けます。これにより、統合のためのパイプラインの追跡が容易になります。
    たとえば、次のリクエストはmy-appデータセットのためのパイプラインを作成します。パイプラインの名前はlogs-my_app-defaultです。

Console

  1. PUT _ingest/pipeline/logs-my_app-default
  2. {
  3. "description": "Pipeline for `my_app` dataset",
  4. "processors": [ ... ]
  5. }

Python

  1. resp = client.cluster.put_component_template(
  2. name="logs-my_app-settings",
  3. template={
  4. "settings": {
  5. "index.default_pipeline": "logs-my_app-default",
  6. "index.lifecycle.name": "logs"
  7. }
  8. },
  9. )
  10. print(resp)
  11. resp1 = client.indices.put_index_template(
  12. name="logs-my_app-template",
  13. index_patterns=[
  14. "logs-my_app-*"
  15. ],
  16. data_stream={},
  17. priority=500,
  18. composed_of=[
  19. "logs-my_app-settings",
  20. "logs-my_app-mappings"
  21. ],
  22. )
  23. print(resp1)

Js

  1. const response = await client.cluster.putComponentTemplate({
  2. name: "logs-my_app-settings",
  3. template: {
  4. settings: {
  5. "index.default_pipeline": "logs-my_app-default",
  6. "index.lifecycle.name": "logs",
  7. },
  8. },
  9. });
  10. console.log(response);
  11. const response1 = await client.indices.putIndexTemplate({
  12. name: "logs-my_app-template",
  13. index_patterns: ["logs-my_app-*"],
  14. data_stream: {},
  15. priority: 500,
  16. composed_of: ["logs-my_app-settings", "logs-my_app-mappings"],
  17. });
  18. console.log(response1);

Console

  1. # インデックス設定のためのコンポーネントテンプレートを作成
  2. PUT _component_template/logs-my_app-settings
  3. {
  4. "template": {
  5. "settings": {
  6. "index.default_pipeline": "logs-my_app-default",
  7. "index.lifecycle.name": "logs"
  8. }
  9. }
  10. }
  11. # `logs-my_app-*`に一致するインデックステンプレートを作成
  12. PUT _index_template/logs-my_app-template
  13. {
  14. "index_patterns": ["logs-my_app-*"],
  15. "data_stream": { },
  16. "priority": 500,
  17. "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"]
  18. }
  • 3. Fleetでカスタムログ統合を追加または編集する際に、**統合の構成

    カスタムログファイル
    高度なオプション**をクリックします。

  • 4. データセット名に、データセットの名前を指定します。Fleetは、結果のlogs-<dataset-name>-defaultデータストリームに統合のための新しいデータを追加します。
    たとえば、データセットの名前がmy_appの場合、Fleetはlogs-my_app-defaultデータストリームに新しいデータを追加します。
    Fleetでのカスタムログ統合の設定
  • 5. ロールオーバーAPIを使用してデータストリームをロールオーバーします。これにより、Elasticsearchはインデックステンプレートとそのパイプライン設定を統合の新しいデータに適用します。

Python

  1. resp = client.indices.rollover(
  2. alias="logs-my_app-default",
  3. )
  4. print(resp)

Ruby

  1. response = client.indices.rollover(
  2. alias: 'logs-my_app-default'
  3. )
  4. puts response

Js

  1. const response = await client.indices.rollover({
  2. alias: "logs-my_app-default",
  3. });
  4. console.log(response);

Console

  1. POST logs-my_app-default/_rollover/

オプション2:カスタム構成

  • 1. 作成し、テストしてインジェストパイプラインを作成します。パイプラインにlogs-<dataset-name>-defaultという名前を付けます。これにより、統合のためのパイプラインの追跡が容易になります。
    たとえば、次のリクエストはmy-appデータセットのためのパイプラインを作成します。パイプラインの名前はlogs-my_app-defaultです。

Console

  1. PUT _ingest/pipeline/logs-my_app-default
  2. {
  3. "description": "Pipeline for `my_app` dataset",
  4. "processors": [ ... ]
  5. }
  • 2. Fleetでカスタムログ統合を追加または編集する際に、**統合の構成

    カスタムログファイル
    高度なオプション**をクリックします。

  • 3. データセット名に、データセットの名前を指定します。Fleetは、結果のlogs-<dataset-name>-defaultデータストリームに統合のための新しいデータを追加します。
    たとえば、データセットの名前がmy_appの場合、Fleetはlogs-my_app-defaultデータストリームに新しいデータを追加します。
  • 4. カスタム構成で、pipelineポリシー設定にパイプラインを指定します。
    カスタムログ統合のためのカスタムパイプライン設定

Elastic Agentスタンドアロン

Elastic Agentをスタンドアロンで実行している場合、インデックステンプレートを使用して、index.default_pipelineまたはindex.final_pipelineインデックス設定を含めることができます。あるいは、pipelineポリシー設定をelastic-agent.yml構成に指定できます。スタンドアロンElastic Agentsのインストールを参照してください。

検索インデックス用のパイプライン

検索ユースケースのためにElasticsearchインデックスを作成する際、たとえば、ウェブクローラーコネクタを使用する場合、これらのインデックスは特定のインジェストパイプラインで自動的に設定されます。これらのプロセッサは、検索のためにコンテンツを最適化するのに役立ちます。詳細については検索におけるインジェストパイプラインを参照してください。

プロセッサ内でソースフィールドにアクセス

プロセッサは、受信ドキュメントのソースフィールドに対して読み取りおよび書き込みアクセスを持っています。プロセッサ内でフィールドキーにアクセスするには、そのフィールド名を使用します。次のsetプロセッサはmy-long-fieldにアクセスします。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "set": {
  6. "field": "my-long-field",
  7. "value": 10
  8. }
  9. }
  10. ],
  11. )
  12. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. set: {
  7. field: 'my-long-field',
  8. value: 10
  9. }
  10. }
  11. ]
  12. }
  13. )
  14. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. set: {
  6. field: "my-long-field",
  7. value: 10,
  8. },
  9. },
  10. ],
  11. });
  12. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "set": {
  6. "field": "my-long-field",
  7. "value": 10
  8. }
  9. }
  10. ]
  11. }
  1. #### Python
  2. ``````python
  3. resp = client.ingest.put_pipeline(
  4. id="my-pipeline",
  5. processors=[
  6. {
  7. "set": {
  8. "field": "_source.my-long-field",
  9. "value": 10
  10. }
  11. }
  12. ],
  13. )
  14. print(resp)
  15. `

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. set: {
  7. field: '_source.my-long-field',
  8. value: 10
  9. }
  10. }
  11. ]
  12. }
  13. )
  14. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. set: {
  6. field: "_source.my-long-field",
  7. value: 10,
  8. },
  9. },
  10. ],
  11. });
  12. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "set": {
  6. "field": "_source.my-long-field",
  7. "value": 10
  8. }
  9. }
  10. ]
  11. }

オブジェクトフィールドにアクセスするには、ドット表記を使用します。

ドキュメントにフラット化されたオブジェクトが含まれている場合は、最初にdot_expanderプロセッサを使用してそれらを展開してください。他のインジェストプロセッサはフラット化されたオブジェクトにアクセスできません。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "dot_expander": {
  6. "description": "Expand 'my-object-field.my-property'",
  7. "field": "my-object-field.my-property"
  8. }
  9. },
  10. {
  11. "set": {
  12. "description": "Set 'my-object-field.my-property' to 10",
  13. "field": "my-object-field.my-property",
  14. "value": 10
  15. }
  16. }
  17. ],
  18. )
  19. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. dot_expander: {
  7. description: "Expand 'my-object-field.my-property'",
  8. field: 'my-object-field.my-property'
  9. }
  10. },
  11. {
  12. set: {
  13. description: "Set 'my-object-field.my-property' to 10",
  14. field: 'my-object-field.my-property',
  15. value: 10
  16. }
  17. }
  18. ]
  19. }
  20. )
  21. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. dot_expander: {
  6. description: "Expand 'my-object-field.my-property'",
  7. field: "my-object-field.my-property",
  8. },
  9. },
  10. {
  11. set: {
  12. description: "Set 'my-object-field.my-property' to 10",
  13. field: "my-object-field.my-property",
  14. value: 10,
  15. },
  16. },
  17. ],
  18. });
  19. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "dot_expander": {
  6. "description": "Expand 'my-object-field.my-property'",
  7. "field": "my-object-field.my-property"
  8. }
  9. },
  10. {
  11. "set": {
  12. "description": "Set 'my-object-field.my-property' to 10",
  13. "field": "my-object-field.my-property",
  14. "value": 10
  15. }
  16. }
  17. ]
  18. }

いくつかのプロセッサパラメータは、Mustacheテンプレートスニペットをサポートしています。テンプレートスニペット内でフィールド値にアクセスするには、フィールド名を三重の波括弧で囲みます:{{{field-name}}}。テンプレートスニペットを使用して、フィールド名を動的に設定できます。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "set": {
  6. "description": "Set dynamic '<service>' field to 'code' value",
  7. "field": "{{{service}}}",
  8. "value": "{{{code}}}"
  9. }
  10. }
  11. ],
  12. )
  13. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. set: {
  7. description: "Set dynamic '<service>' field to 'code' value",
  8. field: '{{{service}}}',
  9. value: '{{{code}}}'
  10. }
  11. }
  12. ]
  13. }
  14. )
  15. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. set: {
  6. description: "Set dynamic '<service>' field to 'code' value",
  7. field: "{{{service}}}",
  8. value: "{{{code}}}",
  9. },
  10. },
  11. ],
  12. });
  13. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "set": {
  6. "description": "Set dynamic '<service>' field to 'code' value",
  7. "field": "{{{service}}}",
  8. "value": "{{{code}}}"
  9. }
  10. }
  11. ]
  12. }

プロセッサ内でメタデータフィールドにアクセス

プロセッサは、次のメタデータフィールドに名前でアクセスできます:

  • _index
  • _id
  • _routing
  • _dynamic_templates

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "set": {
  6. "description": "Set '_routing' to 'geoip.country_iso_code' value",
  7. "field": "_routing",
  8. "value": "{{{geoip.country_iso_code}}}"
  9. }
  10. }
  11. ],
  12. )
  13. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. set: {
  7. description: "Set '_routing' to 'geoip.country_iso_code' value",
  8. field: '_routing',
  9. value: '{{{geoip.country_iso_code}}}'
  10. }
  11. }
  12. ]
  13. }
  14. )
  15. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. set: {
  6. description: "Set '_routing' to 'geoip.country_iso_code' value",
  7. field: "_routing",
  8. value: "{{{geoip.country_iso_code}}}",
  9. },
  10. },
  11. ],
  12. });
  13. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "set": {
  6. "description": "Set '_routing' to 'geoip.country_iso_code' value",
  7. "field": "_routing",
  8. "value": "{{{geoip.country_iso_code}}}"
  9. }
  10. }
  11. ]
  12. }

Mustacheテンプレートスニペットを使用してメタデータフィールド値にアクセスします。たとえば、{{{_routing}}}はドキュメントのルーティング値を取得します。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "set": {
  6. "description": "Use geo_point dynamic template for address field",
  7. "field": "_dynamic_templates",
  8. "value": {
  9. "address": "geo_point"
  10. }
  11. }
  12. }
  13. ],
  14. )
  15. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. set: {
  7. description: 'Use geo_point dynamic template for address field',
  8. field: '_dynamic_templates',
  9. value: {
  10. address: 'geo_point'
  11. }
  12. }
  13. }
  14. ]
  15. }
  16. )
  17. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. set: {
  6. description: "Use geo_point dynamic template for address field",
  7. field: "_dynamic_templates",
  8. value: {
  9. address: "geo_point",
  10. },
  11. },
  12. },
  13. ],
  14. });
  15. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "set": {
  6. "description": "Use geo_point dynamic template for address field",
  7. "field": "_dynamic_templates",
  8. "value": {
  9. "address": "geo_point"
  10. }
  11. }
  12. }
  13. ]
  14. }

上記のセットプロセッサは、フィールドaddressがまだインデックスのマッピングで定義されていない場合、geo_pointという名前の動的テンプレートを使用するようにESに指示します。このプロセッサは、バルクリクエストで既に定義されているフィールドaddressに対して動的テンプレートをオーバーライドしますが、バルクリクエストで定義された他の動的テンプレートには影響を与えません。

ドキュメントIDを自動生成する場合、プロセッサ内で{{{_id}}}を使用することはできません。Elasticsearchは、インジェスト後に自動生成された_id値を割り当てます。

プロセッサ内でインジェストメタデータにアクセス

インジェストプロセッサは、_ingestキーを使用してインジェストメタデータを追加およびアクセスできます。

ソースおよびメタデータフィールドとは異なり、Elasticsearchはデフォルトでインジェストメタデータフィールドをインデックスしません。Elasticsearchは、_ingestキーで始まるソースフィールドも許可します。データにそのようなソースフィールドが含まれている場合は、_source._ingestを使用してアクセスします。

パイプラインはデフォルトで_ingest.timestampインジェストメタデータフィールドのみを作成します。このフィールドには、Elasticsearchがドキュメントのインデックス作成リクエストを受信した時刻のタイムスタンプが含まれます。_ingest.timestampまたは他のインジェストメタデータフィールドをインデックスするには、setプロセッサを使用します。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "set": {
  6. "description": "Index the ingest timestamp as 'event.ingested'",
  7. "field": "event.ingested",
  8. "value": "{{{_ingest.timestamp}}}"
  9. }
  10. }
  11. ],
  12. )
  13. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. set: {
  7. description: "Index the ingest timestamp as 'event.ingested'",
  8. field: 'event.ingested',
  9. value: '{{{_ingest.timestamp}}}'
  10. }
  11. }
  12. ]
  13. }
  14. )
  15. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. set: {
  6. description: "Index the ingest timestamp as 'event.ingested'",
  7. field: "event.ingested",
  8. value: "{{{_ingest.timestamp}}}",
  9. },
  10. },
  11. ],
  12. });
  13. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "set": {
  6. "description": "Index the ingest timestamp as 'event.ingested'",
  7. "field": "event.ingested",
  8. "value": "{{{_ingest.timestamp}}}"
  9. }
  10. }
  11. ]
  12. }

パイプラインの失敗の処理

パイプラインのプロセッサは順次実行されます。デフォルトでは、これらのプロセッサの1つが失敗したりエラーに遭遇した場合、パイプライン処理は停止します。

プロセッサの失敗を無視し、パイプラインの残りのプロセッサを実行するには、ignore_failuretrueに設定します。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "rename": {
  6. "description": "Rename 'provider' to 'cloud.provider'",
  7. "field": "provider",
  8. "target_field": "cloud.provider",
  9. "ignore_failure": True
  10. }
  11. }
  12. ],
  13. )
  14. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. rename: {
  7. description: "Rename 'provider' to 'cloud.provider'",
  8. field: 'provider',
  9. target_field: 'cloud.provider',
  10. ignore_failure: true
  11. }
  12. }
  13. ]
  14. }
  15. )
  16. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. rename: {
  6. description: "Rename 'provider' to 'cloud.provider'",
  7. field: "provider",
  8. target_field: "cloud.provider",
  9. ignore_failure: true,
  10. },
  11. },
  12. ],
  13. });
  14. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "rename": {
  6. "description": "Rename 'provider' to 'cloud.provider'",
  7. "field": "provider",
  8. "target_field": "cloud.provider",
  9. "ignore_failure": true
  10. }
  11. }
  12. ]
  13. }
  1. #### Python
  2. ``````python
  3. resp = client.ingest.put_pipeline(
  4. id="my-pipeline",
  5. processors=[
  6. {
  7. "rename": {
  8. "description": "Rename 'provider' to 'cloud.provider'",
  9. "field": "provider",
  10. "target_field": "cloud.provider",
  11. "on_failure": [
  12. {
  13. "set": {
  14. "description": "Set 'error.message'",
  15. "field": "error.message",
  16. "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  17. "override": False
  18. }
  19. }
  20. ]
  21. }
  22. }
  23. ],
  24. )
  25. print(resp)
  26. `

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. rename: {
  7. description: "Rename 'provider' to 'cloud.provider'",
  8. field: 'provider',
  9. target_field: 'cloud.provider',
  10. on_failure: [
  11. {
  12. set: {
  13. description: "Set 'error.message'",
  14. field: 'error.message',
  15. value: "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  16. override: false
  17. }
  18. }
  19. ]
  20. }
  21. }
  22. ]
  23. }
  24. )
  25. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. rename: {
  6. description: "Rename 'provider' to 'cloud.provider'",
  7. field: "provider",
  8. target_field: "cloud.provider",
  9. on_failure: [
  10. {
  11. set: {
  12. description: "Set 'error.message'",
  13. field: "error.message",
  14. value:
  15. "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  16. override: false,
  17. },
  18. },
  19. ],
  20. },
  21. },
  22. ],
  23. });
  24. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "rename": {
  6. "description": "Rename 'provider' to 'cloud.provider'",
  7. "field": "provider",
  8. "target_field": "cloud.provider",
  9. "on_failure": [
  10. {
  11. "set": {
  12. "description": "Set 'error.message'",
  13. "field": "error.message",
  14. "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  15. "override": false
  16. }
  17. }
  18. ]
  19. }
  20. }
  21. ]
  22. }

ネストされたエラーハンドリングのために、on_failureプロセッサのリストをネストします。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "rename": {
  6. "description": "Rename 'provider' to 'cloud.provider'",
  7. "field": "provider",
  8. "target_field": "cloud.provider",
  9. "on_failure": [
  10. {
  11. "set": {
  12. "description": "Set 'error.message'",
  13. "field": "error.message",
  14. "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  15. "override": False,
  16. "on_failure": [
  17. {
  18. "set": {
  19. "description": "Set 'error.message.multi'",
  20. "field": "error.message.multi",
  21. "value": "Document encountered multiple ingest errors",
  22. "override": True
  23. }
  24. }
  25. ]
  26. }
  27. }
  28. ]
  29. }
  30. }
  31. ],
  32. )
  33. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. rename: {
  7. description: "Rename 'provider' to 'cloud.provider'",
  8. field: 'provider',
  9. target_field: 'cloud.provider',
  10. on_failure: [
  11. {
  12. set: {
  13. description: "Set 'error.message'",
  14. field: 'error.message',
  15. value: "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  16. override: false,
  17. on_failure: [
  18. {
  19. set: {
  20. description: "Set 'error.message.multi'",
  21. field: 'error.message.multi',
  22. value: 'Document encountered multiple ingest errors',
  23. override: true
  24. }
  25. }
  26. ]
  27. }
  28. }
  29. ]
  30. }
  31. }
  32. ]
  33. }
  34. )
  35. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. rename: {
  6. description: "Rename 'provider' to 'cloud.provider'",
  7. field: "provider",
  8. target_field: "cloud.provider",
  9. on_failure: [
  10. {
  11. set: {
  12. description: "Set 'error.message'",
  13. field: "error.message",
  14. value:
  15. "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  16. override: false,
  17. on_failure: [
  18. {
  19. set: {
  20. description: "Set 'error.message.multi'",
  21. field: "error.message.multi",
  22. value: "Document encountered multiple ingest errors",
  23. override: true,
  24. },
  25. },
  26. ],
  27. },
  28. },
  29. ],
  30. },
  31. },
  32. ],
  33. });
  34. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "rename": {
  6. "description": "Rename 'provider' to 'cloud.provider'",
  7. "field": "provider",
  8. "target_field": "cloud.provider",
  9. "on_failure": [
  10. {
  11. "set": {
  12. "description": "Set 'error.message'",
  13. "field": "error.message",
  14. "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  15. "override": false,
  16. "on_failure": [
  17. {
  18. "set": {
  19. "description": "Set 'error.message.multi'",
  20. "field": "error.message.multi",
  21. "value": "Document encountered multiple ingest errors",
  22. "override": true
  23. }
  24. }
  25. ]
  26. }
  27. }
  28. ]
  29. }
  30. }
  31. ]
  32. }

パイプラインにon_failureを指定することもできます。on_failure値のないプロセッサが失敗した場合、Elasticsearchはこのパイプラインレベルのパラメータをフォールバックとして使用します。Elasticsearchは、パイプラインの残りのプロセッサを実行しようとはしません。

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [ ... ],
  4. "on_failure": [
  5. {
  6. "set": {
  7. "description": "Index document to 'failed-<index>'",
  8. "field": "_index",
  9. "value": "failed-{{{ _index }}}"
  10. }
  11. }
  12. ]
  13. }

パイプラインの失敗に関する追加情報は、ドキュメントメタデータフィールドon_failure_messageon_failure_processor_typeon_failure_processor_tag、およびon_failure_pipelineで利用できる場合があります。これらのフィールドは、on_failureブロック内からのみアクセス可能です。

次の例では、メタデータフィールドを使用して、ドキュメント内にパイプラインの失敗に関する情報を含めます。

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [ ... ],
  4. "on_failure": [
  5. {
  6. "set": {
  7. "description": "Record error information",
  8. "field": "error_information",
  9. "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
  10. }
  11. }
  12. ]
  13. }

プロセッサを条件付きで実行

各プロセッサは、Painlessスクリプトとして記述されたオプションのif条件をサポートしています。提供された場合、プロセッサはif条件がtrueのときのみ実行されます。

  1. #### Python
  2. ``````python
  3. resp = client.ingest.put_pipeline(
  4. id="my-pipeline",
  5. processors=[
  6. {
  7. "drop": {
  8. "description": "Drop documents with 'network.name' of 'Guest'",
  9. "if": "ctx?.network?.name == 'Guest'"
  10. }
  11. }
  12. ],
  13. )
  14. print(resp)
  15. `

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. drop: {
  7. description: "Drop documents with 'network.name' of 'Guest'",
  8. if: "ctx?.network?.name == 'Guest'"
  9. }
  10. }
  11. ]
  12. }
  13. )
  14. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. drop: {
  6. description: "Drop documents with 'network.name' of 'Guest'",
  7. if: "ctx?.network?.name == 'Guest'",
  8. },
  9. },
  10. ],
  11. });
  12. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "drop": {
  6. "description": "Drop documents with 'network.name' of 'Guest'",
  7. "if": "ctx?.network?.name == 'Guest'"
  8. }
  9. }
  10. ]
  11. }

script.painless.regex.enabledクラスタ設定が有効になっている場合、if条件スクリプトで正規表現を使用できます。サポートされている構文については、Painless正規表現を参照してください。

可能であれば、正規表現の使用を避けてください。高コストの正規表現はインデックス作成速度を遅くする可能性があります。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "set": {
  6. "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
  7. "if": "ctx.url?.scheme =~ /^http[^s]/",
  8. "field": "url.insecure",
  9. "value": True
  10. }
  11. }
  12. ],
  13. )
  14. print(resp)

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'my-pipeline',
  3. body: {
  4. processors: [
  5. {
  6. set: {
  7. description: "If 'url.scheme' is 'http', set 'url.insecure' to true",
  8. if: 'ctx.url?.scheme =~ /^http[^s]/',
  9. field: 'url.insecure',
  10. value: true
  11. }
  12. }
  13. ]
  14. }
  15. )
  16. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. set: {
  6. description: "If 'url.scheme' is 'http', set 'url.insecure' to true",
  7. if: "ctx.url?.scheme =~ /^http[^s]/",
  8. field: "url.insecure",
  9. value: true,
  10. },
  11. },
  12. ],
  13. });
  14. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "set": {
  6. "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
  7. "if": "ctx.url?.scheme =~ /^http[^s]/",
  8. "field": "url.insecure",
  9. "value": true
  10. }
  11. }
  12. ]
  13. }
  1. 可能であれば、複雑または高コストの`````if`````条件スクリプトの使用を避けてください。高コストの条件スクリプトはインデックス作成速度を遅くする可能性があります。
  2. #### Python
  3. ``````python
  4. resp = client.ingest.put_pipeline(
  5. id="my-pipeline",
  6. processors=[
  7. {
  8. "drop": {
  9. "description": "Drop documents that don't contain 'prod' tag",
  10. "if": "\n Collection tags = ctx.tags;\n if(tags != null){\n for (String tag : tags) {\n if (tag.toLowerCase().contains('prod')) {\n return false;\n }\n }\n }\n return true;\n "
  11. }
  12. }
  13. ],
  14. )
  15. print(resp)
  16. `

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. drop: {
  6. description: "Drop documents that don't contain 'prod' tag",
  7. if: "\n Collection tags = ctx.tags;\n if(tags != null){\n for (String tag : tags) {\n if (tag.toLowerCase().contains('prod')) {\n return false;\n }\n }\n }\n return true;\n ",
  8. },
  9. },
  10. ],
  11. });
  12. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "drop": {
  6. "description": "Drop documents that don't contain 'prod' tag",
  7. "if": """
  8. Collection tags = ctx.tags;
  9. if(tags != null){
  10. for (String tag : tags) {
  11. if (tag.toLowerCase().contains('prod')) {
  12. return false;
  13. }
  14. }
  15. }
  16. return true;
  17. """
  18. }
  19. }
  20. ]
  21. }

if条件としてストアドスクリプトを指定することもできます。

Python

  1. resp = client.put_script(
  2. id="my-prod-tag-script",
  3. script={
  4. "lang": "painless",
  5. "source": "\n Collection tags = ctx.tags;\n if(tags != null){\n for (String tag : tags) {\n if (tag.toLowerCase().contains('prod')) {\n return false;\n }\n }\n }\n return true;\n "
  6. },
  7. )
  8. print(resp)
  9. resp1 = client.ingest.put_pipeline(
  10. id="my-pipeline",
  11. processors=[
  12. {
  13. "drop": {
  14. "description": "Drop documents that don't contain 'prod' tag",
  15. "if": {
  16. "id": "my-prod-tag-script"
  17. }
  18. }
  19. }
  20. ],
  21. )
  22. print(resp1)

Js

  1. const response = await client.putScript({
  2. id: "my-prod-tag-script",
  3. script: {
  4. lang: "painless",
  5. source:
  6. "\n Collection tags = ctx.tags;\n if(tags != null){\n for (String tag : tags) {\n if (tag.toLowerCase().contains('prod')) {\n return false;\n }\n }\n }\n return true;\n ",
  7. },
  8. });
  9. console.log(response);
  10. const response1 = await client.ingest.putPipeline({
  11. id: "my-pipeline",
  12. processors: [
  13. {
  14. drop: {
  15. description: "Drop documents that don't contain 'prod' tag",
  16. if: {
  17. id: "my-prod-tag-script",
  18. },
  19. },
  20. },
  21. ],
  22. });
  23. console.log(response1);

Console

  1. PUT _scripts/my-prod-tag-script
  2. {
  3. "script": {
  4. "lang": "painless",
  5. "source": """
  6. Collection tags = ctx.tags;
  7. if(tags != null){
  8. for (String tag : tags) {
  9. if (tag.toLowerCase().contains('prod')) {
  10. return false;
  11. }
  12. }
  13. }
  14. return true;
  15. """
  16. }
  17. }
  18. PUT _ingest/pipeline/my-pipeline
  19. {
  20. "processors": [
  21. {
  22. "drop": {
  23. "description": "Drop documents that don't contain 'prod' tag",
  24. "if": { "id": "my-prod-tag-script" }
  25. }
  26. }
  27. ]
  28. }

受信ドキュメントにはオブジェクトフィールドが含まれていることがよくあります。プロセッサスクリプトが親オブジェクトが存在しないフィールドにアクセスしようとすると、ElasticsearchはNullPointerExceptionを返します。これらの例外を回避するには、https://www.elastic.co/guide/en/elasticsearch/painless/8.15/painless-operators-reference.html#null-safe-operatorのnull安全演算子(?.など)を使用し、スクリプトをnull安全に記述します。

たとえば、ctx.network?.name.equalsIgnoreCase('Guest')はnull安全ではありません。ctx.network?.nameはnullを返す可能性があります。スクリプトを'Guest'.equalsIgnoreCase(ctx.network?.name)として書き直します。これは、Guestが常に非nullであるため、null安全です。

スクリプトをnull安全に書き直せない場合は、明示的なnullチェックを含めてください。

Python

  1. resp = client.ingest.put_pipeline(
  2. id="my-pipeline",
  3. processors=[
  4. {
  5. "drop": {
  6. "description": "Drop documents that contain 'network.name' of 'Guest'",
  7. "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
  8. }
  9. }
  10. ],
  11. )
  12. print(resp)

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "my-pipeline",
  3. processors: [
  4. {
  5. drop: {
  6. description: "Drop documents that contain 'network.name' of 'Guest'",
  7. if: "ctx.network?.name != null && ctx.network.name.contains('Guest')",
  8. },
  9. },
  10. ],
  11. });
  12. console.log(response);

Console

  1. PUT _ingest/pipeline/my-pipeline
  2. {
  3. "processors": [
  4. {
  5. "drop": {
  6. "description": "Drop documents that contain 'network.name' of 'Guest'",
  7. "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
  8. }
  9. }
  10. ]
  11. }

条件付きでパイプラインを適用

  1. #### Python
  2. ``````python
  3. resp = client.ingest.put_pipeline(
  4. id="one-pipeline-to-rule-them-all",
  5. processors=[
  6. {
  7. "pipeline": {
  8. "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
  9. "if": "ctx.service?.name == 'apache_httpd'",
  10. "name": "httpd_pipeline"
  11. }
  12. },
  13. {
  14. "pipeline": {
  15. "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
  16. "if": "ctx.service?.name == 'syslog'",
  17. "name": "syslog_pipeline"
  18. }
  19. },
  20. {
  21. "fail": {
  22. "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
  23. "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
  24. "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
  25. }
  26. }
  27. ],
  28. )
  29. print(resp)
  30. `

Ruby

  1. response = client.ingest.put_pipeline(
  2. id: 'one-pipeline-to-rule-them-all',
  3. body: {
  4. processors: [
  5. {
  6. pipeline: {
  7. description: "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
  8. if: "ctx.service?.name == 'apache_httpd'",
  9. name: 'httpd_pipeline'
  10. }
  11. },
  12. {
  13. pipeline: {
  14. description: "If 'service.name' is 'syslog', use 'syslog_pipeline'",
  15. if: "ctx.service?.name == 'syslog'",
  16. name: 'syslog_pipeline'
  17. }
  18. },
  19. {
  20. fail: {
  21. description: "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
  22. if: "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
  23. message: 'This pipeline requires service.name to be either `syslog` or `apache_httpd`'
  24. }
  25. }
  26. ]
  27. }
  28. )
  29. puts response

Js

  1. const response = await client.ingest.putPipeline({
  2. id: "one-pipeline-to-rule-them-all",
  3. processors: [
  4. {
  5. pipeline: {
  6. description:
  7. "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
  8. if: "ctx.service?.name == 'apache_httpd'",
  9. name: "httpd_pipeline",
  10. },
  11. },
  12. {
  13. pipeline: {
  14. description: "If 'service.name' is 'syslog', use 'syslog_pipeline'",
  15. if: "ctx.service?.name == 'syslog'",
  16. name: "syslog_pipeline",
  17. },
  18. },
  19. {
  20. fail: {
  21. description:
  22. "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
  23. if: "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
  24. message:
  25. "This pipeline requires service.name to be either `syslog` or `apache_httpd`",
  26. },
  27. },
  28. ],
  29. });
  30. console.log(response);

Console

  1. PUT _ingest/pipeline/one-pipeline-to-rule-them-all
  2. {
  3. "processors": [
  4. {
  5. "pipeline": {
  6. "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
  7. "if": "ctx.service?.name == 'apache_httpd'",
  8. "name": "httpd_pipeline"
  9. }
  10. },
  11. {
  12. "pipeline": {
  13. "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
  14. "if": "ctx.service?.name == 'syslog'",
  15. "name": "syslog_pipeline"
  16. }
  17. },
  18. {
  19. "fail": {
  20. "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
  21. "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
  22. "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
  23. }
  24. }
  25. ]
  26. }

パイプライン使用統計の取得

node stats APIを使用して、グローバルおよびパイプラインごとのインジェスト統計を取得します。これらの統計を使用して、最も頻繁に実行されるパイプラインや、処理に最も時間がかかるパイプラインを特定します。

Python

  1. resp = client.nodes.stats(
  2. metric="ingest",
  3. filter_path="nodes.*.ingest",
  4. )
  5. print(resp)

Ruby

  1. response = client.nodes.stats(
  2. metric: 'ingest',
  3. filter_path: 'nodes.*.ingest'
  4. )
  5. puts response

Js

  1. const response = await client.nodes.stats({
  2. metric: "ingest",
  3. filter_path: "nodes.*.ingest",
  4. });
  5. console.log(response);

Console

  1. GET _nodes/stats/ingest?filter_path=nodes.*.ingest