Merge pull request #4532 from influxdata/cn/more-cleanup

fix: Remove or rescope dead code warnings
pull/24376/head
kodiakhq[bot] 2022-05-09 14:05:00 +00:00 committed by GitHub
commit de32b1ca54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 49 additions and 129 deletions

View File

@ -13,7 +13,7 @@ use backoff::{Backoff, BackoffConfig};
use bytes::Bytes;
use data_types::{
ParquetFile, ParquetFileId, ParquetFileWithMetadata, PartitionId, SequencerId, TableId,
TablePartition, Timestamp, Tombstone, TombstoneId,
Timestamp, Tombstone, TombstoneId,
};
use datafusion::error::DataFusionError;
use iox_catalog::interface::{Catalog, Transaction};
@ -286,21 +286,6 @@ impl Compactor {
.context(Level0Snafu)
}
async fn level_1_parquet_files(
&self,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>> {
let mut repos = self.catalog.repositories().await;
repos
.parquet_files()
.level_1(table_partition, min_time, max_time)
.await
.context(Level1Snafu)
}
async fn update_to_level_1(&self, parquet_file_ids: &[ParquetFileId]) -> Result<()> {
let mut repos = self.catalog.repositories().await;

View File

@ -48,6 +48,7 @@ fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle {
#[derive(Debug)]
pub struct CompactorHandlerImpl {
/// Data to compact
#[allow(dead_code)]
compactor_data: Arc<Compactor>,
/// A token that is used to trigger shutdown of the background worker

View File

@ -10,7 +10,6 @@
clippy::use_self,
clippy::clone_on_ref_ptr
)]
#![allow(dead_code)]
pub mod compact;
pub mod garbage_collector;

View File

@ -156,6 +156,7 @@ impl CompactedData {
/// Information needed to update the catalog after compacting a group of files
#[derive(Debug)]
pub struct CatalogUpdate {
#[allow(dead_code)]
pub(crate) meta: IoxMetadata,
pub(crate) tombstones: BTreeMap<TombstoneId, Tombstone>,
pub(crate) parquet_file: ParquetFileParams,

View File

@ -48,7 +48,6 @@ impl Task {
}
/// The type of error that is returned from tasks in this module
#[allow(dead_code)]
pub type Error = tokio::sync::oneshot::error::RecvError;
/// Job within the executor.

View File

@ -79,7 +79,10 @@ pub enum Error {
#[snafu(display("Nothing in the Persisting list to get removed"))]
PersistingEmpty,
#[snafu(display("The given batch does not match any in the Persisting list. Nothing is removed from the Persisting list"))]
#[snafu(display(
"The given batch does not match any in the Persisting list. \
Nothing is removed from the Persisting list"
))]
PersistingNotMatch,
#[snafu(display("Cannot partition data: {}", source))]
@ -154,6 +157,7 @@ impl IngesterData {
}
/// Get sequencer data for specific sequencer.
#[allow(dead_code)] // Used in tests
pub(crate) fn sequencer(&self, sequencer_id: SequencerId) -> Option<&SequencerData> {
self.sequencers.get(&sequencer_id)
}
@ -191,21 +195,6 @@ impl IngesterData {
.await
}
/// Return table data of a given (sequencer id, namespace name, and table name)
pub(crate) fn table_data(
&self,
sequencer_id: SequencerId,
namespace_name: &str,
table_name: &str,
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
let sequencer_data = self.sequencers.get(&sequencer_id)?;
let namespaces = sequencer_data.namespaces.read();
let namespace_data = namespaces.get(namespace_name)?;
let tables = namespace_data.tables.read();
let table_data = tables.get(table_name)?;
Some(Arc::clone(table_data))
}
/// Return the ingestion progress for the specified kafka
/// partitions. Returns an empty `SequencerProgress` for any kafka
/// partitions that this ingester doesn't know about.
@ -543,8 +532,9 @@ impl NamespaceData {
}
}
/// Buffer the operation in the cache, adding any new partitions or delete tombstones to the catalog.
/// Returns true if ingest should be paused due to memory limits set in the passed lifecycle manager.
/// Buffer the operation in the cache, adding any new partitions or delete tombstones to the
/// catalog. Returns true if ingest should be paused due to memory limits set in the passed
/// lifecycle manager.
pub async fn buffer_operation(
&self,
dml_operation: DmlOperation,
@ -615,8 +605,8 @@ impl NamespaceData {
}
}
/// Snapshots the mutable buffer for the partition, which clears it out and moves it over to snapshots. Then
/// return a vec of the snapshots and the optional persisting batch.
/// Snapshots the mutable buffer for the partition, which clears it out and moves it over to
/// snapshots. Then return a vec of the snapshots and the optional persisting batch.
pub async fn snapshot(
&self,
table_name: &str,
@ -782,6 +772,7 @@ impl TableData {
}
/// Return tombstone_max_sequence_number
#[allow(dead_code)] // Used in tests
pub fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.tombstone_max_sequence_number
}
@ -967,17 +958,9 @@ impl PartitionData {
.snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name)
}
/// Clears the persisting batch, updates the max_persisted_sequence_number.
fn mark_persisted(&mut self) {
if let Some(persisting) = &self.data.persisting {
let (_, max) = persisting.data.min_max_sequence_numbers();
self.data.max_persisted_sequence_number = Some(max);
}
self.data.persisting = None;
}
/// Snapshot whatever is in the buffer and return a new vec of the
/// arc cloned snapshots
#[allow(dead_code)] // Used in tests
pub fn snapshot(&mut self) -> Result<Vec<Arc<SnapshotBatch>>> {
self.data.snapshot().context(SnapshotSnafu)?;
Ok(self.data.snapshots.to_vec())
@ -1017,8 +1000,10 @@ impl PartitionData {
}
/// Buffers a new tombstone:
/// . All the data in the `buffer` and `snapshots` will be replaced with one tombstone-applied snapshot
/// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` exists
/// . All the data in the `buffer` and `snapshots` will be replaced with one
/// tombstone-applied snapshot
/// . The tombstone is only added in the `deletes_during_persisting` if the `persisting`
/// exists
pub(crate) async fn buffer_tombstone(
&mut self,
executor: &Executor,
@ -1037,7 +1022,7 @@ impl PartitionData {
{
Some(query_batch) if !query_batch.is_empty() => query_batch,
_ => {
// No need to procedd further
// No need to proceed further
return;
}
};
@ -1197,11 +1182,6 @@ impl DataBuffer {
Ok(None)
}
/// Returns true if there are no batches in the buffer or snapshots or persisting data
fn is_empty(&self) -> bool {
self.snapshots.is_empty() && self.buffer.is_none() && self.persisting.is_none()
}
/// Snapshots the buffer and make a QueryableBatch for all the snapshots
/// Both buffer and snapshots will be empty after this
pub fn snapshot_to_queryable_batch(
@ -1246,6 +1226,7 @@ impl DataBuffer {
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`.
///
/// # Panic
///
/// Panics if there is already a persisting batch.
pub fn snapshot_to_persisting(
&mut self,
@ -1275,19 +1256,6 @@ impl DataBuffer {
}
}
/// Add a persiting batch into the buffer persisting list
/// Note: For now, there is at most one persisting batch at a time but
/// the plan is to process several of them a time as needed
pub fn add_persisting_batch(&mut self, batch: Arc<PersistingBatch>) -> Result<()> {
if self.persisting.is_some() {
return Err(Error::PersistingNotEmpty);
} else {
self.persisting = Some(batch);
}
Ok(())
}
/// Return a QueryableBatch of the persisting batch after applying new tombstones
pub fn get_persisting_data(&self) -> Option<QueryableBatch> {
let persisting = match &self.persisting {
@ -1304,22 +1272,6 @@ impl DataBuffer {
Some(queryable_batch)
}
/// Remove the given PersistingBatch that was persisted
pub fn remove_persisting_batch(&mut self, batch: &Arc<PersistingBatch>) -> Result<()> {
if let Some(persisting_batch) = &self.persisting {
if persisting_batch == batch {
// found. Remove this batch from the memory
self.persisting = None;
} else {
return Err(Error::PersistingNotMatch);
}
} else {
return Err(Error::PersistingEmpty);
}
Ok(())
}
/// Return the progress in this DataBuffer
fn progress(&self) -> SequencerProgress {
let progress = SequencerProgress::new();
@ -1362,7 +1314,6 @@ pub struct BufferBatch {
impl BufferBatch {
/// Return the progress in this DataBuffer
fn progress(&self) -> SequencerProgress {
SequencerProgress::new()
.with_buffered(self.min_sequence_number)
@ -1452,7 +1403,8 @@ pub struct QueryableBatch {
/// Status of a partition that has unpersisted data.
///
/// Note that this structure is specific to a partition (which itself is bound to a table and sequencer)!
/// Note that this structure is specific to a partition (which itself is bound to a table and
/// sequencer)!
#[derive(Debug, Clone)]
#[allow(missing_copy_implementations)]
pub struct PartitionStatus {
@ -1473,8 +1425,9 @@ pub struct IngesterQueryResponse {
/// Contains status for every partition that has unpersisted data.
///
/// If a partition does NOT appear within this map, then either all data was persisted or the ingester has never seen
/// data for this partition. In either case the querier may just read all parquet files for the missing partition.
/// If a partition does NOT appear within this map, then either all data was persisted or the
/// ingester has never seen data for this partition. In either case the querier may just read
/// all parquet files for the missing partition.
pub unpersisted_partitions: BTreeMap<PartitionId, PartitionStatus>,
/// Map each record batch to a partition ID.

View File

@ -51,11 +51,6 @@ pub enum Error {
},
}
/// When the lifecycle manager indicates that ingest should be paused because of
/// memory pressure, the sequencer will loop, sleeping this long before checking
/// with the manager if it can resume ingest.
const INGEST_PAUSE_DELAY: Duration = Duration::from_millis(100);
/// A specialized `Error` for Catalog errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -478,6 +473,7 @@ mod tests {
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "Background worker 'bad_task' exited early!")]
async fn test_join_task_early_shutdown() {
let mut ingester = TestIngester::new().await.ingester;

View File

@ -20,12 +20,6 @@ impl Job {
Self::Persist { .. } => "persist",
}
}
fn partition_id(&self) -> Option<PartitionId> {
match self {
Self::Persist { partition_id, .. } => Some(*partition_id),
}
}
}
/// The global job registry

View File

@ -12,7 +12,6 @@
clippy::use_self,
clippy::clone_on_ref_ptr
)]
#![allow(dead_code)]
pub mod compact;
pub mod data;

View File

@ -8,6 +8,7 @@ use std::{
};
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(dead_code)]
pub enum PoisonPill {
LifecyclePanic,
LifecycleExit,
@ -48,6 +49,7 @@ impl PoisonCabinet {
}
}
#[allow(dead_code)]
pub fn add(&self, pill: PoisonPill) {
let mut inner = self.inner.write();
inner.pills.push(pill);
@ -63,6 +65,7 @@ impl PoisonCabinet {
inner.pills.contains(pill)
}
#[allow(dead_code)]
pub fn wait_for(&self, pill: PoisonPill) -> PoisonWait {
PoisonWait {
pill,

View File

@ -67,19 +67,6 @@ impl WriteInfoServiceImpl {
}
}
fn write_summary_error_to_status(e: write_summary::Error) -> tonic::Status {
use write_summary::Error;
match e {
// treat "unknown partition error" as a failed precondition
// (so the client can distinguish between "write isn't
// readable" from "we can't tell if write is readable"
e @ Error::UnknownKafkaPartition { .. } => {
tonic::Status::failed_precondition(format!("Can not determine status of write: {}", e))
}
}
}
fn to_proto_status(status: KafkaPartitionWriteStatus) -> proto::KafkaPartitionStatus {
match status {
KafkaPartitionWriteStatus::KafkaPartitionUnknown => proto::KafkaPartitionStatus::Unknown,

View File

@ -19,7 +19,6 @@ use data_types::{
use mutable_batch::MutableBatch;
use std::{borrow::Cow, collections::BTreeMap};
#[allow(dead_code)]
const SHARED_KAFKA_TOPIC: &str = "iox-shared";
const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC;
const TIME_COLUMN: &str = "time";

View File

@ -183,7 +183,6 @@ impl MeasurementGenerator {
Ok(Self {
measurement: Arc::new(Mutex::new(Measurement {
name: measurement_name,
id: measurement_id,
tag_pairs,
generated_tag_sets,
tag_ordering,
@ -207,8 +206,6 @@ impl MeasurementGenerator {
#[derive(Debug)]
pub struct Measurement {
name: String,
#[allow(dead_code)]
id: usize,
tag_pairs: Vec<Arc<TagPair>>,
generated_tag_sets: Arc<Vec<TagSet>>,
tag_ordering: Vec<TagOrdering>,

View File

@ -54,15 +54,6 @@ pub enum Error {
type Result<T, E = Error> = std::result::Result<T, E>;
/// A collection of pre-generated values.
#[derive(Debug)]
pub struct GeneratedValueCollection {
#[allow(dead_code)]
name: String,
#[allow(dead_code)]
values: Vec<GeneratedValue>,
}
/// A single generated value's id and tag key/value pair.
#[derive(Debug)]
pub struct GeneratedValue {

View File

@ -9,6 +9,5 @@
clippy::use_self,
clippy::clone_on_ref_ptr
)]
#![allow(dead_code)]
pub mod util;

View File

@ -159,11 +159,13 @@ where
}
/// Reference to inner backend.
#[allow(dead_code)]
pub fn inner_backend(&self) -> &dyn CacheBackend<K = K, V = V> {
self.inner_backend.as_ref()
}
/// Reference to TTL provider.
#[allow(dead_code)]
pub fn ttl_provider(&self) -> &Arc<dyn TtlProvider<K = K, V = V>> {
&self.ttl_provider
}

View File

@ -177,6 +177,7 @@ where
/// Side-load an entry into the cache.
///
/// This will also complete a currently running request for this key.
#[allow(dead_code)]
pub async fn set(&self, k: K, v: V) {
let maybe_join_handle = {
let mut state = self.state.lock();
@ -239,6 +240,7 @@ struct RunningQuery<V> {
recv: SharedReceiver<V>,
/// A sender that enables setting entries while the query is running.
#[allow(dead_code)]
set: Sender<V>,
/// A handle for the task that is currently executing the query.
@ -246,7 +248,8 @@ struct RunningQuery<V> {
/// The handle can be used to abort the running query, e.g. when dropping the cache.
join_handle: JoinHandle<()>,
/// Tag so that queries for the same key (e.g. when starting, side-loading, starting again) can be told apart.
/// Tag so that queries for the same key (e.g. when starting, side-loading, starting again) can
/// be told apart.
tag: u64,
}

View File

@ -180,6 +180,7 @@ pub struct ParquetChunkAdapter {
metric_registry: Arc<metric::Registry>,
/// Time provider.
#[allow(dead_code)]
time_provider: Arc<dyn TimeProvider>,
}

View File

@ -33,12 +33,15 @@ pub struct QuerierDatabase {
chunk_adapter: Arc<ParquetChunkAdapter>,
/// Metric registry
#[allow(dead_code)]
metric_registry: Arc<metric::Registry>,
/// Namespaces.
#[allow(dead_code)]
namespaces: RwLock<HashMap<Arc<str>, Arc<QuerierNamespace>>>,
/// Object store.
#[allow(dead_code)]
object_store: Arc<DynObjectStore>,
/// Executor for queries.

View File

@ -34,6 +34,7 @@ pub trait QuerierHandler: Send + Sync {
type SharedJoinHandle = Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>;
/// Convert a [`JoinHandle`] into a [`SharedJoinHandle`].
#[allow(dead_code)]
fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle {
handle.map_err(Arc::new).boxed().shared()
}
@ -51,6 +52,7 @@ pub struct QuerierHandlerImpl {
shutdown: CancellationToken,
/// Poison pills for testing.
#[allow(dead_code)]
poison_cabinet: Arc<PoisonCabinet>,
}

View File

@ -18,6 +18,7 @@ impl MockIngesterConnection {
}
/// Set next response for this connection.
#[allow(dead_code)]
pub fn next_response(&self, response: super::Result<Vec<Arc<super::IngesterPartition>>>) {
*self.next_response.lock() = Some(response);
}

View File

@ -8,7 +8,6 @@
clippy::use_self,
clippy::clone_on_ref_ptr
)]
#![allow(dead_code)]
pub mod cache;
mod cache_system;

View File

@ -38,6 +38,7 @@ pub struct QuerierNamespace {
exec: Arc<Executor>,
/// Connection to ingester
#[allow(dead_code)]
ingester_connection: Arc<dyn IngesterConnection>,
/// Query log.

View File

@ -30,6 +30,7 @@ impl PoisonCabinetInner {
#[derive(Debug)]
pub struct PoisonCabinet {
#[allow(dead_code)]
inner: Arc<RwLock<PoisonCabinetInner>>,
}
@ -43,6 +44,7 @@ impl PoisonCabinet {
}
}
#[allow(dead_code)]
pub fn add(&self, pill: PoisonPill) {
let mut inner = self.inner.write();
inner.pills.push(pill);
@ -52,12 +54,14 @@ impl PoisonCabinet {
}
}
#[allow(dead_code)]
pub fn contains(&self, pill: &PoisonPill) -> bool {
let inner = self.inner.read();
inner.pills.contains(pill)
}
#[allow(dead_code)]
pub fn wait_for(&self, pill: PoisonPill) -> PoisonWait {
PoisonWait {
pill,

View File

@ -88,6 +88,7 @@ impl QuerierTable {
}
/// Table ID.
#[allow(dead_code)]
pub fn id(&self) -> TableId {
self.id
}