Merge branch 'main' into dom/fix-sort-key-retry
commit
7f01911c81
|
@ -208,7 +208,8 @@ impl QuerierTable {
|
|||
);
|
||||
|
||||
let (predicate, retention_delete_pred) = match self.namespace_retention_period {
|
||||
// The retention is not fininte, add predicate to filter out data outside retention period
|
||||
// The retention is not fininte, add predicate to filter out data outside retention
|
||||
// period
|
||||
Some(retention_period) => {
|
||||
let retention_time_ns = self
|
||||
.chunk_adapter
|
||||
|
@ -218,11 +219,12 @@ impl QuerierTable {
|
|||
.timestamp_nanos()
|
||||
- retention_period.as_nanos() as i64;
|
||||
|
||||
// Add predicate to only keep chunks inside the retention period: time >= retention_period
|
||||
// Add predicate to only keep chunks inside the retention period: time >=
|
||||
// retention_period
|
||||
let predicate = predicate.clone().with_retention(retention_time_ns);
|
||||
|
||||
// Expression used to add to delete predicate to delete data older than retention period
|
||||
// time < retention_time
|
||||
// Expression used to add to delete predicate to delete data older than retention
|
||||
// period time < retention_time
|
||||
let retention_delete_pred = Some(DeletePredicate::retention_delete_predicate(
|
||||
retention_time_ns,
|
||||
));
|
||||
|
@ -375,7 +377,12 @@ impl QuerierTable {
|
|||
&predicate,
|
||||
)
|
||||
.context(ChunkPruningSnafu)?;
|
||||
debug!(%predicate, num_initial_chunks, num_final_chunks=chunks.len(), "pruned with pushed down predicates");
|
||||
debug!(
|
||||
%predicate,
|
||||
num_initial_chunks,
|
||||
num_final_chunks=chunks.len(),
|
||||
"pruned with pushed down predicates"
|
||||
);
|
||||
Ok(chunks)
|
||||
}
|
||||
|
||||
|
@ -442,7 +449,19 @@ impl QuerierTable {
|
|||
.map(|sharder| vec![**sharder.shard_for_query(&self.table_name, &self.namespace_name)]);
|
||||
|
||||
// get cached table w/o any must-coverage information
|
||||
let Some(cached_table) = self.chunk_adapter.catalog_cache().namespace().get(Arc::clone(&self.namespace_name), &[], span_recorder.child_span("get namespace")).await.and_then(|ns| ns.tables.get(&self.table_name).cloned()) else {return Ok(vec![])};
|
||||
let Some(cached_table) = self.chunk_adapter
|
||||
.catalog_cache()
|
||||
.namespace()
|
||||
.get(
|
||||
Arc::clone(&self.namespace_name),
|
||||
&[],
|
||||
span_recorder.child_span("get namespace")
|
||||
)
|
||||
.await
|
||||
.and_then(|ns| ns.tables.get(&self.table_name).cloned())
|
||||
else {
|
||||
return Ok(vec![])
|
||||
};
|
||||
|
||||
// get any chunks from the ingester(s)
|
||||
let partitions_result = ingester_connection
|
||||
|
@ -459,19 +478,21 @@ impl QuerierTable {
|
|||
|
||||
let partitions = partitions_result?;
|
||||
|
||||
// check that partitions from ingesters don't overlap
|
||||
let mut seen = HashMap::with_capacity(partitions.len());
|
||||
for partition in &partitions {
|
||||
match seen.entry(partition.partition_id()) {
|
||||
Entry::Occupied(o) => {
|
||||
return Err(Error::IngestersOverlap {
|
||||
ingester1: Arc::clone(o.get()),
|
||||
ingester2: Arc::clone(partition.ingester()),
|
||||
partition: partition.partition_id(),
|
||||
})
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(Arc::clone(partition.ingester()));
|
||||
if !self.rpc_write() {
|
||||
// check that partitions from ingesters don't overlap
|
||||
let mut seen = HashMap::with_capacity(partitions.len());
|
||||
for partition in &partitions {
|
||||
match seen.entry(partition.partition_id()) {
|
||||
Entry::Occupied(o) => {
|
||||
return Err(Error::IngestersOverlap {
|
||||
ingester1: Arc::clone(o.get()),
|
||||
ingester2: Arc::clone(partition.ingester()),
|
||||
partition: partition.partition_id(),
|
||||
})
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(Arc::clone(partition.ingester()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -915,8 +936,8 @@ mod tests {
|
|||
// ( ingester in-mem data )
|
||||
//
|
||||
//
|
||||
// However there is no way to split the parquet data into the "wanted" and "ignored" part because we don't have
|
||||
// row-level sequence numbers.
|
||||
// However there is no way to split the parquet data into the "wanted" and "ignored" part
|
||||
// because we don't have row-level sequence numbers.
|
||||
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol("table foo=1 11")
|
||||
|
@ -1021,7 +1042,8 @@ mod tests {
|
|||
),
|
||||
)
|
||||
.with_ingester_partition(
|
||||
// this chunk is filtered out because it has no record batches but the reconciling still takes place
|
||||
// this chunk is filtered out because it has no record batches but the reconciling
|
||||
// still takes place
|
||||
builder2.with_ingester_chunk_id(u128::MAX).build(
|
||||
// parquet max persisted sequence number
|
||||
Some(SequenceNumber::new(3)),
|
||||
|
@ -1057,7 +1079,8 @@ mod tests {
|
|||
// parquet chunks have predicate attached
|
||||
assert_eq!(chunks[0].delete_predicates().len(), 1);
|
||||
assert_eq!(chunks[1].delete_predicates().len(), 2);
|
||||
// ingester in-mem chunk doesn't need predicates, because the ingester has already materialized them for us
|
||||
// ingester in-mem chunk doesn't need predicates, because the ingester has already
|
||||
// materialized them for us
|
||||
assert_eq!(chunks[2].delete_predicates().len(), 0);
|
||||
|
||||
// check spans
|
||||
|
|
Loading…
Reference in New Issue