fix: Don't assume one ingester per shard/table

pull/24376/head
Carol (Nichols || Goulding) 2022-06-20 13:29:48 -04:00
parent 4e91121e29
commit 44bce8e3ec
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 45 additions and 34 deletions

View File

@ -168,7 +168,7 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
/// Returns all partitions ingester(s) know about for the specified table.
async fn partitions(
&self,
sequencer_id: KafkaPartition,
sequencer_ids: &[KafkaPartition],
namespace_name: Arc<str>,
table_name: Arc<str>,
columns: Vec<String>,
@ -665,7 +665,7 @@ impl IngesterConnection for IngesterConnectionImpl {
/// Retrieve chunks from the ingester for the particular table, sequencer, and predicate
async fn partitions(
&self,
sequencer_id: KafkaPartition,
sequencer_ids: &[KafkaPartition],
namespace_name: Arc<str>,
table_name: Arc<str>,
columns: Vec<String>,
@ -678,7 +678,7 @@ impl IngesterConnection for IngesterConnectionImpl {
let request = GetPartitionForIngester {
flight_client: Arc::clone(&self.flight_client),
catalog_cache: Arc::clone(&self.catalog_cache),
ingester_address: Arc::clone(ingester_address),
ingester_address,
namespace_name: Arc::clone(&namespace_name),
table_name: Arc::clone(&table_name),
columns: columns.clone(),
@ -688,7 +688,8 @@ impl IngesterConnection for IngesterConnectionImpl {
let metrics = Arc::clone(&metrics);
// wrap `execute` into an additional future so that we can measure the request time
// INFO: create the measurement structure outside of the async block so cancellation is always measured
// INFO: create the measurement structure outside of the async block so cancellation is
// always measured
let measure_me = ObserveIngesterRequest::new(request.clone(), metrics);
async move {
let res = execute(request.clone()).await;
@ -702,24 +703,31 @@ impl IngesterConnection for IngesterConnectionImpl {
}
};
// Look up the ingester needed for the sequencer
let mut ingester_partitions =
if let Some(ingester_address) = self.sequencer_to_ingester.get(&sequencer_id) {
// If found, only query that ingester
measured_ingester_request(ingester_address).await?
} else {
// If a specific ingester for the sequencer isn't found, query all ingesters
self.unique_ingester_addresses
.iter()
.map(move |ingester_address| measured_ingester_request(ingester_address))
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?
// We have a Vec<Vec<..>> flatten to Vec<_>
.into_iter()
.flatten()
.collect()
};
// Look up the ingesters needed for the sequencer. Collect into a HashSet to avoid making
// multiple requests to the same ingester if that ingester is responsible for multiple
// sequencer_ids relevant to this query.
let relevant_ingester_addresses: HashSet<_> = sequencer_ids
.iter()
.flat_map(|sequencer_id| self.sequencer_to_ingester.get(sequencer_id).map(Arc::clone))
.collect();
let relevant_ingester_addresses = if relevant_ingester_addresses.is_empty() {
// If specific ingesters for the sequencers aren't found, query all ingesters
self.unique_ingester_addresses.clone()
} else {
relevant_ingester_addresses
};
let mut ingester_partitions: Vec<IngesterPartition> = relevant_ingester_addresses
.into_iter()
.map(move |ingester_address| measured_ingester_request(ingester_address))
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?
// We have a Vec<Vec<..>> flatten to Vec<_>
.into_iter()
.flatten()
.collect();
ingester_partitions.sort_by_key(|p| p.partition_id);
Ok(ingester_partitions)
@ -1691,14 +1699,14 @@ mod tests {
ingester_conn: &IngesterConnectionImpl,
sequencer_id: i32,
) -> Result<Vec<IngesterPartition>, Error> {
let sequencer_id = KafkaPartition::new(sequencer_id);
let sequencer_ids = vec![KafkaPartition::new(sequencer_id)];
let namespace = Arc::from("namespace");
let table = Arc::from("table");
let columns = vec![String::from("col")];
let schema = schema();
ingester_conn
.partitions(
sequencer_id,
&sequencer_ids,
namespace,
table,
columns,

View File

@ -28,7 +28,7 @@ impl MockIngesterConnection {
impl IngesterConnection for MockIngesterConnection {
async fn partitions(
&self,
_sequencer_id: KafkaPartition,
_sequencer_ids: &[KafkaPartition],
_namespace_name: Arc<str>,
_table_name: Arc<str>,
_columns: Vec<String>,

View File

@ -60,12 +60,15 @@ impl QuerierNamespace {
.iter()
.map(|(table_name, table_schema)| {
let table_name = Arc::from(table_name.clone());
let sequencer_id = **sharder.shard_for_query(&table_name, &name);
let id = table_schema.id;
let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema");
// Currently, the sharder will only return one sequencer ID per table, but in the
// near future, the sharder might return more than one sequencer ID for one table.
let sequencer_ids = vec![**sharder.shard_for_query(&table_name, &name)];
let table = Arc::new(QuerierTable::new(
sequencer_id,
sequencer_ids,
Arc::clone(&name),
id,
Arc::clone(&table_name),

View File

@ -52,8 +52,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Table representation for the querier.
#[derive(Debug)]
pub struct QuerierTable {
/// Sequencer the table is in
sequencer_id: KafkaPartition,
/// Sequencers responsible for the table's data that need to be queried for unpersisted data
sequencer_ids: Vec<KafkaPartition>,
/// Namespace the table is in
namespace_name: Arc<str>,
@ -80,7 +80,7 @@ pub struct QuerierTable {
impl QuerierTable {
/// Create new table.
pub fn new(
sequencer_id: KafkaPartition,
sequencer_ids: Vec<KafkaPartition>,
namespace_name: Arc<str>,
id: TableId,
table_name: Arc<str>,
@ -95,7 +95,7 @@ impl QuerierTable {
);
Self {
sequencer_id,
sequencer_ids,
namespace_name,
table_name,
id,
@ -188,11 +188,11 @@ impl QuerierTable {
.map(|(_, f)| f.name().to_string())
.collect();
// get any chunks from the ingster
// get any chunks from the ingester
let partitions_result = self
.ingester_connection
.partitions(
self.sequencer_id,
&self.sequencer_ids,
Arc::clone(&self.namespace_name),
Arc::clone(&self.table_name),
columns,

View File

@ -37,7 +37,7 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
let namespace_name = Arc::from(table.namespace.namespace.name.as_str());
QuerierTable::new(
KafkaPartition::new(0),
vec![KafkaPartition::new(0)],
namespace_name,
table.table.id,
table.table.name.clone().into(),