Merge pull request #6003 from influxdata/dom/router-tableid

refactor(router): pass TableId through handler stack
pull/24376/head
kodiakhq[bot] 2022-11-02 10:31:56 +00:00 committed by GitHub
commit eed62c4709
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 91 additions and 43 deletions

View File

@ -289,12 +289,11 @@ pub async fn create_router_server_type(
let shard_service = init_shard_service(sharder, write_buffer_config, catalog).await?;
// Initialise the API delegates
let handler_stack = Arc::new(handler_stack);
let http = HttpDelegate::new(
common_state.run_config().max_http_request_size,
request_limit,
namespace_resolver,
Arc::clone(&handler_stack),
handler_stack,
&metrics,
);
let grpc = GrpcDelegate::new(schema_catalog, object_store, shard_service);

View File

@ -1,5 +1,7 @@
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate};
use data_types::{
DatabaseName, DeletePredicate, NamespaceId, PartitionKey, PartitionTemplate, TableId,
};
use hashbrown::HashMap;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use observability_deps::tracing::*;
@ -64,7 +66,7 @@ impl DmlHandler for Partitioner {
type WriteError = PartitionError;
type DeleteError = PartitionError;
type WriteInput = HashMap<String, MutableBatch>;
type WriteInput = HashMap<TableId, (String, MutableBatch)>;
type WriteOutput = Vec<Partitioned<Self::WriteInput>>;
/// Partition the per-table [`MutableBatch`].
@ -76,9 +78,10 @@ impl DmlHandler for Partitioner {
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
// A collection of partition-keyed, per-table MutableBatch instances.
let mut partitions: HashMap<PartitionKey, HashMap<_, MutableBatch>> = HashMap::default();
let mut partitions: HashMap<PartitionKey, HashMap<_, (String, MutableBatch)>> =
HashMap::default();
for (table_name, batch) in batch {
for (table_id, (table_name, batch)) in batch {
// Partition the table batch according to the configured partition
// template and write it into the partition-keyed map.
for (partition_key, partition_payload) in
@ -87,10 +90,12 @@ impl DmlHandler for Partitioner {
let partition = partitions.entry(partition_key).or_default();
let table_batch = partition
.raw_entry_mut()
.from_key(&table_name)
.or_insert_with(|| (table_name.to_owned(), MutableBatch::default()));
.from_key(&table_id)
.or_insert_with(|| {
(table_id, (table_name.to_owned(), MutableBatch::default()))
});
partition_payload.write_to_batch(table_batch.1)?;
partition_payload.write_to_batch(&mut table_batch.1 .1)?;
}
}
@ -119,9 +124,17 @@ mod tests {
use super::*;
/// The default timestamp applied to test LP if the write does not specify
/// one.
const DEFAULT_TIMESTAMP_NANOS: i64 = 42000000000000000;
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> HashMap<TableId, (String, MutableBatch)> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
writes
.into_iter()
.enumerate()
.map(|(i, (name, data))| (TableId::new(i as _), (name, data)))
.collect()
}
// Generate a test case that partitions "lp".
//
@ -144,7 +157,7 @@ mod tests {
let partitioner = Partitioner::new(partition_template);
let ns = DatabaseName::new("bananas").expect("valid db name");
let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIMESTAMP_NANOS).expect("failed to parse test LP");
let writes = lp_to_writes($lp);
let handler_ret = partitioner.write(&ns, NamespaceId::new(42), writes, None).await;
assert_matches!(handler_ret, $($want_handler_ret)+);
@ -156,8 +169,7 @@ mod tests {
// Extract the table names in this partition
let mut tables = partition
.payload
.keys()
.cloned()
.values().map(|v| v.0.clone())
.collect::<Vec<String>>();
tables.sort();

View File

@ -1,7 +1,7 @@
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema};
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NamespaceSchema, TableId};
use hashbrown::HashMap;
use iox_catalog::{
interface::{get_schema_by_name, Catalog, Error as CatalogError},
@ -146,8 +146,10 @@ where
type WriteError = SchemaError;
type DeleteError = SchemaError;
// Accepts a map of "TableName -> MutableBatch"
type WriteInput = HashMap<String, MutableBatch>;
type WriteOutput = Self::WriteInput;
// And returns a map of TableId -> (TableName, MutableBatch)
type WriteOutput = HashMap<TableId, (String, MutableBatch)>;
/// Validate the schema of all the writes in `batches`.
///
@ -270,18 +272,31 @@ where
// (before passing through the write) in order to allow subsequent,
// parallel requests to use it while waiting on this request to
// complete.
match maybe_new_schema {
let latest_schema = match maybe_new_schema {
Some(v) => {
// This call MAY overwrite a more-up-to-date cache entry if
// racing with another request for the same namespace, but the
// cache will eventually converge in subsequent requests.
self.cache.put_schema(namespace.clone(), v);
self.cache.put_schema(namespace.clone(), Arc::clone(&v));
trace!(%namespace, "schema cache updated");
v
}
None => {
trace!(%namespace, "schema unchanged");
schema
}
}
};
// Map the "TableName -> Data" into "(TableName, TableId) -> Data" for
// downstream handlers.
let batches = batches
.into_iter()
.map(|(name, data)| {
let id = latest_schema.tables.get(&name).unwrap().id;
(id, (name, data))
})
.collect();
Ok(batches)
}
@ -554,7 +569,11 @@ mod tests {
#[tokio::test]
async fn test_write_ok() {
let (catalog, _namespace) = test_setup().await;
let (catalog, namespace) = test_setup().await;
// Create the table so that there is a known ID that must be returned.
let want_id = namespace.create_table("bananas").await.table.id;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
@ -563,7 +582,7 @@ mod tests {
);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
handler
let got = handler
.write(&*NAMESPACE, NamespaceId::new(42), writes, None)
.await
.expect("request should succeed");
@ -573,6 +592,10 @@ mod tests {
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64);
assert_cache(&handler, "bananas", "time", ColumnType::Time);
// Validate the table ID mapping.
let (name, _data) = got.get(&want_id).expect("table not in output");
assert_eq!(name, "bananas");
}
#[tokio::test]

View File

@ -6,7 +6,7 @@ use std::{
};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString};
use data_types::{DatabaseName, DeletePredicate, NamespaceId, NonEmptyString, TableId};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::{stream::FuturesUnordered, StreamExt};
use hashbrown::HashMap;
@ -87,7 +87,7 @@ where
type WriteError = ShardError;
type DeleteError = ShardError;
type WriteInput = Partitioned<HashMap<String, MutableBatch>>;
type WriteInput = Partitioned<HashMap<TableId, (String, MutableBatch)>>;
type WriteOutput = Vec<DmlMeta>;
/// Shard `writes` and dispatch the resultant DML operations.
@ -98,25 +98,37 @@ where
writes: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, ShardError> {
let mut collated: HashMap<_, HashMap<String, MutableBatch>> = HashMap::new();
// Extract the partition key & DML writes.
let (partition_key, writes) = writes.into_parts();
// Sets of maps collated by destination shard for batching/merging of
// shard data.
let mut collated: HashMap<_, HashMap<String, MutableBatch>> = HashMap::new();
let mut table_ids: HashMap<_, HashMap<String, TableId>> = HashMap::new();
// Shard each entry in `writes` and collate them into one DML operation
// per shard to maximise the size of each write, and therefore increase
// the effectiveness of compression of ops in the write buffer.
for (table, batch) in writes.into_iter() {
let shard = self.sharder.shard(&table, namespace, &batch);
for (table_id, (table_name, batch)) in writes.into_iter() {
let shard = self.sharder.shard(&table_name, namespace, &batch);
let existing = collated
.entry(Arc::clone(&shard))
.or_default()
.insert(table_name.clone(), batch);
assert!(existing.is_none());
let existing = table_ids
.entry(shard)
.or_default()
.insert(table, batch.clone());
.insert(table_name.clone(), table_id);
assert!(existing.is_none());
}
// This will be used in a future PR, and eliminated in a dead code pass
// by LLVM in the meantime.
let _ = table_ids;
let iter = collated.into_iter().map(|(shard, batch)| {
let dml = DmlWrite::new(
namespace,
@ -226,9 +238,15 @@ mod tests {
use crate::dml_handlers::DmlHandler;
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> Partitioned<HashMap<String, MutableBatch>> {
fn lp_to_writes(lp: &str) -> Partitioned<HashMap<TableId, (String, MutableBatch)>> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
let writes = writes
.into_iter()
.enumerate()
.map(|(i, (name, data))| (TableId::new(i as _), (name, data)))
.collect();
Partitioned::new("key".into(), writes)
}

View File

@ -2,7 +2,7 @@
mod delete_predicate;
use std::{str::Utf8Error, sync::Arc, time::Instant};
use std::{str::Utf8Error, time::Instant};
use bytes::{Bytes, BytesMut};
use data_types::{org_and_bucket_to_database, OrgBucketMappingError};
@ -229,7 +229,7 @@ pub struct HttpDelegate<D, N, T = SystemProvider> {
max_request_bytes: usize,
time_provider: T,
namespace_resolver: N,
dml_handler: Arc<D>,
dml_handler: D,
// A request limiter to restrict the number of simultaneous requests this
// router services.
@ -259,7 +259,7 @@ impl<D, N> HttpDelegate<D, N, SystemProvider> {
max_request_bytes: usize,
max_requests: usize,
namespace_resolver: N,
dml_handler: Arc<D>,
dml_handler: D,
metrics: &metric::Registry,
) -> Self {
let write_metric_lines = metrics

View File

@ -1,7 +1,9 @@
use std::{collections::BTreeSet, iter, string::String, sync::Arc};
use assert_matches::assert_matches;
use data_types::{ColumnType, PartitionTemplate, QueryPoolId, ShardIndex, TemplatePart, TopicId};
use data_types::{
ColumnType, PartitionTemplate, QueryPoolId, ShardIndex, TableId, TemplatePart, TopicId,
};
use dml::DmlOperation;
use hashbrown::HashMap;
use hyper::{Body, Request, StatusCode};
@ -50,7 +52,7 @@ type HttpDelegateStack = HttpDelegate<
WriteSummaryAdapter<
FanOutAdaptor<
ShardedWriteBuffer<JumpHash<Arc<Shard>>>,
Vec<Partitioned<HashMap<String, MutableBatch>>>,
Vec<Partitioned<HashMap<TableId, (String, MutableBatch)>>>,
>,
>,
>,
@ -116,13 +118,7 @@ impl TestContext {
iox_catalog::INFINITE_RETENTION_POLICY.to_owned(),
);
let delegate = HttpDelegate::new(
1024,
100,
namespace_resolver,
Arc::new(handler_stack),
&metrics,
);
let delegate = HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics);
Self {
delegate,