Merge branch 'main' into cn/compact-filtered-files

pull/24376/head
kodiakhq[bot] 2022-07-21 20:52:40 +00:00 committed by GitHub
commit 0af73acfa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 243 additions and 98 deletions

View File

@ -0,0 +1 @@
ALTER TABLE parquet_file DROP COLUMN min_sequence_number;

View File

@ -146,6 +146,7 @@ impl NamespaceCache {
&self,
name: Arc<str>,
should_cover: &[(&str, &HashSet<ColumnId>)],
span: Option<Span>,
) -> Option<Arc<NamespaceSchema>> {
self.backend.remove_if(&name, |cached_namespace| {
if let Some(namespace) = cached_namespace.as_ref() {
@ -164,9 +165,8 @@ impl NamespaceCache {
}
});
// TODO(marco): pass span
self.cache
.get(name, ((), None))
.get(name, ((), span))
.await
.map(|n| Arc::clone(&n.schema))
}
@ -223,7 +223,7 @@ mod tests {
);
let schema1_a = cache
.schema(Arc::from(String::from("ns1")), &[])
.schema(Arc::from(String::from("ns1")), &[], None)
.await
.unwrap();
let expected_schema_1 = NamespaceSchema {
@ -288,7 +288,7 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
let schema2 = cache
.schema(Arc::from(String::from("ns2")), &[])
.schema(Arc::from(String::from("ns2")), &[], None)
.await
.unwrap();
let expected_schema_2 = NamespaceSchema {
@ -313,7 +313,7 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
let schema1_b = cache
.schema(Arc::from(String::from("ns1")), &[])
.schema(Arc::from(String::from("ns1")), &[], None)
.await
.unwrap();
assert!(Arc::ptr_eq(&schema1_a, &schema1_b));
@ -323,7 +323,7 @@ mod tests {
catalog.mock_time_provider().inc(TTL_EXISTING);
let schema1_c = cache
.schema(Arc::from(String::from("ns1")), &[])
.schema(Arc::from(String::from("ns1")), &[], None)
.await
.unwrap();
assert_eq!(schema1_c.as_ref(), schema1_a.as_ref());
@ -344,18 +344,24 @@ mod tests {
true,
);
let none = cache.schema(Arc::from(String::from("foo")), &[]).await;
let none = cache
.schema(Arc::from(String::from("foo")), &[], None)
.await;
assert!(none.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
let none = cache.schema(Arc::from(String::from("foo")), &[]).await;
let none = cache
.schema(Arc::from(String::from("foo")), &[], None)
.await;
assert!(none.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
// cache timeout
catalog.mock_time_provider().inc(TTL_NON_EXISTING);
let none = cache.schema(Arc::from(String::from("foo")), &[]).await;
let none = cache
.schema(Arc::from(String::from("foo")), &[], None)
.await;
assert!(none.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
}
@ -374,14 +380,14 @@ mod tests {
);
// ========== namespace unknown ==========
assert!(cache.schema(Arc::from("ns1"), &[]).await.is_none());
assert!(cache.schema(Arc::from("ns1"), &[], None).await.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
assert!(cache.schema(Arc::from("ns1"), &[]).await.is_none());
assert!(cache.schema(Arc::from("ns1"), &[], None).await.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))])
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
@ -390,13 +396,13 @@ mod tests {
let ns1 = catalog.create_namespace("ns1").await;
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))])
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 3);
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))])
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 4);
@ -405,13 +411,13 @@ mod tests {
let t1 = ns1.create_table("t1").await;
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))])
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))])
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
@ -421,19 +427,27 @@ mod tests {
let c2 = t1.create_column("c2", ColumnType::Bool).await;
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))])
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([c1.column.id]))])
.schema(
Arc::from("ns1"),
&[("t1", &HashSet::from([c1.column.id]))],
None
)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6);
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([c2.column.id]))])
.schema(
Arc::from("ns1"),
&[("t1", &HashSet::from([c2.column.id]))],
None
)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6);

View File

@ -173,9 +173,8 @@ impl ParquetFileCache {
}
/// Get list of cached parquet files, by table id
pub async fn get(&self, table_id: TableId) -> Arc<CachedParquetFiles> {
// TODO(marco): pass span
self.cache.get(table_id, ((), None)).await
pub async fn get(&self, table_id: TableId, span: Option<Span>) -> Arc<CachedParquetFiles> {
self.cache.get(table_id, ((), span)).await
}
/// Mark the entry for table_id as expired (and needs a refresh)
@ -249,7 +248,7 @@ mod tests {
let tfile = partition.create_parquet_file(builder).await;
let cache = make_cache(&catalog);
let cached_files = cache.get(table.table.id).await.vec();
let cached_files = cache.get(table.table.id, None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile.parquet_file;
@ -257,7 +256,7 @@ mod tests {
// validate a second request doens't result in a catalog request
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.get(table.table.id).await;
cache.get(table.table.id, None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
}
@ -276,12 +275,12 @@ mod tests {
let cache = make_cache(&catalog);
let cached_files = cache.get(table1.table.id).await.vec();
let cached_files = cache.get(table1.table.id, None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile1.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
let cached_files = cache.get(table2.table.id).await.vec();
let cached_files = cache.get(table2.table.id, None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile2.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
@ -297,7 +296,7 @@ mod tests {
let different_catalog = TestCatalog::new();
let cache = make_cache(&different_catalog);
let cached_files = cache.get(table.table.id).await.vec();
let cached_files = cache.get(table.table.id, None).await.vec();
assert!(cached_files.is_empty());
}
@ -313,14 +312,14 @@ mod tests {
assert!(single_file_size < two_file_size);
let cache = make_cache(&catalog);
let cached_files = cache.get(table_id).await;
let cached_files = cache.get(table_id, None).await;
assert_eq!(cached_files.size(), single_file_size);
// add a second file, and force the cache to find it
let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE1_LINE_PROTOCOL);
partition.create_parquet_file(builder).await;
cache.expire(table_id);
let cached_files = cache.get(table_id).await;
let cached_files = cache.get(table_id, None).await;
assert_eq!(cached_files.size(), two_file_size);
}
@ -349,7 +348,7 @@ mod tests {
let cache = make_cache(&catalog);
let table_id = table.table.id;
assert_eq!(
cache.get(table_id).await.ids(),
cache.get(table_id, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
@ -358,7 +357,7 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_2));
assert_eq!(
cache.get(table_id).await.ids(),
cache.get(table_id, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
@ -367,7 +366,7 @@ mod tests {
// should not expire anything
cache.expire_on_newly_persisted_files(table_id, None);
assert_eq!(
cache.get(table_id).await.ids(),
cache.get(table_id, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
@ -381,7 +380,7 @@ mod tests {
let tfile1_10 = partition.create_parquet_file(builder).await;
// cache doesn't have tfile1_10
assert_eq!(
cache.get(table_id).await.ids(),
cache.get(table_id, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
@ -389,7 +388,7 @@ mod tests {
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_10));
// now cache has tfile!_10 (yay!)
assert_eq!(
cache.get(table_id).await.ids(),
cache.get(table_id, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3, &tfile1_10])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
@ -402,16 +401,16 @@ mod tests {
let table_id = table.table.id;
// no parquet files, sould be none
assert!(cache.get(table_id).await.files.is_empty());
assert!(cache.get(table_id, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// second request should be cached
assert!(cache.get(table_id).await.files.is_empty());
assert!(cache.get(table_id, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Calls to expire if there is no known persisted file, should still be cached
cache.expire_on_newly_persisted_files(table_id, None);
assert!(cache.get(table_id).await.files.is_empty());
assert!(cache.get(table_id, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// make a new parquet file
@ -424,12 +423,12 @@ mod tests {
let tfile = partition.create_parquet_file(builder).await;
// cache is stale
assert!(cache.get(table_id).await.files.is_empty());
assert!(cache.get(table_id, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Now call to expire with knowledge of new file, will cause a cache refresh
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_1));
assert_eq!(cache.get(table_id).await.ids(), ids(&[&tfile]));
assert_eq!(cache.get(table_id, None).await.ids(), ids(&[&tfile]));
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}

View File

@ -108,9 +108,8 @@ impl PartitionCache {
}
/// Get sequencer ID.
pub async fn sequencer_id(&self, partition_id: PartitionId) -> SequencerId {
// TODO(marco): pass span
self.cache.get(partition_id, ((), None)).await.sequencer_id
pub async fn sequencer_id(&self, partition_id: PartitionId, span: Option<Span>) -> SequencerId {
self.cache.get(partition_id, ((), span)).await.sequencer_id
}
/// Get sort key
@ -120,6 +119,7 @@ impl PartitionCache {
&self,
partition_id: PartitionId,
should_cover: &HashSet<&str>,
span: Option<Span>,
) -> Arc<Option<SortKey>> {
self.backend.remove_if(&partition_id, |cached_partition| {
if let Some(sort_key) = cached_partition.sort_key.as_ref().as_ref() {
@ -134,8 +134,7 @@ impl PartitionCache {
}
});
// TODO(marco): pass span
self.cache.get(partition_id, ((), None)).await.sort_key
self.cache.get(partition_id, ((), span)).await.sort_key
}
}
@ -195,15 +194,15 @@ mod tests {
true,
);
let id1 = cache.sequencer_id(p1.id).await;
let id1 = cache.sequencer_id(p1.id, None).await;
assert_eq!(id1, s1.sequencer.id);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
let id2 = cache.sequencer_id(p2.id).await;
let id2 = cache.sequencer_id(p2.id, None).await;
assert_eq!(id2, s2.sequencer.id);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
let id1 = cache.sequencer_id(p1.id).await;
let id1 = cache.sequencer_id(p1.id, None).await;
assert_eq!(id1, s1.sequencer.id);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
}
@ -238,15 +237,15 @@ mod tests {
true,
);
let sort_key1 = cache.sort_key(p1.id, &HashSet::new()).await;
let sort_key1 = cache.sort_key(p1.id, &HashSet::new(), None).await;
assert_eq!(sort_key1.as_ref(), &p1.sort_key());
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
let sort_key2 = cache.sort_key(p2.id, &HashSet::new()).await;
let sort_key2 = cache.sort_key(p2.id, &HashSet::new(), None).await;
assert_eq!(sort_key2.as_ref(), &p2.sort_key());
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
let sort_key1 = cache.sort_key(p1.id, &HashSet::new()).await;
let sort_key1 = cache.sort_key(p1.id, &HashSet::new(), None).await;
assert_eq!(sort_key1.as_ref(), &p1.sort_key());
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
}
@ -287,16 +286,16 @@ mod tests {
true,
);
cache.sequencer_id(p2.id).await;
cache.sort_key(p3.id, &HashSet::new()).await;
cache.sequencer_id(p2.id, None).await;
cache.sort_key(p3.id, &HashSet::new(), None).await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
cache.sequencer_id(p1.id).await;
cache.sort_key(p2.id, &HashSet::new()).await;
cache.sequencer_id(p1.id, None).await;
cache.sort_key(p2.id, &HashSet::new(), None).await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
cache.sort_key(p1.id, &HashSet::new()).await;
cache.sequencer_id(p2.id).await;
cache.sort_key(p1.id, &HashSet::new(), None).await;
cache.sequencer_id(p2.id, None).await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
}
@ -320,18 +319,18 @@ mod tests {
true,
);
let sort_key = cache.sort_key(p_id, &HashSet::new()).await;
let sort_key = cache.sort_key(p_id, &HashSet::new(), None).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
// requesting nother will not expire
assert!(p_sort_key.is_none());
let sort_key = cache.sort_key(p_id, &HashSet::new()).await;
let sort_key = cache.sort_key(p_id, &HashSet::new(), None).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
// but requesting something will expire
let sort_key = cache.sort_key(p_id, &HashSet::from(["foo"])).await;
let sort_key = cache.sort_key(p_id, &HashSet::from(["foo"]), None).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
@ -342,7 +341,7 @@ mod tests {
// expire & fetch
let p_sort_key = p.partition.sort_key();
let sort_key = cache.sort_key(p_id, &HashSet::from(["foo"])).await;
let sort_key = cache.sort_key(p_id, &HashSet::from(["foo"]), None).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
@ -353,13 +352,15 @@ mod tests {
HashSet::from(["bar"]),
HashSet::from(["foo", "bar"]),
] {
let sort_key = cache.sort_key(p_id, &should_cover).await;
let sort_key = cache.sort_key(p_id, &should_cover, None).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
}
// unknown columns expire
let sort_key = cache.sort_key(p_id, &HashSet::from(["foo", "x"])).await;
let sort_key = cache
.sort_key(p_id, &HashSet::from(["foo", "x"]), None)
.await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
}

View File

@ -432,7 +432,13 @@ impl ChunkAdapter {
let namespace_schema = self
.catalog_cache
.namespace()
.schema(namespace_name, &[(&table_name, &file_column_ids)])
.schema(
namespace_name,
&[(&table_name, &file_column_ids)],
span_recorder
.span()
.map(|span| span.child("cache GET namespace schema")),
)
.await?;
let table_schema_catalog = namespace_schema.tables.get(table_name.as_ref())?;
let column_id_lookup = table_schema_catalog.column_id_map();
@ -464,7 +470,13 @@ impl ChunkAdapter {
let partition_sort_key = self
.catalog_cache
.partition()
.sort_key(parquet_file.partition_id, &relevant_pk_columns)
.sort_key(
parquet_file.partition_id,
&relevant_pk_columns,
span_recorder
.span()
.map(|span| span.child("cache GET partition sort key")),
)
.await;
let partition_sort_key_ref = partition_sort_key
.as_ref()

View File

@ -14,7 +14,7 @@ use service_common::QueryDatabaseProvider;
use sharder::JumpHash;
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc};
use trace::span::Span;
use trace::span::{Span, SpanRecorder};
use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
};
@ -78,8 +78,8 @@ pub struct QuerierDatabase {
impl QueryDatabaseProvider for QuerierDatabase {
type Db = QuerierNamespace;
async fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
self.namespace(name).await
async fn db(&self, name: &str, span: Option<Span>) -> Option<Arc<Self::Db>> {
self.namespace(name, span).await
}
async fn acquire_semaphore(&self, span: Option<Span>) -> InstrumentedAsyncOwnedSemaphorePermit {
@ -151,7 +151,8 @@ impl QuerierDatabase {
///
/// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER
/// a semaphore permit was acquired since this lowers the chance that we obtain stale data.
pub async fn namespace(&self, name: &str) -> Option<Arc<QuerierNamespace>> {
pub async fn namespace(&self, name: &str, span: Option<Span>) -> Option<Arc<QuerierNamespace>> {
let span_recorder = SpanRecorder::new(span);
let name = Arc::from(name.to_owned());
let schema = self
.catalog_cache
@ -160,6 +161,9 @@ impl QuerierDatabase {
Arc::clone(&name),
// we have no specific need for any tables or columns at this point, so nothing to cover
&[],
span_recorder
.span()
.map(|span| span.child("cache GET namespace schema")),
)
.await?;
Some(Arc::new(QuerierNamespace::new(
@ -304,8 +308,8 @@ mod tests {
catalog.create_namespace("ns1").await;
assert!(db.namespace("ns1").await.is_some());
assert!(db.namespace("ns2").await.is_none());
assert!(db.namespace("ns1", None).await.is_some());
assert!(db.namespace("ns2", None).await.is_none());
}
#[tokio::test]

View File

@ -256,6 +256,10 @@ impl<'a> ObserveIngesterRequest<'a> {
}
}
fn span_recorder(&self) -> &SpanRecorder {
&self.span_recorder
}
fn set_ok(mut self) {
self.res = Some(Ok(()));
self.span_recorder.ok("done");
@ -441,7 +445,10 @@ struct GetPartitionForIngester<'a> {
}
/// Fetches the partitions for a single ingester
async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<IngesterPartition>> {
async fn execute(
request: GetPartitionForIngester<'_>,
span_recorder: &SpanRecorder,
) -> Result<Vec<IngesterPartition>> {
let GetPartitionForIngester {
flight_client,
catalog_cache,
@ -514,8 +521,15 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<IngesterPar
}
// reconstruct partitions
let mut decoder =
IngesterStreamDecoder::new(ingester_address, table_name, catalog_cache, expected_schema);
let mut decoder = IngesterStreamDecoder::new(
ingester_address,
table_name,
catalog_cache,
expected_schema,
span_recorder
.span()
.map(|span| span.child("IngesterStreamDecoder")),
);
for (msg, md) in messages {
decoder.register(msg, md).await?;
}
@ -535,6 +549,7 @@ struct IngesterStreamDecoder {
table_name: Arc<str>,
catalog_cache: Arc<CatalogCache>,
expected_schema: Arc<Schema>,
span_recorder: SpanRecorder,
}
impl IngesterStreamDecoder {
@ -544,6 +559,7 @@ impl IngesterStreamDecoder {
table_name: Arc<str>,
catalog_cache: Arc<CatalogCache>,
expected_schema: Arc<Schema>,
span: Option<Span>,
) -> Self {
Self {
finished_partitions: HashMap::new(),
@ -553,6 +569,7 @@ impl IngesterStreamDecoder {
table_name,
catalog_cache,
expected_schema,
span_recorder: SpanRecorder::new(span),
}
}
@ -590,7 +607,13 @@ impl IngesterStreamDecoder {
let partition_sort_key = self
.catalog_cache
.partition()
.sort_key(current_partition.partition_id(), &primary_key)
.sort_key(
current_partition.partition_id(),
&primary_key,
self.span_recorder
.span()
.map(|span| span.child("cache GET partition sort key")),
)
.await;
let current_partition = current_partition.with_partition_sort_key(partition_sort_key);
self.finished_partitions
@ -626,7 +649,12 @@ impl IngesterStreamDecoder {
let sequencer_id = self
.catalog_cache
.partition()
.sequencer_id(partition_id)
.sequencer_id(
partition_id,
self.span_recorder
.span()
.map(|span| span.child("cache GET partition sequencer ID")),
)
.await;
// Use a temporary empty partition sort key. We are going to fetch this AFTER we know all chunks because
@ -686,14 +714,16 @@ impl IngesterStreamDecoder {
let mut ids: Vec<_> = self.finished_partitions.keys().copied().collect();
ids.sort();
Ok(ids
let partitions = ids
.into_iter()
.map(|id| {
self.finished_partitions
.remove(&id)
.expect("just got key from this map")
})
.collect())
.collect();
self.span_recorder.ok("finished");
Ok(partitions)
}
}
@ -754,7 +784,7 @@ impl IngesterConnection for IngesterConnectionImpl {
// always measured
let measure_me = ObserveIngesterRequest::new(request.clone(), metrics, &span_recorder);
async move {
let res = execute(request.clone()).await;
let res = execute(request.clone(), measure_me.span_recorder()).await;
match &res {
Ok(_) => measure_me.set_ok(),

View File

@ -174,7 +174,12 @@ impl QuerierTable {
.span()
.map(|span| span.child("ingester partitions"))
),
catalog_cache.parquet_file().get(self.id()),
catalog_cache.parquet_file().get(
self.id(),
span_recorder
.span()
.map(|span| span.child("cache GET parquet_file (pre-warm"))
),
catalog_cache.tombstone().get(
self.id(),
span_recorder
@ -199,7 +204,12 @@ impl QuerierTable {
// Now fetch the actual contents of the catalog we need
let (parquet_files, tombstones) = join!(
catalog_cache.parquet_file().get(self.id()),
catalog_cache.parquet_file().get(
self.id(),
span_recorder
.span()
.map(|span| span.child("cache GET parquet_file"))
),
catalog_cache.tombstone().get(
self.id(),
span_recorder

View File

@ -72,7 +72,14 @@ impl Reconciler {
chunks.extend(self.build_ingester_chunks(ingester_partitions));
debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation");
let chunks = self.sync_partition_sort_keys(chunks).await;
let chunks = self
.sync_partition_sort_keys(
chunks,
span_recorder
.span()
.map(|span| span.child("sync_partition_sort_key")),
)
.await;
let chunks: Vec<Arc<dyn QueryChunk>> = chunks
.into_iter()
@ -217,7 +224,10 @@ impl Reconciler {
async fn sync_partition_sort_keys(
&self,
chunks: Vec<Box<dyn UpdatableQuerierChunk>>,
span: Option<Span>,
) -> Vec<Box<dyn UpdatableQuerierChunk>> {
let span_recorder = SpanRecorder::new(span);
// collect columns
let chunk_schemas: Vec<_> = chunks
.iter()
@ -237,7 +247,15 @@ impl Reconciler {
let mut sort_keys: HashMap<PartitionId, Arc<Option<SortKey>>> =
HashMap::with_capacity(all_columns.len());
for (partition_id, columns) in all_columns.into_iter() {
let sort_key = partition_cache.sort_key(partition_id, &columns).await;
let sort_key = partition_cache
.sort_key(
partition_id,
&columns,
span_recorder
.span()
.map(|span| span.child("cache GET partition sort key")),
)
.await;
sort_keys.insert(partition_id, sort_key);
}

View File

@ -20,7 +20,7 @@ pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static {
type Db: ExecutionContextProvider + QueryDatabase;
/// Get database if it exists.
async fn db(&self, name: &str) -> Option<Arc<Self::Db>>;
async fn db(&self, name: &str, span: Option<Span>) -> Option<Arc<Self::Db>>;
/// Acquire concurrency-limiting sempahore
async fn acquire_semaphore(&self, span: Option<Span>) -> InstrumentedAsyncOwnedSemaphorePermit;

View File

@ -61,7 +61,7 @@ impl QueryDatabaseProvider for TestDatabaseStore {
type Db = TestDatabase;
/// Retrieve the database specified name
async fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
async fn db(&self, name: &str, _span: Option<Span>) -> Option<Arc<Self::Db>> {
let databases = self.databases.lock();
databases.get(name).cloned()

View File

@ -210,10 +210,16 @@ where
let database =
DatabaseName::new(&read_info.database_name).context(InvalidDatabaseNameSnafu)?;
let db =
self.server.db(&database).await.ok_or_else(|| {
tonic::Status::not_found(format!("Unknown namespace: {database}"))
})?;
let db = self
.server
.db(
&database,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {database}")))?;
let ctx = db.new_query_context(span_ctx);
let query_completed_token =

View File

@ -246,7 +246,12 @@ where
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -290,7 +295,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -366,7 +376,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -442,7 +457,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -506,7 +526,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -604,7 +629,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -709,7 +739,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -766,7 +801,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -833,7 +873,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
@ -902,7 +947,12 @@ where
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.db(
&db_name,
span_ctx
.as_ref()
.map(|span_ctx| span_ctx.child("get namespace")),
)
.await
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;