fix: Remove or rescope dead code in ingester

pull/24376/head
Carol (Nichols || Goulding) 2022-05-06 11:27:03 -04:00
parent dfced5b21c
commit 4506bf3b8f
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
6 changed files with 27 additions and 95 deletions

View File

@ -79,7 +79,10 @@ pub enum Error {
#[snafu(display("Nothing in the Persisting list to get removed"))]
PersistingEmpty,
#[snafu(display("The given batch does not match any in the Persisting list. Nothing is removed from the Persisting list"))]
#[snafu(display(
"The given batch does not match any in the Persisting list. \
Nothing is removed from the Persisting list"
))]
PersistingNotMatch,
#[snafu(display("Cannot partition data: {}", source))]
@ -154,6 +157,7 @@ impl IngesterData {
}
/// Get sequencer data for specific sequencer.
#[allow(dead_code)] // Used in tests
pub(crate) fn sequencer(&self, sequencer_id: SequencerId) -> Option<&SequencerData> {
self.sequencers.get(&sequencer_id)
}
@ -191,21 +195,6 @@ impl IngesterData {
.await
}
/// Return table data of a given (sequencer id, namespace name, and table name)
pub(crate) fn table_data(
&self,
sequencer_id: SequencerId,
namespace_name: &str,
table_name: &str,
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
let sequencer_data = self.sequencers.get(&sequencer_id)?;
let namespaces = sequencer_data.namespaces.read();
let namespace_data = namespaces.get(namespace_name)?;
let tables = namespace_data.tables.read();
let table_data = tables.get(table_name)?;
Some(Arc::clone(table_data))
}
/// Return the ingestion progress for the specified kafka
/// partitions. Returns an empty `SequencerProgress` for any kafka
/// partitions that this ingester doesn't know about.
@ -543,8 +532,9 @@ impl NamespaceData {
}
}
/// Buffer the operation in the cache, adding any new partitions or delete tombstones to the catalog.
/// Returns true if ingest should be paused due to memory limits set in the passed lifecycle manager.
/// Buffer the operation in the cache, adding any new partitions or delete tombstones to the
/// catalog. Returns true if ingest should be paused due to memory limits set in the passed
/// lifecycle manager.
pub async fn buffer_operation(
&self,
dml_operation: DmlOperation,
@ -615,8 +605,8 @@ impl NamespaceData {
}
}
/// Snapshots the mutable buffer for the partition, which clears it out and moves it over to snapshots. Then
/// return a vec of the snapshots and the optional persisting batch.
/// Snapshots the mutable buffer for the partition, which clears it out and moves it over to
/// snapshots. Then return a vec of the snapshots and the optional persisting batch.
pub async fn snapshot(
&self,
table_name: &str,
@ -782,6 +772,7 @@ impl TableData {
}
/// Return tombstone_max_sequence_number
#[allow(dead_code)] // Used in tests
pub fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.tombstone_max_sequence_number
}
@ -967,17 +958,9 @@ impl PartitionData {
.snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name)
}
/// Clears the persisting batch, updates the max_persisted_sequence_number.
fn mark_persisted(&mut self) {
if let Some(persisting) = &self.data.persisting {
let (_, max) = persisting.data.min_max_sequence_numbers();
self.data.max_persisted_sequence_number = Some(max);
}
self.data.persisting = None;
}
/// Snapshot whatever is in the buffer and return a new vec of the
/// arc cloned snapshots
#[allow(dead_code)] // Used in tests
pub fn snapshot(&mut self) -> Result<Vec<Arc<SnapshotBatch>>> {
self.data.snapshot().context(SnapshotSnafu)?;
Ok(self.data.snapshots.to_vec())
@ -1017,8 +1000,10 @@ impl PartitionData {
}
/// Buffers a new tombstone:
/// . All the data in the `buffer` and `snapshots` will be replaced with one tombstone-applied snapshot
/// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` exists
/// . All the data in the `buffer` and `snapshots` will be replaced with one
/// tombstone-applied snapshot
/// . The tombstone is only added in the `deletes_during_persisting` if the `persisting`
/// exists
pub(crate) async fn buffer_tombstone(
&mut self,
executor: &Executor,
@ -1037,7 +1022,7 @@ impl PartitionData {
{
Some(query_batch) if !query_batch.is_empty() => query_batch,
_ => {
// No need to procedd further
// No need to proceed further
return;
}
};
@ -1197,11 +1182,6 @@ impl DataBuffer {
Ok(None)
}
/// Returns true if there are no batches in the buffer or snapshots or persisting data
fn is_empty(&self) -> bool {
self.snapshots.is_empty() && self.buffer.is_none() && self.persisting.is_none()
}
/// Snapshots the buffer and make a QueryableBatch for all the snapshots
/// Both buffer and snapshots will be empty after this
pub fn snapshot_to_queryable_batch(
@ -1246,6 +1226,7 @@ impl DataBuffer {
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`.
///
/// # Panic
///
/// Panics if there is already a persisting batch.
pub fn snapshot_to_persisting(
&mut self,
@ -1275,19 +1256,6 @@ impl DataBuffer {
}
}
/// Add a persiting batch into the buffer persisting list
/// Note: For now, there is at most one persisting batch at a time but
/// the plan is to process several of them a time as needed
pub fn add_persisting_batch(&mut self, batch: Arc<PersistingBatch>) -> Result<()> {
if self.persisting.is_some() {
return Err(Error::PersistingNotEmpty);
} else {
self.persisting = Some(batch);
}
Ok(())
}
/// Return a QueryableBatch of the persisting batch after applying new tombstones
pub fn get_persisting_data(&self) -> Option<QueryableBatch> {
let persisting = match &self.persisting {
@ -1304,22 +1272,6 @@ impl DataBuffer {
Some(queryable_batch)
}
/// Remove the given PersistingBatch that was persisted
pub fn remove_persisting_batch(&mut self, batch: &Arc<PersistingBatch>) -> Result<()> {
if let Some(persisting_batch) = &self.persisting {
if persisting_batch == batch {
// found. Remove this batch from the memory
self.persisting = None;
} else {
return Err(Error::PersistingNotMatch);
}
} else {
return Err(Error::PersistingEmpty);
}
Ok(())
}
/// Return the progress in this DataBuffer
fn progress(&self) -> SequencerProgress {
let progress = SequencerProgress::new();
@ -1362,7 +1314,6 @@ pub struct BufferBatch {
impl BufferBatch {
/// Return the progress in this DataBuffer
fn progress(&self) -> SequencerProgress {
SequencerProgress::new()
.with_buffered(self.min_sequence_number)
@ -1452,7 +1403,8 @@ pub struct QueryableBatch {
/// Status of a partition that has unpersisted data.
///
/// Note that this structure is specific to a partition (which itself is bound to a table and sequencer)!
/// Note that this structure is specific to a partition (which itself is bound to a table and
/// sequencer)!
#[derive(Debug, Clone)]
#[allow(missing_copy_implementations)]
pub struct PartitionStatus {
@ -1473,8 +1425,9 @@ pub struct IngesterQueryResponse {
/// Contains status for every partition that has unpersisted data.
///
/// If a partition does NOT appear within this map, then either all data was persisted or the ingester has never seen
/// data for this partition. In either case the querier may just read all parquet files for the missing partition.
/// If a partition does NOT appear within this map, then either all data was persisted or the
/// ingester has never seen data for this partition. In either case the querier may just read
/// all parquet files for the missing partition.
pub unpersisted_partitions: BTreeMap<PartitionId, PartitionStatus>,
/// Map each record batch to a partition ID.

View File

@ -51,11 +51,6 @@ pub enum Error {
},
}
/// When the lifecycle manager indicates that ingest should be paused because of
/// memory pressure, the sequencer will loop, sleeping this long before checking
/// with the manager if it can resume ingest.
const INGEST_PAUSE_DELAY: Duration = Duration::from_millis(100);
/// A specialized `Error` for Catalog errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -478,6 +473,7 @@ mod tests {
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "Background worker 'bad_task' exited early!")]
async fn test_join_task_early_shutdown() {
let mut ingester = TestIngester::new().await.ingester;

View File

@ -20,12 +20,6 @@ impl Job {
Self::Persist { .. } => "persist",
}
}
fn partition_id(&self) -> Option<PartitionId> {
match self {
Self::Persist { partition_id, .. } => Some(*partition_id),
}
}
}
/// The global job registry

View File

@ -12,7 +12,6 @@
clippy::use_self,
clippy::clone_on_ref_ptr
)]
#![allow(dead_code)]
pub mod compact;
pub mod data;

View File

@ -8,6 +8,7 @@ use std::{
};
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(dead_code)]
pub enum PoisonPill {
LifecyclePanic,
LifecycleExit,
@ -48,6 +49,7 @@ impl PoisonCabinet {
}
}
#[allow(dead_code)]
pub fn add(&self, pill: PoisonPill) {
let mut inner = self.inner.write();
inner.pills.push(pill);
@ -63,6 +65,7 @@ impl PoisonCabinet {
inner.pills.contains(pill)
}
#[allow(dead_code)]
pub fn wait_for(&self, pill: PoisonPill) -> PoisonWait {
PoisonWait {
pill,

View File

@ -67,19 +67,6 @@ impl WriteInfoServiceImpl {
}
}
fn write_summary_error_to_status(e: write_summary::Error) -> tonic::Status {
use write_summary::Error;
match e {
// treat "unknown partition error" as a failed precondition
// (so the client can distinguish between "write isn't
// readable" from "we can't tell if write is readable"
e @ Error::UnknownKafkaPartition { .. } => {
tonic::Status::failed_precondition(format!("Can not determine status of write: {}", e))
}
}
}
fn to_proto_status(status: KafkaPartitionWriteStatus) -> proto::KafkaPartitionStatus {
match status {
KafkaPartitionWriteStatus::KafkaPartitionUnknown => proto::KafkaPartitionStatus::Unknown,