From 257c155d1ebd8cfbcb3c2b7007c0020eb727efb1 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 21 Dec 2022 11:18:47 -0500 Subject: [PATCH 1/2] chore: Line wrapping at 100 cols --- querier/src/table/mod.rs | 41 ++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 45fe65d849..0c8b5fc86a 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -208,7 +208,8 @@ impl QuerierTable { ); let (predicate, retention_delete_pred) = match self.namespace_retention_period { - // The retention is not fininte, add predicate to filter out data outside retention period + // The retention is not fininte, add predicate to filter out data outside retention + // period Some(retention_period) => { let retention_time_ns = self .chunk_adapter @@ -218,11 +219,12 @@ impl QuerierTable { .timestamp_nanos() - retention_period.as_nanos() as i64; - // Add predicate to only keep chunks inside the retention period: time >= retention_period + // Add predicate to only keep chunks inside the retention period: time >= + // retention_period let predicate = predicate.clone().with_retention(retention_time_ns); - // Expression used to add to delete predicate to delete data older than retention period - // time < retention_time + // Expression used to add to delete predicate to delete data older than retention + // period time < retention_time let retention_delete_pred = Some(DeletePredicate::retention_delete_predicate( retention_time_ns, )); @@ -375,7 +377,12 @@ impl QuerierTable { &predicate, ) .context(ChunkPruningSnafu)?; - debug!(%predicate, num_initial_chunks, num_final_chunks=chunks.len(), "pruned with pushed down predicates"); + debug!( + %predicate, + num_initial_chunks, + num_final_chunks=chunks.len(), + "pruned with pushed down predicates" + ); Ok(chunks) } @@ -442,7 +449,19 @@ impl QuerierTable { .map(|sharder| vec![**sharder.shard_for_query(&self.table_name, &self.namespace_name)]); // get cached table w/o any must-coverage information - let Some(cached_table) = self.chunk_adapter.catalog_cache().namespace().get(Arc::clone(&self.namespace_name), &[], span_recorder.child_span("get namespace")).await.and_then(|ns| ns.tables.get(&self.table_name).cloned()) else {return Ok(vec![])}; + let Some(cached_table) = self.chunk_adapter + .catalog_cache() + .namespace() + .get( + Arc::clone(&self.namespace_name), + &[], + span_recorder.child_span("get namespace") + ) + .await + .and_then(|ns| ns.tables.get(&self.table_name).cloned()) + else { + return Ok(vec![]) + }; // get any chunks from the ingester(s) let partitions_result = ingester_connection @@ -915,8 +934,8 @@ mod tests { // ( ingester in-mem data ) // // - // However there is no way to split the parquet data into the "wanted" and "ignored" part because we don't have - // row-level sequence numbers. + // However there is no way to split the parquet data into the "wanted" and "ignored" part + // because we don't have row-level sequence numbers. let builder = TestParquetFileBuilder::default() .with_line_protocol("table foo=1 11") @@ -1021,7 +1040,8 @@ mod tests { ), ) .with_ingester_partition( - // this chunk is filtered out because it has no record batches but the reconciling still takes place + // this chunk is filtered out because it has no record batches but the reconciling + // still takes place builder2.with_ingester_chunk_id(u128::MAX).build( // parquet max persisted sequence number Some(SequenceNumber::new(3)), @@ -1057,7 +1077,8 @@ mod tests { // parquet chunks have predicate attached assert_eq!(chunks[0].delete_predicates().len(), 1); assert_eq!(chunks[1].delete_predicates().len(), 2); - // ingester in-mem chunk doesn't need predicates, because the ingester has already materialized them for us + // ingester in-mem chunk doesn't need predicates, because the ingester has already + // materialized them for us assert_eq!(chunks[2].delete_predicates().len(), 0); // check spans From 56ba3b17ded9ba0b6eb158d37318eeca6c4904d6 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 21 Dec 2022 11:19:10 -0500 Subject: [PATCH 2/2] fix: Allow partitions from ingesters to overlap in RPC write mode This was added in c82d0d8ca6dc02dcdd40a4c656a1ee51f3f9bfee with the comment: > Right now this would clearly indicate a bug and before I am trying to > understand some prod issues, I wanna rule that one out. In the RPC write path, this isn't a bug, it's quite expected. --- querier/src/table/mod.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 0c8b5fc86a..bd923c28ed 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -478,19 +478,21 @@ impl QuerierTable { let partitions = partitions_result?; - // check that partitions from ingesters don't overlap - let mut seen = HashMap::with_capacity(partitions.len()); - for partition in &partitions { - match seen.entry(partition.partition_id()) { - Entry::Occupied(o) => { - return Err(Error::IngestersOverlap { - ingester1: Arc::clone(o.get()), - ingester2: Arc::clone(partition.ingester()), - partition: partition.partition_id(), - }) - } - Entry::Vacant(v) => { - v.insert(Arc::clone(partition.ingester())); + if !self.rpc_write() { + // check that partitions from ingesters don't overlap + let mut seen = HashMap::with_capacity(partitions.len()); + for partition in &partitions { + match seen.entry(partition.partition_id()) { + Entry::Occupied(o) => { + return Err(Error::IngestersOverlap { + ingester1: Arc::clone(o.get()), + ingester2: Arc::clone(partition.ingester()), + partition: partition.partition_id(), + }) + } + Entry::Vacant(v) => { + v.insert(Arc::clone(partition.ingester())); + } } } }