refactor(router): pass TableId through DmlHandlers

Changes the DML handler transformers to pass through the TableId once it
has been resolved during schema validation.

This value is collated by shard, and then unused. This collated TableId
map will be used in a follow-up PR.
pull/24376/head
Dom Dwyer 2022-10-27 10:14:20 +02:00
parent 2e74727baf
commit b78433bd5d
4 changed files with 86 additions and 31 deletions

View File

@ -1,5 +1,7 @@
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate};
use data_types::{
DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate, TableId,
};
use hashbrown::HashMap;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use observability_deps::tracing::*;
@ -64,7 +66,7 @@ impl DmlHandler for Partitioner {
type WriteError = PartitionError;
type DeleteError = PartitionError;
type WriteInput = HashMap<String, MutableBatch>;
type WriteInput = HashMap<TableId, (String, MutableBatch)>;
type WriteOutput = Vec<Partitioned<Self::WriteInput>>;
/// Partition the per-table [`MutableBatch`].
@ -76,9 +78,10 @@ impl DmlHandler for Partitioner {
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
// A collection of partition-keyed, per-table MutableBatch instances.
let mut partitions: HashMap<PartitionKey, HashMap<_, MutableBatch>> = HashMap::default();
let mut partitions: HashMap<PartitionKey, HashMap<_, (String, MutableBatch)>> =
HashMap::default();
for (table_name, batch) in batch {
for (table_id, (table_name, batch)) in batch {
// Partition the table batch according to the configured partition
// template and write it into the partition-keyed map.
for (partition_key, partition_payload) in
@ -87,10 +90,12 @@ impl DmlHandler for Partitioner {
let partition = partitions.entry(partition_key).or_default();
let table_batch = partition
.raw_entry_mut()
.from_key(&table_name)
.or_insert_with(|| (table_name.to_owned(), MutableBatch::default()));
.from_key(&table_id)
.or_insert_with(|| {
(table_id, (table_name.to_owned(), MutableBatch::default()))
});
partition_payload.write_to_batch(table_batch.1)?;
partition_payload.write_to_batch(&mut table_batch.1 .1)?;
}
}
@ -119,9 +124,17 @@ mod tests {
use super::*;
/// The default timestamp applied to test LP if the write does not specify
/// one.
const DEFAULT_TIMESTAMP_NANOS: i64 = 42000000000000000;
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> HashMap<TableId, (String, MutableBatch)> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
writes
.into_iter()
.enumerate()
.map(|(i, (name, data))| (TableId::new(i as _), (name, data)))
.collect()
}
// Generate a test case that partitions "lp".
//
@ -144,7 +157,7 @@ mod tests {
let partitioner = Partitioner::new(partition_template);
let ns = DatabaseName::new("bananas").expect("valid db name");
let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIMESTAMP_NANOS).expect("failed to parse test LP");
let writes = lp_to_writes($lp);
let handler_ret = partitioner.write(&ns, NamespaceId::new(42), writes, None).await;
assert_matches!(handler_ret, $($want_handler_ret)+);
@ -156,8 +169,7 @@ mod tests {
// Extract the table names in this partition
let mut tables = partition
.payload
.keys()
.cloned()
.values().map(|v| v.0.clone())
.collect::<Vec<String>>();
tables.sort();

View File

@ -1,7 +1,7 @@
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema};
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema, TableId};
use hashbrown::HashMap;
use iox_catalog::{
interface::{get_schema_by_name, Catalog, Error as CatalogError},
@ -146,8 +146,10 @@ where
type WriteError = SchemaError;
type DeleteError = SchemaError;
// Accepts a map of "TableName -> MutableBatch"
type WriteInput = HashMap<String, MutableBatch>;
type WriteOutput = Self::WriteInput;
// And returns a map of TableId -> (TableName, MutableBatch)
type WriteOutput = HashMap<TableId, (String, MutableBatch)>;
/// Validate the schema of all the writes in `batches`.
///
@ -270,18 +272,31 @@ where
// (before passing through the write) in order to allow subsequent,
// parallel requests to use it while waiting on this request to
// complete.
match maybe_new_schema {
let latest_schema = match maybe_new_schema {
Some(v) => {
// This call MAY overwrite a more-up-to-date cache entry if
// racing with another request for the same namespace, but the
// cache will eventually converge in subsequent requests.
self.cache.put_schema(namespace.clone(), v);
self.cache.put_schema(namespace.clone(), Arc::clone(&v));
trace!(%namespace, "schema cache updated");
v
}
None => {
trace!(%namespace, "schema unchanged");
schema
}
}
};
// Map the "TableName -> Data" into "(TableName, TableId) -> Data" for
// downstream handlers.
let batches = batches
.into_iter()
.map(|(name, data)| {
let id = latest_schema.tables.get(&name).unwrap().id;
(id, (name, data))
})
.collect();
Ok(batches)
}
@ -554,7 +569,11 @@ mod tests {
#[tokio::test]
async fn test_write_ok() {
let (catalog, _namespace) = test_setup().await;
let (catalog, namespace) = test_setup().await;
// Create the table so that the is a known ID that must be returned.
let want_id = namespace.create_table("bananas").await.table.id;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
@ -563,7 +582,7 @@ mod tests {
);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
handler
let got = handler
.write(&*NAMESPACE, NamespaceId::new(42), writes, None)
.await
.expect("request should succeed");
@ -573,6 +592,10 @@ mod tests {
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64);
assert_cache(&handler, "bananas", "time", ColumnType::Time);
// Validate the table ID mapping.
let (name, _data) = got.get(&want_id).expect("table not in output");
assert_eq!(name, "bananas");
}
#[tokio::test]

View File

@ -6,7 +6,7 @@ use std::{
};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString};
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString, TableId};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::{stream::FuturesUnordered, StreamExt};
use hashbrown::HashMap;
@ -87,7 +87,7 @@ where
type WriteError = ShardError;
type DeleteError = ShardError;
type WriteInput = Partitioned<HashMap<String, MutableBatch>>;
type WriteInput = Partitioned<HashMap<TableId, (String, MutableBatch)>>;
type WriteOutput = Vec<DmlMeta>;
/// Shard `writes` and dispatch the resultant DML operations.
@ -98,25 +98,37 @@ where
writes: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, ShardError> {
let mut collated: HashMap<_, HashMap<String, MutableBatch>> = HashMap::new();
// Extract the partition key & DML writes.
let (partition_key, writes) = writes.into_parts();
// Sets of maps collated by destination shard for batching/merging of
// shard data.
let mut collated: HashMap<_, HashMap<String, MutableBatch>> = HashMap::new();
let mut table_ids: HashMap<_, HashMap<String, TableId>> = HashMap::new();
// Shard each entry in `writes` and collate them into one DML operation
// per shard to maximise the size of each write, and therefore increase
// the effectiveness of compression of ops in the write buffer.
for (table, batch) in writes.into_iter() {
let shard = self.sharder.shard(&table, namespace, &batch);
for (table_id, (table_name, batch)) in writes.into_iter() {
let shard = self.sharder.shard(&table_name, namespace, &batch);
let existing = collated
.entry(Arc::clone(&shard))
.or_default()
.insert(table_name.clone(), batch);
assert!(existing.is_none());
let existing = table_ids
.entry(shard)
.or_default()
.insert(table, batch.clone());
.insert(table_name.clone(), table_id);
assert!(existing.is_none());
}
// This will be used in a future PR, and eliminated in a dead code pass
// by LLVM in the meantime.
let _ = table_ids;
let iter = collated.into_iter().map(|(shard, batch)| {
let dml = DmlWrite::new(
namespace,
@ -226,9 +238,15 @@ mod tests {
use crate::dml_handlers::DmlHandler;
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> Partitioned<HashMap<String, MutableBatch>> {
fn lp_to_writes(lp: &str) -> Partitioned<HashMap<TableId, (String, MutableBatch)>> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
let writes = writes
.into_iter()
.enumerate()
.map(|(i, (name, data))| (TableId::new(i as _), (name, data)))
.collect();
Partitioned::new("key".into(), writes)
}

View File

@ -1,7 +1,9 @@
use std::{collections::BTreeSet, iter, string::String, sync::Arc};
use assert_matches::assert_matches;
use data_types::{ColumnType, PartitionTemplate, QueryPoolId, ShardIndex, TemplatePart, TopicId};
use data_types::{
ColumnType, PartitionTemplate, QueryPoolId, ShardIndex, TableId, TemplatePart, TopicId,
};
use dml::DmlOperation;
use hashbrown::HashMap;
use hyper::{Body, Request, StatusCode};
@ -50,7 +52,7 @@ type HttpDelegateStack = HttpDelegate<
WriteSummaryAdapter<
FanOutAdaptor<
ShardedWriteBuffer<JumpHash<Arc<Shard>>>,
Vec<Partitioned<HashMap<String, MutableBatch>>>,
Vec<Partitioned<HashMap<TableId, (String, MutableBatch)>>>,
>,
>,
>,