変換APIの作成

変換をインスタンス化します。

リクエスト

PUT _transform/<transform_id>

前提条件

次の権限が必要です:

  • クラスター: manage_transformtransform_admin ビルトインロールがこの権限を付与します)
  • ソースインデックス: read, view_index_metadata
  • 宛先インデックス: read, create_index, indexretention_policy が構成されている場合、delete 権限も必要です。

説明

このAPIは変換を定義し、ソースインデックスからデータをコピーし、それを変換してエンティティ中心の宛先インデックスに永続化します。変換にピボットメソッドを使用することを選択した場合、エンティティはgroup_byフィールドのセットによってpivotオブジェクト内で定義されます。最新メソッドを使用することを選択した場合、エンティティはunique_keyフィールドの値によってlatestオブジェクト内で定義されます。

宛先インデックスは二次元の表形式データ構造(データフレームとして知られています)と考えることもできます。データフレーム内の各ドキュメントのIDはエンティティのハッシュから生成されるため、エンティティごとに一意の行があります。詳細については、データの変換を参照してください。

変換が作成されると、一連の検証が行われて成功を確保します。たとえば、ソースインデックスの存在を確認し、宛先インデックスがソースインデックスパターンの一部でないことを確認します。defer_validationパラメータを使用してこれらのチェックをスキップできます。

遅延検証は、変換が開始されるときに常に実行されますが、権限チェックを除きます。

  • 変換は、作成時にユーザーが持っていたロールを記憶し、それらのロールを使用します。これらのロールがソースおよび宛先インデックスに必要な権限を持っていない場合、変換は不正な操作を試みると失敗します。二次認証ヘッダーを提供した場合、それらの資格情報が代わりに使用されます。
  • 変換を作成するにはKibanaまたはこのAPIを使用する必要があります。ElasticsearchインデックスAPIを使用して.transform-internal*インデックスに直接変換を追加しないでください。Elasticsearchのセキュリティ機能が有効になっている場合、.transform-internal*インデックスにユーザーに権限を与えないでください。7.5以前に変換を使用していた場合、.data-frame-internal*インデックスにユーザーに権限を与えないでください。

変換には最新またはピボットメソッドのいずれかを選択する必要があります。単一の変換で両方を使用することはできません。

パスパラメータ

  • <transform_id>
  • (必須、文字列)変換の識別子。この識別子は小文字の英数字(a-zおよび0-9)、ハイフン、アンダースコアを含むことができます。64文字の制限があり、英数字で始まり、英数字で終わる必要があります。

クエリパラメータ

  • defer_validation
  • (オプション、ブール値)trueの場合、遅延検証は実行されません。この動作は、ソースインデックスが変換が作成された後に存在しない場合に望ましい場合があります。
  • timeout
  • (オプション、時間)応答を待つ期間。タイムアウトが切れる前に応答が受信されない場合、リクエストは失敗し、エラーが返されます。デフォルトは30sです。

リクエストボディ

  • description
  • (オプション、文字列)変換の自由形式の説明。

  • dest

  • (必須、オブジェクト)変換の宛先。
    1. - `````index
    • (必須、文字列)変換の宛先インデックス。
      pivot変換の場合、宛先インデックスのマッピングは可能な限りソースフィールドに基づいて推測されます。代替マッピングが必要な場合は、変換を開始する前にインデックス作成APIを使用してください。
      latest変換の場合、マッピングは決して推測されません。宛先インデックスの動的マッピングが望ましくない場合は、変換を開始する前にインデックス作成APIを使用してください。
    • aliases
    • (オプション、オブジェクトの配列)変換の宛先インデックスが持つべきエイリアス。エイリアスは変換の保存された資格情報を使用して操作され、これは作成時に提供された二次資格情報(プライマリおよび二次資格情報が指定されている場合)を意味します。
      宛先インデックスは、変換によって作成されたか、ユーザーによって事前に作成されたかに関係なく、エイリアスに追加されます。
    1. 詳細
    2. - `````alias
    • (必須、文字列)エイリアスの名前。
    • move_on_creation
    • (オプション、ブール値)宛先インデックスがこのエイリアスの唯一のインデックスであるべきかどうか。trueの場合、宛先インデックスをこのエイリアスに追加する前に、他のすべてのインデックスがこのエイリアスから削除されます。デフォルトはfalseです。
    • pipeline
    • (オプション、文字列)インジェストパイプラインの一意の識別子。
  • frequency
  • (オプション、[時間単位](b10cb0563daae284.md#time-units))変換が継続的に実行されているときにソースインデックスの変更をチェックする間隔。最小値は1s、最大値は1hです。デフォルト値は1mです。

  • latest

  • (必須*、オブジェクト)latestメソッドは、各ユニークキーの最新のドキュメントを見つけることによってデータを変換します。

    1. - `````sort
    • (必須、文字列)最新のドキュメントを特定するために使用される日付フィールドを指定します。
    • unique_key
    • (必須、文字列の配列)データをグループ化するために使用される1つ以上のフィールドの配列を指定します。
  • _meta

  • (オプション、オブジェクト)オプションの変換メタデータを定義します。

  • pivot

  • (必須*、オブジェクト)pivotメソッドは、データを集約してグループ化することによって変換します。これらのオブジェクトはgroup byフィールドとデータを削減するための集約を定義します。

    1. - `````aggregations`````または`````aggs
  • retention_policy

  • (オプション、オブジェクト)変換の保持ポリシーを定義します。定義された基準を満たすデータは宛先インデックスから削除されます。

    1. - `````time
    • (必須、オブジェクト)変換が保持ポリシーを設定するために時間フィールドを使用することを指定します。保持ポリシーのtime.fieldが存在し、max.ageより古いデータを含む場合、データは削除されます。
      1. - `````field
      • (必須、文字列)ドキュメントの年齢を計算するために使用される日付フィールド。time.fieldを既存の日付フィールドに設定します。
      • max_age
      • (必須、[時間単位](b10cb0563daae284.md#time-units))宛先インデックス内のドキュメントの最大年齢を指定します。設定された値より古いドキュメントは宛先インデックスから削除されます。
  • settings

  • (オプション、オブジェクト)オプションの変換設定を定義します。

    1. - `````align_checkpoints
    • (オプション、ブール値)変換のチェックポイント範囲がパフォーマンスの最適化のために最適化されるべきかどうかを指定します。このような最適化は、変換設定で日付ヒストグラムがグループソースとして指定されている場合に、チェックポイント範囲を日付ヒストグラムの間隔に合わせることができます。その結果、宛先インデックス内のドキュメントの更新が少なくなり、全体的なパフォーマンスが向上します。デフォルト値はtrueで、これは可能な場合にチェックポイント範囲が最適化されることを意味します。
    • dates_as_epoch_millis
    • (オプション、ブール値)出力の日付がISO形式の文字列(デフォルト)またはエポックからのミリ秒として書き込まれるべきかどうかを定義します。epoch_millis7.11バージョン以前に作成された変換のデフォルトでした。互換性のある出力を得るには、これをtrueに設定します。デフォルト値はfalseです。
    • deduce_mappings
    • (オプション、ブール値)変換が宛先インデックスのマッピングを変換設定から推測するべきかどうかを指定します。デフォルト値はtrueで、これは可能な場合に宛先インデックスのマッピングが推測されることを意味します。
    • docs_per_second
    • (オプション、浮動小数点数)1秒あたりの入力ドキュメントの数に制限を指定します。この設定は、検索リクエストの間に待機時間を追加することによって変換を制限します。デフォルト値はnullで、これは制限を無効にします。
    • max_page_search_size
    • (オプション、整数)回復可能な失敗のリトライ回数を定義します。最小値は0、最大値は100です。-1は無限を示すために使用できます。この場合、変換は回復可能な失敗のリトライをあきらめません。デフォルト値はクラスターの設定num_transform_failure_retriesです。
    • unattended
    • (オプション、ブール値)trueの場合、変換は無人モードで実行されます。無人モードでは、変換はエラーが発生した場合に無限にリトライし、変換は決して失敗しません。無限以外のリトライ回数を設定すると、検証に失敗します。デフォルトはfalseです。
  • source

  • (必須、オブジェクト)変換のデータソース。

    1. - `````index
    • (必須、文字列または配列)変換のソースインデックス。単一のインデックス、インデックスパターン(例:"my-index-*")、インデックスの配列(例:["my-index-000001", "my-index-000002"])、またはインデックスパターンの配列(例:["my-index-*", "my-other-index-*"])であることができます。リモートインデックスの場合は、"remote_name:index_name"構文を使用します。
      リモートクラスターにインデックスがある場合、マスターノードと少なくとも1つの変換ノードはremote_cluster_clientノードロールを持っている必要があります。
    • query
    • (オプション、オブジェクト)ソースインデックスからデータのサブセットを取得するクエリ句。詳細はクエリDSLを参照してください。
    • runtime_mappings
    • (オプション、オブジェクト)変換で使用できる検索時のランタイムフィールドの定義。検索ランタイムフィールドの場合、すべてのデータノード(リモートノードを含む)は7.12以降である必要があります。
  • sync

  • (オプション、オブジェクト)変換が継続的に実行されるために必要なプロパティを定義します。
    1. - `````time
    • (必須、オブジェクト)変換がソースインデックスと宛先インデックスを同期させるために時間フィールドを使用することを指定します。
      1. - `````delay
      • (オプション、[時間単位](b10cb0563daae284.md#time-units))現在の時間と最新の入力データ時間の間の時間遅延。デフォルト値は60sです。
      • field
      • (必須、文字列)ソース内の新しいドキュメントを特定するために使用される日付フィールド。
        インジェストタイムスタンプを含むフィールドを使用することを強くお勧めします。異なるフィールドを使用する場合、delayを設定してデータ伝送の遅延を考慮する必要があるかもしれません。

次の変換はpivotメソッドを使用します:

Python

  1. resp = client.transform.put_transform(
  2. transform_id="ecommerce_transform1",
  3. source={
  4. "index": "kibana_sample_data_ecommerce",
  5. "query": {
  6. "term": {
  7. "geoip.continent_name": {
  8. "value": "Asia"
  9. }
  10. }
  11. }
  12. },
  13. pivot={
  14. "group_by": {
  15. "customer_id": {
  16. "terms": {
  17. "field": "customer_id",
  18. "missing_bucket": True
  19. }
  20. }
  21. },
  22. "aggregations": {
  23. "max_price": {
  24. "max": {
  25. "field": "taxful_total_price"
  26. }
  27. }
  28. }
  29. },
  30. description="Maximum priced ecommerce data by customer_id in Asia",
  31. dest={
  32. "index": "kibana_sample_data_ecommerce_transform1",
  33. "pipeline": "add_timestamp_pipeline"
  34. },
  35. frequency="5m",
  36. sync={
  37. "time": {
  38. "field": "order_date",
  39. "delay": "60s"
  40. }
  41. },
  42. retention_policy={
  43. "time": {
  44. "field": "order_date",
  45. "max_age": "30d"
  46. }
  47. },
  48. )
  49. print(resp)

Js

  1. const response = await client.transform.putTransform({
  2. transform_id: "ecommerce_transform1",
  3. source: {
  4. index: "kibana_sample_data_ecommerce",
  5. query: {
  6. term: {
  7. "geoip.continent_name": {
  8. value: "Asia",
  9. },
  10. },
  11. },
  12. },
  13. pivot: {
  14. group_by: {
  15. customer_id: {
  16. terms: {
  17. field: "customer_id",
  18. missing_bucket: true,
  19. },
  20. },
  21. },
  22. aggregations: {
  23. max_price: {
  24. max: {
  25. field: "taxful_total_price",
  26. },
  27. },
  28. },
  29. },
  30. description: "Maximum priced ecommerce data by customer_id in Asia",
  31. dest: {
  32. index: "kibana_sample_data_ecommerce_transform1",
  33. pipeline: "add_timestamp_pipeline",
  34. },
  35. frequency: "5m",
  36. sync: {
  37. time: {
  38. field: "order_date",
  39. delay: "60s",
  40. },
  41. },
  42. retention_policy: {
  43. time: {
  44. field: "order_date",
  45. max_age: "30d",
  46. },
  47. },
  48. });
  49. console.log(response);

コンソール

  1. PUT _transform/ecommerce_transform1
  2. {
  3. "source": {
  4. "index": "kibana_sample_data_ecommerce",
  5. "query": {
  6. "term": {
  7. "geoip.continent_name": {
  8. "value": "Asia"
  9. }
  10. }
  11. }
  12. },
  13. "pivot": {
  14. "group_by": {
  15. "customer_id": {
  16. "terms": {
  17. "field": "customer_id",
  18. "missing_bucket": true
  19. }
  20. }
  21. },
  22. "aggregations": {
  23. "max_price": {
  24. "max": {
  25. "field": "taxful_total_price"
  26. }
  27. }
  28. }
  29. },
  30. "description": "Maximum priced ecommerce data by customer_id in Asia",
  31. "dest": {
  32. "index": "kibana_sample_data_ecommerce_transform1",
  33. "pipeline": "add_timestamp_pipeline"
  34. },
  35. "frequency": "5m",
  36. "sync": {
  37. "time": {
  38. "field": "order_date",
  39. "delay": "60s"
  40. }
  41. },
  42. "retention_policy": {
  43. "time": {
  44. "field": "order_date",
  45. "max_age": "30d"
  46. }
  47. }
  48. }

変換が作成されると、次の結果が得られます:

コンソール-結果

  1. {
  2. "acknowledged" : true
  3. }

次の変換はlatestメソッドを使用します:

Python

  1. resp = client.transform.put_transform(
  2. transform_id="ecommerce_transform2",
  3. source={
  4. "index": "kibana_sample_data_ecommerce"
  5. },
  6. latest={
  7. "unique_key": [
  8. "customer_id"
  9. ],
  10. "sort": "order_date"
  11. },
  12. description="Latest order for each customer",
  13. dest={
  14. "index": "kibana_sample_data_ecommerce_transform2"
  15. },
  16. frequency="5m",
  17. sync={
  18. "time": {
  19. "field": "order_date",
  20. "delay": "60s"
  21. }
  22. },
  23. )
  24. print(resp)

Js

  1. const response = await client.transform.putTransform({
  2. transform_id: "ecommerce_transform2",
  3. source: {
  4. index: "kibana_sample_data_ecommerce",
  5. },
  6. latest: {
  7. unique_key: ["customer_id"],
  8. sort: "order_date",
  9. },
  10. description: "Latest order for each customer",
  11. dest: {
  12. index: "kibana_sample_data_ecommerce_transform2",
  13. },
  14. frequency: "5m",
  15. sync: {
  16. time: {
  17. field: "order_date",
  18. delay: "60s",
  19. },
  20. },
  21. });
  22. console.log(response);

コンソール

  1. PUT _transform/ecommerce_transform2
  2. {
  3. "source": {
  4. "index": "kibana_sample_data_ecommerce"
  5. },
  6. "latest": {
  7. "unique_key": ["customer_id"],
  8. "sort": "order_date"
  9. },
  10. "description": "Latest order for each customer",
  11. "dest": {
  12. "index": "kibana_sample_data_ecommerce_transform2"
  13. },
  14. "frequency": "5m",
  15. "sync": {
  16. "time": {
  17. "field": "order_date",
  18. "delay": "60s"
  19. }
  20. }
  21. }