From 7e804db0a32aa07990c45048bb9fe1f12daa40eb Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 13 Sep 2021 12:06:20 +0200 Subject: [PATCH 1/6] fix: use btree map for some protobuf messages for deterministic outputs --- generated_types/build.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/generated_types/build.rs b/generated_types/build.rs index ff248f19b0..f1c03e5816 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -65,7 +65,11 @@ fn generate_grpc_types(root: &Path) -> Result<()> { .compile_well_known_types() .disable_comments(&[".google"]) .extern_path(".google.protobuf", "::google_types::protobuf") - .bytes(&[".influxdata.iox.catalog.v1.AddParquet.metadata"]); + .bytes(&[".influxdata.iox.catalog.v1.AddParquet.metadata"]) + .btree_map(&[ + ".influxdata.iox.catalog.v1.DatabaseCheckpoint.sequencer_numbers", + ".influxdata.iox.catalog.v1.PartitionCheckpoint.sequencer_numbers", + ]); let descriptor_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin"); tonic_build::configure() From afc507ae14b5dec56f5ae6b0321069f9a83a410f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 13 Sep 2021 12:12:34 +0200 Subject: [PATCH 2/6] feat: compress encoded parquet metadata Depending on the number of columns, this should safe between 60% and 75%. --- Cargo.lock | 1 + .../influxdata/iox/catalog/v1/catalog.proto | 3 +- parquet_file/Cargo.toml | 1 + parquet_file/src/catalog/core.rs | 2 +- parquet_file/src/catalog/dump.rs | 12 +++---- parquet_file/src/metadata.rs | 35 ++++++++++++++----- 6 files changed, 37 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6aa6c6622d..cdfb11929c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2833,6 +2833,7 @@ dependencies = [ "tokio-stream", "tracker", "uuid", + "zstd", ] [[package]] diff --git a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto index eb5cc192ac..dfe07d5046 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto @@ -30,11 +30,12 @@ message AddParquet { // [Apache Parquet] metadata encoded using [Apache Thrift]. // - // The metadata is encoded using the [Thrift Compact Protocol]. + // The metadata is encoded using the [Thrift Compact Protocol] and compressed using [Zstandard]. // // [Apache Parquet]: https://parquet.apache.org/ // [Apache Thrift]: https://thrift.apache.org/ // [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md + // [Zstandard]: http://facebook.github.io/zstd/ bytes metadata = 2; } diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index 67585ba5d6..6dd48bc334 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -32,6 +32,7 @@ tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync" tokio-stream = "0.1" tracker = { path = "../tracker" } uuid = { version = "0.8", features = ["serde", "v4"] } +zstd = "0.9" [dev-dependencies] arrow_util = { path = "../arrow_util" } diff --git a/parquet_file/src/catalog/core.rs b/parquet_file/src/catalog/core.rs index 5550d6fdf6..d813a6e41f 100644 --- a/parquet_file/src/catalog/core.rs +++ b/parquet_file/src/catalog/core.rs @@ -37,7 +37,7 @@ pub use crate::catalog::internals::proto_parse::Error as ProtoParseError; /// Current version for serialized transactions. /// /// For breaking changes, this will change. -pub const TRANSACTION_VERSION: u32 = 13; +pub const TRANSACTION_VERSION: u32 = 14; #[derive(Debug, Snafu)] pub enum Error { diff --git a/parquet_file/src/catalog/dump.rs b/parquet_file/src/catalog/dump.rs index 55d83ffb20..e7f7a07c95 100644 --- a/parquet_file/src/catalog/dump.rs +++ b/parquet_file/src/catalog/dump.rs @@ -272,7 +272,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 13, + version: 14, actions: [], revision_counter: 0, uuid: "00000000-0000-0000-0000-000000000000", @@ -297,7 +297,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 13, + version: 14, actions: [ Action { action: Some( @@ -313,7 +313,7 @@ File { }, ), file_size_bytes: 33, - metadata: b"metadata omitted (1742 bytes)", + metadata: b"metadata omitted (944 bytes)", }, ), ), @@ -396,7 +396,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 13, + version: 14, actions: [], revision_counter: 0, uuid: "00000000-0000-0000-0000-000000000000", @@ -421,7 +421,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 13, + version: 14, actions: [ Action { action: Some( @@ -437,7 +437,7 @@ File { }, ), file_size_bytes: 33, - metadata: b"metadata omitted (1742 bytes)", + metadata: b"metadata omitted (944 bytes)", }, ), ), diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 450958bd55..8000656905 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -121,7 +121,7 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputPro /// /// **Important: When changing this structure, consider bumping the /// [catalog transaction version](crate::catalog::core::TRANSACTION_VERSION)!** -pub const METADATA_VERSION: u32 = 6; +pub const METADATA_VERSION: u32 = 7; /// File-level metadata key to store the IOx-specific data. /// @@ -231,6 +231,12 @@ pub enum Error { expected ))] IoxMetadataVersionMismatch { actual: u32, expected: Vec }, + + #[snafu(display("Cannot encode ZSTD message: {}", source))] + ZstdEncodeFailure { source: std::io::Error }, + + #[snafu(display("Cannot decode ZSTD message: {}", source))] + ZstdDecodeFailure { source: std::io::Error }, } pub type Result = std::result::Result; @@ -462,7 +468,7 @@ fn decode_timestamp_from_field( /// Parquet metadata with IOx-specific wrapper. #[derive(Debug, Clone)] pub struct IoxParquetMetaData { - /// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded bytes. + /// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded, and [Zstandard]-compressed bytes. /// /// This can be used to store metadata separate from the related payload data. The usage of [Apache Thrift] allows the /// same stability guarantees as the usage of an ordinary [Apache Parquet] file. To encode a thrift message into bytes @@ -471,6 +477,7 @@ pub struct IoxParquetMetaData { /// [Apache Parquet]: https://parquet.apache.org/ /// [Apache Thrift]: https://thrift.apache.org/ /// [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md + /// [Zstandard]: http://facebook.github.io/zstd/ data: Vec, } @@ -490,7 +497,7 @@ impl IoxParquetMetaData { Self { data } } - /// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded bytes. + /// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded, and [Zstandard]-compressed bytes. /// /// This can be used to store metadata separate from the related payload data. The usage of [Apache Thrift] allows the /// same stability guarantees as the usage of an ordinary [Apache Parquet] file. To encode a thrift message into bytes @@ -499,11 +506,12 @@ impl IoxParquetMetaData { /// [Apache Parquet]: https://parquet.apache.org/ /// [Apache Thrift]: https://thrift.apache.org/ /// [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md + /// [Zstandard]: http://facebook.github.io/zstd/ pub fn thrift_bytes(&self) -> &[u8] { self.data.as_ref() } - /// Encode [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded bytes. + /// Encode [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded, and [Zstandard]-compressed bytes. /// /// This can be used to store metadata separate from the related payload data. The usage of [Apache Thrift] allows the /// same stability guarantees as the usage of an ordinary [Apache Parquet] file. To encode a thrift message into bytes @@ -512,6 +520,7 @@ impl IoxParquetMetaData { /// [Apache Parquet]: https://parquet.apache.org/ /// [Apache Thrift]: https://thrift.apache.org/ /// [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md + /// [Zstandard]: http://facebook.github.io/zstd/ fn parquet_md_to_thrift(parquet_md: ParquetMetaData) -> Result> { // step 1: assemble a thrift-compatible struct use parquet::schema::types::to_thrift as schema_to_thrift; @@ -548,22 +557,30 @@ impl IoxParquetMetaData { protocol.flush().context(ThriftWriteFailure {})?; } + // step 3: compress data + // Note: level 0 is the zstd-provided default + let buffer = zstd::encode_all(&buffer[..], 0).context(ZstdEncodeFailure)?; + Ok(buffer) } - /// Decode [Apache Parquet] metadata from [Apache Thrift]-encoded bytes. + /// Decode [Apache Parquet] metadata from [Apache Thrift]-encoded, and [Zstandard]-compressed bytes. /// /// [Apache Parquet]: https://parquet.apache.org/ /// [Apache Thrift]: https://thrift.apache.org/ + /// [Zstandard]: http://facebook.github.io/zstd/ pub fn decode(&self) -> Result { - // step 1: load thrift data from byte stream + // step 1: decompress + let data = zstd::decode_all(&self.data[..]).context(ZstdDecodeFailure)?; + + // step 2: load thrift data from byte stream let thrift_file_metadata = { - let mut protocol = TCompactInputProtocol::new(&self.data[..]); + let mut protocol = TCompactInputProtocol::new(&data[..]); parquet_format::FileMetaData::read_from_in_protocol(&mut protocol) .context(ThriftReadFailure {})? }; - // step 2: convert thrift to in-mem structs + // step 3: convert thrift to in-mem structs use parquet::schema::types::from_thrift as schema_from_thrift; let schema = @@ -1146,6 +1163,6 @@ mod tests { .await .unwrap(); let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); - assert_eq!(parquet_metadata.size(), 11939); + assert_eq!(parquet_metadata.size(), 3733); } } From 9c80d32af5b4ea5fe80726f2ef83cbfd38fb7342 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 20 Sep 2021 09:34:03 +0200 Subject: [PATCH 3/6] refactor: use normal google timestamps in parquet metadata again We changed from Google timestamp (which use variable-sized integers) to our own fixed-sized integer timestamps so that the size of the parquet metadata does not depend on the timestamp. However with the introduction of compression this is the case anyways (since slightly different timestamps lead to different compression results) and we need now derministic timestamps for tests. So there is now point in using our own timestamp type. Switching back to the variable-sized type also shrinks the post-compression results a bit. --- Cargo.lock | 1 + .../iox/catalog/v1/parquet_metadata.proto | 16 ++-- parquet_file/Cargo.toml | 1 + parquet_file/src/catalog/dump.rs | 4 +- parquet_file/src/metadata.rs | 79 ++++--------------- 5 files changed, 23 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cdfb11929c..f4adc25959 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2814,6 +2814,7 @@ dependencies = [ "datafusion_util", "futures", "generated_types", + "google_types", "internal_types", "iox_object_store", "metric", diff --git a/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto index 6b25fdccff..3991d0d1dc 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto @@ -1,13 +1,7 @@ syntax = "proto3"; package influxdata.iox.catalog.v1; -// Timestamp similar to the Google version but w/ fixed-sized integers. -// -// This is helpful for deterministic outputs. -message FixedSizeTimestamp { - sfixed64 seconds = 1; - sfixed32 nanos = 2; -} +import "google/protobuf/timestamp.proto"; // IOx-specific metadata that will be serialized into the file-level key-value Parquet metadata under a single key. message IoxMetadata { @@ -15,7 +9,7 @@ message IoxMetadata { uint32 version = 1; // Timestamp when this file was created. - FixedSizeTimestamp creation_timestamp = 2; + google.protobuf.Timestamp creation_timestamp = 2; // Table that holds this parquet file. string table_name = 3; @@ -33,10 +27,10 @@ message IoxMetadata { DatabaseCheckpoint database_checkpoint = 7; // Wallclock timestamp of when the first data in this file was received by IOx. - FixedSizeTimestamp time_of_first_write = 8; + google.protobuf.Timestamp time_of_first_write = 8; // Wallclock timestamp of when the last data in this file was received by IOx. - FixedSizeTimestamp time_of_last_write = 9; + google.protobuf.Timestamp time_of_last_write = 9; // Order of this chunk relative to other overlapping chunks. uint32 chunk_order = 10; @@ -52,7 +46,7 @@ message PartitionCheckpoint { map sequencer_numbers = 1; // Minimum unpersisted timestamp. - FixedSizeTimestamp min_unpersisted_timestamp = 2; + google.protobuf.Timestamp min_unpersisted_timestamp = 2; } // Record of the playback state for the whole database. diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index 6dd48bc334..4c0a22e783 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -14,6 +14,7 @@ datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } futures = "0.3.7" generated_types = { path = "../generated_types" } +google_types = { path = "../google_types" } internal_types = { path = "../internal_types" } iox_object_store = { path = "../iox_object_store" } metric = { path = "../metric" } diff --git a/parquet_file/src/catalog/dump.rs b/parquet_file/src/catalog/dump.rs index e7f7a07c95..c1bb223347 100644 --- a/parquet_file/src/catalog/dump.rs +++ b/parquet_file/src/catalog/dump.rs @@ -313,7 +313,7 @@ File { }, ), file_size_bytes: 33, - metadata: b"metadata omitted (944 bytes)", + metadata: b"metadata omitted (930 bytes)", }, ), ), @@ -437,7 +437,7 @@ File { }, ), file_size_bytes: 33, - metadata: b"metadata omitted (944 bytes)", + metadata: b"metadata omitted (930 bytes)", }, ), ), diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 8000656905..3737eaf31e 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -86,7 +86,7 @@ //! [Apache Parquet]: https://parquet.apache.org/ //! [Apache Thrift]: https://thrift.apache.org/ //! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, Utc}; use data_types::{ chunk_metadata::ChunkOrder, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, @@ -397,9 +397,9 @@ impl IoxMetadata { ) }) .collect(), - min_unpersisted_timestamp: Some(encode_timestamp( - self.partition_checkpoint.min_unpersisted_timestamp(), - )), + min_unpersisted_timestamp: Some( + self.partition_checkpoint.min_unpersisted_timestamp().into(), + ), }; let proto_database_checkpoint = proto::DatabaseCheckpoint { @@ -422,9 +422,9 @@ impl IoxMetadata { let proto_msg = proto::IoxMetadata { version: METADATA_VERSION, - creation_timestamp: Some(encode_timestamp(self.creation_timestamp)), - time_of_first_write: Some(encode_timestamp(self.time_of_first_write)), - time_of_last_write: Some(encode_timestamp(self.time_of_last_write)), + creation_timestamp: Some(self.creation_timestamp.into()), + time_of_first_write: Some(self.time_of_first_write.into()), + time_of_last_write: Some(self.time_of_last_write.into()), table_name: self.table_name.to_string(), partition_key: self.partition_key.to_string(), chunk_id: self.chunk_id, @@ -440,29 +440,15 @@ impl IoxMetadata { } } -fn encode_timestamp(ts: DateTime) -> proto::FixedSizeTimestamp { - proto::FixedSizeTimestamp { - seconds: ts.timestamp(), - nanos: ts.timestamp_subsec_nanos() as i32, - } -} - -fn decode_timestamp(ts: proto::FixedSizeTimestamp) -> Result> { - let dt = NaiveDateTime::from_timestamp( - ts.seconds, - ts.nanos - .try_into() - .map_err(|e| Box::new(e) as _) - .context(IoxMetadataBroken)?, - ); - Ok(chrono::DateTime::::from_utc(dt, Utc)) -} - fn decode_timestamp_from_field( - value: Option, + value: Option, field: &'static str, ) -> Result> { - decode_timestamp(value.context(IoxMetadataFieldMissing { field })?) + value + .context(IoxMetadataFieldMissing { field })? + .try_into() + .map_err(|e| Box::new(e) as _) + .context(IoxMetadataBroken) } /// Parquet metadata with IOx-specific wrapper. @@ -870,7 +856,6 @@ mod tests { use super::*; use internal_types::schema::TIME_COLUMN_NAME; - use persistence_windows::checkpoint::PersistCheckpointBuilder; use crate::test_utils::{ chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, make_chunk, @@ -1112,42 +1097,6 @@ mod tests { ); } - #[test] - fn test_iox_metadata_to_protobuf_deterministic_size() { - // checks that different timestamps do NOT alter the size of the serialized metadata - let table_name = Arc::from("table1"); - let partition_key = Arc::from("part1"); - - // try multiple time to provoke an error - for _ in 0..100 { - // build checkpoints - let min_unpersisted_timestamp = Utc::now(); - let partition_checkpoint = PartitionCheckpoint::new( - Arc::clone(&table_name), - Arc::clone(&partition_key), - Default::default(), - min_unpersisted_timestamp, - ); - let builder = PersistCheckpointBuilder::new(partition_checkpoint); - let (partition_checkpoint, database_checkpoint) = builder.build(); - - let metadata = IoxMetadata { - creation_timestamp: Utc::now(), - table_name: Arc::clone(&table_name), - partition_key: Arc::clone(&partition_key), - chunk_id: 1337, - partition_checkpoint, - database_checkpoint, - time_of_first_write: Utc::now(), - time_of_last_write: Utc::now(), - chunk_order: ChunkOrder::new(5), - }; - - let proto_bytes = metadata.to_protobuf().unwrap(); - assert_eq!(proto_bytes.len(), 90); - } - } - #[tokio::test] async fn test_parquet_metadata_size() { // setup: preserve chunk to object store @@ -1163,6 +1112,6 @@ mod tests { .await .unwrap(); let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); - assert_eq!(parquet_metadata.size(), 3733); + assert_eq!(parquet_metadata.size(), 3716); } } From e15631002e289ca4eb4492b544564a2e85d2fda6 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 20 Sep 2021 09:38:04 +0200 Subject: [PATCH 4/6] test: allow test code to specify exact parquet creation timestamp This is required for deterministic sizes since different timestamp lead to different compression ratios. --- server/src/db.rs | 24 ++++++++++++++++++++++-- server/src/db/lifecycle.rs | 2 +- server/src/db/lifecycle/persist.rs | 18 ++++++++++++++---- server/src/db/lifecycle/write.rs | 12 ++++++++---- 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index 22e65d36dd..28d39434d6 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -612,6 +612,21 @@ impl Db { partition_key: &str, now: Instant, ) -> Result> { + self.persist_partition_with_timestamp(table_name, partition_key, now, Utc::now) + .await + } + + /// Internal use only for testing. + async fn persist_partition_with_timestamp( + self: &Arc, + table_name: &str, + partition_key: &str, + now: Instant, + f_parquet_creation_timestamp: F, + ) -> Result> + where + F: Fn() -> DateTime + Send, + { // Use explicit scope to ensure the async generator doesn't // assume the locks have to possibly live across the `await` let fut = { @@ -664,8 +679,13 @@ impl Db { } ); - let (_, fut) = lifecycle::persist_chunks(partition, chunks, flush_handle) - .context(LifecycleError)?; + let (_, fut) = lifecycle::persist_chunks( + partition, + chunks, + flush_handle, + f_parquet_creation_timestamp, + ) + .context(LifecycleError)?; fut }; diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 2acc0a2c9f..f43b92926d 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -225,7 +225,7 @@ impl LockablePartition for LockableCatalogPartition { handle: Self::PersistHandle, ) -> Result, Self::Error> { info!(table=%partition.table_name(), partition=%partition.partition_key(), "persisting chunks"); - let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0)?; + let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0, Utc::now)?; let _ = tokio::spawn(async move { fut.await.log_if_error("persisting chunks") }); Ok(tracker) } diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 7fa38818f7..a1ecfb6c36 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -19,14 +19,18 @@ use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; /// Split and then persist the provided chunks /// /// TODO: Replace low-level locks with transaction object -pub fn persist_chunks( +pub fn persist_chunks( partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, chunks: Vec>, flush_handle: FlushHandle, + f_parquet_creation_timestamp: F, ) -> Result<( TaskTracker, TrackedFuture>> + Send>, -)> { +)> +where + F: Fn() -> DateTime + Send, +{ assert!( !chunks.is_empty(), "must provide at least 1 chunk to persist" @@ -164,7 +168,13 @@ pub fn persist_chunks( }; let to_persist = to_persist.write(); - write_chunk_to_object_store(partition_write, to_persist, flush_handle)?.1 + write_chunk_to_object_store( + partition_write, + to_persist, + flush_handle, + f_parquet_creation_timestamp, + )? + .1 }; // Wait for write operation to complete @@ -239,7 +249,7 @@ mod tests { assert_eq!(handle.timestamp(), Utc.timestamp_nanos(10)); let chunks: Vec<_> = chunks.map(|x| x.upgrade()).collect(); - persist_chunks(partition, chunks, handle) + persist_chunks(partition, chunks, handle, Utc::now) .unwrap() .1 .await diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index bfd7b7df71..287ff6ab63 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -12,7 +12,7 @@ use crate::db::{ use ::lifecycle::LifecycleWriteGuard; -use chrono::Utc; +use chrono::{DateTime, Utc}; use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job}; use internal_types::selection::Selection; use observability_deps::tracing::{debug, warn}; @@ -40,14 +40,18 @@ use super::{ /// /// Returns a future registered with the tracker registry, and the corresponding tracker /// The caller can either spawn this future to tokio, or block directly on it -pub(super) fn write_chunk_to_object_store( +pub(super) fn write_chunk_to_object_store( partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>, flush_handle: FlushHandle, + f_parquet_creation_timestamp: F, ) -> Result<( TaskTracker, TrackedFuture>> + Send>, -)> { +)> +where + F: Fn() -> DateTime + Send, +{ let db = Arc::clone(&chunk.data().db); let addr = chunk.addr().clone(); let table_name = Arc::clone(&addr.table_name); @@ -119,7 +123,7 @@ pub(super) fn write_chunk_to_object_store( // IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted // between creation and the transaction commit. let metadata = IoxMetadata { - creation_timestamp: Utc::now(), + creation_timestamp: f_parquet_creation_timestamp(), table_name: Arc::clone(&table_name), partition_key: Arc::clone(&partition_key), chunk_id: addr.chunk_id, From 0f5198c88db7411cb6ab3397ddf7f42cfb5a1c67 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 20 Sep 2021 09:39:15 +0200 Subject: [PATCH 5/6] test: fix tests dealing w/ parquet metadata sizes Sizes now depend on the actual content and therefore we need deterministic timestamps. --- server/src/db.rs | 79 +++++++++++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index 28d39434d6..5348861015 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1597,7 +1597,8 @@ mod tests { let db = Arc::clone(&test_db.db); - write_lp(db.as_ref(), "cpu bar=1 10").await; + let t1_write = Utc.timestamp(11, 22); + write_lp_with_time(db.as_ref(), "cpu bar=1 10", t1_write).await; let registry = test_db.metric_registry.as_ref(); @@ -1613,10 +1614,14 @@ mod tests { catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 700); // write into same chunk again. - write_lp(db.as_ref(), "cpu bar=2 20").await; - write_lp(db.as_ref(), "cpu bar=3 30").await; - write_lp(db.as_ref(), "cpu bar=4 40").await; - write_lp(db.as_ref(), "cpu bar=5 50").await; + let t2_write = t1_write + chrono::Duration::seconds(1); + write_lp_with_time(db.as_ref(), "cpu bar=2 20", t2_write).await; + let t3_write = t2_write + chrono::Duration::seconds(1); + write_lp_with_time(db.as_ref(), "cpu bar=3 30", t3_write).await; + let t4_write = t3_write + chrono::Duration::seconds(1); + write_lp_with_time(db.as_ref(), "cpu bar=4 40", t4_write).await; + let t5_write = t4_write + chrono::Duration::seconds(1); + write_lp_with_time(db.as_ref(), "cpu bar=5 50", t5_write).await; // verify chunk size updated catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 764); @@ -1658,16 +1663,18 @@ mod tests { let expected_read_buffer_size = 1706; catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size); - db.persist_partition( + let t6_write = t5_write + chrono::Duration::seconds(1); + db.persist_partition_with_timestamp( "cpu", "1970-01-01T00", Instant::now() + Duration::from_secs(1), + || t6_write, ) .await .unwrap(); // A chunk is now in the object store and still in read buffer - let expected_parquet_size = 1551; + let expected_parquet_size = 1243; catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size); // now also in OS catalog_chunk_size_bytes_metric_eq(registry, "object_store", expected_parquet_size); @@ -2061,8 +2068,10 @@ mod tests { let db = test_db.db; // Write some line protocols in Mutable buffer of the DB - write_lp(db.as_ref(), "cpu bar=1 10").await; - write_lp(db.as_ref(), "cpu bar=2 20").await; + let t1_write = Utc.timestamp(11, 22); + write_lp_with_time(db.as_ref(), "cpu bar=1 10", t1_write).await; + let t2_write = t1_write + chrono::Duration::seconds(1); + write_lp_with_time(db.as_ref(), "cpu bar=2 20", t2_write).await; //Now mark the MB chunk close let partition_key = "1970-01-01T00"; @@ -2077,11 +2086,13 @@ mod tests { .await .unwrap(); // Write the RB chunk to Object Store but keep it in RB + let t3_persist = t2_write + chrono::Duration::seconds(1); let pq_chunk = db - .persist_partition( + .persist_partition_with_timestamp( "cpu", partition_key, Instant::now() + Duration::from_secs(1), + || t3_persist, ) .await .unwrap(); @@ -2091,7 +2102,7 @@ mod tests { // Read buffer + Parquet chunk size catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); - catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1551); + catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1242); // while MB and RB chunk are identical, the PQ chunk is a new one (split off) assert_eq!(mb_chunk.id(), rb_chunk.id()); @@ -2157,8 +2168,10 @@ mod tests { let db = test_db.db; // Write some line protocols in Mutable buffer of the DB - write_lp(db.as_ref(), "cpu bar=1 10").await; - write_lp(db.as_ref(), "cpu bar=2 20").await; + let t1_write = Utc.timestamp(11, 22); + write_lp_with_time(db.as_ref(), "cpu bar=1 10", t1_write).await; + let t2_write = t1_write + chrono::Duration::seconds(1); + write_lp_with_time(db.as_ref(), "cpu bar=2 20", t2_write).await; // Now mark the MB chunk close let partition_key = "1970-01-01T00"; @@ -2173,11 +2186,13 @@ mod tests { .await .unwrap(); // Write the RB chunk to Object Store but keep it in RB + let t3_persist = t2_write + chrono::Duration::seconds(1); let pq_chunk = db - .persist_partition( + .persist_partition_with_timestamp( "cpu", partition_key, Instant::now() + Duration::from_secs(1), + || t3_persist, ) .await .unwrap(); @@ -2200,7 +2215,7 @@ mod tests { // Read buffer + Parquet chunk size catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); - catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1551); + catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1242); // Unload RB chunk but keep it in OS let pq_chunk = db @@ -2222,7 +2237,7 @@ mod tests { // Parquet chunk size only catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 0); - catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1551); + catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1242); // Verify data written to the parquet file in object store // @@ -2562,16 +2577,16 @@ mod tests { // get three chunks: one open, one closed in mb and one close in rb // In open chunk, will end up in rb/os - let t_write1 = Utc::now(); - write_lp_with_time(&db, "cpu bar=1 1", t_write1).await; + let t1_write = Utc.timestamp(11, 22); + write_lp_with_time(&db, "cpu bar=1 1", t1_write).await; // Move open chunk to closed db.rollover_partition("cpu", "1970-01-01T00").await.unwrap(); // New open chunk in mb // This point will end up in rb/os - let t_write2 = Utc::now(); - write_lp_with_time(&db, "cpu bar=1,baz=2 2", t_write2).await; + let t2_write = t1_write + chrono::Duration::seconds(1); + write_lp_with_time(&db, "cpu bar=1,baz=2 2", t2_write).await; // Check first/last write times on the chunks at this point let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return"); @@ -2580,15 +2595,15 @@ mod tests { // Each chunk has one write, so both chunks should have first write == last write let closed_mb_t3 = chunk_summaries[0].clone(); assert_eq!(closed_mb_t3.storage, ChunkStorage::ClosedMutableBuffer); - assert_first_last_times_eq(&closed_mb_t3, t_write1); + assert_first_last_times_eq(&closed_mb_t3, t1_write); let open_mb_t3 = chunk_summaries[1].clone(); assert_eq!(open_mb_t3.storage, ChunkStorage::OpenMutableBuffer); - assert_first_last_times_eq(&open_mb_t3, t_write2); + assert_first_last_times_eq(&open_mb_t3, t2_write); assert_chunks_times_ordered(&closed_mb_t3, &open_mb_t3); // This point makes a new open mb chunk and will end up in the closed mb chunk - let t_write3 = Utc::now(); - write_lp_with_time(&db, "cpu bar=1,baz=2,frob=3 400000000000000", t_write3).await; + let t3_write = t2_write + chrono::Duration::seconds(1); + write_lp_with_time(&db, "cpu bar=1,baz=2,frob=3 400000000000000", t3_write).await; // Check first/last write times on the chunks at this point let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return"); @@ -2630,10 +2645,12 @@ mod tests { assert_chunks_times_eq(&other_open_mb_t5, &other_open_mb_t4); // Persist rb to parquet os - db.persist_partition( + let t4_persist = t3_write + chrono::Duration::seconds(1); + db.persist_partition_with_timestamp( "cpu", "1970-01-01T00", Instant::now() + Duration::from_secs(1), + || t4_persist, ) .await .unwrap(); @@ -2674,8 +2691,8 @@ mod tests { // New open chunk in mb // This point will stay in this open mb chunk - let t_write4 = Utc::now(); - write_lp_with_time(&db, "cpu bar=1,baz=3,blargh=3 400000000000000", t_write4).await; + let t5_write = t4_persist + chrono::Duration::seconds(1); + write_lp_with_time(&db, "cpu bar=1,baz=3,blargh=3 400000000000000", t5_write).await; // Check first/last write times on the chunks at this point let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return"); @@ -2693,7 +2710,7 @@ mod tests { // times should be the same let open_mb_t8 = chunk_summaries[2].clone(); assert_eq!(open_mb_t8.storage, ChunkStorage::OpenMutableBuffer); - assert_first_last_times_eq(&open_mb_t8, t_write4); + assert_first_last_times_eq(&open_mb_t8, t5_write); let lifecycle_action = None; @@ -2704,8 +2721,8 @@ mod tests { id: 2, storage: ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, - memory_bytes: 4557, // size of RB and OS chunks - object_store_bytes: 1577, // size of parquet file + memory_bytes: 4085, // size of RB and OS chunks + object_store_bytes: 1533, // size of parquet file row_count: 2, time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), @@ -2759,7 +2776,7 @@ mod tests { assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 1303); assert_eq!(db.catalog.metrics().memory().read_buffer(), 2550); - assert_eq!(db.catalog.metrics().memory().object_store(), 2007); + assert_eq!(db.catalog.metrics().memory().object_store(), 1535); } #[tokio::test] From 831e55d79ea534fbfaebfd315590894bb5fd6677 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 20 Sep 2021 09:42:21 +0200 Subject: [PATCH 6/6] refactor: make error messages more precise --- parquet_file/src/metadata.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 3737eaf31e..5d7d559c4f 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -232,10 +232,10 @@ pub enum Error { ))] IoxMetadataVersionMismatch { actual: u32, expected: Vec }, - #[snafu(display("Cannot encode ZSTD message: {}", source))] + #[snafu(display("Cannot encode ZSTD message for parquet metadata: {}", source))] ZstdEncodeFailure { source: std::io::Error }, - #[snafu(display("Cannot decode ZSTD message: {}", source))] + #[snafu(display("Cannot decode ZSTD message for parquet metadata: {}", source))] ZstdDecodeFailure { source: std::io::Error }, } pub type Result = std::result::Result;