パイプライン集約

パイプライン集約は、ドキュメントセットからではなく、他の集約から生成された出力に基づいて動作し、出力ツリーに情報を追加します。さまざまなタイプのパイプライン集約があり、それぞれが他の集約から異なる情報を計算しますが、これらのタイプは2つのファミリーに分けることができます:

  • 親集約の出力を提供され、新しいバケットや既存のバケットに追加する新しい集約を計算できるパイプライン集約のファミリー。
  • 兄弟
  • 兄弟集約の出力を提供され、兄弟集約と同じレベルにある新しい集約を計算できるパイプライン集約。

パイプライン集約は、buckets_pathパラメータを使用して、計算に必要な集約を参照できます。このパラメータは、必要なメトリックへのパスを示します。これらのパスを定義するための構文は、以下のbuckets_path 構文セクションにあります。

パイプライン集約はサブ集約を持つことはできませんが、タイプによってはbuckets_pathで別のパイプラインを参照でき、パイプライン集約を連鎖させることができます。たとえば、2つの導関数を連鎖させて2回目の導関数(すなわち、導関数の導関数)を計算できます。

パイプライン集約は出力にのみ追加するため、パイプライン集約を連鎖させると、各パイプライン集約の出力が最終出力に含まれます。

バケットパス構文

ほとんどのパイプライン集約は、他の集約を入力として必要とします。入力集約は、特定の形式に従うbuckets_pathパラメータを介して定義されます。

Ebnf

  1. AGG_SEPARATOR = `>` ;
  2. METRIC_SEPARATOR = `.` ;
  3. AGG_NAME = <the name of the aggregation> ;
  4. METRIC = <the name of the metric (in case of multi-value metrics aggregation)> ;
  5. MULTIBUCKET_KEY = `[<KEY_NAME>]`
  6. PATH = <AGG_NAME><MULTIBUCKET_KEY>? (<AGG_SEPARATOR>, <AGG_NAME> )* ( <METRIC_SEPARATOR>, <METRIC> ) ;

たとえば、パス"my_bucket>my_stats.avg"は、"my_bucket"バケット集約に含まれる"my_stats"メトリックのavg値にパスします。

以下は他のいくつかの例です:

  • multi_bucket["foo"]>single_bucket>multi_metric.avgは、"multi_bucket"マルチバケット集約の"foo"バケット内の単一バケット"single_bucket"の下の"multi_metric"集約のavgメトリックに行きます。
  • agg1["foo"]._countは、マルチバケット集約"multi_bucket""foo"バケットの_countメトリックを取得します。

パスはパイプライン集約の位置から相対的であり、絶対パスではなく、パスは集約ツリーを「上」に戻ることはできません。たとえば、この導関数はdate_histogramの内部に埋め込まれており、「兄弟」メトリック"the_sum"を参照しています:

Python

  1. resp = client.search(
  2. aggs={
  3. "my_date_histo": {
  4. "date_histogram": {
  5. "field": "timestamp",
  6. "calendar_interval": "day"
  7. },
  8. "aggs": {
  9. "the_sum": {
  10. "sum": {
  11. "field": "lemmings"
  12. }
  13. },
  14. "the_deriv": {
  15. "derivative": {
  16. "buckets_path": "the_sum"
  17. }
  18. }
  19. }
  20. }
  21. },
  22. )
  23. print(resp)

Ruby

  1. response = client.search(
  2. body: {
  3. aggregations: {
  4. my_date_histo: {
  5. date_histogram: {
  6. field: 'timestamp',
  7. calendar_interval: 'day'
  8. },
  9. aggregations: {
  10. the_sum: {
  11. sum: {
  12. field: 'lemmings'
  13. }
  14. },
  15. the_deriv: {
  16. derivative: {
  17. buckets_path: 'the_sum'
  18. }
  19. }
  20. }
  21. }
  22. }
  23. }
  24. )
  25. puts response

Js

  1. const response = await client.search({
  2. aggs: {
  3. my_date_histo: {
  4. date_histogram: {
  5. field: "timestamp",
  6. calendar_interval: "day",
  7. },
  8. aggs: {
  9. the_sum: {
  10. sum: {
  11. field: "lemmings",
  12. },
  13. },
  14. the_deriv: {
  15. derivative: {
  16. buckets_path: "the_sum",
  17. },
  18. },
  19. },
  20. },
  21. },
  22. });
  23. console.log(response);

コンソール

  1. POST /_search
  2. {
  3. "aggs": {
  4. "my_date_histo": {
  5. "date_histogram": {
  6. "field": "timestamp",
  7. "calendar_interval": "day"
  8. },
  9. "aggs": {
  10. "the_sum": {
  11. "sum": { "field": "lemmings" }
  12. },
  13. "the_deriv": {
  14. "derivative": { "buckets_path": "the_sum" }
  15. }
  16. }
  17. }
  18. }
  19. }
メトリックは"the_sum"と呼ばれます
buckets_pathは相対パス"the_sum"を介してメトリックを参照します
  1. #### Python
  2. ``````python
  3. resp = client.search(
  4. aggs={
  5. "sales_per_month": {
  6. "date_histogram": {
  7. "field": "date",
  8. "calendar_interval": "month"
  9. },
  10. "aggs": {
  11. "sales": {
  12. "sum": {
  13. "field": "price"
  14. }
  15. }
  16. }
  17. },
  18. "max_monthly_sales": {
  19. "max_bucket": {
  20. "buckets_path": "sales_per_month>sales"
  21. }
  22. }
  23. },
  24. )
  25. print(resp)
  26. `

Ruby

  1. response = client.search(
  2. body: {
  3. aggregations: {
  4. sales_per_month: {
  5. date_histogram: {
  6. field: 'date',
  7. calendar_interval: 'month'
  8. },
  9. aggregations: {
  10. sales: {
  11. sum: {
  12. field: 'price'
  13. }
  14. }
  15. }
  16. },
  17. max_monthly_sales: {
  18. max_bucket: {
  19. buckets_path: 'sales_per_month>sales'
  20. }
  21. }
  22. }
  23. }
  24. )
  25. puts response

Js

  1. const response = await client.search({
  2. aggs: {
  3. sales_per_month: {
  4. date_histogram: {
  5. field: "date",
  6. calendar_interval: "month",
  7. },
  8. aggs: {
  9. sales: {
  10. sum: {
  11. field: "price",
  12. },
  13. },
  14. },
  15. },
  16. max_monthly_sales: {
  17. max_bucket: {
  18. buckets_path: "sales_per_month>sales",
  19. },
  20. },
  21. },
  22. });
  23. console.log(response);

コンソール

  1. POST /_search
  2. {
  3. "aggs": {
  4. "sales_per_month": {
  5. "date_histogram": {
  6. "field": "date",
  7. "calendar_interval": "month"
  8. },
  9. "aggs": {
  10. "sales": {
  11. "sum": {
  12. "field": "price"
  13. }
  14. }
  15. }
  16. },
  17. "max_monthly_sales": {
  18. "max_bucket": {
  19. "buckets_path": "sales_per_month>sales"
  20. }
  21. }
  22. }
  23. }
buckets_pathは、このmax_bucket集約に対して、sales_per_month日付ヒストグラム内のsales集約の最大値を取得したいことを指示します。

兄弟パイプラインaggがterms集約のようなマルチバケット集約を参照する場合、特定のキーを選択するオプションもあります。たとえば、bucket_scriptは、計算を実行するために2つの特定のバケット(それぞれのバケットキーを介して)を選択できます:

Python

  1. resp = client.search(
  2. aggs={
  3. "sales_per_month": {
  4. "date_histogram": {
  5. "field": "date",
  6. "calendar_interval": "month"
  7. },
  8. "aggs": {
  9. "sale_type": {
  10. "terms": {
  11. "field": "type"
  12. },
  13. "aggs": {
  14. "sales": {
  15. "sum": {
  16. "field": "price"
  17. }
  18. }
  19. }
  20. },
  21. "hat_vs_bag_ratio": {
  22. "bucket_script": {
  23. "buckets_path": {
  24. "hats": "sale_type['hat']>sales",
  25. "bags": "sale_type['bag']>sales"
  26. },
  27. "script": "params.hats / params.bags"
  28. }
  29. }
  30. }
  31. }
  32. },
  33. )
  34. print(resp)

Ruby

  1. response = client.search(
  2. body: {
  3. aggregations: {
  4. sales_per_month: {
  5. date_histogram: {
  6. field: 'date',
  7. calendar_interval: 'month'
  8. },
  9. aggregations: {
  10. sale_type: {
  11. terms: {
  12. field: 'type'
  13. },
  14. aggregations: {
  15. sales: {
  16. sum: {
  17. field: 'price'
  18. }
  19. }
  20. }
  21. },
  22. hat_vs_bag_ratio: {
  23. bucket_script: {
  24. buckets_path: {
  25. hats: "sale_type['hat']>sales",
  26. bags: "sale_type['bag']>sales"
  27. },
  28. script: 'params.hats / params.bags'
  29. }
  30. }
  31. }
  32. }
  33. }
  34. }
  35. )
  36. puts response

Js

  1. const response = await client.search({
  2. aggs: {
  3. sales_per_month: {
  4. date_histogram: {
  5. field: "date",
  6. calendar_interval: "month",
  7. },
  8. aggs: {
  9. sale_type: {
  10. terms: {
  11. field: "type",
  12. },
  13. aggs: {
  14. sales: {
  15. sum: {
  16. field: "price",
  17. },
  18. },
  19. },
  20. },
  21. hat_vs_bag_ratio: {
  22. bucket_script: {
  23. buckets_path: {
  24. hats: "sale_type['hat']>sales",
  25. bags: "sale_type['bag']>sales",
  26. },
  27. script: "params.hats / params.bags",
  28. },
  29. },
  30. },
  31. },
  32. },
  33. });
  34. console.log(response);

コンソール

  1. POST /_search
  2. {
  3. "aggs": {
  4. "sales_per_month": {
  5. "date_histogram": {
  6. "field": "date",
  7. "calendar_interval": "month"
  8. },
  9. "aggs": {
  10. "sale_type": {
  11. "terms": {
  12. "field": "type"
  13. },
  14. "aggs": {
  15. "sales": {
  16. "sum": {
  17. "field": "price"
  18. }
  19. }
  20. }
  21. },
  22. "hat_vs_bag_ratio": {
  23. "bucket_script": {
  24. "buckets_path": {
  25. "hats": "sale_type['hat']>sales",
  26. "bags": "sale_type['bag']>sales"
  27. },
  28. "script": "params.hats / params.bags"
  29. }
  30. }
  31. }
  32. }
  33. }
  34. }
buckets_pathは、スクリプトで特に使用するために帽子とバッグのバケット(['hat']/['bag']` を介して)を選択します、
代わりにsale_type集約からすべてのバケットを取得するのではなく

特別なパス

メトリックへのパスの代わりに、buckets_pathは特別な"_count"パスを使用できます。これは、パイプライン集約にドキュメント数を入力として使用するよう指示します。たとえば、導関数は特定のメトリックの代わりに各バケットのドキュメント数に基づいて計算できます:

Python

  1. resp = client.search(
  2. aggs={
  3. "my_date_histo": {
  4. "date_histogram": {
  5. "field": "timestamp",
  6. "calendar_interval": "day"
  7. },
  8. "aggs": {
  9. "the_deriv": {
  10. "derivative": {
  11. "buckets_path": "_count"
  12. }
  13. }
  14. }
  15. }
  16. },
  17. )
  18. print(resp)

Ruby

  1. response = client.search(
  2. body: {
  3. aggregations: {
  4. my_date_histo: {
  5. date_histogram: {
  6. field: 'timestamp',
  7. calendar_interval: 'day'
  8. },
  9. aggregations: {
  10. the_deriv: {
  11. derivative: {
  12. buckets_path: '_count'
  13. }
  14. }
  15. }
  16. }
  17. }
  18. }
  19. )
  20. puts response

Js

  1. const response = await client.search({
  2. aggs: {
  3. my_date_histo: {
  4. date_histogram: {
  5. field: "timestamp",
  6. calendar_interval: "day",
  7. },
  8. aggs: {
  9. the_deriv: {
  10. derivative: {
  11. buckets_path: "_count",
  12. },
  13. },
  14. },
  15. },
  16. },
  17. });
  18. console.log(response);

コンソール

  1. POST /_search
  2. {
  3. "aggs": {
  4. "my_date_histo": {
  5. "date_histogram": {
  6. "field": "timestamp",
  7. "calendar_interval": "day"
  8. },
  9. "aggs": {
  10. "the_deriv": {
  11. "derivative": { "buckets_path": "_count" }
  12. }
  13. }
  14. }
  15. }
  16. }
メトリック名の代わりに_countを使用することで、ヒストグラム内のドキュメント数の導関数を計算できます
  1. #### Python
  2. ``````python
  3. resp = client.search(
  4. index="sales",
  5. size=0,
  6. aggs={
  7. "histo": {
  8. "date_histogram": {
  9. "field": "date",
  10. "calendar_interval": "day"
  11. },
  12. "aggs": {
  13. "categories": {
  14. "terms": {
  15. "field": "category"
  16. }
  17. },
  18. "min_bucket_selector": {
  19. "bucket_selector": {
  20. "buckets_path": {
  21. "count": "categories._bucket_count"
  22. },
  23. "script": {
  24. "source": "params.count != 0"
  25. }
  26. }
  27. }
  28. }
  29. }
  30. },
  31. )
  32. print(resp)
  33. `

Ruby

  1. response = client.search(
  2. index: 'sales',
  3. body: {
  4. size: 0,
  5. aggregations: {
  6. histo: {
  7. date_histogram: {
  8. field: 'date',
  9. calendar_interval: 'day'
  10. },
  11. aggregations: {
  12. categories: {
  13. terms: {
  14. field: 'category'
  15. }
  16. },
  17. min_bucket_selector: {
  18. bucket_selector: {
  19. buckets_path: {
  20. count: 'categories._bucket_count'
  21. },
  22. script: {
  23. source: 'params.count != 0'
  24. }
  25. }
  26. }
  27. }
  28. }
  29. }
  30. }
  31. )
  32. puts response

Js

  1. const response = await client.search({
  2. index: "sales",
  3. size: 0,
  4. aggs: {
  5. histo: {
  6. date_histogram: {
  7. field: "date",
  8. calendar_interval: "day",
  9. },
  10. aggs: {
  11. categories: {
  12. terms: {
  13. field: "category",
  14. },
  15. },
  16. min_bucket_selector: {
  17. bucket_selector: {
  18. buckets_path: {
  19. count: "categories._bucket_count",
  20. },
  21. script: {
  22. source: "params.count != 0",
  23. },
  24. },
  25. },
  26. },
  27. },
  28. },
  29. });
  30. console.log(response);

コンソール

  1. POST /sales/_search
  2. {
  3. "size": 0,
  4. "aggs": {
  5. "histo": {
  6. "date_histogram": {
  7. "field": "date",
  8. "calendar_interval": "day"
  9. },
  10. "aggs": {
  11. "categories": {
  12. "terms": {
  13. "field": "category"
  14. }
  15. },
  16. "min_bucket_selector": {
  17. "bucket_selector": {
  18. "buckets_path": {
  19. "count": "categories._bucket_count"
  20. },
  21. "script": {
  22. "source": "params.count != 0"
  23. }
  24. }
  25. }
  26. }
  27. }
  28. }
  29. }
メトリック名の代わりに_bucket_countを使用することで、categories集約のためにバケットが含まれていないhistoバケットをフィルタリングできます

集約名のドットの取り扱い

ドットを含む集約やメトリックに対処するために、代替構文がサポートされています。たとえば、99.9th パーセンタイル。このメトリックは次のように参照される場合があります:

Js

  1. "buckets_path": "my_percentile[99.9]"

データのギャップの取り扱い

現実世界のデータはしばしばノイズが多く、時にはギャップ—データが単に存在しない場所を含むことがあります。これはさまざまな理由で発生する可能性があり、最も一般的な理由は次のとおりです:

  • バケットに入るドキュメントが必要なフィールドを含まない
  • 1つ以上のバケットに対してクエリに一致するドキュメントがない
  • 計算されているメトリックが値を生成できない、他の依存バケットが値を欠いているためです。一部のパイプライン集約には、満たすべき特定の要件があります(例:導関数は前の値がないため最初の値のメトリックを計算できない、HoltWinters移動平均は計算を開始するために「ウォームアップ」データが必要など)。

ギャップポリシーは、「ギャップ」または欠落データが発生したときにパイプライン集約に望ましい動作を通知するメカニズムです。すべてのパイプライン集約はgap_policyパラメータを受け入れます。現在、選択できるギャップポリシーは2つあります:

  • スキップ
  • このオプションは、欠落データをバケットが存在しないかのように扱います。バケットをスキップし、次に利用可能な値を使用して計算を続けます。
  • ゼロを挿入
  • このオプションは、欠落値をゼロ(0)で置き換え、パイプライン集約計算は通常通り進行します。
  • 値を保持
  • このオプションはスキップに似ていますが、メトリックが非null、非NaNの値を提供する場合はこの値が使用され、それ以外の場合は空のバケットがスキップされます。