diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 0246bb79ae..4b267a5416 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -85,7 +85,7 @@ impl ChunkMetrics { #[derive(Debug)] pub struct ParquetChunk { /// Partition this chunk belongs to - partition_key: String, + partition_key: Arc, /// Meta data of the table table_summary: Arc, @@ -146,7 +146,7 @@ impl ParquetChunk { /// Creates a new chunk from given parts w/o parsing anything from the provided parquet metadata. pub(crate) fn new_from_parts( - part_key: impl Into, + partition_key: Arc, table_summary: Arc, schema: Arc, file_location: Path, @@ -157,7 +157,7 @@ impl ParquetChunk { let timestamp_range = extract_range(&table_summary); let mut chunk = Self { - partition_key: part_key.into(), + partition_key, table_summary, schema, timestamp_range, diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index bda14f420c..fe4356669e 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -256,10 +256,10 @@ pub struct IoxMetadata { pub creation_timestamp: DateTime, /// Table that holds this parquet file. - pub table_name: String, + pub table_name: Arc, /// Partition key of the partition that holds this parquet file. - pub partition_key: String, + pub partition_key: Arc, /// Chunk ID. pub chunk_id: u32, @@ -297,6 +297,10 @@ impl IoxMetadata { .map_err(|err| Box::new(err) as _) .context(IoxMetadataBroken)?; + // extract strings + let table_name = Arc::from(proto_msg.table_name.as_ref()); + let partition_key = Arc::from(proto_msg.partition_key.as_ref()); + // extract partition checkpoint let proto_partition_checkpoint = proto_msg @@ -324,8 +328,8 @@ impl IoxMetadata { .map_err(|err| Box::new(err) as _) .context(IoxMetadataBroken)?; let partition_checkpoint = PartitionCheckpoint::new( - proto_msg.table_name.clone(), - proto_msg.partition_key.clone(), + Arc::clone(&table_name), + Arc::clone(&partition_key), sequencer_numbers, min_unpersisted_timestamp, ); @@ -345,8 +349,8 @@ impl IoxMetadata { Ok(Self { creation_timestamp, - table_name: proto_msg.table_name, - partition_key: proto_msg.partition_key, + table_name, + partition_key, chunk_id: proto_msg.chunk_id, partition_checkpoint, database_checkpoint, @@ -384,8 +388,8 @@ impl IoxMetadata { let proto_msg = proto::IoxMetadata { version: METADATA_VERSION, creation_timestamp: Some(self.creation_timestamp.into()), - table_name: self.table_name.clone(), - partition_key: self.partition_key.clone(), + table_name: self.table_name.to_string(), + partition_key: self.partition_key.to_string(), chunk_id: self.chunk_id, partition_checkpoint: Some(proto_partition_checkpoint), database_checkpoint: Some(proto_database_checkpoint), @@ -913,14 +917,16 @@ mod tests { #[test] fn test_iox_metadata_from_protobuf_checks_version() { - let table_name = "table1"; - let partition_key = "part1"; - let (partition_checkpoint, database_checkpoint) = - create_partition_and_database_checkpoint(table_name, partition_key); + let table_name = Arc::from("table1"); + let partition_key = Arc::from("part1"); + let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( + Arc::clone(&table_name), + Arc::clone(&partition_key), + ); let metadata = IoxMetadata { creation_timestamp: Utc::now(), - table_name: table_name.to_string(), - partition_key: partition_key.to_string(), + table_name, + partition_key, chunk_id: 1337, partition_checkpoint, database_checkpoint, diff --git a/parquet_file/src/rebuild.rs b/parquet_file/src/rebuild.rs index f0ad0aafa0..06294c9157 100644 --- a/parquet_file/src/rebuild.rs +++ b/parquet_file/src/rebuild.rs @@ -203,7 +203,7 @@ mod tests { async fn test_rebuild_successfull() { let object_store = make_object_store(); let server_id = make_server_id(); - let db_name = "db1"; + let db_name = Arc::::from("db1"); // build catalog with some data let (catalog, mut state) = PreservedCatalog::new_empty::( @@ -217,13 +217,15 @@ mod tests { { let mut transaction = catalog.open_transaction().await; - let (path, md) = create_parquet_file(&object_store, server_id, db_name, 0).await; + let (path, md) = + create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 0).await; state .parquet_files .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - let (path, md) = create_parquet_file(&object_store, server_id, db_name, 1).await; + let (path, md) = + create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 1).await; state .parquet_files .insert(path.clone(), Arc::new(md.clone())); @@ -239,7 +241,8 @@ mod tests { { let mut transaction = catalog.open_transaction().await; - let (path, md) = create_parquet_file(&object_store, server_id, db_name, 2).await; + let (path, md) = + create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 2).await; state .parquet_files .insert(path.clone(), Arc::new(md.clone())); @@ -257,7 +260,7 @@ mod tests { // wipe catalog drop(catalog); - PreservedCatalog::wipe(&object_store, server_id, db_name) + PreservedCatalog::wipe(&object_store, server_id, &db_name) .await .unwrap(); @@ -459,20 +462,22 @@ mod tests { pub async fn create_parquet_file( object_store: &Arc, server_id: ServerId, - db_name: &str, + db_name: Arc, chunk_id: u32, ) -> (DirsAndFileName, IoxParquetMetaData) { - let table_name = "table1"; - let partition_key = "part1"; + let table_name = Arc::from("table1"); + let partition_key = Arc::from("part1"); let (record_batches, _schema, _column_summaries, _num_rows) = make_record_batch("foo"); let storage = Storage::new(Arc::clone(object_store), server_id); - let (partition_checkpoint, database_checkpoint) = - create_partition_and_database_checkpoint(table_name, partition_key); + let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( + Arc::clone(&table_name), + Arc::clone(&partition_key), + ); let metadata = IoxMetadata { creation_timestamp: Utc::now(), - table_name: table_name.to_string(), - partition_key: partition_key.to_string(), + table_name: Arc::clone(&table_name), + partition_key: Arc::clone(&partition_key), chunk_id, partition_checkpoint, database_checkpoint, @@ -481,9 +486,9 @@ mod tests { let (path, parquet_md) = storage .write_to_object_store( ChunkAddr { - db_name: Arc::from(db_name), - table_name: Arc::from(table_name), - partition_key: Arc::from(partition_key), + db_name, + table_name, + partition_key, chunk_id, }, stream, diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index e3ae669a32..2ddd261f38 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -453,14 +453,16 @@ mod tests { #[tokio::test] async fn test_parquet_contains_key_value_metadata() { - let table_name = "table1"; - let partition_key = "part1"; - let (partition_checkpoint, database_checkpoint) = - create_partition_and_database_checkpoint(table_name, partition_key); + let table_name = Arc::from("table1"); + let partition_key = Arc::from("part1"); + let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( + Arc::clone(&table_name), + Arc::clone(&partition_key), + ); let metadata = IoxMetadata { creation_timestamp: Utc::now(), - table_name: table_name.to_string(), - partition_key: partition_key.to_string(), + table_name, + partition_key, chunk_id: 1337, partition_checkpoint, database_checkpoint, @@ -513,9 +515,9 @@ mod tests { // create Storage let server_id = ServerId::try_from(1).unwrap(); - let db_name = "my_db"; - let table_name = "my_table"; - let partition_key = "my_partition"; + let db_name = Arc::from("my_db"); + let table_name = Arc::from("my_table"); + let partition_key = Arc::from("my_partition"); let chunk_id = 33; let storage = Storage::new(make_object_store(), server_id); @@ -525,12 +527,14 @@ mod tests { batch.schema(), vec![Arc::new(batch)], )); - let (partition_checkpoint, database_checkpoint) = - create_partition_and_database_checkpoint(table_name, partition_key); + let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( + Arc::clone(&table_name), + Arc::clone(&partition_key), + ); let metadata = IoxMetadata { creation_timestamp: Utc::now(), - table_name: table_name.to_string(), - partition_key: partition_key.to_string(), + table_name: Arc::clone(&table_name), + partition_key: Arc::clone(&partition_key), chunk_id, partition_checkpoint, database_checkpoint, @@ -539,9 +543,9 @@ mod tests { let (path, _) = storage .write_to_object_store( ChunkAddr { - db_name: Arc::from(db_name), - table_name: Arc::from(table_name), - partition_key: Arc::from(partition_key), + db_name, + table_name, + partition_key, chunk_id, }, input_stream, diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index f83c8c9d10..b39c5698ad 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -150,12 +150,14 @@ pub async fn make_chunk_given_record_batch( } else { Box::pin(MemoryStream::new(record_batches)) }; - let (partition_checkpoint, database_checkpoint) = - create_partition_and_database_checkpoint(&addr.table_name, &addr.partition_key); + let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( + Arc::clone(&addr.table_name), + Arc::clone(&addr.partition_key), + ); let metadata = IoxMetadata { creation_timestamp: Utc.timestamp(10, 20), - table_name: addr.table_name.to_string(), - partition_key: addr.partition_key.to_string(), + table_name: Arc::clone(&addr.table_name), + partition_key: Arc::clone(&addr.partition_key), chunk_id: addr.chunk_id, partition_checkpoint, database_checkpoint, @@ -166,7 +168,7 @@ pub async fn make_chunk_given_record_batch( .unwrap(); ParquetChunk::new_from_parts( - addr.partition_key.to_string(), + addr.partition_key, Arc::new(table_summary), Arc::new(schema), path, @@ -750,8 +752,8 @@ pub async fn make_metadata( /// Create [`PartitionCheckpoint`] and [`DatabaseCheckpoint`] for testing. pub fn create_partition_and_database_checkpoint( - table_name: &str, - partition_key: &str, + table_name: Arc, + partition_key: Arc, ) -> (PartitionCheckpoint, DatabaseCheckpoint) { // create partition checkpoint let mut sequencer_numbers = BTreeMap::new(); @@ -759,8 +761,8 @@ pub fn create_partition_and_database_checkpoint( sequencer_numbers.insert(2, MinMaxSequence::new(25, 28)); let min_unpersisted_timestamp = Utc.timestamp(10, 20); let partition_checkpoint = PartitionCheckpoint::new( - table_name.to_string(), - partition_key.to_string(), + table_name, + partition_key, sequencer_numbers, min_unpersisted_timestamp, ); diff --git a/persistence_windows/src/checkpoint.rs b/persistence_windows/src/checkpoint.rs index 5fc9b2e2fb..ce843df72b 100644 --- a/persistence_windows/src/checkpoint.rs +++ b/persistence_windows/src/checkpoint.rs @@ -33,8 +33,8 @@ //! # //! # fn get_checkpoint(&self) -> PartitionCheckpoint { //! # PartitionCheckpoint::new( -//! # "table".to_string(), -//! # "part".to_string(), +//! # Arc::from("table"), +//! # Arc::from("part"), //! # Default::default(), //! # Utc::now(), //! # ) @@ -111,6 +111,7 @@ //! use persistence_windows::checkpoint::ReplayPlanner; //! //! # // mocking for the example below +//! # use std::sync::Arc; //! # use chrono::Utc; //! # use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder}; //! # @@ -119,8 +120,8 @@ //! # impl File { //! # fn extract_partition_checkpoint(&self) -> PartitionCheckpoint { //! # PartitionCheckpoint::new( -//! # "table".to_string(), -//! # "part".to_string(), +//! # Arc::from("table"), +//! # Arc::from("part"), //! # Default::default(), //! # Utc::now(), //! # ) @@ -188,9 +189,12 @@ //! //! // database is now ready for normal playback //! ``` -use std::collections::{ - btree_map::Entry::{Occupied, Vacant}, - BTreeMap, +use std::{ + collections::{ + btree_map::Entry::{Occupied, Vacant}, + BTreeMap, + }, + sync::Arc, }; use chrono::{DateTime, Utc}; @@ -207,8 +211,8 @@ pub enum Error { PartitionCheckpointMinimumBeforeDatabase { partition_checkpoint_sequence_number: u64, database_checkpoint_sequence_number: u64, - table_name: String, - partition_key: String, + table_name: Arc, + partition_key: Arc, }, } @@ -218,10 +222,10 @@ pub type Result = std::result::Result; #[derive(Debug, Clone, PartialEq, Eq)] pub struct PartitionCheckpoint { /// Table of the partition. - table_name: String, + table_name: Arc, /// Partition key. - partition_key: String, + partition_key: Arc, /// Maps sequencer_id to the minimum and maximum sequence numbers seen. sequencer_numbers: BTreeMap, @@ -233,8 +237,8 @@ pub struct PartitionCheckpoint { impl PartitionCheckpoint { /// Create new checkpoint. pub fn new( - table_name: String, - partition_key: String, + table_name: Arc, + partition_key: Arc, sequencer_numbers: BTreeMap, min_unpersisted_timestamp: DateTime, ) -> Self { @@ -247,12 +251,12 @@ impl PartitionCheckpoint { } /// Table of the partition. - pub fn table_name(&self) -> &str { + pub fn table_name(&self) -> &Arc { &self.table_name } /// Partition key. - pub fn partition_key(&self) -> &str { + pub fn partition_key(&self) -> &Arc { &self.partition_key } @@ -402,7 +406,7 @@ pub struct ReplayPlanner { replay_ranges: BTreeMap, Option)>, /// Last known partition checkpoint, mapped via table name and partition key. - last_partition_checkpoints: BTreeMap<(String, String), PartitionCheckpoint>, + last_partition_checkpoints: BTreeMap<(Arc, Arc), PartitionCheckpoint>, } impl ReplayPlanner { @@ -435,8 +439,8 @@ impl ReplayPlanner { } match self.last_partition_checkpoints.entry(( - partition_checkpoint.table_name().to_string(), - partition_checkpoint.partition_key().to_string(), + Arc::clone(partition_checkpoint.table_name()), + Arc::clone(partition_checkpoint.partition_key()), )) { Vacant(v) => { // new partition => insert @@ -534,7 +538,7 @@ pub struct ReplayPlan { replay_ranges: BTreeMap, /// Last known partition checkpoint, mapped via table name and partition key. - last_partition_checkpoints: BTreeMap<(String, String), PartitionCheckpoint>, + last_partition_checkpoints: BTreeMap<(Arc, Arc), PartitionCheckpoint>, } impl ReplayPlan { @@ -555,7 +559,7 @@ impl ReplayPlan { partition_key: &str, ) -> Option<&PartitionCheckpoint> { self.last_partition_checkpoints - .get(&(table_name.to_string(), partition_key.to_string())) + .get(&(Arc::from(table_name), Arc::from(partition_key))) } /// Sorted list of sequencer IDs that have to be replayed. @@ -564,7 +568,7 @@ impl ReplayPlan { } /// Sorted list of partitions (by table name and partition key) that have to be replayed. - pub fn partitions(&self) -> Vec<(String, String)> { + pub fn partitions(&self) -> Vec<(Arc, Arc)> { self.last_partition_checkpoints.keys().cloned().collect() } } @@ -584,7 +588,7 @@ mod tests { let min_unpersisted_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc); - PartitionCheckpoint::new($table_name.to_string(), $partition_key.to_string(), sequencer_numbers, min_unpersisted_timestamp) + PartitionCheckpoint::new(Arc::from($table_name), Arc::from($partition_key), sequencer_numbers, min_unpersisted_timestamp) } }; } @@ -606,8 +610,8 @@ mod tests { fn test_partition_checkpoint() { let pckpt = part_ckpt!("table_1", "partition_1", {1 => (10, 20), 2 => (5, 15)}); - assert_eq!(pckpt.table_name(), "table_1"); - assert_eq!(pckpt.partition_key(), "partition_1"); + assert_eq!(pckpt.table_name().as_ref(), "table_1"); + assert_eq!(pckpt.partition_key().as_ref(), "partition_1"); assert_eq!( pckpt.sequencer_numbers(1).unwrap(), MinMaxSequence::new(10, 20) @@ -703,8 +707,8 @@ mod tests { assert_eq!( plan.partitions(), vec![ - ("table_1".to_string(), "partition_1".to_string()), - ("table_1".to_string(), "partition_2".to_string()) + (Arc::from("table_1"), Arc::from("partition_1")), + (Arc::from("table_1"), Arc::from("partition_2")) ] ); assert_eq!( diff --git a/server/src/db.rs b/server/src/db.rs index 76516241cb..d29b3f5f40 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1183,7 +1183,7 @@ mod tests { .eq(1.0) .unwrap(); - let expected_parquet_size = 663; + let expected_parquet_size = 655; catalog_chunk_size_bytes_metric_eq( &test_db.metric_registry, "read_buffer", @@ -1587,7 +1587,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2149.0) + .sample_sum_eq(2141.0) .unwrap(); // it should be the same chunk! @@ -1695,7 +1695,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2149.0) + .sample_sum_eq(2141.0) .unwrap(); // Unload RB chunk but keep it in OS @@ -1722,7 +1722,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(663.0) + .sample_sum_eq(655.0) .unwrap(); // Verify data written to the parquet file in object store @@ -2108,7 +2108,7 @@ mod tests { Arc::from("cpu"), 0, ChunkStorage::ReadBufferAndObjectStore, - 2147, // size of RB and OS chunks + 2139, // size of RB and OS chunks 1, ), ChunkSummary::new_without_timestamps( @@ -2151,7 +2151,7 @@ mod tests { db.catalog.metrics().memory().read_buffer().get_total(), 1484 ); - assert_eq!(db.catalog.metrics().memory().parquet().get_total(), 663); + assert_eq!(db.catalog.metrics().memory().parquet().get_total(), 655); } #[tokio::test] diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index bdb028f3c5..e8ac6a0a93 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -101,11 +101,14 @@ pub fn write_chunk_to_object_store( // IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted // between creation and the transaction commit. let (partition_checkpoint, database_checkpoint) = - fake_partition_and_database_checkpoint(&addr.table_name, &addr.partition_key); + fake_partition_and_database_checkpoint( + Arc::clone(&addr.table_name), + Arc::clone(&addr.partition_key), + ); let metadata = IoxMetadata { creation_timestamp: Utc::now(), - table_name: addr.table_name.to_string(), - partition_key: addr.partition_key.to_string(), + table_name: Arc::clone(&addr.table_name), + partition_key: Arc::clone(&addr.partition_key), chunk_id: addr.chunk_id, partition_checkpoint, database_checkpoint, @@ -184,15 +187,15 @@ pub fn write_chunk_to_object_store( /// Fake until we have the split implementation in-place. fn fake_partition_and_database_checkpoint( - table_name: &str, - partition_key: &str, + table_name: Arc, + partition_key: Arc, ) -> (PartitionCheckpoint, DatabaseCheckpoint) { // create partition checkpoint let sequencer_numbers = BTreeMap::new(); let min_unpersisted_timestamp = Utc::now(); let partition_checkpoint = PartitionCheckpoint::new( - table_name.to_string(), - partition_key.to_string(), + table_name, + partition_key, sequencer_numbers, min_unpersisted_timestamp, );