diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 025628a408..4fdbc6a586 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -259,8 +259,7 @@ impl Compactor { // to prioritize partitions min_recent_ingested_files: usize, ) -> Result> { - let mut candidates = - Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard); + let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard); let mut repos = self.catalog.repositories().await; for shard_id in &self.shards { @@ -322,8 +321,7 @@ impl Compactor { // Max number of cold partitions per shard we want to compact max_num_partitions_per_shard: usize, ) -> Result> { - let mut candidates = - Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard); + let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard); let mut repos = self.catalog.repositories().await; for shard_id in &self.shards { diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index efbb7200dd..8cd06d0c1b 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -242,10 +242,7 @@ mod tests { table.create_column("tag2", ColumnType::Tag).await; table.create_column("tag3", ColumnType::Tag).await; table.create_column("time", ColumnType::Time).await; - let partition = table - .with_shard(&sequencer) - .create_partition("part") - .await; + let partition = table.with_shard(&sequencer).create_partition("part").await; let time = Arc::new(SystemProvider::new()); let config = make_compactor_config(); let metrics = Arc::new(metric::Registry::new()); @@ -463,10 +460,7 @@ mod tests { table.create_column("tag2", ColumnType::Tag).await; table.create_column("tag3", ColumnType::Tag).await; table.create_column("time", ColumnType::Time).await; - let partition = table - .with_shard(&sequencer) - .create_partition("part") - .await; + let partition = table.with_shard(&sequencer).create_partition("part").await; let time = Arc::new(SystemProvider::new()); let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(); let config = make_compactor_config(); @@ -650,10 +644,7 @@ mod tests { table.create_column("tag2", ColumnType::Tag).await; table.create_column("tag3", ColumnType::Tag).await; table.create_column("time", ColumnType::Time).await; - let partition = table - .with_shard(&sequencer) - .create_partition("part") - .await; + let partition = table.with_shard(&sequencer).create_partition("part").await; let time = Arc::new(SystemProvider::new()); let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(); let config = make_compactor_config(); diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 60d5c5187d..29b4a0eb29 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -319,10 +319,7 @@ pub(crate) async fn compact_parquet_files( info!(?partition_id, "compaction complete"); - let attributes = Attributes::from([( - "shard_id", - format!("{}", partition.shard_id()).into(), - )]); + let attributes = Attributes::from([("shard_id", format!("{}", partition.shard_id()).into())]); let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes); for size in file_sizes { compaction_input_file_bytes.record(size as u64); diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 4bb9ec20a6..3ee4af08e3 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -476,13 +476,7 @@ impl ShardData { }; namespace_data - .buffer_operation( - dml_operation, - shard_id, - catalog, - lifecycle_handle, - executor, - ) + .buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor) .await } @@ -944,12 +938,8 @@ impl TableData { } } - let should_pause = lifecycle_handle.log_write( - partition_data.id, - shard_id, - sequence_number, - batch.size(), - ); + let should_pause = + lifecycle_handle.log_write(partition_data.id, shard_id, sequence_number, batch.size()); partition_data.buffer_write(sequence_number, batch)?; Ok(should_pause) diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 28d283de19..c5656ac16f 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -496,7 +496,8 @@ mod tests { .await .unwrap(); - if has_measurement && shard.min_unpersisted_sequence_number == SequenceNumber::new(9) + if has_measurement + && shard.min_unpersisted_sequence_number == SequenceNumber::new(9) { break; } diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 652d4071ed..9dfb0e9bee 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -616,12 +616,9 @@ mod tests { assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(2), 1)); // log another write for different partition using a different handle - assert!(!m.handle().log_write( - PartitionId::new(2), - shard_id, - SequenceNumber::new(3), - 3 - )); + assert!(!m + .handle() + .log_write(PartitionId::new(2), shard_id, SequenceNumber::new(3), 3)); let stats = m.stats(); assert_eq!(stats.total_bytes, 5); @@ -890,12 +887,7 @@ mod tests { let partition_id = PartitionId::new(1); let persister = Arc::new(TestPersister::default()); h.log_write(partition_id, shard_id, SequenceNumber::new(1), 8); - h.log_write( - PartitionId::new(2), - shard_id, - SequenceNumber::new(2), - 13, - ); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 13); m.maybe_persist(&persister).await; @@ -912,12 +904,7 @@ mod tests { // add that partition back in over size h.log_write(partition_id, shard_id, SequenceNumber::new(3), 20); - h.log_write( - PartitionId::new(2), - shard_id, - SequenceNumber::new(4), - 21, - ); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(4), 21); // both partitions should now need to be persisted to bring us below the mem threshold of 20. m.maybe_persist(&persister).await; diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 9b54d08ec7..3d8081e8e7 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -860,13 +860,13 @@ pub(crate) async fn make_one_partition_with_tombstones( let mut seq_num = seq_num.get(); seq_num += 1; let ts = create_tombstone( - 2, // tombstone id - table_id.get(), // table id + 2, // tombstone id + table_id.get(), // table id shard_id.get(), // sequencer id - seq_num, // delete's seq_number - 10, // min time of data to get deleted - 50, // max time of data to get deleted - "city=Boston", // delete predicate + seq_num, // delete's seq_number + 10, // min time of data to get deleted + 50, // max time of data to get deleted + "city=Boston", // delete predicate ); p1.buffer_tombstone(exec, table_name, ts).await; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 91c5cd7d0c..d84bda82a5 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -5,8 +5,8 @@ use data_types::{ Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, - QueryPoolId, SequenceNumber, Shard, ShardId, Table, TableId, TablePartition, - TableSchema, Timestamp, Tombstone, TombstoneId, + QueryPoolId, SequenceNumber, Shard, ShardId, Table, TableId, TablePartition, TableSchema, + Timestamp, Tombstone, TombstoneId, }; use iox_time::TimeProvider; use snafu::{OptionExt, Snafu}; diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index f50d6f1915..5a23f72d8c 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -13,8 +13,7 @@ use crate::interface::{ColumnUpsertRequest, Error, RepoCollection, Result, Transaction}; use data_types::{ - ColumnType, KafkaPartition, KafkaTopic, NamespaceSchema, QueryPool, Shard, ShardId, - TableSchema, + ColumnType, KafkaPartition, KafkaTopic, NamespaceSchema, QueryPool, Shard, ShardId, TableSchema, }; use mutable_batch::MutableBatch; use std::{borrow::Cow, collections::BTreeMap}; diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index ca179fb7b1..e550dc1bd0 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -691,22 +691,23 @@ impl PartitionRepo for MemTxn { ) -> Result { let stage = self.stage(); - let partition = match stage.partitions.iter().find(|p| { - p.partition_key == key && p.shard_id == shard_id && p.table_id == table_id - }) { - Some(p) => p, - None => { - let p = Partition { - id: PartitionId::new(stage.partitions.len() as i64 + 1), - shard_id, - table_id, - partition_key: key, - sort_key: vec![], - }; - stage.partitions.push(p); - stage.partitions.last().unwrap() - } - }; + let partition = + match stage.partitions.iter().find(|p| { + p.partition_key == key && p.shard_id == shard_id && p.table_id == table_id + }) { + Some(p) => p, + None => { + let p = Partition { + id: PartitionId::new(stage.partitions.len() as i64 + 1), + shard_id, + table_id, + partition_key: key, + sort_key: vec![], + }; + stage.partitions.push(p); + stage.partitions.last().unwrap() + } + }; Ok(partition.clone()) } @@ -829,9 +830,7 @@ impl TombstoneRepo for MemTxn { let stage = self.stage(); let tombstone = match stage.tombstones.iter().find(|t| { - t.table_id == table_id - && t.shard_id == shard_id - && t.sequence_number == sequence_number + t.table_id == table_id && t.shard_id == shard_id && t.sequence_number == sequence_number }) { Some(t) => t, None => { diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 490042d032..3a30d6415c 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -7,8 +7,7 @@ use arrow::{ use data_types::{ Column, ColumnSet, ColumnType, CompactionLevel, KafkaPartition, KafkaTopic, Namespace, NamespaceSchema, ParquetFile, ParquetFileParams, Partition, PartitionId, QueryPool, - SequenceNumber, Shard, ShardId, Table, TableId, TableSchema, Timestamp, Tombstone, - TombstoneId, + SequenceNumber, Shard, ShardId, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId, }; use datafusion::physical_plan::metrics::Count; use iox_catalog::{ @@ -184,10 +183,7 @@ impl TestCatalog { } /// List level 0 files - pub async fn list_level_0_files( - self: &Arc, - shard_id: ShardId, - ) -> Vec { + pub async fn list_level_0_files(self: &Arc, shard_id: ShardId) -> Vec { self.catalog .repositories() .await @@ -299,10 +295,7 @@ pub struct TestTable { impl TestTable { /// Attach a shard to the table - pub fn with_shard( - self: &Arc, - shard: &Arc, - ) -> Arc { + pub fn with_shard(self: &Arc, shard: &Arc) -> Arc { assert!(Arc::ptr_eq(&self.catalog, &shard.catalog)); assert!(Arc::ptr_eq(&self.namespace, &shard.namespace)); diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 93424122dc..5dfc94dd1d 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -89,8 +89,8 @@ use bytes::Bytes; use data_types::{ ColumnId, ColumnSet, ColumnSummary, CompactionLevel, InfluxDbType, NamespaceId, - ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, ShardId, StatValues, - Statistics, TableId, Timestamp, + ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, ShardId, StatValues, Statistics, + TableId, Timestamp, }; use generated_types::influxdata::iox::ingester::v1 as proto; use iox_time::Time; diff --git a/parquet_file/src/serialize.rs b/parquet_file/src/serialize.rs index 00eeb723d8..c41b7e1f79 100644 --- a/parquet_file/src/serialize.rs +++ b/parquet_file/src/serialize.rs @@ -166,9 +166,7 @@ mod tests { use crate::metadata::IoxParquetMetaData; use arrow::array::{ArrayRef, StringArray}; use bytes::Bytes; - use data_types::{ - CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId, - }; + use data_types::{CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId}; use iox_time::Time; use parquet::{ arrow::{ArrowReader, ParquetFileArrowReader}, diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index c864819875..57478eb037 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -405,9 +405,7 @@ fn project_for_parquet_reader( mod tests { use super::*; use arrow::array::{ArrayRef, Int64Array, StringArray}; - use data_types::{ - CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId, - }; + use data_types::{CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId}; use datafusion::common::DataFusionError; use iox_time::Time; use std::collections::HashMap; diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index c82a603f7b..d31558b79a 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -475,10 +475,7 @@ mod tests { table.create_column("time", ColumnType::Time).await; let sequencer1 = ns.create_shard(1).await; - let partition = table - .with_shard(&sequencer1) - .create_partition("k") - .await; + let partition = table.with_shard(&sequencer1).create_partition("k").await; (table, partition) } diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs index f58dbdff36..5e1c78e169 100644 --- a/querier/src/cache/read_buffer.rs +++ b/querier/src/cache/read_buffer.rs @@ -250,10 +250,7 @@ mod tests { table.create_column("time", ColumnType::Time).await; let sequencer1 = ns.create_shard(1).await; - let partition = table - .with_shard(&sequencer1) - .create_partition("k") - .await; + let partition = table.with_shard(&sequencer1).create_partition("k").await; (catalog, partition) } @@ -328,10 +325,7 @@ mod tests { table.create_column("time", ColumnType::Time).await; let sequencer1 = ns.create_shard(1).await; - let partition = table - .with_shard(&sequencer1) - .create_partition("k") - .await; + let partition = table.with_shard(&sequencer1).create_partition("k").await; let builder = TestParquetFileBuilder::default() .with_line_protocol(&format!("{table_name} foo=1 11")); diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 28a747fc88..cb89d74fcc 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -549,18 +549,9 @@ mod tests { let sequencer1 = ns.create_shard(1).await; let sequencer2 = ns.create_shard(2).await; - let partition11 = table1 - .with_shard(&sequencer1) - .create_partition("k") - .await; - let partition12 = table1 - .with_shard(&sequencer2) - .create_partition("k") - .await; - let partition21 = table2 - .with_shard(&sequencer1) - .create_partition("k") - .await; + let partition11 = table1.with_shard(&sequencer1).create_partition("k").await; + let partition12 = table1.with_shard(&sequencer2).create_partition("k").await; + let partition21 = table2.with_shard(&sequencer1).create_partition("k").await; table1.create_column("time", ColumnType::Time).await; table1.create_column("foo", ColumnType::F64).await; @@ -761,14 +752,8 @@ mod tests { let ns = catalog.create_namespace("ns").await; let table = ns.create_table("table").await; let sequencer = ns.create_shard(1).await; - let partition1 = table - .with_shard(&sequencer) - .create_partition("k1") - .await; - let partition2 = table - .with_shard(&sequencer) - .create_partition("k2") - .await; + let partition1 = table.with_shard(&sequencer).create_partition("k1").await; + let partition2 = table.with_shard(&sequencer).create_partition("k2").await; table.create_column("time", ColumnType::Time).await; table.create_column("foo", ColumnType::F64).await; @@ -914,14 +899,8 @@ mod tests { let ns = catalog.create_namespace("ns").await; let table = ns.create_table("table").await; let sequencer = ns.create_shard(1).await; - let partition1 = table - .with_shard(&sequencer) - .create_partition("k1") - .await; - let partition2 = table - .with_shard(&sequencer) - .create_partition("k2") - .await; + let partition1 = table.with_shard(&sequencer).create_partition("k1").await; + let partition2 = table.with_shard(&sequencer).create_partition("k2").await; let schema = Arc::new( SchemaBuilder::new() diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index b046fc3135..f5db155e33 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -104,8 +104,7 @@ impl Reconciler { tombstones.into_iter().map(QuerierTombstone::from).collect(); // match chunks and tombstones - let mut tombstones_by_shard: HashMap> = - HashMap::new(); + let mut tombstones_by_shard: HashMap> = HashMap::new(); for tombstone in querier_tombstones { tombstones_by_shard @@ -129,8 +128,7 @@ impl Reconciler { Vec::with_capacity(parquet_files.len() + ingester_partitions.len()); for chunk in parquet_files.into_iter() { - let chunk = if let Some(tombstones) = - tombstones_by_shard.get(&chunk.meta().shard_id()) + let chunk = if let Some(tombstones) = tombstones_by_shard.get(&chunk.meta().shard_id()) { let mut delete_predicates = Vec::with_capacity(tombstones.len()); for tombstone in tombstones { diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 66b49677a0..9589c3e051 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -14,9 +14,7 @@ use generated_types::{ }; use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError}; use ingester::{ - data::{ - FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, ShardData, - }, + data::{FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, ShardData}, lifecycle::LifecycleHandle, querier_handler::prepare_data_to_querier, }; @@ -694,10 +692,7 @@ impl MockIngester { let shards = BTreeMap::from([( sequencer.shard.id, - ShardData::new( - sequencer.shard.kafka_partition, - catalog.metric_registry(), - ), + ShardData::new(sequencer.shard.kafka_partition, catalog.metric_registry()), )]); let ingester_data = Arc::new(IngesterData::new( catalog.object_store(), @@ -730,11 +725,7 @@ impl MockIngester { let should_pause = self .ingester_data - .buffer_operation( - self.sequencer.shard.id, - dml_operation, - &lifecycle_handle, - ) + .buffer_operation(self.sequencer.shard.id, dml_operation, &lifecycle_handle) .await .unwrap(); assert!(!should_pause); diff --git a/write_summary/src/progress.rs b/write_summary/src/progress.rs index bf23ef4cff..50f1c48259 100644 --- a/write_summary/src/progress.rs +++ b/write_summary/src/progress.rs @@ -218,9 +218,7 @@ mod tests { let eq = SequenceNumber::new(1); let gt = SequenceNumber::new(2); - let progress = ShardProgress::new() - .with_buffered(eq) - .with_persisted(lt); + let progress = ShardProgress::new().with_buffered(eq).with_persisted(lt); assert!(progress.readable(lt)); assert!(progress.persisted(lt)); @@ -238,9 +236,7 @@ mod tests { let eq = SequenceNumber::new(1); let gt = SequenceNumber::new(2); - let progress = ShardProgress::new() - .with_buffered(eq) - .with_persisted(eq); + let progress = ShardProgress::new().with_buffered(eq).with_persisted(eq); assert!(progress.readable(lt)); assert!(progress.persisted(lt)); @@ -282,9 +278,7 @@ mod tests { let progress1 = ShardProgress::new().with_buffered(gt); - let progress2 = ShardProgress::new() - .with_buffered(lt) - .with_persisted(eq); + let progress2 = ShardProgress::new().with_buffered(lt).with_persisted(eq); let expected = ShardProgress::new() .with_buffered(lt)