fix: do not produce gRPC series frames for fields that only contain null values (#558)
* test: add test for field columns with only nulls * fix: do not produce series for null fields, tests for same * fix: remove uneeded test printlnspull/24376/head
parent
d34e09dab1
commit
1740e26ec3
|
@ -458,7 +458,7 @@ mod tests {
|
|||
let input = parse_to_iterator(
|
||||
schema,
|
||||
"one,ten,10.0,1,1000\n\
|
||||
one,ten,10.1,2,2000\n",
|
||||
one,ten,10.1,2,2000\n",
|
||||
);
|
||||
|
||||
let table_name = "foo";
|
||||
|
@ -497,6 +497,59 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_single_series_no_tags_nulls() -> Result<()> {
|
||||
// single series
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("tag_a", DataType::Utf8, true),
|
||||
Field::new("tag_b", DataType::Utf8, true),
|
||||
Field::new("float_field", DataType::Float64, true),
|
||||
Field::new("int_field", DataType::Int64, true),
|
||||
Field::new("time", DataType::Int64, false),
|
||||
]));
|
||||
// send no values in the int_field colum
|
||||
let input = parse_to_iterator(
|
||||
schema,
|
||||
"one,ten,10.0,,1000\n\
|
||||
one,ten,10.1,,2000\n",
|
||||
);
|
||||
|
||||
let table_name = "foo";
|
||||
let tag_columns = [];
|
||||
let field_columns = ["float_field"];
|
||||
let results = convert(table_name, &tag_columns, &field_columns, input).await;
|
||||
|
||||
assert_eq!(results.len(), 1);
|
||||
let series_set = results[0].as_ref().expect("Correctly converted");
|
||||
|
||||
assert_eq!(*series_set.table_name, "foo");
|
||||
assert!(series_set.tags.is_empty());
|
||||
assert_eq!(series_set.timestamp_index, 4);
|
||||
assert_eq!(series_set.field_indices, Arc::new(vec![2]));
|
||||
assert_eq!(series_set.start_row, 0);
|
||||
assert_eq!(series_set.num_rows, 2);
|
||||
|
||||
// Test that the record batch made it through
|
||||
let expected_data = vec![
|
||||
"+-------+-------+-------------+-----------+------+",
|
||||
"| tag_a | tag_b | float_field | int_field | time |",
|
||||
"+-------+-------+-------------+-----------+------+",
|
||||
"| one | ten | 10 | | 1000 |",
|
||||
"| one | ten | 10.1 | | 2000 |",
|
||||
"+-------+-------+-------------+-----------+------+",
|
||||
"",
|
||||
];
|
||||
|
||||
let actual_data = pretty_format_batches(&[series_set.batch.clone()])
|
||||
.expect("formatting batch")
|
||||
.split('\n')
|
||||
.map(|s| s.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
assert_eq!(expected_data, actual_data);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_single_series_one_tag() -> Result<()> {
|
||||
// single series
|
||||
|
|
|
@ -131,16 +131,30 @@ fn data_type(array: &ArrayRef) -> Result<DataType> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns true if the array is entirely null between start_row and
|
||||
/// start_row+num_rows
|
||||
fn is_all_null(arr: &ArrayRef, start_row: usize, num_rows: usize) -> bool {
|
||||
let end_row = start_row + num_rows;
|
||||
(start_row..end_row).all(|i| arr.is_null(i))
|
||||
}
|
||||
|
||||
// Convert and append a single field to a sequence of frames
|
||||
fn field_to_data(frames: &mut Vec<Data>, series_set: &SeriesSet, field_index: usize) -> Result<()> {
|
||||
let batch = &series_set.batch;
|
||||
let schema = batch.schema();
|
||||
|
||||
let field = schema.field(field_index);
|
||||
let array = batch.column(field_index);
|
||||
|
||||
let start_row = series_set.start_row;
|
||||
let num_rows = series_set.num_rows;
|
||||
|
||||
// No values for this field are in the array so it does not
|
||||
// contribute to a series.
|
||||
if field.is_nullable() && is_all_null(array, start_row, num_rows) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let series_frame = SeriesFrame {
|
||||
tags: convert_tags(
|
||||
series_set.table_name.as_ref(),
|
||||
|
@ -356,7 +370,6 @@ mod tests {
|
|||
|
||||
let response =
|
||||
series_set_to_read_response(series_set).expect("Correctly converted series set");
|
||||
println!("Response is: {:#?}", response);
|
||||
|
||||
assert_eq!(response.frames.len(), 8); // 2 per field x 4 fields = 8
|
||||
|
||||
|
@ -384,6 +397,67 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_series_set_conversion_with_null_field() {
|
||||
// single series
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
ArrowField::new("state", ArrowDataType::Utf8, true),
|
||||
ArrowField::new("int_field", ArrowDataType::Int64, true),
|
||||
ArrowField::new("float_field", ArrowDataType::Float64, true),
|
||||
ArrowField::new("time", ArrowDataType::Int64, false),
|
||||
]));
|
||||
|
||||
let tag_array: ArrayRef = Arc::new(StringArray::from(vec!["MA", "MA", "MA", "MA"]));
|
||||
let int_array: ArrayRef = Arc::new(Int64Array::from(vec![None, None, None, None]));
|
||||
let float_array: ArrayRef = Arc::new(Float64Array::from(vec![
|
||||
Some(10.1),
|
||||
Some(20.1),
|
||||
None,
|
||||
Some(40.1),
|
||||
]));
|
||||
|
||||
let timestamp_array: ArrayRef = Arc::new(Int64Array::from(vec![1000, 2000, 3000, 4000]));
|
||||
|
||||
let batch = RecordBatch::try_new(
|
||||
schema,
|
||||
vec![tag_array, int_array, float_array, timestamp_array],
|
||||
)
|
||||
.expect("created new record batch");
|
||||
|
||||
let series_set = SeriesSet {
|
||||
table_name: Arc::new("the_table".into()),
|
||||
tags: vec![(Arc::new("state".into()), Arc::new("MA".into()))],
|
||||
timestamp_index: 3,
|
||||
field_indices: Arc::new(vec![1, 2]),
|
||||
start_row: 0,
|
||||
num_rows: batch.num_rows(),
|
||||
batch,
|
||||
};
|
||||
|
||||
// Expect only a single series (for the data in float_field, int_field is all
|
||||
// nulls)
|
||||
|
||||
let response =
|
||||
series_set_to_read_response(series_set).expect("Correctly converted series set");
|
||||
|
||||
let dumped_frames = response
|
||||
.frames
|
||||
.iter()
|
||||
.map(|f| dump_frame(f))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let expected_frames = vec![
|
||||
"SeriesFrame, tags: _field=float_field,_measurement=the_table,state=MA, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000, 2000, 3000, 4000], values: \"10.1,20.1,0,40.1\"",
|
||||
];
|
||||
|
||||
assert_eq!(
|
||||
dumped_frames, expected_frames,
|
||||
"Expected:\n{:#?}\nActual:\n{:#?}",
|
||||
expected_frames, dumped_frames
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_group_group_conversion() {
|
||||
let group_description = GroupDescription {
|
||||
|
@ -397,7 +471,6 @@ mod tests {
|
|||
|
||||
let response = series_set_item_to_read_response(grouped_series_set_item)
|
||||
.expect("Correctly converted grouped_series_set_item");
|
||||
println!("Response is: {:#?}", response);
|
||||
|
||||
let dumped_frames = response
|
||||
.frames
|
||||
|
@ -443,8 +516,6 @@ mod tests {
|
|||
let response = series_set_item_to_read_response(series_set_item)
|
||||
.expect("Correctly converted series_set_item");
|
||||
|
||||
println!("Response is: {:#?}", response);
|
||||
|
||||
let dumped_frames = response
|
||||
.frames
|
||||
.iter()
|
||||
|
|
|
@ -261,8 +261,6 @@ async fn read_and_write_data() -> Result<()> {
|
|||
.flat_map(|f| f.data)
|
||||
.collect();
|
||||
|
||||
assert_eq!(frames.len(), 10);
|
||||
|
||||
let expected_frames = substitute_nanos(ns_since_epoch, &[
|
||||
"SeriesFrame, tags: _field=value,_measurement=cpu_load_short,host=server01,region=, type: 0",
|
||||
"FloatPointsFrame, timestamps: [ns1], values: \"27.99\"",
|
||||
|
|
Loading…
Reference in New Issue