fix(pb): encoding entirely NULL columns (#4272)
This commit changes the protobuf record batch encoding to skip entirely NULL columns when serialising. This prevents the deserialisation from erroring due to a column type inference failure. Prior to this commit, when the system was presented a record batch such as this: | time | A | B | | ---------- | ---- | ---- | | 1970-01-01 | 1 | NULL | | 1970-07-05 | NULL | 1 | Which would be partitioned by YMD into two separate partitions: | time | A | B | | ---------- | ---- | ---- | | 1970-01-01 | 1 | NULL | and: | time | A | B | | ---------- | ---- | ---- | | 1970-07-05 | NULL | 1 | Both partitions would contain an entirely NULL column. Both of these partitioned record batches would be successfully encoded, but decoding the partition fails due to the inability to infer a column type from the serialised format which contains no values, which on the wire, looks like: Column { column_name: "B", semantic_type: Field, values: Some( Values { i64_values: [], f64_values: [], u64_values: [], string_values: [], bool_values: [], bytes_values: [], packed_string_values: None, interned_string_values: None, }, ), null_mask: [ 1, ], }, In a column that is not entirely NULL, one of the "Values" fields would be non-empty, and the decoder would use this to infer the type of the column. Because we have chosen to not differentiate between "NULL" and "empty" in our proto encoding, the decoder cannot infer which field within the "Values" struct the column belongs to - all are valid, but empty. This commit prevents this type inference failure by skipping any columns that are entirely NULL during serialisation, preventing the deserialiser from having to process columns with ambiguous types.pull/24376/head
parent
41ee23463d
commit
43300878bc
|
@ -3224,6 +3224,7 @@ name = "mutable_batch_pb"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow_util",
|
||||
"data_types",
|
||||
"dml",
|
||||
"generated_types",
|
||||
"hashbrown 0.12.1",
|
||||
|
|
|
@ -16,3 +16,4 @@ workspace-hack = { path = "../workspace-hack"}
|
|||
|
||||
[dev-dependencies]
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
data_types = { path = "../data_types" }
|
||||
|
|
|
@ -28,7 +28,20 @@ pub fn encode_batch(table_name: &str, batch: &MutableBatch) -> TableBatch {
|
|||
table_name: table_name.to_string(),
|
||||
columns: batch
|
||||
.columns()
|
||||
.map(|(column_name, column)| encode_column(column_name, column))
|
||||
.filter_map(|(column_name, column)| {
|
||||
// Skip encoding any entirely NULL columns.
|
||||
//
|
||||
// This prevents a type-inference error during deserialisation
|
||||
// of the proto wire message.
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/4272
|
||||
//
|
||||
if column.valid_mask().is_all_unset() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(encode_column(column_name, column))
|
||||
})
|
||||
.collect(),
|
||||
row_count: batch.rows() as u32,
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use arrow_util::assert_batches_eq;
|
||||
use mutable_batch::MutableBatch;
|
||||
use data_types::{PartitionTemplate, TemplatePart};
|
||||
use mutable_batch::{writer::Writer, MutableBatch, PartitionWrite, WritePayload};
|
||||
use mutable_batch_pb::{decode::write_table_batch, encode::encode_batch};
|
||||
use schema::selection::Selection;
|
||||
|
||||
|
@ -36,3 +37,141 @@ fn test_encode_decode() {
|
|||
|
||||
assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]);
|
||||
}
|
||||
|
||||
// This test asserts columns containing no values do not prevent an encoded
|
||||
// batch from being deserialised:
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/4272
|
||||
//
|
||||
// The test constructs a table such that after partitioning, one entire column
|
||||
// is NULL within a partition. In this test case, the following table is
|
||||
// partitioned by YMD:
|
||||
//
|
||||
//
|
||||
// | time | A | B |
|
||||
// | ---------- | ---- | ---- |
|
||||
// | 1970-01-01 | 1 | NULL |
|
||||
// | 1970-07-05 | NULL | 1 |
|
||||
//
|
||||
// Yielding two partitions:
|
||||
//
|
||||
// | time | A | B |
|
||||
// | ---------- | ---- | ---- |
|
||||
// | 1970-01-01 | 1 | NULL |
|
||||
//
|
||||
// and:
|
||||
//
|
||||
// | time | A | B |
|
||||
// | ---------- | ---- | ---- |
|
||||
// | 1970-07-05 | NULL | 1 |
|
||||
//
|
||||
// In both partitions, one column is composed entirely of NULL values.
|
||||
//
|
||||
// Encoding each of these partitions succeeds, but decoding the partition fails
|
||||
// due to the inability to infer a column type from the serialised format which
|
||||
// contains no values:
|
||||
//
|
||||
// ```
|
||||
// Column {
|
||||
// column_name: "B",
|
||||
// semantic_type: Field,
|
||||
// values: Some(
|
||||
// Values {
|
||||
// i64_values: [],
|
||||
// f64_values: [],
|
||||
// u64_values: [],
|
||||
// string_values: [],
|
||||
// bool_values: [],
|
||||
// bytes_values: [],
|
||||
// packed_string_values: None,
|
||||
// interned_string_values: None,
|
||||
// },
|
||||
// ),
|
||||
// null_mask: [
|
||||
// 1,
|
||||
// ],
|
||||
// },
|
||||
// ```
|
||||
//
|
||||
// In a column that is not entirely NULL, one of the "Values" fields would be
|
||||
// non-empty, and the decoder would use this to infer the type of the column.
|
||||
//
|
||||
// Because we have chosen to not differentiate between "NULL" and "empty" in our
|
||||
// proto encoding, the decoder cannot infer which field within the "Values"
|
||||
// struct the column belongs to - all are valid, but empty. This causes
|
||||
// [`Error::EmptyColumn`] to be returned during deserialisation.
|
||||
//
|
||||
// This is fixed by skipping entirely-null columns when encoding the batch.
|
||||
#[test]
|
||||
fn test_encode_decode_null_columns_issue_4272() {
|
||||
let mut batch = MutableBatch::new();
|
||||
let mut writer = Writer::new(&mut batch, 2);
|
||||
|
||||
writer
|
||||
// Yielding partition keys: ["1970-01-01", "1970-07-05"]
|
||||
.write_time("time", [160, 16007561568756160].into_iter())
|
||||
.unwrap();
|
||||
writer
|
||||
.write_i64("A", Some(&[0b00000001]), [1].into_iter())
|
||||
.unwrap();
|
||||
writer
|
||||
.write_i64("B", Some(&[0b00000010]), [1].into_iter())
|
||||
.unwrap();
|
||||
writer.commit();
|
||||
|
||||
let mut partitions = PartitionWrite::partition(
|
||||
"test",
|
||||
&batch,
|
||||
&PartitionTemplate {
|
||||
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
|
||||
},
|
||||
);
|
||||
|
||||
// There should be two partitions, one with for the timestamp 160, and
|
||||
// one for the other timestamp.
|
||||
assert_eq!(partitions.len(), 2);
|
||||
|
||||
// Round-trip the "1970-01-01" partition
|
||||
let mut got = MutableBatch::default();
|
||||
partitions
|
||||
.remove("1970-01-01")
|
||||
.expect("partition not found")
|
||||
.write_to_batch(&mut got)
|
||||
.expect("should write");
|
||||
|
||||
let encoded = encode_batch("bananas", &got);
|
||||
let mut batch = MutableBatch::new();
|
||||
// Without the fix for #4272 this deserialisation call would fail.
|
||||
write_table_batch(&mut batch, &encoded).unwrap();
|
||||
|
||||
let expected = &[
|
||||
"+---+--------------------------------+",
|
||||
"| A | time |",
|
||||
"+---+--------------------------------+",
|
||||
"| 1 | 1970-01-01T00:00:00.000000160Z |",
|
||||
"+---+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]);
|
||||
|
||||
// And finally assert the "1970-07-05" round-trip
|
||||
let mut got = MutableBatch::default();
|
||||
partitions
|
||||
.remove("1970-07-05")
|
||||
.expect("partition not found")
|
||||
.write_to_batch(&mut got)
|
||||
.expect("should write");
|
||||
|
||||
let encoded = encode_batch("bananas", &got);
|
||||
let mut batch = MutableBatch::new();
|
||||
// Without the fix for #4272 this deserialisation call would fail.
|
||||
write_table_batch(&mut batch, &encoded).unwrap();
|
||||
|
||||
let expected = &[
|
||||
"+---+--------------------------------+",
|
||||
"| B | time |",
|
||||
"+---+--------------------------------+",
|
||||
"| 1 | 1970-07-05T06:32:41.568756160Z |",
|
||||
"+---+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue