refactor: parallel column resolution

A quick change to perform the ColumnRepo::create_or_get() calls in
parallel (up to a maximum of 3 in-flight at any one time) in order to
mitigate the latency of the call and reduce the overall schema
validation call duration.

The in-flight limit is enforced to avoid starving the DB connection pool
of connections.
pull/24376/head
Dom Dwyer 2022-02-24 20:47:57 +00:00
parent 723a0c659f
commit b07f15bec7
5 changed files with 66 additions and 36 deletions

View File

@ -1141,7 +1141,6 @@ mod tests {
use mutable_batch_lp::lines_to_batches;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use object_store::ObjectStoreApi;
use std::ops::DerefMut;
use std::time::Duration;
use test_helpers::assert_error;
use time::Time;
@ -1368,13 +1367,13 @@ mod tests {
lines_to_batches("mem foo=1 10", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(1, 1), ignored_ts, None, 50),
);
let _ = validate_or_insert_schema(w1.tables(), &schema, repos.deref_mut())
std::mem::drop(repos);
let _ = validate_or_insert_schema(w1.tables(), &schema, &*catalog)
.await
.unwrap()
.unwrap();
std::mem::drop(repos);
let pause_size = w1.size() + 1;
let manager = LifecycleManager::new(
LifecycleConfig::new(pause_size, 0, 0, Duration::from_secs(1)),
@ -1438,7 +1437,9 @@ mod tests {
lines_to_batches("mem foo=1 10", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(1, 1), ignored_ts, None, 50),
);
let schema = validate_or_insert_schema(w1.tables(), &schema, repos.deref_mut())
// drop repos so the mem catalog won't deadlock.
std::mem::drop(repos);
let schema = validate_or_insert_schema(w1.tables(), &schema, &*catalog)
.await
.unwrap()
.unwrap();
@ -1448,7 +1449,7 @@ mod tests {
lines_to_batches("cpu foo=1 10", 1).unwrap(),
DmlMeta::sequenced(Sequence::new(2, 1), ignored_ts, None, 50),
);
let _ = validate_or_insert_schema(w2.tables(), &schema, repos.deref_mut())
let _ = validate_or_insert_schema(w2.tables(), &schema, &*catalog)
.await
.unwrap()
.unwrap();
@ -1459,8 +1460,6 @@ mod tests {
DmlMeta::sequenced(Sequence::new(1, 2), ignored_ts, None, 50),
);
// drop repos so the mem catalog won't deadlock.
std::mem::drop(repos);
let manager = LifecycleManager::new(
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)),
Arc::new(metric::Registry::new()),

View File

@ -417,7 +417,7 @@ mod tests {
use iox_catalog::validate_or_insert_schema;
use metric::{Attributes, Metric, U64Counter, U64Gauge};
use mutable_batch_lp::lines_to_batches;
use std::{num::NonZeroU32, ops::DerefMut};
use std::num::NonZeroU32;
use time::Time;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
@ -430,7 +430,6 @@ mod tests {
ingester.kafka_topic.id,
ingester.query_pool.id,
);
let mut txn = ingester.catalog.start_transaction().await.unwrap();
let ingest_ts1 = Time::from_timestamp_millis(42);
let ingest_ts2 = Time::from_timestamp_millis(1337);
let w1 = DmlWrite::new(
@ -438,7 +437,7 @@ mod tests {
lines_to_batches("mem foo=1 10", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(0, 0), ingest_ts1, None, 50),
);
let schema = validate_or_insert_schema(w1.tables(), &schema, txn.deref_mut())
let schema = validate_or_insert_schema(w1.tables(), &schema, &*ingester.catalog)
.await
.unwrap()
.unwrap();
@ -448,7 +447,7 @@ mod tests {
lines_to_batches("cpu bar=2 20\ncpu bar=3 30", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(0, 7), ingest_ts2, None, 150),
);
let _schema = validate_or_insert_schema(w2.tables(), &schema, txn.deref_mut())
let _schema = validate_or_insert_schema(w2.tables(), &schema, &*ingester.catalog)
.await
.unwrap()
.unwrap();
@ -458,12 +457,11 @@ mod tests {
lines_to_batches("a b=2 200", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(0, 9), ingest_ts2, None, 150),
);
let _schema = validate_or_insert_schema(w3.tables(), &schema, txn.deref_mut())
let _schema = validate_or_insert_schema(w3.tables(), &schema, &*ingester.catalog)
.await
.unwrap()
.unwrap();
ingester.write_buffer_state.push_write(w3);
txn.commit().await.unwrap();
// give the writes some time to go through the buffer. Exit once we've verified there's
// data in there from both writes.

View File

@ -16,7 +16,8 @@ use crate::interface::{
SequencerId, TableSchema, Transaction,
};
use interface::{ParquetFile, ProcessedTombstone, RepoCollection, Tombstone};
use futures::{future, stream, StreamExt, TryStreamExt};
use interface::{Catalog, ParquetFile, ProcessedTombstone, Tombstone};
use mutable_batch::MutableBatch;
use std::{borrow::Cow, collections::BTreeMap};
@ -44,12 +45,12 @@ pub mod postgres;
pub async fn validate_or_insert_schema<'a, T, U, R>(
tables: T,
schema: &NamespaceSchema,
repos: &mut R,
repos: &R,
) -> Result<Option<NamespaceSchema>>
where
T: IntoIterator<IntoIter = U, Item = (&'a str, &'a MutableBatch)> + Send + Sync,
U: Iterator<Item = T::Item> + Send,
R: RepoCollection + ?Sized,
R: Catalog + ?Sized,
{
let tables = tables.into_iter();
@ -70,10 +71,10 @@ async fn validate_mutable_batch<R>(
mb: &MutableBatch,
table_name: &str,
schema: &mut Cow<'_, NamespaceSchema>,
repos: &mut R,
catalog: &R,
) -> Result<()>
where
R: RepoCollection + ?Sized,
R: Catalog + ?Sized,
{
// Check if the table exists in the schema.
//
@ -82,6 +83,8 @@ where
let mut table = match schema.tables.get(table_name) {
Some(t) => Cow::Borrowed(t),
None => {
let mut repos = catalog.repositories().await;
// The table does not exist in the cached schema.
//
// Attempt to create the table in the catalog, or load an existing
@ -110,13 +113,15 @@ where
}
};
let table_id = table.id;
// The table is now in the schema (either by virtue of it already existing,
// or through adding it above).
//
// If the table itself needs to be updated during column validation it
// becomes a Cow::owned() copy and the modified copy should be inserted into
// the schema before returning.
let mut column_futures = Vec::default();
for (name, col) in mb.columns() {
// Check if the column exists in the cached schema.
//
@ -139,16 +144,40 @@ where
None => {
// The column does not exist in the cache, create/get it from
// the catalog, and add it to the table.
let column = repos
column_futures.push({
async {
catalog
.repositories()
.await
.columns()
.create_or_get(name.as_str(), table.id, ColumnType::from(col.influx_type()))
.await?;
table.to_mut().add_column(&column);
.create_or_get(
name.as_str(),
table_id,
ColumnType::from(col.influx_type()),
)
.await
}
});
}
};
}
// Execute up to a maximum of 3 column resolution futures at a time to hide
// the call latency while avoiding monopolisation of the underlying
// connection pool.
//
// Any error causes an early return, possibly after some column futures have
// been executed to completion.
stream::iter(column_futures)
.buffer_unordered(3)
.try_for_each(|col| {
future::ready({
table.to_mut().add_column(&col);
Ok(())
})
})
.await?;
if let Cow::Owned(table) = table {
// The table schema was mutated and needs inserting into the namespace
// schema to make the changes visible to the caller.
@ -248,11 +277,15 @@ mod tests {
use pretty_assertions::assert_eq;
const NAMESPACE_NAME: &str = "bananas";
let repo = MemCatalog::new();
let mut txn = repo.start_transaction().await.unwrap();
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, txn.deref_mut()).await.unwrap();
let catalog = MemCatalog::new();
let namespace = txn
let mut txn = catalog.start_transaction().await.unwrap();
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, txn.deref_mut()).await.unwrap();
txn.commit().await.expect("txn commit failed");
let namespace = catalog
.repositories()
.await
.namespaces()
.create(NAMESPACE_NAME, "inf", kafka_topic.id, query_pool.id)
.await
@ -275,7 +308,7 @@ mod tests {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp.as_str(), 42)
.expect("failed to build test writes from LP");
let got = validate_or_insert_schema(writes.iter().map(|(k, v)| (k.as_str(), v)), &schema, txn.deref_mut())
let got = validate_or_insert_schema(writes.iter().map(|(k, v)| (k.as_str(), v)), &schema, &catalog)
.await;
match got {
@ -295,7 +328,8 @@ mod tests {
// Invariant: in absence of concurrency, the schema within
// the database must always match the incrementally built
// cached schema.
let db_schema = get_schema_by_name(NAMESPACE_NAME, txn.deref_mut())
let mut repo = catalog.repositories().await;
let db_schema = get_schema_by_name(NAMESPACE_NAME, repo.deref_mut())
.await
.expect("database failed to query for namespace schema");
assert_eq!(schema, db_schema, "schema in DB and cached schema differ");

View File

@ -19,7 +19,7 @@ use sqlx_hotswap_pool::HotSwapPool;
use std::{sync::Arc, time::Duration};
use uuid::Uuid;
const MAX_CONNECTIONS: u32 = 5;
const MAX_CONNECTIONS: u32 = 10;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
/// the default schema name to use in Postgres

View File

@ -127,8 +127,6 @@ where
batches: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let mut repos = self.catalog.repositories().await;
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
let schema = self.cache.get_schema(namespace);
@ -137,6 +135,7 @@ where
None => {
// Pull the schema from the global catalog or error if it does
// not exist.
let mut repos = self.catalog.repositories().await;
let schema = get_schema_by_name(namespace, repos.deref_mut())
.await
.map_err(|e| {
@ -156,7 +155,7 @@ where
let maybe_new_schema = validate_or_insert_schema(
batches.iter().map(|(k, v)| (k.as_str(), v)),
&schema,
repos.deref_mut(),
&*self.catalog,
)
.await
.map_err(|e| {