fix: Return `_field` and `_measurement` tags correctly for read_group (#2736)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
f8fd61e2c3
commit
bf73bba1dc
|
@ -112,6 +112,16 @@ fn group_description_to_frames(group_description: GroupDescription) -> Vec<Frame
|
|||
.map(|(k, v)| (k.bytes().collect(), v.bytes().collect()))
|
||||
.unzip();
|
||||
|
||||
// Flux expects there to be `_field` and `_measurement` as the
|
||||
// first two "tags". Note this means the lengths of tag_keys and
|
||||
// partition_key_values is different.
|
||||
//
|
||||
// See https://github.com/influxdata/influxdb_iox/issues/2690 for gory details
|
||||
let tag_keys = vec![b"_field".to_vec(), b"_measurement".to_vec()]
|
||||
.into_iter()
|
||||
.chain(tag_keys.into_iter())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let group_frame = GroupFrame {
|
||||
tag_keys,
|
||||
partition_key_vals,
|
||||
|
@ -232,8 +242,8 @@ fn field_to_data(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
// Convert the tag=value pairs from the series set to the correct gRPC
|
||||
// format, and add the _f and _m tags for the field name and measurement
|
||||
/// Convert the tag=value pairs from the series set to the correct gRPC
|
||||
/// format, and add the _f and _m tags for the field name and measurement
|
||||
fn convert_tags(table_name: &str, field_name: &str, tags: &[(Arc<str>, Arc<str>)]) -> Vec<Tag> {
|
||||
// Special case "measurement" name which is modeled as a tag of
|
||||
// "_measurement" and "field" which is modeled as a tag of "_field"
|
||||
|
@ -548,8 +558,9 @@ mod tests {
|
|||
.map(|f| dump_frame(f))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let expected_frames =
|
||||
vec!["GroupFrame, tag_keys: tag1,tag2, partition_key_vals: val1,val2"];
|
||||
let expected_frames = vec![
|
||||
"GroupFrame, tag_keys: _field,_measurement,tag1,tag2, partition_key_vals: val1,val2",
|
||||
];
|
||||
|
||||
assert_eq!(
|
||||
dumped_frames, expected_frames,
|
||||
|
|
|
@ -401,7 +401,7 @@ async fn test_read_group_none_agg(
|
|||
};
|
||||
|
||||
let expected_group_frames = vec![
|
||||
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu1",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
|
@ -410,7 +410,7 @@ async fn test_read_group_none_agg(
|
|||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"10,11\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"71,72\"",
|
||||
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu2",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
|
@ -424,11 +424,9 @@ async fn test_read_group_none_agg(
|
|||
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
|
||||
|
||||
assert_eq!(
|
||||
expected_group_frames,
|
||||
actual_group_frames,
|
||||
"Expected:\n{}\nActual:\n{}",
|
||||
expected_group_frames.join("\n"),
|
||||
actual_group_frames.join("\n")
|
||||
expected_group_frames, actual_group_frames,
|
||||
"Expected:\n{:#?}\nActual:\n{:#?}",
|
||||
expected_group_frames, actual_group_frames
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -453,12 +451,12 @@ async fn test_read_group_none_agg_with_predicate(
|
|||
};
|
||||
|
||||
let expected_group_frames = vec![
|
||||
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu1",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000], values: \"20\"",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000], values: \"10\"",
|
||||
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu2",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000], values: \"40\"",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
|
||||
|
@ -468,11 +466,9 @@ async fn test_read_group_none_agg_with_predicate(
|
|||
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
|
||||
|
||||
assert_eq!(
|
||||
expected_group_frames,
|
||||
actual_group_frames,
|
||||
"Expected:\n{}\nActual:\n{}",
|
||||
expected_group_frames.join("\n"),
|
||||
actual_group_frames.join("\n")
|
||||
expected_group_frames, actual_group_frames,
|
||||
"Expected:\n{:#?}\nActual:\n{:#?}",
|
||||
expected_group_frames, actual_group_frames
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -500,7 +496,7 @@ async fn test_read_group_sum_agg(
|
|||
};
|
||||
|
||||
let expected_group_frames = vec![
|
||||
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu1",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
|
@ -509,7 +505,7 @@ async fn test_read_group_sum_agg(
|
|||
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"143\"",
|
||||
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu2",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"81\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
|
@ -523,11 +519,9 @@ async fn test_read_group_sum_agg(
|
|||
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
|
||||
|
||||
assert_eq!(
|
||||
expected_group_frames,
|
||||
actual_group_frames,
|
||||
"Expected:\n{}\nActual:\n{}",
|
||||
expected_group_frames.join("\n"),
|
||||
actual_group_frames.join("\n")
|
||||
expected_group_frames, actual_group_frames,
|
||||
"Expected:\n{:#?}\nActual:\n{:#?}",
|
||||
expected_group_frames, actual_group_frames,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -555,7 +549,7 @@ async fn test_read_group_last_agg(
|
|||
};
|
||||
|
||||
let expected_group_frames = vec![
|
||||
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu1",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
|
@ -564,7 +558,7 @@ async fn test_read_group_last_agg(
|
|||
"FloatPointsFrame, timestamps: [2000], values: \"11\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"72\"",
|
||||
"GroupFrame, tag_keys: cpu, partition_key_vals: cpu2",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
|
@ -578,11 +572,9 @@ async fn test_read_group_last_agg(
|
|||
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
|
||||
|
||||
assert_eq!(
|
||||
expected_group_frames,
|
||||
actual_group_frames,
|
||||
"Expected:\n{}\nActual:\n{}",
|
||||
expected_group_frames.join("\n"),
|
||||
actual_group_frames.join("\n")
|
||||
expected_group_frames, actual_group_frames,
|
||||
"Expected:\n{:#?}\nActual:\n{:#?}",
|
||||
expected_group_frames, actual_group_frames,
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue