変換の例

これらの例は、データから有用な洞察を導き出すために変換を使用する方法を示しています。すべての例は、Kibanaのサンプルデータセットのいずれかを使用しています。より詳細なステップバイステップの例については、チュートリアル:eCommerceサンプルデータの変換を参照してください。

最高の顧客を見つける

この例では、eCommerceの注文サンプルデータセットを使用して、仮想のウェブショップで最も多く支出した顧客を見つけます。pivotタイプの変換を使用して、宛先インデックスには、注文数、注文の合計金額、ユニークな商品の数、注文ごとの平均価格、および各顧客の注文された商品の合計が含まれるようにします。

Kibanaでの変換を使用して最高の顧客を見つける

また、プレビュートランスフォームトランスフォーム作成APIを使用することもできます。

APIの例

Python

  1. resp = client.transform.preview_transform(
  2. source={
  3. "index": "kibana_sample_data_ecommerce"
  4. },
  5. dest={
  6. "index": "sample_ecommerce_orders_by_customer"
  7. },
  8. pivot={
  9. "group_by": {
  10. "user": {
  11. "terms": {
  12. "field": "user"
  13. }
  14. },
  15. "customer_id": {
  16. "terms": {
  17. "field": "customer_id"
  18. }
  19. }
  20. },
  21. "aggregations": {
  22. "order_count": {
  23. "value_count": {
  24. "field": "order_id"
  25. }
  26. },
  27. "total_order_amt": {
  28. "sum": {
  29. "field": "taxful_total_price"
  30. }
  31. },
  32. "avg_amt_per_order": {
  33. "avg": {
  34. "field": "taxful_total_price"
  35. }
  36. },
  37. "avg_unique_products_per_order": {
  38. "avg": {
  39. "field": "total_unique_products"
  40. }
  41. },
  42. "total_unique_products": {
  43. "cardinality": {
  44. "field": "products.product_id"
  45. }
  46. }
  47. }
  48. },
  49. )
  50. print(resp)

Js

  1. const response = await client.transform.previewTransform({
  2. source: {
  3. index: "kibana_sample_data_ecommerce",
  4. },
  5. dest: {
  6. index: "sample_ecommerce_orders_by_customer",
  7. },
  8. pivot: {
  9. group_by: {
  10. user: {
  11. terms: {
  12. field: "user",
  13. },
  14. },
  15. customer_id: {
  16. terms: {
  17. field: "customer_id",
  18. },
  19. },
  20. },
  21. aggregations: {
  22. order_count: {
  23. value_count: {
  24. field: "order_id",
  25. },
  26. },
  27. total_order_amt: {
  28. sum: {
  29. field: "taxful_total_price",
  30. },
  31. },
  32. avg_amt_per_order: {
  33. avg: {
  34. field: "taxful_total_price",
  35. },
  36. },
  37. avg_unique_products_per_order: {
  38. avg: {
  39. field: "total_unique_products",
  40. },
  41. },
  42. total_unique_products: {
  43. cardinality: {
  44. field: "products.product_id",
  45. },
  46. },
  47. },
  48. },
  49. });
  50. console.log(response);

コンソール

  1. POST _transform/_preview
  2. {
  3. "source": {
  4. "index": "kibana_sample_data_ecommerce"
  5. },
  6. "dest" : {
  7. "index" : "sample_ecommerce_orders_by_customer"
  8. },
  9. "pivot": {
  10. "group_by": {
  11. "user": { "terms": { "field": "user" }},
  12. "customer_id": { "terms": { "field": "customer_id" }}
  13. },
  14. "aggregations": {
  15. "order_count": { "value_count": { "field": "order_id" }},
  16. "total_order_amt": { "sum": { "field": "taxful_total_price" }},
  17. "avg_amt_per_order": { "avg": { "field": "taxful_total_price" }},
  18. "avg_unique_products_per_order": { "avg": { "field": "total_unique_products" }},
  19. "total_unique_products": { "cardinality": { "field": "products.product_id" }}
  20. }
  21. }
  22. }
変換の宛先インデックス。_previewによって無視されます。
2つのgroup_byフィールドが選択されます。これは、変換がusercustomer_idの組み合わせごとにユニークな行を含むことを意味します。このデータセット内では、これらのフィールドはどちらもユニークです。両方を変換に含めることで、最終結果により多くのコンテキストが提供されます。

上記の例では、ピボットオブジェクトの可読性を高めるために、圧縮JSONフォーマットが使用されています。

プレビュートランスフォームAPIを使用すると、サンプル値で埋められた変換のレイアウトを事前に確認できます。たとえば:

Js

  1. {
  2. "preview" : [
  3. {
  4. "total_order_amt" : 3946.9765625,
  5. "order_count" : 59.0,
  6. "total_unique_products" : 116.0,
  7. "avg_unique_products_per_order" : 2.0,
  8. "customer_id" : "10",
  9. "user" : "recip",
  10. "avg_amt_per_order" : 66.89790783898304
  11. },
  12. ...
  13. ]
  14. }

この変換は、次のような質問に答えるのを容易にします:

  • どの顧客が最も多く支出していますか?
  • どの顧客が注文ごとに最も多く支出していますか?
  • どの顧客が最も頻繁に注文していますか?
  • どの顧客が最も少ない異なる商品を注文しましたか?

これらの質問には集計だけで答えることも可能ですが、変換を使用することで、顧客中心のインデックスとしてこのデータを永続化できます。これにより、スケールでデータを分析し、顧客中心の視点からデータを探索しナビゲートする柔軟性が向上します。場合によっては、視覚化の作成がはるかに簡単になることもあります。

最も遅延の多い航空会社を見つける

この例では、フライトのサンプルデータセットを使用して、どの航空会社が最も遅延が多かったかを調べます。まず、クエリフィルターを使用して、すべてのキャンセルされたフライトを除外するようにソースデータをフィルタリングします。次に、データを変換して、航空会社ごとのフライトの数、遅延時間の合計、およびフライト時間の合計を含むようにします。最後に、bucket_scriptを使用して、実際の遅延時間の割合を決定します。

Python

  1. resp = client.transform.preview_transform(
  2. source={
  3. "index": "kibana_sample_data_flights",
  4. "query": {
  5. "bool": {
  6. "filter": [
  7. {
  8. "term": {
  9. "Cancelled": False
  10. }
  11. }
  12. ]
  13. }
  14. }
  15. },
  16. dest={
  17. "index": "sample_flight_delays_by_carrier"
  18. },
  19. pivot={
  20. "group_by": {
  21. "carrier": {
  22. "terms": {
  23. "field": "Carrier"
  24. }
  25. }
  26. },
  27. "aggregations": {
  28. "flights_count": {
  29. "value_count": {
  30. "field": "FlightNum"
  31. }
  32. },
  33. "delay_mins_total": {
  34. "sum": {
  35. "field": "FlightDelayMin"
  36. }
  37. },
  38. "flight_mins_total": {
  39. "sum": {
  40. "field": "FlightTimeMin"
  41. }
  42. },
  43. "delay_time_percentage": {
  44. "bucket_script": {
  45. "buckets_path": {
  46. "delay_time": "delay_mins_total.value",
  47. "flight_time": "flight_mins_total.value"
  48. },
  49. "script": "(params.delay_time / params.flight_time) * 100"
  50. }
  51. }
  52. }
  53. },
  54. )
  55. print(resp)

Js

  1. const response = await client.transform.previewTransform({
  2. source: {
  3. index: "kibana_sample_data_flights",
  4. query: {
  5. bool: {
  6. filter: [
  7. {
  8. term: {
  9. Cancelled: false,
  10. },
  11. },
  12. ],
  13. },
  14. },
  15. },
  16. dest: {
  17. index: "sample_flight_delays_by_carrier",
  18. },
  19. pivot: {
  20. group_by: {
  21. carrier: {
  22. terms: {
  23. field: "Carrier",
  24. },
  25. },
  26. },
  27. aggregations: {
  28. flights_count: {
  29. value_count: {
  30. field: "FlightNum",
  31. },
  32. },
  33. delay_mins_total: {
  34. sum: {
  35. field: "FlightDelayMin",
  36. },
  37. },
  38. flight_mins_total: {
  39. sum: {
  40. field: "FlightTimeMin",
  41. },
  42. },
  43. delay_time_percentage: {
  44. bucket_script: {
  45. buckets_path: {
  46. delay_time: "delay_mins_total.value",
  47. flight_time: "flight_mins_total.value",
  48. },
  49. script: "(params.delay_time / params.flight_time) * 100",
  50. },
  51. },
  52. },
  53. },
  54. });
  55. console.log(response);

コンソール

  1. POST _transform/_preview
  2. {
  3. "source": {
  4. "index": "kibana_sample_data_flights",
  5. "query": {
  6. "bool": {
  7. "filter": [
  8. { "term": { "Cancelled": false } }
  9. ]
  10. }
  11. }
  12. },
  13. "dest" : {
  14. "index" : "sample_flight_delays_by_carrier"
  15. },
  16. "pivot": {
  17. "group_by": {
  18. "carrier": { "terms": { "field": "Carrier" }}
  19. },
  20. "aggregations": {
  21. "flights_count": { "value_count": { "field": "FlightNum" }},
  22. "delay_mins_total": { "sum": { "field": "FlightDelayMin" }},
  23. "flight_mins_total": { "sum": { "field": "FlightTimeMin" }},
  24. "delay_time_percentage": {
  25. "bucket_script": {
  26. "buckets_path": {
  27. "delay_time": "delay_mins_total.value",
  28. "flight_time": "flight_mins_total.value"
  29. },
  30. "script": "(params.delay_time / params.flight_time) * 100"
  31. }
  32. }
  33. }
  34. }
  35. }
キャンセルされていないフライトのみを選択するようにソースデータをフィルタリングします。
変換の宛先インデックス。_previewによって無視されます。
データは航空会社名を含むCarrierフィールドでグループ化されます。
このbucket_scriptは、集計によって返される結果に対して計算を行います。この特定の例では、旅行時間のうちどのくらいが遅延に費やされたかを計算します。

プレビューでは、新しいインデックスが各キャリアに対してこのようなデータを含むことを示しています:

Js

  1. {
  2. "preview" : [
  3. {
  4. "carrier" : "ES-Air",
  5. "flights_count" : 2802.0,
  6. "flight_mins_total" : 1436927.5130677223,
  7. "delay_time_percentage" : 9.335543983955839,
  8. "delay_mins_total" : 134145.0
  9. },
  10. ...
  11. ]
  12. }

この変換は、次のような質問に答えるのを容易にします:

  • どの航空会社がフライト時間の割合として最も多くの遅延を持っていますか?

このデータは架空のものであり、特定の目的地または出発空港の実際の遅延やフライト統計を反映していません。

疑わしいクライアントIPを見つける

この例では、ウェブログのサンプルデータセットを使用して疑わしいクライアントIPを特定します。データを変換して、新しいインデックスには、バイトの合計と異なるURL、エージェント、場所ごとの受信リクエストの数、および各クライアントIPの地理的な宛先が含まれるようにします。また、各クライアントIPが受け取る特定のHTTPレスポンスの種類をカウントするためにフィルター集計を使用します。最終的に、以下の例は、ウェブログデータをclientipというエンティティ中心のインデックスに変換します。

Python

  1. resp = client.transform.put_transform(
  2. transform_id="suspicious_client_ips",
  3. source={
  4. "index": "kibana_sample_data_logs"
  5. },
  6. dest={
  7. "index": "sample_weblogs_by_clientip"
  8. },
  9. sync={
  10. "time": {
  11. "field": "timestamp",
  12. "delay": "60s"
  13. }
  14. },
  15. pivot={
  16. "group_by": {
  17. "clientip": {
  18. "terms": {
  19. "field": "clientip"
  20. }
  21. }
  22. },
  23. "aggregations": {
  24. "url_dc": {
  25. "cardinality": {
  26. "field": "url.keyword"
  27. }
  28. },
  29. "bytes_sum": {
  30. "sum": {
  31. "field": "bytes"
  32. }
  33. },
  34. "geo.src_dc": {
  35. "cardinality": {
  36. "field": "geo.src"
  37. }
  38. },
  39. "agent_dc": {
  40. "cardinality": {
  41. "field": "agent.keyword"
  42. }
  43. },
  44. "geo.dest_dc": {
  45. "cardinality": {
  46. "field": "geo.dest"
  47. }
  48. },
  49. "responses.total": {
  50. "value_count": {
  51. "field": "timestamp"
  52. }
  53. },
  54. "success": {
  55. "filter": {
  56. "term": {
  57. "response": "200"
  58. }
  59. }
  60. },
  61. "error404": {
  62. "filter": {
  63. "term": {
  64. "response": "404"
  65. }
  66. }
  67. },
  68. "error5xx": {
  69. "filter": {
  70. "range": {
  71. "response": {
  72. "gte": 500,
  73. "lt": 600
  74. }
  75. }
  76. }
  77. },
  78. "timestamp.min": {
  79. "min": {
  80. "field": "timestamp"
  81. }
  82. },
  83. "timestamp.max": {
  84. "max": {
  85. "field": "timestamp"
  86. }
  87. },
  88. "timestamp.duration_ms": {
  89. "bucket_script": {
  90. "buckets_path": {
  91. "min_time": "timestamp.min.value",
  92. "max_time": "timestamp.max.value"
  93. },
  94. "script": "(params.max_time - params.min_time)"
  95. }
  96. }
  97. }
  98. },
  99. )
  100. print(resp)

Js

  1. const response = await client.transform.putTransform({
  2. transform_id: "suspicious_client_ips",
  3. source: {
  4. index: "kibana_sample_data_logs",
  5. },
  6. dest: {
  7. index: "sample_weblogs_by_clientip",
  8. },
  9. sync: {
  10. time: {
  11. field: "timestamp",
  12. delay: "60s",
  13. },
  14. },
  15. pivot: {
  16. group_by: {
  17. clientip: {
  18. terms: {
  19. field: "clientip",
  20. },
  21. },
  22. },
  23. aggregations: {
  24. url_dc: {
  25. cardinality: {
  26. field: "url.keyword",
  27. },
  28. },
  29. bytes_sum: {
  30. sum: {
  31. field: "bytes",
  32. },
  33. },
  34. "geo.src_dc": {
  35. cardinality: {
  36. field: "geo.src",
  37. },
  38. },
  39. agent_dc: {
  40. cardinality: {
  41. field: "agent.keyword",
  42. },
  43. },
  44. "geo.dest_dc": {
  45. cardinality: {
  46. field: "geo.dest",
  47. },
  48. },
  49. "responses.total": {
  50. value_count: {
  51. field: "timestamp",
  52. },
  53. },
  54. success: {
  55. filter: {
  56. term: {
  57. response: "200",
  58. },
  59. },
  60. },
  61. error404: {
  62. filter: {
  63. term: {
  64. response: "404",
  65. },
  66. },
  67. },
  68. error5xx: {
  69. filter: {
  70. range: {
  71. response: {
  72. gte: 500,
  73. lt: 600,
  74. },
  75. },
  76. },
  77. },
  78. "timestamp.min": {
  79. min: {
  80. field: "timestamp",
  81. },
  82. },
  83. "timestamp.max": {
  84. max: {
  85. field: "timestamp",
  86. },
  87. },
  88. "timestamp.duration_ms": {
  89. bucket_script: {
  90. buckets_path: {
  91. min_time: "timestamp.min.value",
  92. max_time: "timestamp.max.value",
  93. },
  94. script: "(params.max_time - params.min_time)",
  95. },
  96. },
  97. },
  98. },
  99. });
  100. console.log(response);

コンソール

  1. PUT _transform/suspicious_client_ips
  2. {
  3. "source": {
  4. "index": "kibana_sample_data_logs"
  5. },
  6. "dest" : {
  7. "index" : "sample_weblogs_by_clientip"
  8. },
  9. "sync" : {
  10. "time": {
  11. "field": "timestamp",
  12. "delay": "60s"
  13. }
  14. },
  15. "pivot": {
  16. "group_by": {
  17. "clientip": { "terms": { "field": "clientip" } }
  18. },
  19. "aggregations": {
  20. "url_dc": { "cardinality": { "field": "url.keyword" }},
  21. "bytes_sum": { "sum": { "field": "bytes" }},
  22. "geo.src_dc": { "cardinality": { "field": "geo.src" }},
  23. "agent_dc": { "cardinality": { "field": "agent.keyword" }},
  24. "geo.dest_dc": { "cardinality": { "field": "geo.dest" }},
  25. "responses.total": { "value_count": { "field": "timestamp" }},
  26. "success" : {
  27. "filter": {
  28. "term": { "response" : "200"}}
  29. },
  30. "error404" : {
  31. "filter": {
  32. "term": { "response" : "404"}}
  33. },
  34. "error5xx" : {
  35. "filter": {
  36. "range": { "response" : { "gte": 500, "lt": 600}}}
  37. },
  38. "timestamp.min": { "min": { "field": "timestamp" }},
  39. "timestamp.max": { "max": { "field": "timestamp" }},
  40. "timestamp.duration_ms": {
  41. "bucket_script": {
  42. "buckets_path": {
  43. "min_time": "timestamp.min.value",
  44. "max_time": "timestamp.max.value"
  45. },
  46. "script": "(params.max_time - params.min_time)"
  47. }
  48. }
  49. }
  50. }
  51. }
変換の宛先インデックス。
変換を継続的に実行するように設定します。timestampフィールドを使用して、ソースインデックスと宛先インデックスを同期します。最悪の場合の取り込み遅延は60秒です。
データはclientipフィールドでグループ化されます。
フィルター集計は、200レスポンスの発生回数をカウントします。次の2つの集計(error404error5xx)は、エラーコードによるエラーレスポンスをカウントし、正確な値またはレスポンスコードの範囲に一致します。
このbucket_scriptは、集計の結果に基づいてclientipアクセスの期間を計算します。

変換を作成した後は、それを開始する必要があります:

Python

  1. resp = client.transform.start_transform(
  2. transform_id="suspicious_client_ips",
  3. )
  4. print(resp)

Ruby

  1. response = client.transform.start_transform(
  2. transform_id: 'suspicious_client_ips'
  3. )
  4. puts response

Js

  1. const response = await client.transform.startTransform({
  2. transform_id: "suspicious_client_ips",
  3. });
  4. console.log(response);

コンソール

  1. POST _transform/suspicious_client_ips/_start

短時間後、最初の結果が宛先インデックスに表示されるはずです:

Python

  1. resp = client.search(
  2. index="sample_weblogs_by_clientip",
  3. )
  4. print(resp)

Ruby

  1. response = client.search(
  2. index: 'sample_weblogs_by_clientip'
  3. )
  4. puts response

Js

  1. const response = await client.search({
  2. index: "sample_weblogs_by_clientip",
  3. });
  4. console.log(response);

コンソール

  1. GET sample_weblogs_by_clientip/_search

検索結果は、各クライアントIPに対してこのようなデータを示します:

Js

  1. "hits" : [
  2. {
  3. "_index" : "sample_weblogs_by_clientip",
  4. "_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
  5. "_score" : 1.0,
  6. "_source" : {
  7. "geo" : {
  8. "src_dc" : 2.0,
  9. "dest_dc" : 2.0
  10. },
  11. "success" : 2,
  12. "error404" : 0,
  13. "error503" : 0,
  14. "clientip" : "0.72.176.46",
  15. "agent_dc" : 2.0,
  16. "bytes_sum" : 4422.0,
  17. "responses" : {
  18. "total" : 2.0
  19. },
  20. "url_dc" : 2.0,
  21. "timestamp" : {
  22. "duration_ms" : 5.2191698E8,
  23. "min" : "2020-03-16T07:51:57.333Z",
  24. "max" : "2020-03-22T08:50:34.313Z"
  25. }
  26. }
  27. }
  28. ]

他のKibanaサンプルデータセットと同様に、ウェブログのサンプルデータセットには、インストール時のタイムスタンプが含まれており、将来のタイムスタンプも含まれています。継続的な変換は、データポイントが過去に入るとそれを取得します。ウェブログのサンプルデータセットをしばらく前にインストールした場合は、アンインストールして再インストールすると、タイムスタンプが変更されます。

この変換は、次のような質問に答えるのを容易にします:

  • どのクライアントIPが最も多くのデータを転送していますか?
  • どのクライアントIPが多くの異なるURLと対話していますか?
  • どのクライアントIPが高いエラー率を持っていますか?
  • どのクライアントIPが多くの宛先国と対話していますか?

各IPアドレスの最後のログイベントを見つける

この例では、ウェブログのサンプルデータセットを使用して、IPアドレスからの最後のログを見つけます。latestタイプの変換を継続モードで使用します。これは、ソースインデックスから宛先インデックスに各ユニークキーの最新のドキュメントをコピーし、新しいデータがソースインデックスに入ると宛先インデックスを更新します。

  1. ![Kibanaでの各IPアドレスの最後のログイベントを見つける](https://cdn.hedaai.com/projects/elasticsearch-8-15/6d6dd5a2500cfa32ca48f4159a8a1140.jpeg_big1500.jpeg)
  2. 最近ログに表示されたIPアドレスのドキュメントのみを保持することに興味があると仮定しましょう。保持ポリシーを定義し、ドキュメントの年齢を計算するために使用される日付フィールドを指定できます。この例では、データをソートするために使用されるのと同じ日付フィールドを使用します。次に、ドキュメントの最大年齢を設定します。設定した値より古いドキュメントは宛先インデックスから削除されます。
  3. ![Kibanaでの変換の保持ポリシーの定義](https://cdn.hedaai.com/projects/elasticsearch-8-15/d1160c420c1efc2f263b3ab623e83299.jpeg_big1500.jpeg)
  4. この変換は、各クライアントIPの最新のログイン日を含む宛先インデックスを作成します。変換が継続モードで実行されるため、宛先インデックスはソースインデックスに新しいデータが入ると更新されます。最後に、30日以上古いすべてのドキュメントは、適用された保持ポリシーにより宛先インデックスから削除されます。
  5. APIの例
  6. #### Python
  7. ``````python
  8. resp = client.transform.put_transform(
  9. transform_id="last-log-from-clientip",
  10. source={
  11. "index": [
  12. "kibana_sample_data_logs"
  13. ]
  14. },
  15. latest={
  16. "unique_key": [
  17. "clientip"
  18. ],
  19. "sort": "timestamp"
  20. },
  21. frequency="1m",
  22. dest={
  23. "index": "last-log-from-clientip"
  24. },
  25. sync={
  26. "time": {
  27. "field": "timestamp",
  28. "delay": "60s"
  29. }
  30. },
  31. retention_policy={
  32. "time": {
  33. "field": "timestamp",
  34. "max_age": "30d"
  35. }
  36. },
  37. settings={
  38. "max_page_search_size": 500
  39. },
  40. )
  41. print(resp)
  42. `

Js

  1. const response = await client.transform.putTransform({
  2. transform_id: "last-log-from-clientip",
  3. source: {
  4. index: ["kibana_sample_data_logs"],
  5. },
  6. latest: {
  7. unique_key: ["clientip"],
  8. sort: "timestamp",
  9. },
  10. frequency: "1m",
  11. dest: {
  12. index: "last-log-from-clientip",
  13. },
  14. sync: {
  15. time: {
  16. field: "timestamp",
  17. delay: "60s",
  18. },
  19. },
  20. retention_policy: {
  21. time: {
  22. field: "timestamp",
  23. max_age: "30d",
  24. },
  25. },
  26. settings: {
  27. max_page_search_size: 500,
  28. },
  29. });
  30. console.log(response);

コンソール

  1. PUT _transform/last-log-from-clientip
  2. {
  3. "source": {
  4. "index": [
  5. "kibana_sample_data_logs"
  6. ]
  7. },
  8. "latest": {
  9. "unique_key": [
  10. "clientip"
  11. ],
  12. "sort": "timestamp"
  13. },
  14. "frequency": "1m",
  15. "dest": {
  16. "index": "last-log-from-clientip"
  17. },
  18. "sync": {
  19. "time": {
  20. "field": "timestamp",
  21. "delay": "60s"
  22. }
  23. },
  24. "retention_policy": {
  25. "time": {
  26. "field": "timestamp",
  27. "max_age": "30d"
  28. }
  29. },
  30. "settings": {
  31. "max_page_search_size": 500
  32. }
  33. }
データをグループ化するためのフィールドを指定します。
データをソートするために使用される日付フィールドを指定します。
ソースインデックスの変更をチェックするための変換の間隔を設定します。
ソースインデックスと宛先インデックスを同期するために使用される時間フィールドと遅延設定を含みます。
変換の保持ポリシーを指定します。設定された値より古いドキュメントは宛先インデックスから削除されます。

変換を作成した後は、それを開始します:

Python

  1. resp = client.transform.start_transform(
  2. transform_id="last-log-from-clientip",
  3. )
  4. print(resp)

Ruby

  1. response = client.transform.start_transform(
  2. transform_id: 'last-log-from-clientip'
  3. )
  4. puts response

Js

  1. const response = await client.transform.startTransform({
  2. transform_id: "last-log-from-clientip",
  3. });
  4. console.log(response);

コンソール

  1. POST _transform/last-log-from-clientip/_start

変換がデータを処理した後、宛先インデックスを検索します:

Python

  1. resp = client.search(
  2. index="last-log-from-clientip",
  3. )
  4. print(resp)

Ruby

  1. response = client.search(
  2. index: 'last-log-from-clientip'
  3. )
  4. puts response

Js

  1. const response = await client.search({
  2. index: "last-log-from-clientip",
  3. });
  4. console.log(response);

コンソール

  1. GET last-log-from-clientip/_search

検索結果は、各クライアントIPに対してこのようなデータを示します:

Js

  1. {
  2. "_index" : "last-log-from-clientip",
  3. "_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
  4. "_score" : 1.0,
  5. "_source" : {
  6. "referer" : "http://twitter.com/error/don-lind",
  7. "request" : "/elasticsearch",
  8. "agent" : "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)",
  9. "extension" : "",
  10. "memory" : null,
  11. "ip" : "0.72.176.46",
  12. "index" : "kibana_sample_data_logs",
  13. "message" : "0.72.176.46 - - [2018-09-18T06:31:00.572Z] \"GET /elasticsearch HTTP/1.1\" 200 7065 \"-\" \"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)\"",
  14. "url" : "https://www.elastic.co/downloads/elasticsearch",
  15. "tags" : [
  16. "success",
  17. "info"
  18. ],
  19. "geo" : {
  20. "srcdest" : "IN:PH",
  21. "src" : "IN",
  22. "coordinates" : {
  23. "lon" : -124.1127917,
  24. "lat" : 40.80338889
  25. },
  26. "dest" : "PH"
  27. },
  28. "utc_time" : "2021-05-04T06:31:00.572Z",
  29. "bytes" : 7065,
  30. "machine" : {
  31. "os" : "ios",
  32. "ram" : 12884901888
  33. },
  34. "response" : 200,
  35. "clientip" : "0.72.176.46",
  36. "host" : "www.elastic.co",
  37. "event" : {
  38. "dataset" : "sample_web_logs"
  39. },
  40. "phpmemory" : null,
  41. "timestamp" : "2021-05-04T06:31:00.572Z"
  42. }
  43. }

この変換は、次のような質問に答えるのを容易にします:

  • 特定のIPアドレスに関連付けられた最も最近のログイベントは何でしたか?

変換の例

これらの例は、データから有用な洞察を導き出すために変換を使用する方法を示しています。すべての例は、Kibanaのサンプルデータセットのいずれかを使用しています。より詳細なステップバイステップの例については、チュートリアル:eCommerceサンプルデータの変換を参照してください。

Python

  1. resp = client.transform.preview_transform(
  2. source={
  3. "index": "kibana_sample_data_logs"
  4. },
  5. pivot={
  6. "group_by": {
  7. "timestamp": {
  8. "date_histogram": {
  9. "field": "timestamp",
  10. "fixed_interval": "1h"
  11. }
  12. }
  13. },
  14. "aggregations": {
  15. "bytes.max": {
  16. "max": {
  17. "field": "bytes"
  18. }
  19. },
  20. "top": {
  21. "top_metrics": {
  22. "metrics": [
  23. {
  24. "field": "clientip"
  25. },
  26. {
  27. "field": "geo.src"
  28. }
  29. ],
  30. "sort": {
  31. "bytes": "desc"
  32. }
  33. }
  34. }
  35. }
  36. },
  37. )
  38. print(resp)

Js

  1. const response = await client.transform.previewTransform({
  2. source: {
  3. index: "kibana_sample_data_logs",
  4. },
  5. pivot: {
  6. group_by: {
  7. timestamp: {
  8. date_histogram: {
  9. field: "timestamp",
  10. fixed_interval: "1h",
  11. },
  12. },
  13. },
  14. aggregations: {
  15. "bytes.max": {
  16. max: {
  17. field: "bytes",
  18. },
  19. },
  20. top: {
  21. top_metrics: {
  22. metrics: [
  23. {
  24. field: "clientip",
  25. },
  26. {
  27. field: "geo.src",
  28. },
  29. ],
  30. sort: {
  31. bytes: "desc",
  32. },
  33. },
  34. },
  35. },
  36. },
  37. });
  38. console.log(response);

コンソール

  1. POST _transform/_preview
  2. {
  3. "source": {
  4. "index": "kibana_sample_data_logs"
  5. },
  6. "pivot": {
  7. "group_by": {
  8. "timestamp": {
  9. "date_histogram": {
  10. "field": "timestamp",
  11. "fixed_interval": "1h"
  12. }
  13. }
  14. },
  15. "aggregations": {
  16. "bytes.max": {
  17. "max": {
  18. "field": "bytes"
  19. }
  20. },
  21. "top": {
  22. "top_metrics": {
  23. "metrics": [
  24. {
  25. "field": "clientip"
  26. },
  27. {
  28. "field": "geo.src"
  29. }
  30. ],
  31. "sort": {
  32. "bytes": "desc"
  33. }
  34. }
  35. }
  36. }
  37. }
  38. }
データは、1時間の間隔で時間フィールドの[bytes]のデータでグループ化されます。
bytesフィールドの最大値を計算します。
トップドキュメントを返すフィールド(clientipgeo.src)とソート方法(最高のbytes値を持つドキュメント)を指定します。

上記のAPI呼び出しは、次のようなレスポンスを返します:

Js

  1. {
  2. "preview" : [
  3. {
  4. "top" : {
  5. "clientip" : "223.87.60.27",
  6. "geo.src" : "IN"
  7. },
  8. "bytes" : {
  9. "max" : 6219
  10. },
  11. "timestamp" : "2021-04-25T00:00:00.000Z"
  12. },
  13. {
  14. "top" : {
  15. "clientip" : "99.74.118.237",
  16. "geo.src" : "LK"
  17. },
  18. "bytes" : {
  19. "max" : 14113
  20. },
  21. "timestamp" : "2021-04-25T03:00:00.000Z"
  22. },
  23. {
  24. "top" : {
  25. "clientip" : "218.148.135.12",
  26. "geo.src" : "BR"
  27. },
  28. "bytes" : {
  29. "max" : 4531
  30. },
  31. "timestamp" : "2021-04-25T04:00:00.000Z"
  32. },
  33. ...
  34. ]
  35. }

顧客IDによる顧客名とメールアドレスの取得

この例では、ecommerceサンプルデータセットを使用して、顧客IDに基づくエンティティ中心のインデックスを作成し、top_metrics集計を使用して顧客名とメールアドレスを取得します。

データをcustomer_idでグループ化し、top_metrics集計を追加します。ここで、metricsemailcustomer_first_name.keywordcustomer_last_name.keywordフィールドです。top_metricsorder_dateで降順にソートします。API呼び出しは次のようになります:

Python

  1. resp = client.transform.preview_transform(
  2. source={
  3. "index": "kibana_sample_data_ecommerce"
  4. },
  5. pivot={
  6. "group_by": {
  7. "customer_id": {
  8. "terms": {
  9. "field": "customer_id"
  10. }
  11. }
  12. },
  13. "aggregations": {
  14. "last": {
  15. "top_metrics": {
  16. "metrics": [
  17. {
  18. "field": "email"
  19. },
  20. {
  21. "field": "customer_first_name.keyword"
  22. },
  23. {
  24. "field": "customer_last_name.keyword"
  25. }
  26. ],
  27. "sort": {
  28. "order_date": "desc"
  29. }
  30. }
  31. }
  32. }
  33. },
  34. )
  35. print(resp)

Js

  1. const response = await client.transform.previewTransform({
  2. source: {
  3. index: "kibana_sample_data_ecommerce",
  4. },
  5. pivot: {
  6. group_by: {
  7. customer_id: {
  8. terms: {
  9. field: "customer_id",
  10. },
  11. },
  12. },
  13. aggregations: {
  14. last: {
  15. top_metrics: {
  16. metrics: [
  17. {
  18. field: "email",
  19. },
  20. {
  21. field: "customer_first_name.keyword",
  22. },
  23. {
  24. field: "customer_last_name.keyword",
  25. },
  26. ],
  27. sort: {
  28. order_date: "desc",
  29. },
  30. },
  31. },
  32. },
  33. },
  34. });
  35. console.log(response);

コンソール

  1. POST _transform/_preview
  2. {
  3. "source": {
  4. "index": "kibana_sample_data_ecommerce"
  5. },
  6. "pivot": {
  7. "group_by": {
  8. "customer_id": {
  9. "terms": {
  10. "field": "customer_id"
  11. }
  12. }
  13. },
  14. "aggregations": {
  15. "last": {
  16. "top_metrics": {
  17. "metrics": [
  18. {
  19. "field": "email"
  20. },
  21. {
  22. "field": "customer_first_name.keyword"
  23. },
  24. {
  25. "field": "customer_last_name.keyword"
  26. }
  27. ],
  28. "sort": {
  29. "order_date": "desc"
  30. }
  31. }
  32. }
  33. }
  34. }
  35. }
データはterms集計でcustomer_idフィールドでグループ化されます。
返すフィールド(メールと名前フィールド)を指定し、注文日で降順にソートします。

APIは、次のようなレスポンスを返します:

Js

  1. {
  2. "preview" : [
  3. {
  4. "last" : {
  5. "customer_last_name.keyword" : "Long",
  6. "customer_first_name.keyword" : "Recip",
  7. "email" : "[email protected]"
  8. },
  9. "customer_id" : "10"
  10. },
  11. {
  12. "last" : {
  13. "customer_last_name.keyword" : "Jackson",
  14. "customer_first_name.keyword" : "Fitzgerald",
  15. "email" : "[email protected]"
  16. },
  17. "customer_id" : "11"
  18. },
  19. {
  20. "last" : {
  21. "customer_last_name.keyword" : "Cross",
  22. "customer_first_name.keyword" : "Brigitte",
  23. "email" : "[email protected]"
  24. },
  25. "customer_id" : "12"
  26. },
  27. ...
  28. ]
  29. }