From 44bce8e3ecb7af7e618db27c521d75cf2b0a46eb Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 20 Jun 2022 13:29:48 -0400 Subject: [PATCH] fix: Don't assume one ingester per shard/table --- querier/src/ingester/mod.rs | 56 ++++++++++++++++++------------- querier/src/ingester/test_util.rs | 2 +- querier/src/namespace/mod.rs | 7 ++-- querier/src/table/mod.rs | 12 +++---- querier/src/table/test_util.rs | 2 +- 5 files changed, 45 insertions(+), 34 deletions(-) diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 63757352eb..a7be2b3d83 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -168,7 +168,7 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static { /// Returns all partitions ingester(s) know about for the specified table. async fn partitions( &self, - sequencer_id: KafkaPartition, + sequencer_ids: &[KafkaPartition], namespace_name: Arc, table_name: Arc, columns: Vec, @@ -665,7 +665,7 @@ impl IngesterConnection for IngesterConnectionImpl { /// Retrieve chunks from the ingester for the particular table, sequencer, and predicate async fn partitions( &self, - sequencer_id: KafkaPartition, + sequencer_ids: &[KafkaPartition], namespace_name: Arc, table_name: Arc, columns: Vec, @@ -678,7 +678,7 @@ impl IngesterConnection for IngesterConnectionImpl { let request = GetPartitionForIngester { flight_client: Arc::clone(&self.flight_client), catalog_cache: Arc::clone(&self.catalog_cache), - ingester_address: Arc::clone(ingester_address), + ingester_address, namespace_name: Arc::clone(&namespace_name), table_name: Arc::clone(&table_name), columns: columns.clone(), @@ -688,7 +688,8 @@ impl IngesterConnection for IngesterConnectionImpl { let metrics = Arc::clone(&metrics); // wrap `execute` into an additional future so that we can measure the request time - // INFO: create the measurement structure outside of the async block so cancellation is always measured + // INFO: create the measurement structure outside of the async block so cancellation is + // always measured let measure_me = ObserveIngesterRequest::new(request.clone(), metrics); async move { let res = execute(request.clone()).await; @@ -702,24 +703,31 @@ impl IngesterConnection for IngesterConnectionImpl { } }; - // Look up the ingester needed for the sequencer - let mut ingester_partitions = - if let Some(ingester_address) = self.sequencer_to_ingester.get(&sequencer_id) { - // If found, only query that ingester - measured_ingester_request(ingester_address).await? - } else { - // If a specific ingester for the sequencer isn't found, query all ingesters - self.unique_ingester_addresses - .iter() - .map(move |ingester_address| measured_ingester_request(ingester_address)) - .collect::>() - .try_collect::>() - .await? - // We have a Vec> flatten to Vec<_> - .into_iter() - .flatten() - .collect() - }; + // Look up the ingesters needed for the sequencer. Collect into a HashSet to avoid making + // multiple requests to the same ingester if that ingester is responsible for multiple + // sequencer_ids relevant to this query. + let relevant_ingester_addresses: HashSet<_> = sequencer_ids + .iter() + .flat_map(|sequencer_id| self.sequencer_to_ingester.get(sequencer_id).map(Arc::clone)) + .collect(); + + let relevant_ingester_addresses = if relevant_ingester_addresses.is_empty() { + // If specific ingesters for the sequencers aren't found, query all ingesters + self.unique_ingester_addresses.clone() + } else { + relevant_ingester_addresses + }; + + let mut ingester_partitions: Vec = relevant_ingester_addresses + .into_iter() + .map(move |ingester_address| measured_ingester_request(ingester_address)) + .collect::>() + .try_collect::>() + .await? + // We have a Vec> flatten to Vec<_> + .into_iter() + .flatten() + .collect(); ingester_partitions.sort_by_key(|p| p.partition_id); Ok(ingester_partitions) @@ -1691,14 +1699,14 @@ mod tests { ingester_conn: &IngesterConnectionImpl, sequencer_id: i32, ) -> Result, Error> { - let sequencer_id = KafkaPartition::new(sequencer_id); + let sequencer_ids = vec![KafkaPartition::new(sequencer_id)]; let namespace = Arc::from("namespace"); let table = Arc::from("table"); let columns = vec![String::from("col")]; let schema = schema(); ingester_conn .partitions( - sequencer_id, + &sequencer_ids, namespace, table, columns, diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index a50a8d3716..b22be36c96 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -28,7 +28,7 @@ impl MockIngesterConnection { impl IngesterConnection for MockIngesterConnection { async fn partitions( &self, - _sequencer_id: KafkaPartition, + _sequencer_ids: &[KafkaPartition], _namespace_name: Arc, _table_name: Arc, _columns: Vec, diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index ba8f0a0506..115975f1bc 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -60,12 +60,15 @@ impl QuerierNamespace { .iter() .map(|(table_name, table_schema)| { let table_name = Arc::from(table_name.clone()); - let sequencer_id = **sharder.shard_for_query(&table_name, &name); let id = table_schema.id; let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema"); + // Currently, the sharder will only return one sequencer ID per table, but in the + // near future, the sharder might return more than one sequencer ID for one table. + let sequencer_ids = vec![**sharder.shard_for_query(&table_name, &name)]; + let table = Arc::new(QuerierTable::new( - sequencer_id, + sequencer_ids, Arc::clone(&name), id, Arc::clone(&table_name), diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 1bad3dc5e2..e3b5f1570e 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -52,8 +52,8 @@ pub type Result = std::result::Result; /// Table representation for the querier. #[derive(Debug)] pub struct QuerierTable { - /// Sequencer the table is in - sequencer_id: KafkaPartition, + /// Sequencers responsible for the table's data that need to be queried for unpersisted data + sequencer_ids: Vec, /// Namespace the table is in namespace_name: Arc, @@ -80,7 +80,7 @@ pub struct QuerierTable { impl QuerierTable { /// Create new table. pub fn new( - sequencer_id: KafkaPartition, + sequencer_ids: Vec, namespace_name: Arc, id: TableId, table_name: Arc, @@ -95,7 +95,7 @@ impl QuerierTable { ); Self { - sequencer_id, + sequencer_ids, namespace_name, table_name, id, @@ -188,11 +188,11 @@ impl QuerierTable { .map(|(_, f)| f.name().to_string()) .collect(); - // get any chunks from the ingster + // get any chunks from the ingester let partitions_result = self .ingester_connection .partitions( - self.sequencer_id, + &self.sequencer_ids, Arc::clone(&self.namespace_name), Arc::clone(&self.table_name), columns, diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index 10f8211ce8..c41751cf91 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -37,7 +37,7 @@ pub async fn querier_table(catalog: &Arc, table: &Arc) - let namespace_name = Arc::from(table.namespace.namespace.name.as_str()); QuerierTable::new( - KafkaPartition::new(0), + vec![KafkaPartition::new(0)], namespace_name, table.table.id, table.table.name.clone().into(),