時系列データストリーム (TSDS) の再インデックス
はじめに
再インデックスを使用すると、古い 時系列データストリーム (TSDS) から新しいものにドキュメントをコピーできます。データストリームは一般的に再インデックスをサポートしていますが、いくつかの 制限 があります。それでも、時系列データストリームは、各バックインデックスに含まれる受け入れ可能なタイムスタンプ範囲に対する厳しい制御のため、追加の課題をもたらします。再インデックス API を直接使用すると、現在の受け入れウィンドウの外にあるタイムスタンプを持つドキュメントを挿入しようとするため、エラーが発生する可能性があります。
これらの制限を回避するために、以下に示すプロセスを使用してください:
- 1. 再インデックスされたデータを含む宛先データストリームのインデックステンプレートを作成します。
- 2. テンプレートを更新します。
- 2.1.
index.time_series.start_time
とindex.time_series.end_time
のインデックス設定を、古いデータストリームの最小および最大@timestamp
値に一致させます。 - 2.2.
index.number_of_shards
のインデックス設定を、古いデータストリームのすべてのバックインデックスのすべてのプライマリシャードの合計に設定します。 - 2.3.
index.number_of_replicas
をゼロに設定し、index.lifecycle.name
のインデックス設定を解除します。
- 2.1.
- 3. 再インデックス操作を完了するまで実行します。
- 4. 宛先インデックステンプレートの上書きされたインデックス設定を元に戻します。
- 5. 新しいドキュメントを受け入れることができる新しいバックインデックスを作成するために
rollover
API を呼び出します。
このプロセスは、ダウンサンプリング 設定のない時系列データストリームにのみ適用されます。ダウンサンプリングのあるデータストリームは、バックインデックスを個別に再インデックスし、それらを空の宛先データストリームに追加することによってのみ再インデックスできます。
以下では、プロセスの各ステップを例を挙げて詳しく説明します。
古いドキュメントを受け入れる TSDS テンプレートの作成
次のテンプレートを持つ TSDS を考えてみましょう:
Python
resp = client.cluster.put_component_template(
name="source_template",
template={
"settings": {
"index": {
"number_of_replicas": 2,
"number_of_shards": 2,
"mode": "time_series",
"routing_path": [
"metricset"
]
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"metricset": {
"type": "keyword",
"time_series_dimension": True
},
"k8s": {
"properties": {
"tx": {
"type": "long"
},
"rx": {
"type": "long"
}
}
}
}
}
},
)
print(resp)
resp1 = client.indices.put_index_template(
name="1",
index_patterns=[
"k8s*"
],
composed_of=[
"source_template"
],
data_stream={},
)
print(resp1)
Ruby
response = client.cluster.put_component_template(
name: 'source_template',
body: {
template: {
settings: {
index: {
number_of_replicas: 2,
number_of_shards: 2,
mode: 'time_series',
routing_path: [
'metricset'
]
}
},
mappings: {
properties: {
"@timestamp": {
type: 'date'
},
metricset: {
type: 'keyword',
time_series_dimension: true
},
"k8s": {
properties: {
tx: {
type: 'long'
},
rx: {
type: 'long'
}
}
}
}
}
}
}
)
puts response
response = client.indices.put_index_template(
name: 1,
body: {
index_patterns: [
'k8s*'
],
composed_of: [
'source_template'
],
data_stream: {}
}
)
puts response
Js
const response = await client.cluster.putComponentTemplate({
name: "source_template",
template: {
settings: {
index: {
number_of_replicas: 2,
number_of_shards: 2,
mode: "time_series",
routing_path: ["metricset"],
},
},
mappings: {
properties: {
"@timestamp": {
type: "date",
},
metricset: {
type: "keyword",
time_series_dimension: true,
},
k8s: {
properties: {
tx: {
type: "long",
},
rx: {
type: "long",
},
},
},
},
},
},
});
console.log(response);
const response1 = await client.indices.putIndexTemplate({
name: 1,
index_patterns: ["k8s*"],
composed_of: ["source_template"],
data_stream: {},
});
console.log(response1);
コンソール
POST /_component_template/source_template
{
"template": {
"settings": {
"index": {
"number_of_replicas": 2,
"number_of_shards": 2,
"mode": "time_series",
"routing_path": [ "metricset" ]
}
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"metricset": {
"type": "keyword",
"time_series_dimension": true
},
"k8s": {
"properties": {
"tx": { "type": "long" },
"rx": { "type": "long" }
}
}
}
}
}
}
POST /_index_template/1
{
"index_patterns": [
"k8s*"
],
"composed_of": [
"source_template"
],
"data_stream": {}
}
/k8s/_settings
の出力例は次のようになります:
コンソール-結果
{
".ds-k8s-2023.09.01-000002": {
"settings": {
"index": {
"mode": "time_series",
"routing": {
"allocation": {
"include": {
"_tier_preference": "data_hot"
}
}
},
"hidden": "true",
"number_of_shards": "2",
"time_series": {
"end_time": "2023-09-01T14:00:00.000Z",
"start_time": "2023-09-01T10:00:00.000Z"
},
"provided_name": ".ds-k9s-2023.09.01-000002",
"creation_date": "1694439857608",
"number_of_replicas": "2",
"routing_path": [
"metricset"
],
...
}
}
},
".ds-k8s-2023.09.01-000001": {
"settings": {
"index": {
"mode": "time_series",
"routing": {
"allocation": {
"include": {
"_tier_preference": "data_hot"
}
}
},
"hidden": "true",
"number_of_shards": "2",
"time_series": {
"end_time": "2023-09-01T10:00:00.000Z",
"start_time": "2023-09-01T06:00:00.000Z"
},
"provided_name": ".ds-k9s-2023.09.01-000001",
"creation_date": "1694439837126",
"number_of_replicas": "2",
"routing_path": [
"metricset"
],
...
}
}
}
}
この TSDS を再インデックスするには、宛先データストリームでそのインデックステンプレートを再利用しないでください。機能に影響を与えないようにするためです。代わりに、ソース TSDS のテンプレートをクローンし、次の変更を適用します:
index.time_series.start_time
とindex.time_series.end_time
のインデックス設定を明示的に設定します。これらの値は、再インデックスするデータストリームの最小および最大@timestamp
値に基づくべきです。これにより、初期のバックインデックスはソースデータストリームに含まれるすべてのデータをロードできます。index.number_of_shards
のインデックス設定を、ソースデータストリームのすべてのバックインデックスのすべてのプライマリシャードの合計に設定します。これにより、各シャードが別々のスレッド(またはそれ以上)で処理されるため、同じレベルの検索並列性が維持されます。index.lifecycle.name
のインデックス設定がある場合は解除します。これにより、再インデックス中に ILM が宛先データストリームを変更するのを防ぎます。- (オプション)
index.number_of_replicas
をゼロに設定します。これにより、再インデックス操作が高速化されます。データがコピーされるため、レプリカが不足していることによるデータ損失のリスクは限られています。
上記の例をソース TSDS として使用すると、宛先 TSDS のテンプレートは次のようになります:
Python
resp = client.cluster.put_component_template(
name="destination_template",
template={
"settings": {
"index": {
"number_of_replicas": 0,
"number_of_shards": 4,
"mode": "time_series",
"routing_path": [
"metricset"
],
"time_series": {
"end_time": "2023-09-01T14:00:00.000Z",
"start_time": "2023-09-01T06:00:00.000Z"
}
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"metricset": {
"type": "keyword",
"time_series_dimension": True
},
"k8s": {
"properties": {
"tx": {
"type": "long"
},
"rx": {
"type": "long"
}
}
}
}
}
},
)
print(resp)
resp1 = client.indices.put_index_template(
name="2",
index_patterns=[
"k8s*"
],
composed_of=[
"destination_template"
],
data_stream={},
)
print(resp1)
Js
const response = await client.cluster.putComponentTemplate({
name: "destination_template",
template: {
settings: {
index: {
number_of_replicas: 0,
number_of_shards: 4,
mode: "time_series",
routing_path: ["metricset"],
time_series: {
end_time: "2023-09-01T14:00:00.000Z",
start_time: "2023-09-01T06:00:00.000Z",
},
},
},
mappings: {
properties: {
"@timestamp": {
type: "date",
},
metricset: {
type: "keyword",
time_series_dimension: true,
},
k8s: {
properties: {
tx: {
type: "long",
},
rx: {
type: "long",
},
},
},
},
},
},
});
console.log(response);
const response1 = await client.indices.putIndexTemplate({
name: 2,
index_patterns: ["k8s*"],
composed_of: ["destination_template"],
data_stream: {},
});
console.log(response1);
コンソール
POST /_component_template/destination_template
{
"template": {
"settings": {
"index": {
"number_of_replicas": 0,
"number_of_shards": 4,
"mode": "time_series",
"routing_path": [ "metricset" ],
"time_series": {
"end_time": "2023-09-01T14:00:00.000Z",
"start_time": "2023-09-01T06:00:00.000Z"
}
}
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"metricset": {
"type": "keyword",
"time_series_dimension": true
},
"k8s": {
"properties": {
"tx": { "type": "long" },
"rx": { "type": "long" }
}
}
}
}
}
}
POST /_index_template/2
{
"index_patterns": [
"k8s*"
],
"composed_of": [
"destination_template"
],
"data_stream": {}
}
再インデックス
再インデックス API を呼び出します。例えば:
Python
resp = client.reindex(
source={
"index": "k8s"
},
dest={
"index": "k9s",
"op_type": "create"
},
)
print(resp)
Ruby
response = client.reindex(
body: {
source: {
index: 'k8s'
},
dest: {
index: 'k9s',
op_type: 'create'
}
}
)
puts response
Js
const response = await client.reindex({
source: {
index: "k8s",
},
dest: {
index: "k9s",
op_type: "create",
},
});
console.log(response);
コンソール
POST /_reindex
{
"source": {
"index": "k8s"
},
"dest": {
"index": "k9s",
"op_type": "create"
}
}
宛先インデックステンプレートの復元
再インデックス操作が完了したら、宛先 TSDS のインデックステンプレートを次のように復元します:
index.time_series.start_time
とindex.time_series.end_time
の上書きを削除します。index.number_of_shards
、index.number_of_replicas
、およびindex.lifecycle.name
の値を適用可能な場合は復元します。
前の例を使用すると、宛先テンプレートは次のように変更されます:
Python
resp = client.cluster.put_component_template(
name="destination_template",
template={
"settings": {
"index": {
"number_of_replicas": 2,
"number_of_shards": 2,
"mode": "time_series",
"routing_path": [
"metricset"
]
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"metricset": {
"type": "keyword",
"time_series_dimension": True
},
"k8s": {
"properties": {
"tx": {
"type": "long"
},
"rx": {
"type": "long"
}
}
}
}
}
},
)
print(resp)
Ruby
response = client.cluster.put_component_template(
name: 'destination_template',
body: {
template: {
settings: {
index: {
number_of_replicas: 2,
number_of_shards: 2,
mode: 'time_series',
routing_path: [
'metricset'
]
}
},
mappings: {
properties: {
"@timestamp": {
type: 'date'
},
metricset: {
type: 'keyword',
time_series_dimension: true
},
"k8s": {
properties: {
tx: {
type: 'long'
},
rx: {
type: 'long'
}
}
}
}
}
}
}
)
puts response
Js
const response = await client.cluster.putComponentTemplate({
name: "destination_template",
template: {
settings: {
index: {
number_of_replicas: 2,
number_of_shards: 2,
mode: "time_series",
routing_path: ["metricset"],
},
},
mappings: {
properties: {
"@timestamp": {
type: "date",
},
metricset: {
type: "keyword",
time_series_dimension: true,
},
k8s: {
properties: {
tx: {
type: "long",
},
rx: {
type: "long",
},
},
},
},
},
},
});
console.log(response);
コンソール
POST /_component_template/destination_template
{
"template": {
"settings": {
"index": {
"number_of_replicas": 2,
"number_of_shards": 2,
"mode": "time_series",
"routing_path": [ "metricset" ]
}
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"metricset": {
"type": "keyword",
"time_series_dimension": true
},
"k8s": {
"properties": {
"tx": { "type": "long" },
"rx": { "type": "long" }
}
}
}
}
}
}
次に、条件を設定せずに宛先データストリームで rollover
API を呼び出します。
Python
resp = client.indices.rollover(
alias="k9s",
)
print(resp)
Ruby
response = client.indices.rollover(
alias: 'k9s'
)
puts response
Js
const response = await client.indices.rollover({
alias: "k9s",
});
console.log(response);
コンソール
POST /k9s/_rollover/
これにより、更新されたインデックス設定を持つ新しいバックインデックスが作成されます。宛先データストリームは新しいドキュメントを受け入れる準備が整いました。
初期のバックインデックスは、ソースデータストリームから派生したタイムスタンプの範囲内でドキュメントを受け入れることができます。これが望ましくない場合は、明示的に 読み取り専用 とマークしてください。