時系列データストリーム (TSDS) の再インデックス

はじめに

再インデックスを使用すると、古い 時系列データストリーム (TSDS) から新しいものにドキュメントをコピーできます。データストリームは一般的に再インデックスをサポートしていますが、いくつかの 制限 があります。それでも、時系列データストリームは、各バックインデックスに含まれる受け入れ可能なタイムスタンプ範囲に対する厳しい制御のため、追加の課題をもたらします。再インデックス API を直接使用すると、現在の受け入れウィンドウの外にあるタイムスタンプを持つドキュメントを挿入しようとするため、エラーが発生する可能性があります。

これらの制限を回避するために、以下に示すプロセスを使用してください:

  • 1. 再インデックスされたデータを含む宛先データストリームのインデックステンプレートを作成します。
  • 2. テンプレートを更新します。
    • 2.1. index.time_series.start_timeindex.time_series.end_time のインデックス設定を、古いデータストリームの最小および最大 @timestamp 値に一致させます。
    • 2.2. index.number_of_shards のインデックス設定を、古いデータストリームのすべてのバックインデックスのすべてのプライマリシャードの合計に設定します。
    • 2.3. index.number_of_replicas をゼロに設定し、index.lifecycle.name のインデックス設定を解除します。
  • 3. 再インデックス操作を完了するまで実行します。
  • 4. 宛先インデックステンプレートの上書きされたインデックス設定を元に戻します。
  • 5. 新しいドキュメントを受け入れることができる新しいバックインデックスを作成するために rollover API を呼び出します。

このプロセスは、ダウンサンプリング 設定のない時系列データストリームにのみ適用されます。ダウンサンプリングのあるデータストリームは、バックインデックスを個別に再インデックスし、それらを空の宛先データストリームに追加することによってのみ再インデックスできます。

以下では、プロセスの各ステップを例を挙げて詳しく説明します。

古いドキュメントを受け入れる TSDS テンプレートの作成

次のテンプレートを持つ TSDS を考えてみましょう:

Python

  1. resp = client.cluster.put_component_template(
  2. name="source_template",
  3. template={
  4. "settings": {
  5. "index": {
  6. "number_of_replicas": 2,
  7. "number_of_shards": 2,
  8. "mode": "time_series",
  9. "routing_path": [
  10. "metricset"
  11. ]
  12. }
  13. },
  14. "mappings": {
  15. "properties": {
  16. "@timestamp": {
  17. "type": "date"
  18. },
  19. "metricset": {
  20. "type": "keyword",
  21. "time_series_dimension": True
  22. },
  23. "k8s": {
  24. "properties": {
  25. "tx": {
  26. "type": "long"
  27. },
  28. "rx": {
  29. "type": "long"
  30. }
  31. }
  32. }
  33. }
  34. }
  35. },
  36. )
  37. print(resp)
  38. resp1 = client.indices.put_index_template(
  39. name="1",
  40. index_patterns=[
  41. "k8s*"
  42. ],
  43. composed_of=[
  44. "source_template"
  45. ],
  46. data_stream={},
  47. )
  48. print(resp1)

Ruby

  1. response = client.cluster.put_component_template(
  2. name: 'source_template',
  3. body: {
  4. template: {
  5. settings: {
  6. index: {
  7. number_of_replicas: 2,
  8. number_of_shards: 2,
  9. mode: 'time_series',
  10. routing_path: [
  11. 'metricset'
  12. ]
  13. }
  14. },
  15. mappings: {
  16. properties: {
  17. "@timestamp": {
  18. type: 'date'
  19. },
  20. metricset: {
  21. type: 'keyword',
  22. time_series_dimension: true
  23. },
  24. "k8s": {
  25. properties: {
  26. tx: {
  27. type: 'long'
  28. },
  29. rx: {
  30. type: 'long'
  31. }
  32. }
  33. }
  34. }
  35. }
  36. }
  37. }
  38. )
  39. puts response
  40. response = client.indices.put_index_template(
  41. name: 1,
  42. body: {
  43. index_patterns: [
  44. 'k8s*'
  45. ],
  46. composed_of: [
  47. 'source_template'
  48. ],
  49. data_stream: {}
  50. }
  51. )
  52. puts response

Js

  1. const response = await client.cluster.putComponentTemplate({
  2. name: "source_template",
  3. template: {
  4. settings: {
  5. index: {
  6. number_of_replicas: 2,
  7. number_of_shards: 2,
  8. mode: "time_series",
  9. routing_path: ["metricset"],
  10. },
  11. },
  12. mappings: {
  13. properties: {
  14. "@timestamp": {
  15. type: "date",
  16. },
  17. metricset: {
  18. type: "keyword",
  19. time_series_dimension: true,
  20. },
  21. k8s: {
  22. properties: {
  23. tx: {
  24. type: "long",
  25. },
  26. rx: {
  27. type: "long",
  28. },
  29. },
  30. },
  31. },
  32. },
  33. },
  34. });
  35. console.log(response);
  36. const response1 = await client.indices.putIndexTemplate({
  37. name: 1,
  38. index_patterns: ["k8s*"],
  39. composed_of: ["source_template"],
  40. data_stream: {},
  41. });
  42. console.log(response1);

コンソール

  1. POST /_component_template/source_template
  2. {
  3. "template": {
  4. "settings": {
  5. "index": {
  6. "number_of_replicas": 2,
  7. "number_of_shards": 2,
  8. "mode": "time_series",
  9. "routing_path": [ "metricset" ]
  10. }
  11. },
  12. "mappings": {
  13. "properties": {
  14. "@timestamp": { "type": "date" },
  15. "metricset": {
  16. "type": "keyword",
  17. "time_series_dimension": true
  18. },
  19. "k8s": {
  20. "properties": {
  21. "tx": { "type": "long" },
  22. "rx": { "type": "long" }
  23. }
  24. }
  25. }
  26. }
  27. }
  28. }
  29. POST /_index_template/1
  30. {
  31. "index_patterns": [
  32. "k8s*"
  33. ],
  34. "composed_of": [
  35. "source_template"
  36. ],
  37. "data_stream": {}
  38. }

/k8s/_settings の出力例は次のようになります:

コンソール-結果

  1. {
  2. ".ds-k8s-2023.09.01-000002": {
  3. "settings": {
  4. "index": {
  5. "mode": "time_series",
  6. "routing": {
  7. "allocation": {
  8. "include": {
  9. "_tier_preference": "data_hot"
  10. }
  11. }
  12. },
  13. "hidden": "true",
  14. "number_of_shards": "2",
  15. "time_series": {
  16. "end_time": "2023-09-01T14:00:00.000Z",
  17. "start_time": "2023-09-01T10:00:00.000Z"
  18. },
  19. "provided_name": ".ds-k9s-2023.09.01-000002",
  20. "creation_date": "1694439857608",
  21. "number_of_replicas": "2",
  22. "routing_path": [
  23. "metricset"
  24. ],
  25. ...
  26. }
  27. }
  28. },
  29. ".ds-k8s-2023.09.01-000001": {
  30. "settings": {
  31. "index": {
  32. "mode": "time_series",
  33. "routing": {
  34. "allocation": {
  35. "include": {
  36. "_tier_preference": "data_hot"
  37. }
  38. }
  39. },
  40. "hidden": "true",
  41. "number_of_shards": "2",
  42. "time_series": {
  43. "end_time": "2023-09-01T10:00:00.000Z",
  44. "start_time": "2023-09-01T06:00:00.000Z"
  45. },
  46. "provided_name": ".ds-k9s-2023.09.01-000001",
  47. "creation_date": "1694439837126",
  48. "number_of_replicas": "2",
  49. "routing_path": [
  50. "metricset"
  51. ],
  52. ...
  53. }
  54. }
  55. }
  56. }

この TSDS を再インデックスするには、宛先データストリームでそのインデックステンプレートを再利用しないでください。機能に影響を与えないようにするためです。代わりに、ソース TSDS のテンプレートをクローンし、次の変更を適用します:

  • index.time_series.start_timeindex.time_series.end_time のインデックス設定を明示的に設定します。これらの値は、再インデックスするデータストリームの最小および最大 @timestamp 値に基づくべきです。これにより、初期のバックインデックスはソースデータストリームに含まれるすべてのデータをロードできます。
  • index.number_of_shards のインデックス設定を、ソースデータストリームのすべてのバックインデックスのすべてのプライマリシャードの合計に設定します。これにより、各シャードが別々のスレッド(またはそれ以上)で処理されるため、同じレベルの検索並列性が維持されます。
  • index.lifecycle.name のインデックス設定がある場合は解除します。これにより、再インデックス中に ILM が宛先データストリームを変更するのを防ぎます。
  • (オプション)index.number_of_replicas をゼロに設定します。これにより、再インデックス操作が高速化されます。データがコピーされるため、レプリカが不足していることによるデータ損失のリスクは限られています。

上記の例をソース TSDS として使用すると、宛先 TSDS のテンプレートは次のようになります:

Python

  1. resp = client.cluster.put_component_template(
  2. name="destination_template",
  3. template={
  4. "settings": {
  5. "index": {
  6. "number_of_replicas": 0,
  7. "number_of_shards": 4,
  8. "mode": "time_series",
  9. "routing_path": [
  10. "metricset"
  11. ],
  12. "time_series": {
  13. "end_time": "2023-09-01T14:00:00.000Z",
  14. "start_time": "2023-09-01T06:00:00.000Z"
  15. }
  16. }
  17. },
  18. "mappings": {
  19. "properties": {
  20. "@timestamp": {
  21. "type": "date"
  22. },
  23. "metricset": {
  24. "type": "keyword",
  25. "time_series_dimension": True
  26. },
  27. "k8s": {
  28. "properties": {
  29. "tx": {
  30. "type": "long"
  31. },
  32. "rx": {
  33. "type": "long"
  34. }
  35. }
  36. }
  37. }
  38. }
  39. },
  40. )
  41. print(resp)
  42. resp1 = client.indices.put_index_template(
  43. name="2",
  44. index_patterns=[
  45. "k8s*"
  46. ],
  47. composed_of=[
  48. "destination_template"
  49. ],
  50. data_stream={},
  51. )
  52. print(resp1)

Js

  1. const response = await client.cluster.putComponentTemplate({
  2. name: "destination_template",
  3. template: {
  4. settings: {
  5. index: {
  6. number_of_replicas: 0,
  7. number_of_shards: 4,
  8. mode: "time_series",
  9. routing_path: ["metricset"],
  10. time_series: {
  11. end_time: "2023-09-01T14:00:00.000Z",
  12. start_time: "2023-09-01T06:00:00.000Z",
  13. },
  14. },
  15. },
  16. mappings: {
  17. properties: {
  18. "@timestamp": {
  19. type: "date",
  20. },
  21. metricset: {
  22. type: "keyword",
  23. time_series_dimension: true,
  24. },
  25. k8s: {
  26. properties: {
  27. tx: {
  28. type: "long",
  29. },
  30. rx: {
  31. type: "long",
  32. },
  33. },
  34. },
  35. },
  36. },
  37. },
  38. });
  39. console.log(response);
  40. const response1 = await client.indices.putIndexTemplate({
  41. name: 2,
  42. index_patterns: ["k8s*"],
  43. composed_of: ["destination_template"],
  44. data_stream: {},
  45. });
  46. console.log(response1);

コンソール

  1. POST /_component_template/destination_template
  2. {
  3. "template": {
  4. "settings": {
  5. "index": {
  6. "number_of_replicas": 0,
  7. "number_of_shards": 4,
  8. "mode": "time_series",
  9. "routing_path": [ "metricset" ],
  10. "time_series": {
  11. "end_time": "2023-09-01T14:00:00.000Z",
  12. "start_time": "2023-09-01T06:00:00.000Z"
  13. }
  14. }
  15. },
  16. "mappings": {
  17. "properties": {
  18. "@timestamp": { "type": "date" },
  19. "metricset": {
  20. "type": "keyword",
  21. "time_series_dimension": true
  22. },
  23. "k8s": {
  24. "properties": {
  25. "tx": { "type": "long" },
  26. "rx": { "type": "long" }
  27. }
  28. }
  29. }
  30. }
  31. }
  32. }
  33. POST /_index_template/2
  34. {
  35. "index_patterns": [
  36. "k8s*"
  37. ],
  38. "composed_of": [
  39. "destination_template"
  40. ],
  41. "data_stream": {}
  42. }

再インデックス

再インデックス API を呼び出します。例えば:

Python

  1. resp = client.reindex(
  2. source={
  3. "index": "k8s"
  4. },
  5. dest={
  6. "index": "k9s",
  7. "op_type": "create"
  8. },
  9. )
  10. print(resp)

Ruby

  1. response = client.reindex(
  2. body: {
  3. source: {
  4. index: 'k8s'
  5. },
  6. dest: {
  7. index: 'k9s',
  8. op_type: 'create'
  9. }
  10. }
  11. )
  12. puts response

Js

  1. const response = await client.reindex({
  2. source: {
  3. index: "k8s",
  4. },
  5. dest: {
  6. index: "k9s",
  7. op_type: "create",
  8. },
  9. });
  10. console.log(response);

コンソール

  1. POST /_reindex
  2. {
  3. "source": {
  4. "index": "k8s"
  5. },
  6. "dest": {
  7. "index": "k9s",
  8. "op_type": "create"
  9. }
  10. }

宛先インデックステンプレートの復元

再インデックス操作が完了したら、宛先 TSDS のインデックステンプレートを次のように復元します:

  • index.time_series.start_timeindex.time_series.end_time の上書きを削除します。
  • index.number_of_shardsindex.number_of_replicas、および index.lifecycle.name の値を適用可能な場合は復元します。

前の例を使用すると、宛先テンプレートは次のように変更されます:

Python

  1. resp = client.cluster.put_component_template(
  2. name="destination_template",
  3. template={
  4. "settings": {
  5. "index": {
  6. "number_of_replicas": 2,
  7. "number_of_shards": 2,
  8. "mode": "time_series",
  9. "routing_path": [
  10. "metricset"
  11. ]
  12. }
  13. },
  14. "mappings": {
  15. "properties": {
  16. "@timestamp": {
  17. "type": "date"
  18. },
  19. "metricset": {
  20. "type": "keyword",
  21. "time_series_dimension": True
  22. },
  23. "k8s": {
  24. "properties": {
  25. "tx": {
  26. "type": "long"
  27. },
  28. "rx": {
  29. "type": "long"
  30. }
  31. }
  32. }
  33. }
  34. }
  35. },
  36. )
  37. print(resp)

Ruby

  1. response = client.cluster.put_component_template(
  2. name: 'destination_template',
  3. body: {
  4. template: {
  5. settings: {
  6. index: {
  7. number_of_replicas: 2,
  8. number_of_shards: 2,
  9. mode: 'time_series',
  10. routing_path: [
  11. 'metricset'
  12. ]
  13. }
  14. },
  15. mappings: {
  16. properties: {
  17. "@timestamp": {
  18. type: 'date'
  19. },
  20. metricset: {
  21. type: 'keyword',
  22. time_series_dimension: true
  23. },
  24. "k8s": {
  25. properties: {
  26. tx: {
  27. type: 'long'
  28. },
  29. rx: {
  30. type: 'long'
  31. }
  32. }
  33. }
  34. }
  35. }
  36. }
  37. }
  38. )
  39. puts response

Js

  1. const response = await client.cluster.putComponentTemplate({
  2. name: "destination_template",
  3. template: {
  4. settings: {
  5. index: {
  6. number_of_replicas: 2,
  7. number_of_shards: 2,
  8. mode: "time_series",
  9. routing_path: ["metricset"],
  10. },
  11. },
  12. mappings: {
  13. properties: {
  14. "@timestamp": {
  15. type: "date",
  16. },
  17. metricset: {
  18. type: "keyword",
  19. time_series_dimension: true,
  20. },
  21. k8s: {
  22. properties: {
  23. tx: {
  24. type: "long",
  25. },
  26. rx: {
  27. type: "long",
  28. },
  29. },
  30. },
  31. },
  32. },
  33. },
  34. });
  35. console.log(response);

コンソール

  1. POST /_component_template/destination_template
  2. {
  3. "template": {
  4. "settings": {
  5. "index": {
  6. "number_of_replicas": 2,
  7. "number_of_shards": 2,
  8. "mode": "time_series",
  9. "routing_path": [ "metricset" ]
  10. }
  11. },
  12. "mappings": {
  13. "properties": {
  14. "@timestamp": { "type": "date" },
  15. "metricset": {
  16. "type": "keyword",
  17. "time_series_dimension": true
  18. },
  19. "k8s": {
  20. "properties": {
  21. "tx": { "type": "long" },
  22. "rx": { "type": "long" }
  23. }
  24. }
  25. }
  26. }
  27. }
  28. }

次に、条件を設定せずに宛先データストリームで rollover API を呼び出します。

Python

  1. resp = client.indices.rollover(
  2. alias="k9s",
  3. )
  4. print(resp)

Ruby

  1. response = client.indices.rollover(
  2. alias: 'k9s'
  3. )
  4. puts response

Js

  1. const response = await client.indices.rollover({
  2. alias: "k9s",
  3. });
  4. console.log(response);

コンソール

  1. POST /k9s/_rollover/

これにより、更新されたインデックス設定を持つ新しいバックインデックスが作成されます。宛先データストリームは新しいドキュメントを受け入れる準備が整いました。

初期のバックインデックスは、ソースデータストリームから派生したタイムスタンプの範囲内でドキュメントを受け入れることができます。これが望ましくない場合は、明示的に 読み取り専用 とマークしてください。