Merge pull request #1931 from influxdata/crepererum/str_arcs

refactor: convert some table name and part. key String to Arcs
pull/24376/head
kodiakhq[bot] 2021-07-08 13:28:04 +00:00 committed by GitHub
commit 3a329d3705
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 120 additions and 96 deletions

View File

@ -85,7 +85,7 @@ impl ChunkMetrics {
#[derive(Debug)]
pub struct ParquetChunk {
/// Partition this chunk belongs to
partition_key: String,
partition_key: Arc<str>,
/// Meta data of the table
table_summary: Arc<TableSummary>,
@ -146,7 +146,7 @@ impl ParquetChunk {
/// Creates a new chunk from given parts w/o parsing anything from the provided parquet metadata.
pub(crate) fn new_from_parts(
part_key: impl Into<String>,
partition_key: Arc<str>,
table_summary: Arc<TableSummary>,
schema: Arc<Schema>,
file_location: Path,
@ -157,7 +157,7 @@ impl ParquetChunk {
let timestamp_range = extract_range(&table_summary);
let mut chunk = Self {
partition_key: part_key.into(),
partition_key,
table_summary,
schema,
timestamp_range,

View File

@ -256,10 +256,10 @@ pub struct IoxMetadata {
pub creation_timestamp: DateTime<Utc>,
/// Table that holds this parquet file.
pub table_name: String,
pub table_name: Arc<str>,
/// Partition key of the partition that holds this parquet file.
pub partition_key: String,
pub partition_key: Arc<str>,
/// Chunk ID.
pub chunk_id: u32,
@ -297,6 +297,10 @@ impl IoxMetadata {
.map_err(|err| Box::new(err) as _)
.context(IoxMetadataBroken)?;
// extract strings
let table_name = Arc::from(proto_msg.table_name.as_ref());
let partition_key = Arc::from(proto_msg.partition_key.as_ref());
// extract partition checkpoint
let proto_partition_checkpoint =
proto_msg
@ -324,8 +328,8 @@ impl IoxMetadata {
.map_err(|err| Box::new(err) as _)
.context(IoxMetadataBroken)?;
let partition_checkpoint = PartitionCheckpoint::new(
proto_msg.table_name.clone(),
proto_msg.partition_key.clone(),
Arc::clone(&table_name),
Arc::clone(&partition_key),
sequencer_numbers,
min_unpersisted_timestamp,
);
@ -345,8 +349,8 @@ impl IoxMetadata {
Ok(Self {
creation_timestamp,
table_name: proto_msg.table_name,
partition_key: proto_msg.partition_key,
table_name,
partition_key,
chunk_id: proto_msg.chunk_id,
partition_checkpoint,
database_checkpoint,
@ -384,8 +388,8 @@ impl IoxMetadata {
let proto_msg = proto::IoxMetadata {
version: METADATA_VERSION,
creation_timestamp: Some(self.creation_timestamp.into()),
table_name: self.table_name.clone(),
partition_key: self.partition_key.clone(),
table_name: self.table_name.to_string(),
partition_key: self.partition_key.to_string(),
chunk_id: self.chunk_id,
partition_checkpoint: Some(proto_partition_checkpoint),
database_checkpoint: Some(proto_database_checkpoint),
@ -913,14 +917,16 @@ mod tests {
#[test]
fn test_iox_metadata_from_protobuf_checks_version() {
let table_name = "table1";
let partition_key = "part1";
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(table_name, partition_key);
let table_name = Arc::from("table1");
let partition_key = Arc::from("part1");
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&table_name),
Arc::clone(&partition_key),
);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
table_name,
partition_key,
chunk_id: 1337,
partition_checkpoint,
database_checkpoint,

View File

@ -203,7 +203,7 @@ mod tests {
async fn test_rebuild_successfull() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let db_name = Arc::<str>::from("db1");
// build catalog with some data
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>(
@ -217,13 +217,15 @@ mod tests {
{
let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file(&object_store, server_id, db_name, 0).await;
let (path, md) =
create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 0).await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
transaction.add_parquet(&path, &md).unwrap();
let (path, md) = create_parquet_file(&object_store, server_id, db_name, 1).await;
let (path, md) =
create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 1).await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
@ -239,7 +241,8 @@ mod tests {
{
let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file(&object_store, server_id, db_name, 2).await;
let (path, md) =
create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 2).await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
@ -257,7 +260,7 @@ mod tests {
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(&object_store, server_id, db_name)
PreservedCatalog::wipe(&object_store, server_id, &db_name)
.await
.unwrap();
@ -459,20 +462,22 @@ mod tests {
pub async fn create_parquet_file(
object_store: &Arc<ObjectStore>,
server_id: ServerId,
db_name: &str,
db_name: Arc<str>,
chunk_id: u32,
) -> (DirsAndFileName, IoxParquetMetaData) {
let table_name = "table1";
let partition_key = "part1";
let table_name = Arc::from("table1");
let partition_key = Arc::from("part1");
let (record_batches, _schema, _column_summaries, _num_rows) = make_record_batch("foo");
let storage = Storage::new(Arc::clone(object_store), server_id);
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(table_name, partition_key);
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&table_name),
Arc::clone(&partition_key),
);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
table_name: Arc::clone(&table_name),
partition_key: Arc::clone(&partition_key),
chunk_id,
partition_checkpoint,
database_checkpoint,
@ -481,9 +486,9 @@ mod tests {
let (path, parquet_md) = storage
.write_to_object_store(
ChunkAddr {
db_name: Arc::from(db_name),
table_name: Arc::from(table_name),
partition_key: Arc::from(partition_key),
db_name,
table_name,
partition_key,
chunk_id,
},
stream,

View File

@ -453,14 +453,16 @@ mod tests {
#[tokio::test]
async fn test_parquet_contains_key_value_metadata() {
let table_name = "table1";
let partition_key = "part1";
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(table_name, partition_key);
let table_name = Arc::from("table1");
let partition_key = Arc::from("part1");
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&table_name),
Arc::clone(&partition_key),
);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
table_name,
partition_key,
chunk_id: 1337,
partition_checkpoint,
database_checkpoint,
@ -513,9 +515,9 @@ mod tests {
// create Storage
let server_id = ServerId::try_from(1).unwrap();
let db_name = "my_db";
let table_name = "my_table";
let partition_key = "my_partition";
let db_name = Arc::from("my_db");
let table_name = Arc::from("my_table");
let partition_key = Arc::from("my_partition");
let chunk_id = 33;
let storage = Storage::new(make_object_store(), server_id);
@ -525,12 +527,14 @@ mod tests {
batch.schema(),
vec![Arc::new(batch)],
));
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(table_name, partition_key);
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&table_name),
Arc::clone(&partition_key),
);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
table_name: Arc::clone(&table_name),
partition_key: Arc::clone(&partition_key),
chunk_id,
partition_checkpoint,
database_checkpoint,
@ -539,9 +543,9 @@ mod tests {
let (path, _) = storage
.write_to_object_store(
ChunkAddr {
db_name: Arc::from(db_name),
table_name: Arc::from(table_name),
partition_key: Arc::from(partition_key),
db_name,
table_name,
partition_key,
chunk_id,
},
input_stream,

View File

@ -150,12 +150,14 @@ pub async fn make_chunk_given_record_batch(
} else {
Box::pin(MemoryStream::new(record_batches))
};
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(&addr.table_name, &addr.partition_key);
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&addr.table_name),
Arc::clone(&addr.partition_key),
);
let metadata = IoxMetadata {
creation_timestamp: Utc.timestamp(10, 20),
table_name: addr.table_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: Arc::clone(&addr.table_name),
partition_key: Arc::clone(&addr.partition_key),
chunk_id: addr.chunk_id,
partition_checkpoint,
database_checkpoint,
@ -166,7 +168,7 @@ pub async fn make_chunk_given_record_batch(
.unwrap();
ParquetChunk::new_from_parts(
addr.partition_key.to_string(),
addr.partition_key,
Arc::new(table_summary),
Arc::new(schema),
path,
@ -750,8 +752,8 @@ pub async fn make_metadata(
/// Create [`PartitionCheckpoint`] and [`DatabaseCheckpoint`] for testing.
pub fn create_partition_and_database_checkpoint(
table_name: &str,
partition_key: &str,
table_name: Arc<str>,
partition_key: Arc<str>,
) -> (PartitionCheckpoint, DatabaseCheckpoint) {
// create partition checkpoint
let mut sequencer_numbers = BTreeMap::new();
@ -759,8 +761,8 @@ pub fn create_partition_and_database_checkpoint(
sequencer_numbers.insert(2, MinMaxSequence::new(25, 28));
let min_unpersisted_timestamp = Utc.timestamp(10, 20);
let partition_checkpoint = PartitionCheckpoint::new(
table_name.to_string(),
partition_key.to_string(),
table_name,
partition_key,
sequencer_numbers,
min_unpersisted_timestamp,
);

View File

@ -33,8 +33,8 @@
//! #
//! # fn get_checkpoint(&self) -> PartitionCheckpoint {
//! # PartitionCheckpoint::new(
//! # "table".to_string(),
//! # "part".to_string(),
//! # Arc::from("table"),
//! # Arc::from("part"),
//! # Default::default(),
//! # Utc::now(),
//! # )
@ -111,6 +111,7 @@
//! use persistence_windows::checkpoint::ReplayPlanner;
//!
//! # // mocking for the example below
//! # use std::sync::Arc;
//! # use chrono::Utc;
//! # use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder};
//! #
@ -119,8 +120,8 @@
//! # impl File {
//! # fn extract_partition_checkpoint(&self) -> PartitionCheckpoint {
//! # PartitionCheckpoint::new(
//! # "table".to_string(),
//! # "part".to_string(),
//! # Arc::from("table"),
//! # Arc::from("part"),
//! # Default::default(),
//! # Utc::now(),
//! # )
@ -188,9 +189,12 @@
//!
//! // database is now ready for normal playback
//! ```
use std::collections::{
btree_map::Entry::{Occupied, Vacant},
BTreeMap,
use std::{
collections::{
btree_map::Entry::{Occupied, Vacant},
BTreeMap,
},
sync::Arc,
};
use chrono::{DateTime, Utc};
@ -207,8 +211,8 @@ pub enum Error {
PartitionCheckpointMinimumBeforeDatabase {
partition_checkpoint_sequence_number: u64,
database_checkpoint_sequence_number: u64,
table_name: String,
partition_key: String,
table_name: Arc<str>,
partition_key: Arc<str>,
},
}
@ -218,10 +222,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PartitionCheckpoint {
/// Table of the partition.
table_name: String,
table_name: Arc<str>,
/// Partition key.
partition_key: String,
partition_key: Arc<str>,
/// Maps sequencer_id to the minimum and maximum sequence numbers seen.
sequencer_numbers: BTreeMap<u32, MinMaxSequence>,
@ -233,8 +237,8 @@ pub struct PartitionCheckpoint {
impl PartitionCheckpoint {
/// Create new checkpoint.
pub fn new(
table_name: String,
partition_key: String,
table_name: Arc<str>,
partition_key: Arc<str>,
sequencer_numbers: BTreeMap<u32, MinMaxSequence>,
min_unpersisted_timestamp: DateTime<Utc>,
) -> Self {
@ -247,12 +251,12 @@ impl PartitionCheckpoint {
}
/// Table of the partition.
pub fn table_name(&self) -> &str {
pub fn table_name(&self) -> &Arc<str> {
&self.table_name
}
/// Partition key.
pub fn partition_key(&self) -> &str {
pub fn partition_key(&self) -> &Arc<str> {
&self.partition_key
}
@ -402,7 +406,7 @@ pub struct ReplayPlanner {
replay_ranges: BTreeMap<u32, (Option<u64>, Option<u64>)>,
/// Last known partition checkpoint, mapped via table name and partition key.
last_partition_checkpoints: BTreeMap<(String, String), PartitionCheckpoint>,
last_partition_checkpoints: BTreeMap<(Arc<str>, Arc<str>), PartitionCheckpoint>,
}
impl ReplayPlanner {
@ -435,8 +439,8 @@ impl ReplayPlanner {
}
match self.last_partition_checkpoints.entry((
partition_checkpoint.table_name().to_string(),
partition_checkpoint.partition_key().to_string(),
Arc::clone(partition_checkpoint.table_name()),
Arc::clone(partition_checkpoint.partition_key()),
)) {
Vacant(v) => {
// new partition => insert
@ -534,7 +538,7 @@ pub struct ReplayPlan {
replay_ranges: BTreeMap<u32, MinMaxSequence>,
/// Last known partition checkpoint, mapped via table name and partition key.
last_partition_checkpoints: BTreeMap<(String, String), PartitionCheckpoint>,
last_partition_checkpoints: BTreeMap<(Arc<str>, Arc<str>), PartitionCheckpoint>,
}
impl ReplayPlan {
@ -555,7 +559,7 @@ impl ReplayPlan {
partition_key: &str,
) -> Option<&PartitionCheckpoint> {
self.last_partition_checkpoints
.get(&(table_name.to_string(), partition_key.to_string()))
.get(&(Arc::from(table_name), Arc::from(partition_key)))
}
/// Sorted list of sequencer IDs that have to be replayed.
@ -564,7 +568,7 @@ impl ReplayPlan {
}
/// Sorted list of partitions (by table name and partition key) that have to be replayed.
pub fn partitions(&self) -> Vec<(String, String)> {
pub fn partitions(&self) -> Vec<(Arc<str>, Arc<str>)> {
self.last_partition_checkpoints.keys().cloned().collect()
}
}
@ -584,7 +588,7 @@ mod tests {
let min_unpersisted_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc);
PartitionCheckpoint::new($table_name.to_string(), $partition_key.to_string(), sequencer_numbers, min_unpersisted_timestamp)
PartitionCheckpoint::new(Arc::from($table_name), Arc::from($partition_key), sequencer_numbers, min_unpersisted_timestamp)
}
};
}
@ -606,8 +610,8 @@ mod tests {
fn test_partition_checkpoint() {
let pckpt = part_ckpt!("table_1", "partition_1", {1 => (10, 20), 2 => (5, 15)});
assert_eq!(pckpt.table_name(), "table_1");
assert_eq!(pckpt.partition_key(), "partition_1");
assert_eq!(pckpt.table_name().as_ref(), "table_1");
assert_eq!(pckpt.partition_key().as_ref(), "partition_1");
assert_eq!(
pckpt.sequencer_numbers(1).unwrap(),
MinMaxSequence::new(10, 20)
@ -703,8 +707,8 @@ mod tests {
assert_eq!(
plan.partitions(),
vec![
("table_1".to_string(), "partition_1".to_string()),
("table_1".to_string(), "partition_2".to_string())
(Arc::from("table_1"), Arc::from("partition_1")),
(Arc::from("table_1"), Arc::from("partition_2"))
]
);
assert_eq!(

View File

@ -1183,7 +1183,7 @@ mod tests {
.eq(1.0)
.unwrap();
let expected_parquet_size = 663;
let expected_parquet_size = 655;
catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"read_buffer",
@ -1587,7 +1587,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2149.0)
.sample_sum_eq(2141.0)
.unwrap();
// it should be the same chunk!
@ -1695,7 +1695,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2149.0)
.sample_sum_eq(2141.0)
.unwrap();
// Unload RB chunk but keep it in OS
@ -1722,7 +1722,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(663.0)
.sample_sum_eq(655.0)
.unwrap();
// Verify data written to the parquet file in object store
@ -2108,7 +2108,7 @@ mod tests {
Arc::from("cpu"),
0,
ChunkStorage::ReadBufferAndObjectStore,
2147, // size of RB and OS chunks
2139, // size of RB and OS chunks
1,
),
ChunkSummary::new_without_timestamps(
@ -2151,7 +2151,7 @@ mod tests {
db.catalog.metrics().memory().read_buffer().get_total(),
1484
);
assert_eq!(db.catalog.metrics().memory().parquet().get_total(), 663);
assert_eq!(db.catalog.metrics().memory().parquet().get_total(), 655);
}
#[tokio::test]

View File

@ -101,11 +101,14 @@ pub fn write_chunk_to_object_store(
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let (partition_checkpoint, database_checkpoint) =
fake_partition_and_database_checkpoint(&addr.table_name, &addr.partition_key);
fake_partition_and_database_checkpoint(
Arc::clone(&addr.table_name),
Arc::clone(&addr.partition_key),
);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: addr.table_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: Arc::clone(&addr.table_name),
partition_key: Arc::clone(&addr.partition_key),
chunk_id: addr.chunk_id,
partition_checkpoint,
database_checkpoint,
@ -184,15 +187,15 @@ pub fn write_chunk_to_object_store(
/// Fake until we have the split implementation in-place.
fn fake_partition_and_database_checkpoint(
table_name: &str,
partition_key: &str,
table_name: Arc<str>,
partition_key: Arc<str>,
) -> (PartitionCheckpoint, DatabaseCheckpoint) {
// create partition checkpoint
let sequencer_numbers = BTreeMap::new();
let min_unpersisted_timestamp = Utc::now();
let partition_checkpoint = PartitionCheckpoint::new(
table_name.to_string(),
partition_key.to_string(),
table_name,
partition_key,
sequencer_numbers,
min_unpersisted_timestamp,
);