ロールアップの始め方
8.11.0で非推奨。
ロールアップは将来のバージョンで削除されます。代わりにmigrateをダウンサンプリングに移行してください。
8.15.0以降、ロールアップを使用していないクラスターでput job APIを呼び出すと、ロールアップの非推奨と計画的な削除に関するメッセージと共に失敗します。クラスターは、put job APIを実行するためにロールアップジョブまたはロールアップインデックスを含む必要があります。
ロールアップ機能を使用するには、1つ以上の「ロールアップジョブ」を作成する必要があります。これらのジョブはバックグラウンドで継続的に実行され、指定したインデックスまたはインデックスをロールアップし、ロールアップされたドキュメントを選択した二次インデックスに配置します。
センサーデータを保持する一連の日次インデックス(sensor-2017-01-01
、sensor-2017-01-02
など)があると想像してください。サンプルドキュメントは次のようになります:
Js
{
"timestamp": 1516729294000,
"temperature": 200,
"voltage": 5.2,
"node": "a"
}
ロールアップジョブの作成
これらのドキュメントを時間ごとの要約にロールアップしたいと考えています。これにより、1時間以上の任意の時間間隔でレポートやダッシュボードを生成できます。ロールアップジョブは次のようになります:
Python
resp = client.rollup.put_job(
id="sensor",
index_pattern="sensor-*",
rollup_index="sensor_rollup",
cron="*/30 * * * * ?",
page_size=1000,
groups={
"date_histogram": {
"field": "timestamp",
"fixed_interval": "60m"
},
"terms": {
"fields": [
"node"
]
}
},
metrics=[
{
"field": "temperature",
"metrics": [
"min",
"max",
"sum"
]
},
{
"field": "voltage",
"metrics": [
"avg"
]
}
],
)
print(resp)
Js
const response = await client.rollup.putJob({
id: "sensor",
index_pattern: "sensor-*",
rollup_index: "sensor_rollup",
cron: "*/30 * * * * ?",
page_size: 1000,
groups: {
date_histogram: {
field: "timestamp",
fixed_interval: "60m",
},
terms: {
fields: ["node"],
},
},
metrics: [
{
field: "temperature",
metrics: ["min", "max", "sum"],
},
{
field: "voltage",
metrics: ["avg"],
},
],
});
console.log(response);
コンソール
PUT _rollup/job/sensor
{
"index_pattern": "sensor-*",
"rollup_index": "sensor_rollup",
"cron": "*/30 * * * * ?",
"page_size": 1000,
"groups": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "60m"
},
"terms": {
"fields": [ "node" ]
}
},
"metrics": [
{
"field": "temperature",
"metrics": [ "min", "max", "sum" ]
},
{
"field": "voltage",
"metrics": [ "avg" ]
}
]
}
ジョブに「sensor」というIDを与え(URL内:PUT _rollup/job/sensor
)、インデックスパターン"sensor-*"
をロールアップするように指示します。このジョブは、そのパターンに一致するインデックスを見つけてロールアップします。ロールアップの要約は"sensor_rollup"
インデックスに保存されます。
代わりに、cronが毎日真夜中に1回実行されるように設定されている場合、ジョブは過去24時間分のデータを処理します。この選択は、ロールアップの「リアルタイム」性や、継続的に処理するかオフピーク時間に移動するかに基づいて、主に好みによります。
次に、`````groups`````のセットを定義します。基本的に、データをクエリする際に後でピボットしたい次元を定義しています。このジョブのグループ化により、`````date_histogram`````集計を`````timestamp`````フィールドに対して、時間ごとの間隔でロールアップできます。また、`````node`````フィールドに対して用語集計を実行することもできます。
**日付ヒストグラムの間隔とcronスケジュール**
ジョブのcronが30秒ごとに実行されるように設定されている一方で、date_histogramは60分間隔でロールアップするように設定されていることに注意してください。これらはどのように関連していますか?
date_histogramは保存されたデータの粒度を制御します。データは時間ごとの間隔でロールアップされ、より細かい粒度でクエリすることはできません。cronは単にプロセスがロールアップする新しいデータを探すタイミングを制御します。30秒ごとに新しい1時間分のデータがあるかどうかを確認し、ロールアップします。なければ、ジョブは再びスリープに戻ります。
多くの場合、大きな間隔(1時間)に対して非常に小さなcron(30秒)を定義することは理にかなわないことが多いです。なぜなら、ほとんどのアクティベーションは単にスリープに戻るからです。しかし、それでも問題はありません。ジョブは正しいことを行います。
データのために生成されるべきグループを定義した後、次に収集すべきメトリックを構成します。デフォルトでは、各グループに対して`````doc_counts`````のみが収集されます。ロールアップを有用にするために、平均、最小、最大などのメトリックを追加することがよくあります。この例では、メトリックは非常に単純です:`````temperature`````フィールドの最小/最大/合計を保存し、`````voltage`````フィールドの平均を保存したいと考えています。
**平均は合成できない?!**
以前にロールアップを使用したことがある場合、平均に対して慎重になるかもしれません。10分間隔で保存された平均は、通常は大きな間隔には役に立ちません。6つの10分間の平均を平均して1時間の平均を求めることはできません。平均の平均は総平均に等しくありません。
このため、他のシステムは平均を省略するか、より柔軟なクエリをサポートするために複数の間隔で平均を保存する傾向があります。
代わりに、データロールアップ機能は、定義された時間間隔のために`````count`````と`````sum`````を保存します。これにより、定義された間隔以上の任意の間隔で平均を再構築できます。これにより、最小限のストレージコストで最大の柔軟性が得られます…そして、平均の正確性について心配する必要はありません(ここでは平均の平均はありません!)
ジョブ構文の詳細については、[ロールアップジョブの作成](/read/elasticsearch-8-15/72462e080b27c599.md)を参照してください。
上記のコマンドを実行してジョブを作成すると、次のような応答が得られます:
[](#bc5fcc40c29087a0df7b5405bb70de5c)
#### コンソール-結果
``````console-result
{
"acknowledged": true
}
`
ジョブの開始
ジョブが作成されると、非アクティブ状態になります。ジョブはデータの処理を開始する前に開始する必要があります(これにより、後で一時的に停止するために構成を削除せずに停止できます)。
ジョブを開始するには、このコマンドを実行します:
Python
resp = client.rollup.start_job(
id="sensor",
)
print(resp)
Ruby
response = client.rollup.start_job(
id: 'sensor'
)
puts response
Js
const response = await client.rollup.startJob({
id: "sensor",
});
console.log(response);
コンソール
POST _rollup/job/sensor/_start
ロールアップ結果の検索
ジョブが実行され、いくつかのデータが処理された後、ロールアップ検索エンドポイントを使用して検索を行うことができます。ロールアップ機能は、慣れ親しんだ同じQuery DSL構文を使用できるように設計されています…ただし、ロールアップされたデータで実行されます。
たとえば、このクエリを考えてみてください:
Python
resp = client.rollup.rollup_search(
index="sensor_rollup",
size=0,
aggregations={
"max_temperature": {
"max": {
"field": "temperature"
}
}
},
)
print(resp)
Ruby
response = client.rollup.rollup_search(
index: 'sensor_rollup',
body: {
size: 0,
aggregations: {
max_temperature: {
max: {
field: 'temperature'
}
}
}
}
)
puts response
Js
const response = await client.rollup.rollupSearch({
index: "sensor_rollup",
size: 0,
aggregations: {
max_temperature: {
max: {
field: "temperature",
},
},
},
});
console.log(response);
コンソール
GET /sensor_rollup/_rollup_search
{
"size": 0,
"aggregations": {
"max_temperature": {
"max": {
"field": "temperature"
}
}
}
}
これは、temperature
フィールドの最大値を計算する単純な集計です。しかし、sensor_rollup
インデックスに送信されていることに気付くでしょう。生のsensor-*
インデックスではなく、_rollup_search
エンドポイントを使用していることにも気付くでしょう。そうでなければ、構文は期待通りです。
そのクエリを実行すると、通常の集計応答のように見える結果が得られます:
コンソール-結果
{
"took" : 102,
"timed_out" : false,
"terminated_early" : false,
"_shards" : ... ,
"hits" : {
"total" : {
"value": 0,
"relation": "eq"
},
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"max_temperature" : {
"value" : 202.0
}
}
}
唯一の顕著な違いは、ロールアップ検索結果にはhits
がゼロであることです。なぜなら、もはや元のライブデータを検索していないからです。それ以外は、構文は同一です。
ここにはいくつかの興味深いポイントがあります。まず、データは時間ごとの間隔でロールアップされ、ノード名で分割されているにもかかわらず、実行したクエリはすべてのドキュメントの最大温度を計算しています。ジョブで構成されたgroups
はクエリの必須要素ではなく、単にパーティションを分けるための追加の次元です。第二に、リクエストと応答の構文は通常のDSLとほぼ同じであり、ダッシュボードやアプリケーションに統合しやすくなっています。
最後に、定義したグループフィールドを使用して、より複雑なクエリを構築できます:
Python
resp = client.rollup.rollup_search(
index="sensor_rollup",
size=0,
aggregations={
"timeline": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "7d"
},
"aggs": {
"nodes": {
"terms": {
"field": "node"
},
"aggs": {
"max_temperature": {
"max": {
"field": "temperature"
}
},
"avg_voltage": {
"avg": {
"field": "voltage"
}
}
}
}
}
}
},
)
print(resp)
Ruby
response = client.rollup.rollup_search(
index: 'sensor_rollup',
body: {
size: 0,
aggregations: {
timeline: {
date_histogram: {
field: 'timestamp',
fixed_interval: '7d'
},
aggregations: {
nodes: {
terms: {
field: 'node'
},
aggregations: {
max_temperature: {
max: {
field: 'temperature'
}
},
avg_voltage: {
avg: {
field: 'voltage'
}
}
}
}
}
}
}
}
)
puts response
Js
const response = await client.rollup.rollupSearch({
index: "sensor_rollup",
size: 0,
aggregations: {
timeline: {
date_histogram: {
field: "timestamp",
fixed_interval: "7d",
},
aggs: {
nodes: {
terms: {
field: "node",
},
aggs: {
max_temperature: {
max: {
field: "temperature",
},
},
avg_voltage: {
avg: {
field: "voltage",
},
},
},
},
},
},
},
});
console.log(response);
コンソール
GET /sensor_rollup/_rollup_search
{
"size": 0,
"aggregations": {
"timeline": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "7d"
},
"aggs": {
"nodes": {
"terms": {
"field": "node"
},
"aggs": {
"max_temperature": {
"max": {
"field": "temperature"
}
},
"avg_voltage": {
"avg": {
"field": "voltage"
}
}
}
}
}
}
}
}
コンソール-結果
{
"took" : 93,
"timed_out" : false,
"terminated_early" : false,
"_shards" : ... ,
"hits" : {
"total" : {
"value": 0,
"relation": "eq"
},
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"timeline" : {
"buckets" : [
{
"key_as_string" : "2018-01-18T00:00:00.000Z",
"key" : 1516233600000,
"doc_count" : 6,
"nodes" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "a",
"doc_count" : 2,
"max_temperature" : {
"value" : 202.0
},
"avg_voltage" : {
"value" : 5.1499998569488525
}
},
{
"key" : "b",
"doc_count" : 2,
"max_temperature" : {
"value" : 201.0
},
"avg_voltage" : {
"value" : 5.700000047683716
}
},
{
"key" : "c",
"doc_count" : 2,
"max_temperature" : {
"value" : 202.0
},
"avg_voltage" : {
"value" : 4.099999904632568
}
}
]
}
}
]
}
}
}
より複雑(日時ヒストグラムと用語集計、さらに追加の平均メトリック)であることに加えて、date_histogramは7d
間隔を使用していることに気付くでしょう。60m
の代わりに。
結論
このクイックスタートは、ロールアップが提供するコア機能の簡潔な概要を提供したはずです。ロールアップを設定する際に考慮すべきヒントや事項がさらにあります。これらはこのセクションの残りの部分で見つけることができます。また、利用可能なものの概要についてはREST APIを探索することもできます。