diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 8c23987422..903bfb4150 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -289,12 +289,11 @@ pub async fn create_router_server_type( let shard_service = init_shard_service(sharder, write_buffer_config, catalog).await?; // Initialise the API delegates - let handler_stack = Arc::new(handler_stack); let http = HttpDelegate::new( common_state.run_config().max_http_request_size, request_limit, namespace_resolver, - Arc::clone(&handler_stack), + handler_stack, &metrics, ); let grpc = GrpcDelegate::new(schema_catalog, object_store, shard_service); diff --git a/router/src/dml_handlers/partitioner.rs b/router/src/dml_handlers/partitioner.rs index 0bd948b413..81ab22f115 100644 --- a/router/src/dml_handlers/partitioner.rs +++ b/router/src/dml_handlers/partitioner.rs @@ -1,5 +1,7 @@ use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate}; +use data_types::{ + DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate, TableId, +}; use hashbrown::HashMap; use mutable_batch::{MutableBatch, PartitionWrite, WritePayload}; use observability_deps::tracing::*; @@ -64,7 +66,7 @@ impl DmlHandler for Partitioner { type WriteError = PartitionError; type DeleteError = PartitionError; - type WriteInput = HashMap; + type WriteInput = HashMap; type WriteOutput = Vec>; /// Partition the per-table [`MutableBatch`]. @@ -76,9 +78,10 @@ impl DmlHandler for Partitioner { _span_ctx: Option, ) -> Result { // A collection of partition-keyed, per-table MutableBatch instances. - let mut partitions: HashMap> = HashMap::default(); + let mut partitions: HashMap> = + HashMap::default(); - for (table_name, batch) in batch { + for (table_id, (table_name, batch)) in batch { // Partition the table batch according to the configured partition // template and write it into the partition-keyed map. for (partition_key, partition_payload) in @@ -87,10 +90,12 @@ impl DmlHandler for Partitioner { let partition = partitions.entry(partition_key).or_default(); let table_batch = partition .raw_entry_mut() - .from_key(&table_name) - .or_insert_with(|| (table_name.to_owned(), MutableBatch::default())); + .from_key(&table_id) + .or_insert_with(|| { + (table_id, (table_name.to_owned(), MutableBatch::default())) + }); - partition_payload.write_to_batch(table_batch.1)?; + partition_payload.write_to_batch(&mut table_batch.1 .1)?; } } @@ -119,9 +124,17 @@ mod tests { use super::*; - /// The default timestamp applied to test LP if the write does not specify - /// one. - const DEFAULT_TIMESTAMP_NANOS: i64 = 42000000000000000; + // Parse `lp` into a table-keyed MutableBatch map. + fn lp_to_writes(lp: &str) -> HashMap { + let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) + .expect("failed to build test writes from LP"); + + writes + .into_iter() + .enumerate() + .map(|(i, (name, data))| (TableId::new(i as _), (name, data))) + .collect() + } // Generate a test case that partitions "lp". // @@ -144,7 +157,7 @@ mod tests { let partitioner = Partitioner::new(partition_template); let ns = DatabaseName::new("bananas").expect("valid db name"); - let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIMESTAMP_NANOS).expect("failed to parse test LP"); + let writes = lp_to_writes($lp); let handler_ret = partitioner.write(&ns, NamespaceId::new(42), writes, None).await; assert_matches!(handler_ret, $($want_handler_ret)+); @@ -156,8 +169,7 @@ mod tests { // Extract the table names in this partition let mut tables = partition .payload - .keys() - .cloned() + .values().map(|v| v.0.clone()) .collect::>(); tables.sort(); diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index 8a2660292a..55652aa114 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -1,7 +1,7 @@ use std::{ops::DerefMut, sync::Arc}; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema, TableId}; use hashbrown::HashMap; use iox_catalog::{ interface::{get_schema_by_name, Catalog, Error as CatalogError}, @@ -146,8 +146,10 @@ where type WriteError = SchemaError; type DeleteError = SchemaError; + // Accepts a map of "TableName -> MutableBatch" type WriteInput = HashMap; - type WriteOutput = Self::WriteInput; + // And returns a map of TableId -> (TableName, MutableBatch) + type WriteOutput = HashMap; /// Validate the schema of all the writes in `batches`. /// @@ -270,18 +272,31 @@ where // (before passing through the write) in order to allow subsequent, // parallel requests to use it while waiting on this request to // complete. - match maybe_new_schema { + let latest_schema = match maybe_new_schema { Some(v) => { // This call MAY overwrite a more-up-to-date cache entry if // racing with another request for the same namespace, but the // cache will eventually converge in subsequent requests. - self.cache.put_schema(namespace.clone(), v); + self.cache.put_schema(namespace.clone(), Arc::clone(&v)); trace!(%namespace, "schema cache updated"); + v } None => { trace!(%namespace, "schema unchanged"); + schema } - } + }; + + // Map the "TableName -> Data" into "(TableName, TableId) -> Data" for + // downstream handlers. + let batches = batches + .into_iter() + .map(|(name, data)| { + let id = latest_schema.tables.get(&name).unwrap().id; + + (id, (name, data)) + }) + .collect(); Ok(batches) } @@ -554,7 +569,11 @@ mod tests { #[tokio::test] async fn test_write_ok() { - let (catalog, _namespace) = test_setup().await; + let (catalog, namespace) = test_setup().await; + + // Create the table so that there is a known ID that must be returned. + let want_id = namespace.create_table("bananas").await.table.id; + let metrics = Arc::new(metric::Registry::default()); let handler = SchemaValidator::new( catalog.catalog(), @@ -563,7 +582,7 @@ mod tests { ); let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); - handler + let got = handler .write(&*NAMESPACE, NamespaceId::new(42), writes, None) .await .expect("request should succeed"); @@ -573,6 +592,10 @@ mod tests { assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); assert_cache(&handler, "bananas", "val", ColumnType::I64); assert_cache(&handler, "bananas", "time", ColumnType::Time); + + // Validate the table ID mapping. + let (name, _data) = got.get(&want_id).expect("table not in output"); + assert_eq!(name, "bananas"); } #[tokio::test] diff --git a/router/src/dml_handlers/sharded_write_buffer.rs b/router/src/dml_handlers/sharded_write_buffer.rs index 11ef89b132..c03db24030 100644 --- a/router/src/dml_handlers/sharded_write_buffer.rs +++ b/router/src/dml_handlers/sharded_write_buffer.rs @@ -6,7 +6,7 @@ use std::{ }; use async_trait::async_trait; -use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString}; +use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString, TableId}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use futures::{stream::FuturesUnordered, StreamExt}; use hashbrown::HashMap; @@ -87,7 +87,7 @@ where type WriteError = ShardError; type DeleteError = ShardError; - type WriteInput = Partitioned>; + type WriteInput = Partitioned>; type WriteOutput = Vec; /// Shard `writes` and dispatch the resultant DML operations. @@ -98,25 +98,37 @@ where writes: Self::WriteInput, span_ctx: Option, ) -> Result { - let mut collated: HashMap<_, HashMap> = HashMap::new(); - // Extract the partition key & DML writes. let (partition_key, writes) = writes.into_parts(); + // Sets of maps collated by destination shard for batching/merging of + // shard data. + let mut collated: HashMap<_, HashMap> = HashMap::new(); + let mut table_ids: HashMap<_, HashMap> = HashMap::new(); + // Shard each entry in `writes` and collate them into one DML operation // per shard to maximise the size of each write, and therefore increase // the effectiveness of compression of ops in the write buffer. - for (table, batch) in writes.into_iter() { - let shard = self.sharder.shard(&table, namespace, &batch); + for (table_id, (table_name, batch)) in writes.into_iter() { + let shard = self.sharder.shard(&table_name, namespace, &batch); let existing = collated + .entry(Arc::clone(&shard)) + .or_default() + .insert(table_name.clone(), batch); + assert!(existing.is_none()); + + let existing = table_ids .entry(shard) .or_default() - .insert(table, batch.clone()); - + .insert(table_name.clone(), table_id); assert!(existing.is_none()); } + // This will be used in a future PR, and eliminated in a dead code pass + // by LLVM in the meantime. + let _ = table_ids; + let iter = collated.into_iter().map(|(shard, batch)| { let dml = DmlWrite::new( namespace, @@ -226,9 +238,15 @@ mod tests { use crate::dml_handlers::DmlHandler; // Parse `lp` into a table-keyed MutableBatch map. - fn lp_to_writes(lp: &str) -> Partitioned> { + fn lp_to_writes(lp: &str) -> Partitioned> { let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) .expect("failed to build test writes from LP"); + + let writes = writes + .into_iter() + .enumerate() + .map(|(i, (name, data))| (TableId::new(i as _), (name, data))) + .collect(); Partitioned::new("key".into(), writes) } diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 8206e7d783..56c204021a 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -2,7 +2,7 @@ mod delete_predicate; -use std::{str::Utf8Error, sync::Arc, time::Instant}; +use std::{str::Utf8Error, time::Instant}; use bytes::{Bytes, BytesMut}; use data_types::{org_and_bucket_to_database, OrgBucketMappingError}; @@ -229,7 +229,7 @@ pub struct HttpDelegate { max_request_bytes: usize, time_provider: T, namespace_resolver: N, - dml_handler: Arc, + dml_handler: D, // A request limiter to restrict the number of simultaneous requests this // router services. @@ -259,7 +259,7 @@ impl HttpDelegate { max_request_bytes: usize, max_requests: usize, namespace_resolver: N, - dml_handler: Arc, + dml_handler: D, metrics: &metric::Registry, ) -> Self { let write_metric_lines = metrics diff --git a/router/tests/http.rs b/router/tests/http.rs index f4698c766a..8945c427b0 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -1,7 +1,9 @@ use std::{collections::BTreeSet, iter, string::String, sync::Arc}; use assert_matches::assert_matches; -use data_types::{ColumnType, PartitionTemplate, QueryPoolId, ShardIndex, TemplatePart, TopicId}; +use data_types::{ + ColumnType, PartitionTemplate, QueryPoolId, ShardIndex, TableId, TemplatePart, TopicId, +}; use dml::DmlOperation; use hashbrown::HashMap; use hyper::{Body, Request, StatusCode}; @@ -50,7 +52,7 @@ type HttpDelegateStack = HttpDelegate< WriteSummaryAdapter< FanOutAdaptor< ShardedWriteBuffer>>, - Vec>>, + Vec>>, >, >, >, @@ -116,13 +118,7 @@ impl TestContext { iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), ); - let delegate = HttpDelegate::new( - 1024, - 100, - namespace_resolver, - Arc::new(handler_stack), - &metrics, - ); + let delegate = HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics); Self { delegate,