diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 5f4193d9d2..5ab5ecef9b 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -115,7 +115,7 @@ pub enum Error { #[snafu(display("Could not find partition {:?}", partition_id))] PartitionNotFound { partition_id: PartitionId }, - #[snafu(display("Could not serialise and persist record batches {}", source))] + #[snafu(display("Could not serialize and persist record batches {}", source))] Persist { source: parquet_file::storage::UploadError, }, @@ -470,7 +470,7 @@ impl Compactor { // // This builds the StreamSplitExec plan & executes both partitions // concurrently, streaming the resulting record batches into the - // Parquet serialiser and directly uploads them to object store. + // Parquet serializer and directly uploads them to object store. // // If an non-object-store upload error occurs, all // executions/uploads are aborted and the error is returned to the @@ -486,20 +486,25 @@ impl Compactor { meta, tombstones, } = v; - debug!(?meta, "executing and uploading compaction StreamSplitExec"); + debug!( + ?partition_id, + ?meta, + "executing and uploading compaction StreamSplitExec" + ); let object_store_id = meta.object_store_id; - info!(%object_store_id, "streaming exec to object store"); + info!(?partition_id, %object_store_id, "streaming exec to object store"); - // Stream the record batches from the compaction exec, serialise + // Stream the record batches from the compaction exec, serialize // them, and directly upload the resulting Parquet files to // object storage. let (parquet_meta, file_size) = self.store.upload(data, &meta).await.context(PersistSnafu)?; - debug!(%object_store_id, "file uploaded to object store"); + debug!(?partition_id, %object_store_id, "file uploaded to object store"); Ok(CatalogUpdate::new( + partition_id, meta, file_size, parquet_meta, diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index 2cf8e0a152..50f0b8d0c2 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -2,7 +2,8 @@ use crate::query::QueryableParquetChunk; use data_types::{ - ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Timestamp, Tombstone, TombstoneId, + ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, PartitionId, Timestamp, Tombstone, + TombstoneId, }; use datafusion::physical_plan::SendableRecordBatchStream; use observability_deps::tracing::*; @@ -165,12 +166,13 @@ pub struct CatalogUpdate { impl CatalogUpdate { /// Initialize with data received from a persist to object storage pub fn new( + partition_id: PartitionId, meta: IoxMetadata, file_size: usize, md: IoxParquetMetaData, tombstones: BTreeMap, ) -> Self { - let parquet_file = meta.to_parquet_file(file_size, &md); + let parquet_file = meta.to_parquet_file(partition_id, file_size, &md); Self { meta, tombstones, diff --git a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto index e9e4d85e75..4b4e415beb 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto @@ -10,7 +10,7 @@ message IoxMetadata { // Removed as the Parquet metadata itself contains the row count & min/max // timestamps, and specifying them here creates a dependency that prevents // streaming serialisation (needing to know the number rows before you can - // serialise your parquet file with this metadata structure within it) + // serialize your parquet file with this metadata structure within it) reserved 10, 11, 14; reserved "row_count", "time_of_first_write", "time_of_last_write"; diff --git a/generated_types/src/google.rs b/generated_types/src/google.rs index 6c92b29da2..d81cfdd7e6 100644 --- a/generated_types/src/google.rs +++ b/generated_types/src/google.rs @@ -84,8 +84,8 @@ struct EncodeError(prost::EncodeError); impl From for tonic::Status { fn from(error: EncodeError) -> Self { - error!(error=%error.0, "failed to serialise error response details"); - tonic::Status::unknown(format!("failed to serialise server error: {}", error.0)) + error!(error=%error.0, "failed to serialize error response details"); + tonic::Status::unknown(format!("failed to serialize server error: {}", error.0)) } } diff --git a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs index b3a0d42b8c..01661632a7 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs @@ -56,15 +56,16 @@ async fn basic_multi_ingesters() { async { let combined_response = get_multi_ingester_readable_combined_response(state).await; - // make sure data is spread across all kafka - // partitions by ensuring all partition is readable - // (and there is none that is unknown) + // make sure the data in all partitions is readable or + // persisted (and there is none that is unknown) assert!( - combined_response - .kafka_partition_infos - .iter() - .all(|info| info.status() == KafkaPartitionStatus::Readable), - "combined responses: {:?}", + combined_response.kafka_partition_infos.iter().all(|info| { + matches!( + info.status(), + KafkaPartitionStatus::Persisted | KafkaPartitionStatus::Readable + ) + }), + "Not all partitions were readable or persisted. Combined responses: {:?}", combined_response ); } diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 1cf6360908..2e0b4ecfea 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -310,7 +310,7 @@ impl Persister for IngesterData { if let Some((file_size, md)) = file_size_and_md { // Add the parquet file to the catalog until succeed - let parquet_file = iox_meta.to_parquet_file(file_size, &md); + let parquet_file = iox_meta.to_parquet_file(partition_id, file_size, &md); Backoff::new(&self.backoff_config) .retry_all_errors("add parquet file to catalog", || async { let mut repos = self.catalog.repositories().await; diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index ec40724526..bfc60e7dbe 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -11,7 +11,7 @@ use snafu::{ResultExt, Snafu}; #[derive(Debug, Snafu)] #[allow(missing_docs)] pub enum Error { - #[snafu(display("Could not serialise and persist record batches {}", source))] + #[snafu(display("Could not serialize and persist record batches {}", source))] Persist { source: parquet_file::storage::UploadError, }, diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 8942ddb61b..676fb5843f 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -272,7 +272,7 @@ where None } Some(Err(e)) if e.kind() == WriteBufferErrorKind::InvalidData => { - // The DmlOperation could not be de-serialised from the + // The DmlOperation could not be de-serialized from the // kafka message. // // This is almost certainly data loss as the write will not @@ -282,7 +282,7 @@ where kafka_topic=%self.kafka_topic_name, kafka_partition=%self.kafka_partition, potential_data_loss=true, - "unable to deserialise dml operation" + "unable to deserialize dml operation" ); self.seq_invalid_data_count.inc(1); diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 7cab6a73a6..10b52c18f6 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -37,7 +37,7 @@ pub mod postgres; /// it is created and an updated [`NamespaceSchema`] is returned. /// /// This function pushes schema additions through to the backend catalog, and -/// relies on the catalog to serialise concurrent additions of a given column, +/// relies on the catalog to serialize concurrent additions of a given column, /// ensuring only one type is ever accepted per column. pub async fn validate_or_insert_schema<'a, T, U, R>( tables: T, diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index fd2e885195..0def4acc1c 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -2230,7 +2230,7 @@ mod tests { sequence_number, min_timestamp, max_timestamp, - "some other serialised predicate which is different", + "some other serialized predicate which is different", ) .await .expect("should panic before result evaluated"); diff --git a/mutable_batch_pb/tests/encode.rs b/mutable_batch_pb/tests/encode.rs index 890eeb5d44..7a746fb792 100644 --- a/mutable_batch_pb/tests/encode.rs +++ b/mutable_batch_pb/tests/encode.rs @@ -39,7 +39,7 @@ fn test_encode_decode() { } // This test asserts columns containing no values do not prevent an encoded -// batch from being deserialised: +// batch from being deserialize: // // https://github.com/influxdata/influxdb_iox/issues/4272 // @@ -68,7 +68,7 @@ fn test_encode_decode() { // In both partitions, one column is composed entirely of NULL values. // // Encoding each of these partitions succeeds, but decoding the partition fails -// due to the inability to infer a column type from the serialised format which +// due to the inability to infer a column type from the serialized format which // contains no values: // // ``` diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index 3afcdb053f..bc50f30203 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -17,7 +17,7 @@ pub mod chunk; pub mod metadata; -pub mod serialise; +pub mod serialize; pub mod storage; use data_types::{NamespaceId, PartitionId, SequencerId, TableId}; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 330773fe9a..7a3e23301f 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -92,6 +92,7 @@ use data_types::{ }; use generated_types::influxdata::iox::ingester::v1 as proto; use iox_time::Time; +use observability_deps::tracing::debug; use parquet::{ arrow::parquet_to_arrow_schema, file::{ @@ -373,7 +374,7 @@ impl IoxMetadata { /// contain valid metadata bytes, has no readable schema, or has no field /// statistics. /// - /// A [`RecordBatch`] serialised without the embedded metadata found in the + /// A [`RecordBatch`] serialized without the embedded metadata found in the /// IOx [`Schema`] type will cause a statistic resolution failure due to /// lack of the IOx field type metadata for the time column. Batches /// produced from the through the IOx write path always include this @@ -382,11 +383,23 @@ impl IoxMetadata { /// [`RecordBatch`]: arrow::record_batch::RecordBatch pub fn to_parquet_file( &self, + partition_id: PartitionId, file_size_bytes: usize, metadata: &IoxParquetMetaData, ) -> ParquetFileParams { let decoded = metadata.decode().expect("invalid IOx metadata"); + debug!( + ?partition_id, + ?decoded, + "DecodedIoxParquetMetaData decoded from its IoxParquetMetaData" + ); let row_count = decoded.row_count(); + if decoded.md.row_groups().is_empty() { + debug!( + ?partition_id, + "Decoded IoxParquetMetaData has no row groups to provide useful statistics" + ); + } // Derive the min/max timestamp from the Parquet column statistics. let schema = decoded @@ -771,6 +784,8 @@ fn read_statistics_from_parquet_row_group( }), stats, }); + } else { + debug!(?field, "Provided schema of the field does not inlcude IOx Column Type such as Tag, Field, Time"); } } @@ -967,9 +982,9 @@ mod tests { let batch = RecordBatch::try_from_iter([("a", data)]).unwrap(); let stream = futures::stream::iter([Ok(batch.clone())]); - let (bytes, file_meta) = crate::serialise::to_parquet_bytes(stream, &meta) + let (bytes, file_meta) = crate::serialize::to_parquet_bytes(stream, &meta) .await - .expect("should serialise"); + .expect("should serialize"); // Verify if the parquet file meta data has values assert!(!file_meta.row_groups.is_empty()); @@ -988,8 +1003,8 @@ mod tests { .expect("failed to decode IoxParquetMetaData from file metadata"); assert_eq!(iox_from_file_meta, iox_parquet_meta); - // Reproducer of https://github.com/influxdata/influxdb_iox/issues/4695 - // Convert IOx meta data back to parquet meta data and verify it still the same + // Reproducer of https://github.com/influxdata/influxdb_iox/issues/4714 + // Convert IOx meta data back to parquet meta data and verify it is still the same let decoded = iox_from_file_meta.decode().unwrap(); let new_file_meta = decoded.parquet_file_meta(); @@ -1002,15 +1017,12 @@ mod tests { let col_meta = new_row_group_meta[0].column(0); assert!(col_meta.statistics().is_some()); // There is statistics for column "a" - // Exactly used in 4695 let schema = decoded.read_schema().unwrap(); let (_, field) = schema.field(0); assert_eq!(field.name(), "a"); println!("schema: {:#?}", schema); - let col_summary = decoded - .read_statistics(&*schema) // // BUG: should not empty - .unwrap(); - assert!(col_summary.is_empty()); // TODO: must be NOT empty after the fix of 4695 + let col_summary = decoded.read_statistics(&*schema).unwrap(); + assert!(col_summary.is_empty()); // TODO: must be NOT empty after the fix of 4714 } } diff --git a/parquet_file/src/serialise.rs b/parquet_file/src/serialize.rs similarity index 88% rename from parquet_file/src/serialise.rs rename to parquet_file/src/serialize.rs index 8d345270e5..bc07798c5f 100644 --- a/parquet_file/src/serialise.rs +++ b/parquet_file/src/serialize.rs @@ -36,7 +36,7 @@ pub enum CodecError { Arrow(#[from] ArrowError), /// Serialising the [`IoxMetadata`] to protobuf-encoded bytes failed. - #[error("failed to serialise iox metadata: {0}")] + #[error("failed to serialize iox metadata: {0}")] MetadataSerialisation(#[from] prost::EncodeError), /// Writing the parquet file failed with the specified error. @@ -56,10 +56,10 @@ pub enum CodecError { /// an error. /// /// IOx metadata is encoded into the parquet file's metadata under the key -/// [`METADATA_KEY`], with a base64-wrapped, protobuf serialised +/// [`METADATA_KEY`], with a base64-wrapped, protobuf serialized /// [`proto::IoxMetadata`] structure. /// -/// Returns the serialised [`FileMetaData`] for the encoded parquet file, from +/// Returns the serialized [`FileMetaData`] for the encoded parquet file, from /// which an [`IoxParquetMetaData`] can be derived. /// /// [`proto::IoxMetadata`]: generated_types::influxdata::iox::ingester::v1 @@ -96,10 +96,10 @@ where .map(|v| v.schema()) .ok_or(CodecError::SchemaPeek)?; - // Serialise the IoxMetadata to the protobuf bytes. + // Serialize the IoxMetadata to the protobuf bytes. let props = writer_props(meta)?; - // Construct the arrow serialiser with the metadata as part of the parquet + // Construct the arrow serializer with the metadata as part of the parquet // file properties. let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?; @@ -121,10 +121,21 @@ where { let mut w = InMemoryWriteableCursor::default(); - // Serialise the record batches into the in-memory buffer - let meta = to_parquet(batches, meta, &mut w).await?; + let partition_id = meta.partition_id; + debug!( + ?partition_id, + ?meta, + "IOxMetaData provided for serializing the data into the in-memory buffer" + ); - debug!(?meta, "Parquet Metadata"); + // Serialize the record batches into the in-memory buffer + let meta = to_parquet(batches, meta, &mut w).await?; + if meta.row_groups.is_empty() { + // panic here to avoid later consequence of reading it for statistics + panic!("partition_id={}. Created Parquet metadata has no column metadata. HINT a common reason of this is writing empty data to parquet file: {:#?}", partition_id, meta); + } + + debug!(?partition_id, ?meta, "Parquet Metadata"); let mut bytes = w .into_inner() @@ -190,7 +201,7 @@ mod tests { let (bytes, _file_meta) = to_parquet_bytes(stream, &meta) .await - .expect("should serialise"); + .expect("should serialize"); // Read the metadata from the file bytes. // diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index a7637c497a..a6caf94a05 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -3,7 +3,7 @@ use crate::{ metadata::{IoxMetadata, IoxParquetMetaData}, - serialise::{self, CodecError}, + serialize::{self, CodecError}, ParquetFilePath, }; use arrow::{ @@ -74,7 +74,7 @@ pub enum ReadError { /// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an /// underlying [`ObjectStore`]. /// -/// [`RecordBatch`] instances are serialised to Parquet files, with IOx specific +/// [`RecordBatch`] instances are serialized to Parquet files, with IOx specific /// metadata ([`IoxParquetMetaData`]) attached. /// /// Code that interacts with Parquet files in object storage should utilise this @@ -116,11 +116,23 @@ impl ParquetStorage { // // This is not a huge concern, as the resulting parquet files are // currently smallish on average. - let (data, parquet_file_meta) = serialise::to_parquet_bytes(batches, meta).await?; + let (data, parquet_file_meta) = serialize::to_parquet_bytes(batches, meta).await?; + // TODO: remove this if after verifying the panic is thrown + // correctly inside the serialize::to_parquet_bytes above + if parquet_file_meta.row_groups.is_empty() { + debug!( + ?meta.partition_id, ?parquet_file_meta, + "Created parquet_file_meta has no row groups which will introduce panic later when its statistics is read"); + } // Read the IOx-specific parquet metadata from the file metadata let parquet_meta = IoxParquetMetaData::try_from(parquet_file_meta).map_err(UploadError::Metadata)?; + debug!( + ?meta.partition_id, + ?parquet_meta, + "IoxParquetMetaData coverted from Row Group Metadata (aka FileMetaData)" + ); // Derive the correct object store path from the metadata. let path = ParquetFilePath::from(meta).object_store_path(); @@ -144,7 +156,7 @@ impl ParquetStorage { /// Pull the Parquet-encoded [`RecordBatch`] at the file path derived from /// the provided [`ParquetFilePath`]. /// - /// The `selection` projection is pushed down to the Parquet deserialiser. + /// The `selection` projection is pushed down to the Parquet deserializer. /// /// This impl fetches the associated Parquet file bytes from object storage, /// temporarily persisting them to a local temp file to feed to the arrow @@ -315,11 +327,11 @@ mod tests { let schema = batch.schema(); let stream = futures::stream::iter([Ok(batch.clone())]); - // Serialise & upload the record batches. + // Serialize & upload the record batches. let (file_meta, _file_size) = store .upload(stream, &meta) .await - .expect("should serialise and store sucessfully"); + .expect("should serialize and store sucessfully"); // Extract the various bits of metadata. let file_meta = file_meta.decode().expect("should decode parquet metadata"); diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs index 072dd695f3..4a17165d95 100644 --- a/parquet_file/tests/metadata.rs +++ b/parquet_file/tests/metadata.rs @@ -8,7 +8,7 @@ use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, use iox_time::Time; use object_store::DynObjectStore; use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage}; -use schema::{builder::SchemaBuilder, InfluxFieldType, TIME_COLUMN_NAME}; +use schema::{builder::SchemaBuilder, sort::SortKey, InfluxFieldType, TIME_COLUMN_NAME}; #[tokio::test] async fn test_decoded_iox_metadata() { @@ -28,6 +28,7 @@ async fn test_decoded_iox_metadata() { "some_field", to_string_array(&["bananas", "platanos", "manzana"]), ), + ("null_field", null_array(3)), ]; // And the metadata the batch would be encoded with if it came through the @@ -57,7 +58,7 @@ async fn test_decoded_iox_metadata() { let (iox_parquet_meta, file_size) = storage .upload(stream, &meta) .await - .expect("failed to serialise & persist record batch"); + .expect("failed to serialize & persist record batch"); // Sanity check - can't assert the actual value. assert!(file_size > 0); @@ -74,21 +75,14 @@ async fn test_decoded_iox_metadata() { "row count statistics does not match input row count" ); - let got = decoded - .read_iox_metadata_new() - .expect("failed to deserialise embedded IOx metadata"); - assert_eq!( - got, meta, - "embedded metadata does not match original metadata" - ); - - // Repro of 4695 + // Repro of 4714 let row_group_meta = decoded.parquet_row_group_metadata(); - println!("row_group_meta: {:#?}", row_group_meta); + println!("ow_group_meta: {:#?}", row_group_meta); assert_eq!(row_group_meta.len(), 1); - assert_eq!(row_group_meta[0].columns().len(), 2); // time and some_field + assert_eq!(row_group_meta[0].columns().len(), 3); // time and some_field assert!(row_group_meta[0].column(0).statistics().is_some()); // There is statistics for "time" assert!(row_group_meta[0].column(1).statistics().is_some()); // There is statistics for "some_field" + assert!(row_group_meta[0].column(2).statistics().is_some()); // There is statistics for "null_field" let schema = decoded.read_schema().unwrap(); let (_, field) = schema.field(0); @@ -96,9 +90,187 @@ async fn test_decoded_iox_metadata() { println!("schema: {:#?}", schema); let col_summary = decoded - .read_statistics(&*schema) // BUG: should not empty - .unwrap(); - assert!(col_summary.is_empty()); // TODO: must be NOT empty after the fix of 4695 + .read_statistics(&*schema) + .expect("Invalid Statistics"); + assert!(col_summary.is_empty()); // TODO: must NOT be empty after the fix of 4714 + + let got = decoded + .read_iox_metadata_new() + .expect("failed to deserialize embedded IOx metadata"); + assert_eq!( + got, meta, + "embedded metadata does not match original metadata" + ); +} + +// Reproducer for "https://github.com/influxdata/influxdb_iox/issues/4695" +// TODO: remove #[ignore] to turn the test on after the fix +#[ignore] +#[tokio::test] +async fn test_decoded_iox_metadata_without_data() { + // A representative IOx data sample (with a time column, an invariant upheld + // in the IOx write path) + let data = [ + ( + TIME_COLUMN_NAME, + to_timestamp_array(&[]), // No data on purpose to reproduce the panic bug + ), + ("some_field", to_string_array(&[])), + ]; + + // And the metadata the batch would be encoded with if it came through the + // IOx write path. + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + compaction_level: 1, + sort_key: None, + }; + + let batch = RecordBatch::try_from_iter(data).unwrap(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + let storage = ParquetStorage::new(object_store); + + let (iox_parquet_meta, _file_size) = storage + .upload(stream, &meta) + .await + .expect("failed to serialise & persist record batch"); + + // Decode the IOx metadata embedded in the parquet file metadata. + let decoded = iox_parquet_meta + .decode() + .expect("failed to decode parquet file metadata"); + + let schema = decoded.read_schema().unwrap(); + decoded + .read_statistics(&*schema) + .expect("Invalid Statistics"); // panic due to the bug +} + +#[tokio::test] +async fn test_decoded_many_columns_with_null_cols_iox_metadata() { + // Increase these values to have larger test + let num_cols = 10; + let num_rows = 20; + let num_repeats = 5; + + let mut data = Vec::with_capacity(num_cols); + + let t = 1646917692000000000; + let mut time_arr: Vec = Vec::with_capacity(num_rows); + let mut string_arr = Vec::with_capacity(num_rows); + + // Make long string data + fn make_long_str(len: usize) -> String { + "Long String Data".repeat(len) + } + let str = make_long_str(num_repeats); + + // Data of time and string columns + for i in 0..num_rows { + time_arr.push(t + i as i64); + string_arr.push(str.as_str()); + } + + // First column is time + data.push((TIME_COLUMN_NAME.to_string(), to_timestamp_array(&time_arr))); + // Second column contains all nulls + data.push(("column_name_1".to_string(), null_array(num_rows))); + // Names of other columns + fn make_col_name(i: usize) -> String { + "column_name_".to_string() + i.to_string().as_str() + } + // Data of the rest of the columns + for i in 2..num_cols { + let col = make_col_name(i); + let col_data = (col, to_string_array(&string_arr)); + data.push(col_data); + } + + // And the metadata the batch would be encoded with if it came through the + // IOx write path. + + // sort key inlcudes all columns with time column last + let mut sort_key_data = Vec::with_capacity(num_cols); + for i in 1..num_cols { + let col = make_col_name(i); + sort_key_data.push(col); + } + sort_key_data.push(TIME_COLUMN_NAME.to_string()); + let sort_key = SortKey::from_columns(sort_key_data); + + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + compaction_level: 1, + sort_key: Some(sort_key), + }; + //println!("IoxMetadata: {:#?}", meta); + + let batch = RecordBatch::try_from_iter(data).unwrap(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + let storage = ParquetStorage::new(object_store); + + let (iox_parquet_meta, file_size) = storage + .upload(stream, &meta) + .await + .expect("failed to serialize & persist record batch"); + //println!("iox_parquet_meta: {:#?}", iox_parquet_meta); + + // Sanity check - can't assert the actual value. + assert!(file_size > 0); + + // Decode the IOx metadata embedded in the parquet file metadata. + let decoded = iox_parquet_meta + .decode() + .expect("failed to decode parquet file metadata"); + + // And verify the metadata matches the expected values. + assert_eq!( + decoded.row_count(), + num_rows, + "row count statistics does not match input row count" + ); + + let schema = decoded.read_schema().unwrap(); + let (_, field) = schema.field(0); + assert_eq!(field.name(), "time"); + //println!("schema: {:#?}", schema); + + let col_summary = decoded + .read_statistics(&*schema) + .expect("Invalid Statistics"); + assert!(col_summary.is_empty()); // TODO: must NOT be empty after the fix of 4714 + + let got = decoded + .read_iox_metadata_new() + .expect("failed to deserialize embedded IOx metadata"); + assert_eq!( + got, meta, + "embedded metadata does not match original metadata" + ); } #[tokio::test] @@ -117,6 +289,7 @@ async fn test_derive_parquet_file_params() { // And the metadata the batch would be encoded with if it came through the // IOx write path. + let partition_id = PartitionId::new(4); let meta = IoxMetadata { object_store_id: Default::default(), creation_timestamp: Time::from_timestamp_nanos(1234), @@ -125,7 +298,7 @@ async fn test_derive_parquet_file_params() { sequencer_id: SequencerId::new(2), table_id: TableId::new(3), table_name: "platanos".into(), - partition_id: PartitionId::new(4), + partition_id, partition_key: "potato".into(), min_sequence_number: SequenceNumber::new(10), max_sequence_number: SequenceNumber::new(11), @@ -151,15 +324,16 @@ async fn test_derive_parquet_file_params() { let (iox_parquet_meta, file_size) = storage .upload(stream, &meta) .await - .expect("failed to serialise & persist record batch"); + .expect("failed to serialize & persist record batch"); // Use the IoxParquetMetaData and original IoxMetadata to derive a // ParquetFileParams. - let catalog_data = meta.to_parquet_file(file_size, &iox_parquet_meta); + let catalog_data = meta.to_parquet_file(partition_id, file_size, &iox_parquet_meta); // And verify the resulting statistics used in the catalog. // // NOTE: thrift-encoded metadata not checked + // TODO: check thrift-encoded metadata which may be the issue of bug 4695 assert_eq!(catalog_data.sequencer_id, meta.sequencer_id); assert_eq!(catalog_data.namespace_id, meta.namespace_id); assert_eq!(catalog_data.table_id, meta.table_id); @@ -190,3 +364,11 @@ fn to_timestamp_array(timestamps: &[i64]) -> ArrayRef { .expect("failed to append timestamp values"); Arc::new(builder.finish()) } + +fn null_array(num: usize) -> ArrayRef { + let mut builder = StringBuilder::new(num); + for _i in 0..num { + builder.append_null().expect("failed to append null values"); + } + Arc::new(builder.finish()) +} diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index 871f98f489..34fad8eb15 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -74,7 +74,7 @@ pub enum SchemaError { /// (but uncached) columns, returning an error if the requested type does not /// match. /// -/// The catalog must serialise column creation to avoid `set(a=tag)` overwriting +/// The catalog must serialize column creation to avoid `set(a=tag)` overwriting /// a prior `set(a=int)` write to the catalog (returning an error that `a` is an /// `int` for the second request). /// diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 9cada17386..6bf1d3d9de 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -135,7 +135,7 @@ pub enum OrgBucketError { NotSpecified, /// The request contains invalid parameters. - #[error("failed to deserialise org/bucket/precision in request: {0}")] + #[error("failed to deserialize org/bucket/precision in request: {0}")] DecodeFail(#[from] serde::de::value::Error), /// The provided org/bucket could not be converted into a database name.