diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 4667e9fa22..6428932717 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -13,7 +13,7 @@ use backoff::{Backoff, BackoffConfig}; use bytes::Bytes; use data_types::{ ParquetFile, ParquetFileId, ParquetFileWithMetadata, PartitionId, SequencerId, TableId, - TablePartition, Timestamp, Tombstone, TombstoneId, + Timestamp, Tombstone, TombstoneId, }; use datafusion::error::DataFusionError; use iox_catalog::interface::{Catalog, Transaction}; @@ -286,21 +286,6 @@ impl Compactor { .context(Level0Snafu) } - async fn level_1_parquet_files( - &self, - table_partition: TablePartition, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result> { - let mut repos = self.catalog.repositories().await; - - repos - .parquet_files() - .level_1(table_partition, min_time, max_time) - .await - .context(Level1Snafu) - } - async fn update_to_level_1(&self, parquet_file_ids: &[ParquetFileId]) -> Result<()> { let mut repos = self.catalog.repositories().await; diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 533ac414ba..1763363245 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -48,6 +48,7 @@ fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle { #[derive(Debug)] pub struct CompactorHandlerImpl { /// Data to compact + #[allow(dead_code)] compactor_data: Arc, /// A token that is used to trigger shutdown of the background worker diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 058967618c..3acd6a70b7 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -10,7 +10,6 @@ clippy::use_self, clippy::clone_on_ref_ptr )] -#![allow(dead_code)] pub mod compact; pub mod garbage_collector; diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index 5d37afb5a6..e67873a123 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -156,6 +156,7 @@ impl CompactedData { /// Information needed to update the catalog after compacting a group of files #[derive(Debug)] pub struct CatalogUpdate { + #[allow(dead_code)] pub(crate) meta: IoxMetadata, pub(crate) tombstones: BTreeMap, pub(crate) parquet_file: ParquetFileParams, diff --git a/executor/src/lib.rs b/executor/src/lib.rs index c23dbd8ebd..75a4c930b4 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -48,7 +48,6 @@ impl Task { } /// The type of error that is returned from tasks in this module -#[allow(dead_code)] pub type Error = tokio::sync::oneshot::error::RecvError; /// Job within the executor. diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 2cbf704447..8ae1bad6c0 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -79,7 +79,10 @@ pub enum Error { #[snafu(display("Nothing in the Persisting list to get removed"))] PersistingEmpty, - #[snafu(display("The given batch does not match any in the Persisting list. Nothing is removed from the Persisting list"))] + #[snafu(display( + "The given batch does not match any in the Persisting list. \ + Nothing is removed from the Persisting list" + ))] PersistingNotMatch, #[snafu(display("Cannot partition data: {}", source))] @@ -154,6 +157,7 @@ impl IngesterData { } /// Get sequencer data for specific sequencer. + #[allow(dead_code)] // Used in tests pub(crate) fn sequencer(&self, sequencer_id: SequencerId) -> Option<&SequencerData> { self.sequencers.get(&sequencer_id) } @@ -191,21 +195,6 @@ impl IngesterData { .await } - /// Return table data of a given (sequencer id, namespace name, and table name) - pub(crate) fn table_data( - &self, - sequencer_id: SequencerId, - namespace_name: &str, - table_name: &str, - ) -> Option>> { - let sequencer_data = self.sequencers.get(&sequencer_id)?; - let namespaces = sequencer_data.namespaces.read(); - let namespace_data = namespaces.get(namespace_name)?; - let tables = namespace_data.tables.read(); - let table_data = tables.get(table_name)?; - Some(Arc::clone(table_data)) - } - /// Return the ingestion progress for the specified kafka /// partitions. Returns an empty `SequencerProgress` for any kafka /// partitions that this ingester doesn't know about. @@ -543,8 +532,9 @@ impl NamespaceData { } } - /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the catalog. - /// Returns true if ingest should be paused due to memory limits set in the passed lifecycle manager. + /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the + /// catalog. Returns true if ingest should be paused due to memory limits set in the passed + /// lifecycle manager. pub async fn buffer_operation( &self, dml_operation: DmlOperation, @@ -615,8 +605,8 @@ impl NamespaceData { } } - /// Snapshots the mutable buffer for the partition, which clears it out and moves it over to snapshots. Then - /// return a vec of the snapshots and the optional persisting batch. + /// Snapshots the mutable buffer for the partition, which clears it out and moves it over to + /// snapshots. Then return a vec of the snapshots and the optional persisting batch. pub async fn snapshot( &self, table_name: &str, @@ -782,6 +772,7 @@ impl TableData { } /// Return tombstone_max_sequence_number + #[allow(dead_code)] // Used in tests pub fn tombstone_max_sequence_number(&self) -> Option { self.tombstone_max_sequence_number } @@ -967,17 +958,9 @@ impl PartitionData { .snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name) } - /// Clears the persisting batch, updates the max_persisted_sequence_number. - fn mark_persisted(&mut self) { - if let Some(persisting) = &self.data.persisting { - let (_, max) = persisting.data.min_max_sequence_numbers(); - self.data.max_persisted_sequence_number = Some(max); - } - self.data.persisting = None; - } - /// Snapshot whatever is in the buffer and return a new vec of the /// arc cloned snapshots + #[allow(dead_code)] // Used in tests pub fn snapshot(&mut self) -> Result>> { self.data.snapshot().context(SnapshotSnafu)?; Ok(self.data.snapshots.to_vec()) @@ -1017,8 +1000,10 @@ impl PartitionData { } /// Buffers a new tombstone: - /// . All the data in the `buffer` and `snapshots` will be replaced with one tombstone-applied snapshot - /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` exists + /// . All the data in the `buffer` and `snapshots` will be replaced with one + /// tombstone-applied snapshot + /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` + /// exists pub(crate) async fn buffer_tombstone( &mut self, executor: &Executor, @@ -1037,7 +1022,7 @@ impl PartitionData { { Some(query_batch) if !query_batch.is_empty() => query_batch, _ => { - // No need to procedd further + // No need to proceed further return; } }; @@ -1197,11 +1182,6 @@ impl DataBuffer { Ok(None) } - /// Returns true if there are no batches in the buffer or snapshots or persisting data - fn is_empty(&self) -> bool { - self.snapshots.is_empty() && self.buffer.is_none() && self.persisting.is_none() - } - /// Snapshots the buffer and make a QueryableBatch for all the snapshots /// Both buffer and snapshots will be empty after this pub fn snapshot_to_queryable_batch( @@ -1246,6 +1226,7 @@ impl DataBuffer { /// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. /// /// # Panic + /// /// Panics if there is already a persisting batch. pub fn snapshot_to_persisting( &mut self, @@ -1275,19 +1256,6 @@ impl DataBuffer { } } - /// Add a persiting batch into the buffer persisting list - /// Note: For now, there is at most one persisting batch at a time but - /// the plan is to process several of them a time as needed - pub fn add_persisting_batch(&mut self, batch: Arc) -> Result<()> { - if self.persisting.is_some() { - return Err(Error::PersistingNotEmpty); - } else { - self.persisting = Some(batch); - } - - Ok(()) - } - /// Return a QueryableBatch of the persisting batch after applying new tombstones pub fn get_persisting_data(&self) -> Option { let persisting = match &self.persisting { @@ -1304,22 +1272,6 @@ impl DataBuffer { Some(queryable_batch) } - /// Remove the given PersistingBatch that was persisted - pub fn remove_persisting_batch(&mut self, batch: &Arc) -> Result<()> { - if let Some(persisting_batch) = &self.persisting { - if persisting_batch == batch { - // found. Remove this batch from the memory - self.persisting = None; - } else { - return Err(Error::PersistingNotMatch); - } - } else { - return Err(Error::PersistingEmpty); - } - - Ok(()) - } - /// Return the progress in this DataBuffer fn progress(&self) -> SequencerProgress { let progress = SequencerProgress::new(); @@ -1362,7 +1314,6 @@ pub struct BufferBatch { impl BufferBatch { /// Return the progress in this DataBuffer - fn progress(&self) -> SequencerProgress { SequencerProgress::new() .with_buffered(self.min_sequence_number) @@ -1452,7 +1403,8 @@ pub struct QueryableBatch { /// Status of a partition that has unpersisted data. /// -/// Note that this structure is specific to a partition (which itself is bound to a table and sequencer)! +/// Note that this structure is specific to a partition (which itself is bound to a table and +/// sequencer)! #[derive(Debug, Clone)] #[allow(missing_copy_implementations)] pub struct PartitionStatus { @@ -1473,8 +1425,9 @@ pub struct IngesterQueryResponse { /// Contains status for every partition that has unpersisted data. /// - /// If a partition does NOT appear within this map, then either all data was persisted or the ingester has never seen - /// data for this partition. In either case the querier may just read all parquet files for the missing partition. + /// If a partition does NOT appear within this map, then either all data was persisted or the + /// ingester has never seen data for this partition. In either case the querier may just read + /// all parquet files for the missing partition. pub unpersisted_partitions: BTreeMap, /// Map each record batch to a partition ID. diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 24c58e36c3..c7179e14d2 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -51,11 +51,6 @@ pub enum Error { }, } -/// When the lifecycle manager indicates that ingest should be paused because of -/// memory pressure, the sequencer will loop, sleeping this long before checking -/// with the manager if it can resume ingest. -const INGEST_PAUSE_DELAY: Duration = Duration::from_millis(100); - /// A specialized `Error` for Catalog errors pub type Result = std::result::Result; @@ -478,6 +473,7 @@ mod tests { .unwrap(); } + #[tokio::test] #[should_panic(expected = "Background worker 'bad_task' exited early!")] async fn test_join_task_early_shutdown() { let mut ingester = TestIngester::new().await.ingester; diff --git a/ingester/src/job.rs b/ingester/src/job.rs index 89aa2d3376..6b20226f23 100644 --- a/ingester/src/job.rs +++ b/ingester/src/job.rs @@ -20,12 +20,6 @@ impl Job { Self::Persist { .. } => "persist", } } - - fn partition_id(&self) -> Option { - match self { - Self::Persist { partition_id, .. } => Some(*partition_id), - } - } } /// The global job registry diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 97824b36bd..a4534839d9 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -12,7 +12,6 @@ clippy::use_self, clippy::clone_on_ref_ptr )] -#![allow(dead_code)] pub mod compact; pub mod data; diff --git a/ingester/src/poison.rs b/ingester/src/poison.rs index 746cd05c97..c8a887deb4 100644 --- a/ingester/src/poison.rs +++ b/ingester/src/poison.rs @@ -8,6 +8,7 @@ use std::{ }; #[derive(Debug, Clone, PartialEq, Eq)] +#[allow(dead_code)] pub enum PoisonPill { LifecyclePanic, LifecycleExit, @@ -48,6 +49,7 @@ impl PoisonCabinet { } } + #[allow(dead_code)] pub fn add(&self, pill: PoisonPill) { let mut inner = self.inner.write(); inner.pills.push(pill); @@ -63,6 +65,7 @@ impl PoisonCabinet { inner.pills.contains(pill) } + #[allow(dead_code)] pub fn wait_for(&self, pill: PoisonPill) -> PoisonWait { PoisonWait { pill, diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index d80a7b21ee..b51c47ccd6 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -67,19 +67,6 @@ impl WriteInfoServiceImpl { } } -fn write_summary_error_to_status(e: write_summary::Error) -> tonic::Status { - use write_summary::Error; - - match e { - // treat "unknown partition error" as a failed precondition - // (so the client can distinguish between "write isn't - // readable" from "we can't tell if write is readable" - e @ Error::UnknownKafkaPartition { .. } => { - tonic::Status::failed_precondition(format!("Can not determine status of write: {}", e)) - } - } -} - fn to_proto_status(status: KafkaPartitionWriteStatus) -> proto::KafkaPartitionStatus { match status { KafkaPartitionWriteStatus::KafkaPartitionUnknown => proto::KafkaPartitionStatus::Unknown, diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index dddf90764c..7cab6a73a6 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -19,7 +19,6 @@ use data_types::{ use mutable_batch::MutableBatch; use std::{borrow::Cow, collections::BTreeMap}; -#[allow(dead_code)] const SHARED_KAFKA_TOPIC: &str = "iox-shared"; const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC; const TIME_COLUMN: &str = "time"; diff --git a/iox_data_generator/src/measurement.rs b/iox_data_generator/src/measurement.rs index b56b779048..6ab8ee092d 100644 --- a/iox_data_generator/src/measurement.rs +++ b/iox_data_generator/src/measurement.rs @@ -183,7 +183,6 @@ impl MeasurementGenerator { Ok(Self { measurement: Arc::new(Mutex::new(Measurement { name: measurement_name, - id: measurement_id, tag_pairs, generated_tag_sets, tag_ordering, @@ -207,8 +206,6 @@ impl MeasurementGenerator { #[derive(Debug)] pub struct Measurement { name: String, - #[allow(dead_code)] - id: usize, tag_pairs: Vec>, generated_tag_sets: Arc>, tag_ordering: Vec, diff --git a/iox_data_generator/src/tag_set.rs b/iox_data_generator/src/tag_set.rs index f3e1e8599a..1f5e8d0a5d 100644 --- a/iox_data_generator/src/tag_set.rs +++ b/iox_data_generator/src/tag_set.rs @@ -54,15 +54,6 @@ pub enum Error { type Result = std::result::Result; -/// A collection of pre-generated values. -#[derive(Debug)] -pub struct GeneratedValueCollection { - #[allow(dead_code)] - name: String, - #[allow(dead_code)] - values: Vec, -} - /// A single generated value's id and tag key/value pair. #[derive(Debug)] pub struct GeneratedValue { diff --git a/iox_tests/src/lib.rs b/iox_tests/src/lib.rs index 0455c5aacc..d344a5be44 100644 --- a/iox_tests/src/lib.rs +++ b/iox_tests/src/lib.rs @@ -9,6 +9,5 @@ clippy::use_self, clippy::clone_on_ref_ptr )] -#![allow(dead_code)] pub mod util; diff --git a/querier/src/cache_system/backend/ttl.rs b/querier/src/cache_system/backend/ttl.rs index 928060c7aa..73ef28356d 100644 --- a/querier/src/cache_system/backend/ttl.rs +++ b/querier/src/cache_system/backend/ttl.rs @@ -159,11 +159,13 @@ where } /// Reference to inner backend. + #[allow(dead_code)] pub fn inner_backend(&self) -> &dyn CacheBackend { self.inner_backend.as_ref() } /// Reference to TTL provider. + #[allow(dead_code)] pub fn ttl_provider(&self) -> &Arc> { &self.ttl_provider } diff --git a/querier/src/cache_system/driver.rs b/querier/src/cache_system/driver.rs index bcef99c3b6..d6d960f340 100644 --- a/querier/src/cache_system/driver.rs +++ b/querier/src/cache_system/driver.rs @@ -177,6 +177,7 @@ where /// Side-load an entry into the cache. /// /// This will also complete a currently running request for this key. + #[allow(dead_code)] pub async fn set(&self, k: K, v: V) { let maybe_join_handle = { let mut state = self.state.lock(); @@ -239,6 +240,7 @@ struct RunningQuery { recv: SharedReceiver, /// A sender that enables setting entries while the query is running. + #[allow(dead_code)] set: Sender, /// A handle for the task that is currently executing the query. @@ -246,7 +248,8 @@ struct RunningQuery { /// The handle can be used to abort the running query, e.g. when dropping the cache. join_handle: JoinHandle<()>, - /// Tag so that queries for the same key (e.g. when starting, side-loading, starting again) can be told apart. + /// Tag so that queries for the same key (e.g. when starting, side-loading, starting again) can + /// be told apart. tag: u64, } diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 29ce1afca9..b38b3178c7 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -180,6 +180,7 @@ pub struct ParquetChunkAdapter { metric_registry: Arc, /// Time provider. + #[allow(dead_code)] time_provider: Arc, } diff --git a/querier/src/database.rs b/querier/src/database.rs index 0da42b7455..8b41bf9e86 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -33,12 +33,15 @@ pub struct QuerierDatabase { chunk_adapter: Arc, /// Metric registry + #[allow(dead_code)] metric_registry: Arc, /// Namespaces. + #[allow(dead_code)] namespaces: RwLock, Arc>>, /// Object store. + #[allow(dead_code)] object_store: Arc, /// Executor for queries. diff --git a/querier/src/handler.rs b/querier/src/handler.rs index ead0507d95..55651cefac 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -34,6 +34,7 @@ pub trait QuerierHandler: Send + Sync { type SharedJoinHandle = Shared>>>; /// Convert a [`JoinHandle`] into a [`SharedJoinHandle`]. +#[allow(dead_code)] fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle { handle.map_err(Arc::new).boxed().shared() } @@ -51,6 +52,7 @@ pub struct QuerierHandlerImpl { shutdown: CancellationToken, /// Poison pills for testing. + #[allow(dead_code)] poison_cabinet: Arc, } diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index 7400c4a7d5..e09f890505 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -18,6 +18,7 @@ impl MockIngesterConnection { } /// Set next response for this connection. + #[allow(dead_code)] pub fn next_response(&self, response: super::Result>>) { *self.next_response.lock() = Some(response); } diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 8c4b1df7ae..3aa663e316 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -8,7 +8,6 @@ clippy::use_self, clippy::clone_on_ref_ptr )] -#![allow(dead_code)] pub mod cache; mod cache_system; diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 053d01970b..6a86e80695 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -38,6 +38,7 @@ pub struct QuerierNamespace { exec: Arc, /// Connection to ingester + #[allow(dead_code)] ingester_connection: Arc, /// Query log. diff --git a/querier/src/poison.rs b/querier/src/poison.rs index d9c037603a..e42cfde8bd 100644 --- a/querier/src/poison.rs +++ b/querier/src/poison.rs @@ -30,6 +30,7 @@ impl PoisonCabinetInner { #[derive(Debug)] pub struct PoisonCabinet { + #[allow(dead_code)] inner: Arc>, } @@ -43,6 +44,7 @@ impl PoisonCabinet { } } + #[allow(dead_code)] pub fn add(&self, pill: PoisonPill) { let mut inner = self.inner.write(); inner.pills.push(pill); @@ -52,12 +54,14 @@ impl PoisonCabinet { } } + #[allow(dead_code)] pub fn contains(&self, pill: &PoisonPill) -> bool { let inner = self.inner.read(); inner.pills.contains(pill) } + #[allow(dead_code)] pub fn wait_for(&self, pill: PoisonPill) -> PoisonWait { PoisonWait { pill, diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 30f66a42b1..d03db81636 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -88,6 +88,7 @@ impl QuerierTable { } /// Table ID. + #[allow(dead_code)] pub fn id(&self) -> TableId { self.id }