diff --git a/Cargo.lock b/Cargo.lock index f67f31f689..97f4cc2bcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3224,6 +3224,7 @@ name = "mutable_batch_pb" version = "0.1.0" dependencies = [ "arrow_util", + "data_types", "dml", "generated_types", "hashbrown 0.12.1", diff --git a/mutable_batch_pb/Cargo.toml b/mutable_batch_pb/Cargo.toml index ea2b42c854..370fa39df4 100644 --- a/mutable_batch_pb/Cargo.toml +++ b/mutable_batch_pb/Cargo.toml @@ -16,3 +16,4 @@ workspace-hack = { path = "../workspace-hack"} [dev-dependencies] mutable_batch_lp = { path = "../mutable_batch_lp" } +data_types = { path = "../data_types" } diff --git a/mutable_batch_pb/src/encode.rs b/mutable_batch_pb/src/encode.rs index 1bf798c314..f913801ffc 100644 --- a/mutable_batch_pb/src/encode.rs +++ b/mutable_batch_pb/src/encode.rs @@ -28,7 +28,20 @@ pub fn encode_batch(table_name: &str, batch: &MutableBatch) -> TableBatch { table_name: table_name.to_string(), columns: batch .columns() - .map(|(column_name, column)| encode_column(column_name, column)) + .filter_map(|(column_name, column)| { + // Skip encoding any entirely NULL columns. + // + // This prevents a type-inference error during deserialisation + // of the proto wire message. + // + // https://github.com/influxdata/influxdb_iox/issues/4272 + // + if column.valid_mask().is_all_unset() { + return None; + } + + Some(encode_column(column_name, column)) + }) .collect(), row_count: batch.rows() as u32, } diff --git a/mutable_batch_pb/tests/encode.rs b/mutable_batch_pb/tests/encode.rs index 5f6cdb10b5..890eeb5d44 100644 --- a/mutable_batch_pb/tests/encode.rs +++ b/mutable_batch_pb/tests/encode.rs @@ -1,5 +1,6 @@ use arrow_util::assert_batches_eq; -use mutable_batch::MutableBatch; +use data_types::{PartitionTemplate, TemplatePart}; +use mutable_batch::{writer::Writer, MutableBatch, PartitionWrite, WritePayload}; use mutable_batch_pb::{decode::write_table_batch, encode::encode_batch}; use schema::selection::Selection; @@ -36,3 +37,141 @@ fn test_encode_decode() { assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]); } + +// This test asserts columns containing no values do not prevent an encoded +// batch from being deserialised: +// +// https://github.com/influxdata/influxdb_iox/issues/4272 +// +// The test constructs a table such that after partitioning, one entire column +// is NULL within a partition. In this test case, the following table is +// partitioned by YMD: +// +// +// | time | A | B | +// | ---------- | ---- | ---- | +// | 1970-01-01 | 1 | NULL | +// | 1970-07-05 | NULL | 1 | +// +// Yielding two partitions: +// +// | time | A | B | +// | ---------- | ---- | ---- | +// | 1970-01-01 | 1 | NULL | +// +// and: +// +// | time | A | B | +// | ---------- | ---- | ---- | +// | 1970-07-05 | NULL | 1 | +// +// In both partitions, one column is composed entirely of NULL values. +// +// Encoding each of these partitions succeeds, but decoding the partition fails +// due to the inability to infer a column type from the serialised format which +// contains no values: +// +// ``` +// Column { +// column_name: "B", +// semantic_type: Field, +// values: Some( +// Values { +// i64_values: [], +// f64_values: [], +// u64_values: [], +// string_values: [], +// bool_values: [], +// bytes_values: [], +// packed_string_values: None, +// interned_string_values: None, +// }, +// ), +// null_mask: [ +// 1, +// ], +// }, +// ``` +// +// In a column that is not entirely NULL, one of the "Values" fields would be +// non-empty, and the decoder would use this to infer the type of the column. +// +// Because we have chosen to not differentiate between "NULL" and "empty" in our +// proto encoding, the decoder cannot infer which field within the "Values" +// struct the column belongs to - all are valid, but empty. This causes +// [`Error::EmptyColumn`] to be returned during deserialisation. +// +// This is fixed by skipping entirely-null columns when encoding the batch. +#[test] +fn test_encode_decode_null_columns_issue_4272() { + let mut batch = MutableBatch::new(); + let mut writer = Writer::new(&mut batch, 2); + + writer + // Yielding partition keys: ["1970-01-01", "1970-07-05"] + .write_time("time", [160, 16007561568756160].into_iter()) + .unwrap(); + writer + .write_i64("A", Some(&[0b00000001]), [1].into_iter()) + .unwrap(); + writer + .write_i64("B", Some(&[0b00000010]), [1].into_iter()) + .unwrap(); + writer.commit(); + + let mut partitions = PartitionWrite::partition( + "test", + &batch, + &PartitionTemplate { + parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], + }, + ); + + // There should be two partitions, one with for the timestamp 160, and + // one for the other timestamp. + assert_eq!(partitions.len(), 2); + + // Round-trip the "1970-01-01" partition + let mut got = MutableBatch::default(); + partitions + .remove("1970-01-01") + .expect("partition not found") + .write_to_batch(&mut got) + .expect("should write"); + + let encoded = encode_batch("bananas", &got); + let mut batch = MutableBatch::new(); + // Without the fix for #4272 this deserialisation call would fail. + write_table_batch(&mut batch, &encoded).unwrap(); + + let expected = &[ + "+---+--------------------------------+", + "| A | time |", + "+---+--------------------------------+", + "| 1 | 1970-01-01T00:00:00.000000160Z |", + "+---+--------------------------------+", + ]; + assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]); + + // And finally assert the "1970-07-05" round-trip + let mut got = MutableBatch::default(); + partitions + .remove("1970-07-05") + .expect("partition not found") + .write_to_batch(&mut got) + .expect("should write"); + + let encoded = encode_batch("bananas", &got); + let mut batch = MutableBatch::new(); + // Without the fix for #4272 this deserialisation call would fail. + write_table_batch(&mut batch, &encoded).unwrap(); + + let expected = &[ + "+---+--------------------------------+", + "| B | time |", + "+---+--------------------------------+", + "| 1 | 1970-07-05T06:32:41.568756160Z |", + "+---+--------------------------------+", + ]; + assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]); +}