Merge pull request #6170 from influxdata/cn/remove-table-name

fix: Remove table names from DmlWrite
pull/24376/head
kodiakhq[bot] 2022-11-18 15:48:45 +00:00 committed by GitHub
commit 1a68da02ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 241 additions and 343 deletions

1
Cargo.lock generated
View File

@ -4048,6 +4048,7 @@ dependencies = [
"dml",
"futures",
"generated_types",
"hashbrown 0.13.1",
"influxdb_iox_client",
"ingester",
"iox_catalog",

View File

@ -177,9 +177,8 @@ impl From<DmlDelete> for DmlOperation {
pub struct DmlWrite {
/// The namespace being written to
namespace_id: NamespaceId,
/// Writes to individual tables keyed by table name
tables: HashMap<String, MutableBatch>,
table_ids: HashMap<String, TableId>,
/// Writes to individual tables keyed by table ID
table_ids: HashMap<TableId, MutableBatch>,
/// Write metadata
meta: DmlMeta,
min_timestamp: i64,
@ -195,22 +194,19 @@ impl DmlWrite {
///
/// Panics if
///
/// - `tables` is empty
/// - `table_ids` is empty
/// - a MutableBatch is empty
/// - a MutableBatch lacks an i64 "time" column
pub fn new(
namespace_id: NamespaceId,
tables: HashMap<String, MutableBatch>,
table_ids: HashMap<String, TableId>,
table_ids: HashMap<TableId, MutableBatch>,
partition_key: PartitionKey,
meta: DmlMeta,
) -> Self {
assert_ne!(tables.len(), 0);
// A simple (and incomplete) sanity check.
assert_eq!(tables.len(), table_ids.len());
assert_ne!(table_ids.len(), 0);
let mut stats = StatValues::new_empty();
for (table_name, table) in &tables {
for (table_id, table) in &table_ids {
match table
.column(schema::TIME_COLUMN_NAME)
.expect("time")
@ -219,14 +215,13 @@ impl DmlWrite {
Statistics::I64(col_stats) => stats.update_from(&col_stats),
s => unreachable!(
"table \"{}\" has unexpected type for time column: {}",
table_name,
table_id,
s.type_name()
),
};
}
Self {
tables,
table_ids,
partition_key,
meta,
@ -248,33 +243,23 @@ impl DmlWrite {
/// Returns an iterator over the per-table writes within this [`DmlWrite`]
/// in no particular order
pub fn tables(&self) -> impl Iterator<Item = (&str, &MutableBatch)> + '_ {
self.tables.iter().map(|(k, v)| (k.as_str(), v))
pub fn tables(&self) -> impl Iterator<Item = (&TableId, &MutableBatch)> + '_ {
self.table_ids.iter()
}
/// Consumes `self`, returning an iterator of the table data, name & ID
/// contained within it.
pub fn into_tables(self) -> impl Iterator<Item = (String, TableId, MutableBatch)> {
self.tables.into_iter().map(move |(name, data)| {
(
name.clone(),
*self
.table_ids
.get(&name)
.expect("no table ID found for table"),
data,
)
})
/// Consumes `self`, returning an iterator of the table ID and data contained within it.
pub fn into_tables(self) -> impl Iterator<Item = (TableId, MutableBatch)> {
self.table_ids.into_iter()
}
/// Gets the write for a given table
pub fn table(&self, name: &str) -> Option<&MutableBatch> {
self.tables.get(name)
pub fn table(&self, id: &TableId) -> Option<&MutableBatch> {
self.table_ids.get(id)
}
/// Returns the number of tables within this write
pub fn table_count(&self) -> usize {
self.tables.len()
self.table_ids.len()
}
/// Returns the minimum timestamp in the write
@ -292,15 +277,10 @@ impl DmlWrite {
/// This includes `Self`.
pub fn size(&self) -> usize {
std::mem::size_of::<Self>()
+ self
.tables
.iter()
.map(|(k, v)| std::mem::size_of_val(k) + k.capacity() + v.size())
.sum::<usize>()
+ self
.table_ids
.keys()
.map(|k| std::mem::size_of_val(k) + k.capacity() + std::mem::size_of::<TableId>())
.values()
.map(|v| std::mem::size_of::<TableId>() + v.size())
.sum::<usize>()
+ self.meta.size()
+ std::mem::size_of::<NamespaceId>()
@ -313,11 +293,6 @@ impl DmlWrite {
&self.partition_key
}
/// Return the map of [`TableId`] to table names for this batch.
pub fn table_id(&self, name: &str) -> Option<TableId> {
self.table_ids.get(name).cloned()
}
/// Return the [`NamespaceId`] to which this [`DmlWrite`] should be applied.
pub fn namespace_id(&self) -> NamespaceId {
self.namespace_id
@ -430,14 +405,14 @@ pub mod test_util {
assert_eq!(a.table_count(), b.table_count());
for (table_name, a_batch) in a.tables() {
let b_batch = b.table(table_name).expect("table not found");
for (table_id, a_batch) in a.tables() {
let b_batch = b.table(table_id).expect("table not found");
assert_eq!(
pretty_format_batches(&[a_batch.to_arrow(Projection::All).unwrap()]).unwrap(),
pretty_format_batches(&[b_batch.to_arrow(Projection::All).unwrap()]).unwrap(),
"batches for table \"{}\" differ",
table_name
table_id
);
}
}

View File

@ -24,7 +24,9 @@ message DatabaseBatch {
}
message TableBatch {
string table_name = 1;
// Was the table name for this data.
reserved "table_name";
reserved 1;
// The catalog ID for this table.
int64 table_id = 4;
@ -59,7 +61,6 @@ message PackedStrings {
repeated uint32 offsets = 2;
}
// A dictionary containing a list of string values combined with a values array of
// indexes into this dictionary
//

View File

@ -760,10 +760,16 @@ mod tests {
namespace.id,
&table1,
);
validate_or_insert_schema(table1_write.tables(), &schema, repos.deref_mut())
.await
.unwrap()
.unwrap();
validate_or_insert_schema(
table1_write
.tables()
.map(|(_id, batch)| (table1.name.as_str(), batch)),
&schema,
repos.deref_mut(),
)
.await
.unwrap()
.unwrap();
let table2_write = Self::arbitrary_write_with_seq_num_at_time(
1,
@ -773,10 +779,16 @@ mod tests {
namespace.id,
&table2,
);
validate_or_insert_schema(table2_write.tables(), &schema, repos.deref_mut())
.await
.unwrap()
.unwrap();
validate_or_insert_schema(
table2_write
.tables()
.map(|(_id, batch)| (table2.name.as_str(), batch)),
&schema,
repos.deref_mut(),
)
.await
.unwrap()
.unwrap();
(namespace, table1, table2, shard1, shard2)
};

View File

@ -184,7 +184,7 @@ impl NamespaceData {
// Extract the partition key derived by the router.
let partition_key = write.partition_key().clone();
for (_table_name, table_id, b) in write.into_tables() {
for (table_id, b) in write.into_tables() {
// Grab a reference to the table data, or insert a new
// TableData for it.
let table_data = self.tables.get_or_insert_with(&table_id, || {

View File

@ -395,19 +395,16 @@ impl<T> Drop for IngestHandlerImpl<T> {
#[cfg(test)]
mod tests {
use std::{num::NonZeroU32, ops::DerefMut};
use data_types::{Namespace, NamespaceId, NamespaceSchema, Sequence, SequenceNumber, TableId};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
use iox_time::Time;
use mutable_batch_lp::lines_to_batches;
use super::*;
use crate::test_util::make_write_op;
use data_types::{Namespace, NamespaceId, PartitionKey, SequenceNumber, TableId};
use dml::DmlWrite;
use iox_catalog::mem::MemCatalog;
use object_store::memory::InMemory;
use std::num::NonZeroU32;
use test_helpers::maybe_start_logging;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use super::*;
#[tokio::test]
async fn test_shutdown() {
let (ingester, _, _) = ingester_test_setup(vec![], 0, true).await;
@ -498,19 +495,7 @@ mod tests {
let write_buffer_state =
MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap());
let schema = NamespaceSchema::new(
namespace.id,
topic.id,
query_pool.id,
namespace.max_columns_per_table,
namespace.retention_period_ns,
);
for write_operation in write_operations {
validate_or_insert_schema(write_operation.tables(), &schema, txn.deref_mut())
.await
.unwrap()
.unwrap();
write_buffer_state.push_write(write_operation);
}
txn.commit().await.unwrap();
@ -545,24 +530,36 @@ mod tests {
(ingester, shard, namespace)
}
fn dml_write(table_name: &str, sequence_number: i64) -> DmlWrite {
let partition_key = PartitionKey::from("1970-01-01");
let shard_index = ShardIndex::new(0);
let namespace_id = NamespaceId::new(1);
let table_id = TableId::new(1);
let timestamp = 42;
make_write_op(
&partition_key,
shard_index,
namespace_id,
table_name,
table_id,
sequence_number,
&format!(
"{} foo=1 {}\n{} foo=2 {}",
table_name,
timestamp,
table_name,
timestamp + 10
),
)
}
#[tokio::test]
#[should_panic(expected = "JoinError::Panic")]
async fn sequence_number_no_longer_exists() {
maybe_start_logging();
let ingest_ts1 = Time::from_timestamp_millis(42).unwrap();
let write_operations = vec![DmlWrite::new(
NamespaceId::new(1),
lines_to_batches("cpu bar=2 20", 0).unwrap(),
[("cpu".to_string(), TableId::new(1))].into_iter().collect(),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(10)),
ingest_ts1,
None,
150,
),
)];
let write_operations = vec![dml_write("cpu", 10)];
let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 2, false).await;
tokio::time::timeout(Duration::from_millis(1000), ingester.join())
@ -577,19 +574,8 @@ mod tests {
async fn sequence_number_after_watermark() {
maybe_start_logging();
let ingest_ts1 = Time::from_timestamp_millis(42).unwrap();
let write_operations = vec![DmlWrite::new(
NamespaceId::new(1),
lines_to_batches("cpu bar=2 20", 0).unwrap(),
[("cpu".to_string(), TableId::new(1))].into_iter().collect(),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(2)),
ingest_ts1,
None,
150,
),
)];
let write_operations = vec![dml_write("cpu", 2)];
let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 10, false).await;
tokio::time::timeout(Duration::from_millis(1100), ingester.join())
@ -604,19 +590,8 @@ mod tests {
async fn sequence_number_after_watermark_skip_to_oldest_available() {
maybe_start_logging();
let ingest_ts1 = Time::from_timestamp_millis(42).unwrap();
let write_operations = vec![DmlWrite::new(
NamespaceId::new(1),
lines_to_batches("cpu bar=2 20", 0).unwrap(),
[("cpu".to_string(), TableId::new(1))].into_iter().collect(),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(2)),
ingest_ts1,
None,
150,
),
)];
let write_operations = vec![dml_write("cpu", 2)];
let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 10, true).await;
tokio::time::timeout(Duration::from_millis(1100), ingester.join())

View File

@ -1,21 +1,19 @@
//! A handler of streamed ops from a write buffer.
use std::{fmt::Debug, time::Duration};
use data_types::{SequenceNumber, ShardId, ShardIndex};
use dml::DmlOperation;
use futures::{pin_mut, FutureExt, StreamExt};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationCounter, DurationGauge, U64Counter};
use observability_deps::tracing::*;
use tokio_util::sync::CancellationToken;
use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler};
use super::DmlSink;
use crate::{
data::DmlApplyAction,
lifecycle::{LifecycleHandle, LifecycleHandleImpl},
};
use data_types::{SequenceNumber, ShardId, ShardIndex};
use dml::DmlOperation;
use futures::{pin_mut, FutureExt, StreamExt};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationCounter, DurationGauge, U64Counter};
use observability_deps::tracing::*;
use std::{fmt::Debug, time::Duration};
use tokio_util::sync::CancellationToken;
use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler};
/// When the [`LifecycleManager`] indicates that ingest should be paused because
/// of memory pressure, the shard will loop, sleeping this long between
@ -510,8 +508,11 @@ fn metric_attrs(
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::{
lifecycle::{LifecycleConfig, LifecycleManager},
stream_handler::mock_sink::MockDmlSink,
};
use assert_matches::assert_matches;
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, Sequence, TableId, TimestampRange};
@ -521,17 +522,12 @@ mod tests {
use metric::Metric;
use mutable_batch_lp::lines_to_batches;
use once_cell::sync::Lazy;
use std::sync::Arc;
use test_helpers::timeout::FutureTimeout;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use write_buffer::core::WriteBufferError;
use super::*;
use crate::{
lifecycle::{LifecycleConfig, LifecycleManager},
stream_handler::mock_sink::MockDmlSink,
};
static TEST_TIME: Lazy<Time> = Lazy::new(|| SystemProvider::default().now());
static TEST_SHARD_INDEX: ShardIndex = ShardIndex::new(42);
static TEST_TOPIC_NAME: &str = "topic_name";
@ -539,10 +535,10 @@ mod tests {
// Return a DmlWrite with the given namespace ID and a single table.
fn make_write(namespace_id: i64, write_time: u64) -> DmlWrite {
let tables = lines_to_batches("bananas level=42 4242", 0).unwrap();
let ids = tables
.keys()
let tables_by_ids = tables
.into_iter()
.enumerate()
.map(|(i, v)| (v.clone(), TableId::new(i as _)))
.map(|(i, (_k, v))| (TableId::new(i as _), v))
.collect();
let sequence = DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(2)),
@ -554,8 +550,7 @@ mod tests {
);
DmlWrite::new(
NamespaceId::new(namespace_id),
tables,
ids,
tables_by_ids,
"1970-01-01".into(),
sequence,
)

View File

@ -236,8 +236,10 @@ where
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::stream_handler::{
mock_sink::MockDmlSink, mock_watermark_fetcher::MockWatermarkFetcher,
};
use assert_matches::assert_matches;
use data_types::{NamespaceId, Sequence, SequenceNumber, ShardId, TableId};
use dml::{DmlMeta, DmlWrite};
@ -245,13 +247,9 @@ mod tests {
use metric::{Metric, MetricObserver, Observation};
use mutable_batch_lp::lines_to_batches;
use once_cell::sync::Lazy;
use std::sync::Arc;
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
use super::*;
use crate::stream_handler::{
mock_sink::MockDmlSink, mock_watermark_fetcher::MockWatermarkFetcher,
};
/// The shard index the [`SinkInstrumentation`] under test is configured to
/// be observing for.
const SHARD_INDEX: ShardIndex = ShardIndex::new(42);
@ -272,12 +270,18 @@ mod tests {
/// Return a DmlWrite with the given metadata and a single table.
fn make_write(meta: DmlMeta) -> DmlWrite {
let tables = lines_to_batches("bananas level=42 4242", 0).unwrap();
let ids = tables
.keys()
let tables_by_ids = tables
.into_iter()
.enumerate()
.map(|(i, v)| (v.clone(), TableId::new(i as _)))
.map(|(i, (_k, v))| (TableId::new(i as _), v))
.collect();
DmlWrite::new(NamespaceId::new(42), tables, ids, "1970-01-01".into(), meta)
DmlWrite::new(
NamespaceId::new(42),
tables_by_ids,
"1970-01-01".into(),
meta,
)
}
/// Extract the metric with the given name from `metrics`.

View File

@ -594,15 +594,15 @@ pub(crate) fn make_write_op(
sequence_number: i64,
lines: &str,
) -> DmlWrite {
let tables = lines_to_batches(lines, 0).unwrap();
assert_eq!(tables.len(), 1);
assert!(tables.get(table_name).is_some());
let mut tables_by_name = lines_to_batches(lines, 0).unwrap();
assert_eq!(tables_by_name.len(), 1);
let ids = [(table_name.into(), table_id)].into_iter().collect();
let tables_by_id = [(table_id, tables_by_name.remove(table_name).unwrap())]
.into_iter()
.collect();
DmlWrite::new(
namespace_id,
tables,
ids,
tables_by_id,
partition_key.clone(),
DmlMeta::sequenced(
Sequence {

View File

@ -210,49 +210,6 @@ impl TestContext {
ns
}
/// Enqueue the specified `op` into the write buffer for the ingester to
/// consume.
///
/// This call takes care of validating the schema of `op` and populating the
/// catalog with any new schema elements.
///
/// # Panics
///
/// This method panics if the namespace for `op` does not exist, or the
/// schema is invalid or conflicts with the existing namespace schema.
#[track_caller]
pub async fn enqueue_write(&mut self, op: DmlWrite) -> SequenceNumber {
let schema = self
.namespaces
.get_mut(&op.namespace_id())
.expect("namespace does not exist");
// Pull the sequence number out of the op to return it back to the user
// for simplicity.
let offset = op
.meta()
.sequence()
.expect("write must be sequenced")
.sequence_number;
// Perform schema validation, populating the catalog.
let mut repo = self.catalog.repositories().await;
if let Some(new) = validate_or_insert_schema(op.tables(), schema, repo.as_mut())
.await
.expect("failed schema validation for enqueuing write")
{
// Retain the updated schema.
debug!(?schema, "updated test context schema");
*schema = new;
}
// Push the write into the write buffer.
self.write_buffer_state.push_write(op);
debug!(?offset, "enqueued write in write buffer");
offset
}
/// A helper wrapper over [`Self::enqueue_write()`] for line-protocol.
#[track_caller]
pub async fn write_lp(
@ -274,34 +231,49 @@ impl TestContext {
.expect("namespace does not exist")
.id;
// Build the TableId -> TableName map, upserting the tables in the
let schema = self
.namespaces
.get_mut(&namespace_id)
.expect("namespace does not exist");
let batches = lines_to_batches(lp, 0).unwrap();
validate_or_insert_schema(
batches
.iter()
.map(|(table_name, batch)| (table_name.as_str(), batch)),
schema,
self.catalog.repositories().await.as_mut(),
)
.await
.expect("failed schema validation for enqueuing write");
// Build the TableId -> Batch map, upserting the tables into the catalog in the
// process.
let ids = lines_to_batches(lp, 0)
.unwrap()
.keys()
.map(|v| {
let batches_by_ids = batches
.into_iter()
.map(|(table_name, batch)| {
let catalog = Arc::clone(&self.catalog);
async move {
let id = catalog
.repositories()
.await
.tables()
.create_or_get(v, namespace_id)
.create_or_get(&table_name, namespace_id)
.await
.expect("table should create OK")
.id;
(v.clone(), id)
(id, batch)
}
})
.collect::<FuturesUnordered<_>>()
.collect::<hashbrown::HashMap<_, _>>()
.await;
self.enqueue_write(DmlWrite::new(
let op = DmlWrite::new(
namespace_id,
lines_to_batches(lp, 0).unwrap(),
ids.clone(),
batches_by_ids,
partition_key,
DmlMeta::sequenced(
Sequence::new(TEST_SHARD_INDEX, SequenceNumber::new(sequence_number)),
@ -309,8 +281,21 @@ impl TestContext {
None,
50,
),
))
.await
);
// Pull the sequence number out of the op to return it back to the user
// for simplicity.
let offset = op
.meta()
.sequence()
.expect("write must be sequenced")
.sequence_number;
// Push the write into the write buffer.
self.write_buffer_state.push_write(op);
debug!(?offset, "enqueued write in write buffer");
offset
}
/// Return the [`TableId`] in the catalog for `name`, or panic.

View File

@ -1,14 +1,13 @@
//! Code to decode [`MutableBatch`] from pbdata protobuf
use hashbrown::{HashMap, HashSet};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use generated_types::influxdata::pbdata::v1::{
column::{SemanticType, Values as PbValues},
Column as PbColumn, DatabaseBatch, PackedStrings, TableBatch,
};
use hashbrown::{HashMap, HashSet};
use mutable_batch::{writer::Writer, MutableBatch};
use schema::{InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
/// Error type for line protocol conversion
#[derive(Debug, Snafu)]
@ -57,24 +56,17 @@ pub enum Error {
/// Result type for pbdata conversion
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Decodes a [`DatabaseBatch`] to a map of [`MutableBatch`] keyed by table name
pub fn decode_database_batch(
database_batch: &DatabaseBatch,
) -> Result<(HashMap<String, MutableBatch>, HashMap<String, i64>)> {
let mut name_to_data = HashMap::with_capacity(database_batch.table_batches.len());
let mut id_to_name = HashMap::with_capacity(database_batch.table_batches.len());
/// Decodes a [`DatabaseBatch`] to a map of [`MutableBatch`] keyed by table ID
pub fn decode_database_batch(database_batch: &DatabaseBatch) -> Result<HashMap<i64, MutableBatch>> {
let mut id_to_data = HashMap::with_capacity(database_batch.table_batches.len());
for table_batch in &database_batch.table_batches {
let (_, batch) = name_to_data
.raw_entry_mut()
.from_key(table_batch.table_name.as_str())
.or_insert_with(|| (table_batch.table_name.clone(), MutableBatch::new()));
id_to_name.insert(table_batch.table_name.clone(), table_batch.table_id);
let batch = id_to_data.entry(table_batch.table_id).or_default();
write_table_batch(batch, table_batch)?;
}
Ok((name_to_data, id_to_name))
Ok(id_to_data)
}
/// Writes the provided [`TableBatch`] to a [`MutableBatch`] on error any changes made
@ -591,7 +583,6 @@ mod tests {
#[test]
fn test_basic() {
let mut table_batch = TableBatch {
table_name: "table".to_string(),
columns: vec![
with_strings(
column("tag1", SemanticType::Tag),
@ -710,7 +701,6 @@ mod tests {
#[test]
fn test_strings() {
let table_batch = TableBatch {
table_name: "table".to_string(),
columns: vec![
with_packed_strings(
column("tag1", SemanticType::Tag),
@ -789,7 +779,6 @@ mod tests {
// Try to write 6 rows expecting an error
let mut try_write = |other: PbColumn, expected_err: &str| {
let table_batch = TableBatch {
table_name: "table".to_string(),
columns: vec![
with_i64(
column("time", SemanticType::Time),
@ -893,7 +882,6 @@ mod tests {
fn test_optimization_trim_null_masks() {
// See https://github.com/influxdata/influxdb-pb-data-protocol#optimization-1-trim-null-masks
let table_batch = TableBatch {
table_name: "table".to_string(),
columns: vec![
with_i64(
column("i64", SemanticType::Field),
@ -938,7 +926,6 @@ mod tests {
fn test_optimization_omit_null_masks() {
// See https://github.com/influxdata/influxdb-pb-data-protocol#optimization-1b-omit-empty-null-masks
let table_batch = TableBatch {
table_name: "table".to_string(),
columns: vec![with_i64(
column("time", SemanticType::Time),
vec![1, 2, 3, 4, 5, 6, 7, 8, 9],
@ -975,7 +962,6 @@ mod tests {
fn test_optimization_trim_repeated_tail_values() {
// See https://github.com/influxdata/influxdb-pb-data-protocol#optimization-2-trim-repeated-tail-values
let table_batch = TableBatch {
table_name: "table".to_string(),
columns: vec![
with_strings(
column("f_s", SemanticType::Field),
@ -1075,7 +1061,6 @@ mod tests {
// we need at least one value though
let table_batch = TableBatch {
table_name: "table".to_string(),
columns: vec![with_i64(column("time", SemanticType::Time), vec![], vec![])],
row_count: 9,
table_id: 42,

View File

@ -16,19 +16,7 @@ pub fn encode_write(database_id: i64, write: &DmlWrite) -> DatabaseBatch {
DatabaseBatch {
table_batches: write
.tables()
.map(|(table_name, batch)| {
// Temporary code.
//
// Once only IDs are pushed over the network this extra lookup
// can be removed.
let table_id = write.table_id(table_name).unwrap_or_else(|| {
panic!(
"no table ID mapping found for namespace ID {} table {}",
database_id, table_name
)
});
encode_batch(table_name, table_id.get(), batch)
})
.map(|(table_id, batch)| encode_batch(table_id.get(), batch))
.collect(),
partition_key: write.partition_key().to_string(),
database_id,
@ -36,9 +24,8 @@ pub fn encode_write(database_id: i64, write: &DmlWrite) -> DatabaseBatch {
}
/// Convert a [`MutableBatch`] to [`TableBatch`]
pub fn encode_batch(table_name: &str, table_id: i64, batch: &MutableBatch) -> TableBatch {
pub fn encode_batch(table_id: i64, batch: &MutableBatch) -> TableBatch {
TableBatch {
table_name: table_name.to_string(),
columns: batch
.columns()
.filter_map(|(column_name, column)| {

View File

@ -30,7 +30,7 @@ fn test_encode_decode() {
assert_batches_eq!(expected, &[batch.to_arrow(Projection::All).unwrap()]);
let encoded = encode_batch("foo", 42, &batch);
let encoded = encode_batch(42, &batch);
assert_eq!(encoded.table_id, 42);
let mut batch = MutableBatch::new();
@ -140,7 +140,7 @@ fn test_encode_decode_null_columns_issue_4272() {
.write_to_batch(&mut got)
.expect("should write");
let encoded = encode_batch("bananas", 24, &got);
let encoded = encode_batch(24, &got);
assert_eq!(encoded.table_id, 24);
let mut batch = MutableBatch::new();
@ -164,7 +164,7 @@ fn test_encode_decode_null_columns_issue_4272() {
.write_to_batch(&mut got)
.expect("should write");
let encoded = encode_batch("bananas", 42, &got);
let encoded = encode_batch(42, &got);
assert_eq!(encoded.table_id, 42);
let mut batch = MutableBatch::new();

View File

@ -13,16 +13,15 @@ fn generate_pbdata_bytes() -> Vec<(String, (usize, Bytes))> {
.into_iter()
.map(|(bench, lp)| {
let batches = lines_to_batches(&lp, 0).unwrap();
let ids = batches
.keys()
let data = batches
.into_iter()
.enumerate()
.map(|(i, name)| (name.clone(), TableId::new(i as _)))
.map(|(idx, (_table_name, batch))| (TableId::new(idx as _), batch))
.collect();
let write = DmlWrite::new(
NamespaceId::new(42),
batches,
ids,
data,
"bananas".into(),
Default::default(),
);

View File

@ -17,6 +17,7 @@ datafusion_util = { path = "../datafusion_util" }
dml = { path = "../dml" }
futures = "0.3"
generated_types = { path = "../generated_types" }
hashbrown = { workspace = true }
influxdb_iox_client = { path = "../influxdb_iox_client" }
ingester = { path = "../ingester" }
iox_catalog = { path = "../iox_catalog" }

View File

@ -786,19 +786,21 @@ impl MockIngester {
}
let (mutable_batches, _stats) = converter.finish().unwrap();
// set up catalog
let tables = {
// sort names so that IDs are deterministic
let mut table_names: Vec<_> = mutable_batches.keys().cloned().collect();
table_names.sort();
// sort names so that IDs are deterministic
let mut table_names: Vec<_> = mutable_batches.keys().cloned().collect();
table_names.sort();
// set up catalog, map from catalog id to batch
let mut tables = Vec::with_capacity(table_names.len());
let mut batches_by_id = hashbrown::HashMap::with_capacity(table_names.len());
for table_name in table_names {
let table = self.ns.create_table(&table_name).await;
let table_id = table.table.id;
tables.push(table);
batches_by_id.insert(table_id, mutable_batches.get(&table_name).unwrap().clone());
}
let mut tables = vec![];
for table_name in table_names {
let table = self.ns.create_table(&table_name).await;
tables.push(table);
}
tables
};
let mut partition_ids = vec![];
for table in &tables {
let partition = table
@ -808,12 +810,7 @@ impl MockIngester {
partition_ids.push(partition.partition.id);
}
let ids = tables
.iter()
.map(|v| (v.table.name.clone(), v.table.id))
.collect();
for table in tables {
for table in &tables {
let schema = mutable_batches
.get(&table.table.name)
.unwrap()
@ -836,8 +833,7 @@ impl MockIngester {
);
let op = DmlOperation::Write(DmlWrite::new(
self.ns.namespace.id,
mutable_batches,
ids,
batches_by_id,
PartitionKey::from(partition_key),
meta,
));

View File

@ -146,7 +146,7 @@ where
type WriteError = SchemaError;
type DeleteError = SchemaError;
// Accepts a map of "TableName -> MutableBatch"
// Accepts a map of TableName -> MutableBatch
type WriteInput = HashMap<String, MutableBatch>;
// And returns a map of TableId -> (TableName, MutableBatch)
type WriteOutput = HashMap<TableId, (String, MutableBatch)>;
@ -287,7 +287,7 @@ where
}
};
// Map the "TableName -> Data" into "(TableName, TableId) -> Data" for
// Map the "TableName -> Data" into "TableId -> (TableName, Data)" for
// downstream handlers.
let batches = batches
.into_iter()

View File

@ -103,8 +103,7 @@ where
// Sets of maps collated by destination shard for batching/merging of
// shard data.
let mut collated: HashMap<_, HashMap<String, MutableBatch>> = HashMap::new();
let mut table_ids: HashMap<_, HashMap<String, TableId>> = HashMap::new();
let mut collated: HashMap<_, HashMap<TableId, MutableBatch>> = HashMap::new();
// Shard each entry in `writes` and collate them into one DML operation
// per shard to maximise the size of each write, and therefore increase
@ -115,13 +114,7 @@ where
let existing = collated
.entry(Arc::clone(&shard))
.or_default()
.insert(table_name.clone(), batch);
assert!(existing.is_none());
let existing = table_ids
.entry(shard)
.or_default()
.insert(table_name.clone(), table_id);
.insert(table_id, batch);
assert!(existing.is_none());
}
@ -129,7 +122,6 @@ where
let dml = DmlWrite::new(
namespace_id,
batch,
table_ids.remove(&shard).unwrap(),
partition_key.clone(),
DmlMeta::unsequenced(span_ctx.clone()),
);

View File

@ -163,6 +163,26 @@ impl TestContext {
pub fn metrics(&self) -> &Registry {
self.metrics.as_ref()
}
/// Return the [`TableId`] in the catalog for `name` in `namespace`, or panic.
pub async fn table_id(&self, namespace: &str, name: &str) -> TableId {
let mut repos = self.catalog.repositories().await;
let namespace_id = repos
.namespaces()
.get_by_name(namespace)
.await
.expect("query failed")
.expect("namespace does not exist")
.id;
repos
.tables()
.get_by_namespace_and_name(namespace_id, name)
.await
.expect("query failed")
.expect("no table entry for the specified namespace/table name pair")
.id
}
}
impl Default for TestContext {
@ -200,7 +220,8 @@ async fn test_write_ok() {
let writes = ctx.write_buffer_state().get_messages(ShardIndex::new(0));
assert_eq!(writes.len(), 1);
assert_matches!(writes.as_slice(), [Ok(DmlOperation::Write(w))] => {
assert!(w.table("platanos").is_some());
let table_id = ctx.table_id("bananas_test", "platanos").await;
assert!(w.table(&table_id).is_some());
});
// Ensure the catalog saw the namespace creation
@ -495,10 +516,9 @@ async fn test_write_propagate_ids() {
assert_eq!(writes.len(), 1);
assert_matches!(writes.as_slice(), [Ok(DmlOperation::Write(w))] => {
assert_eq!(w.namespace_id(), ns.id);
assert!(w.table("platanos").is_some());
for (name, id) in ids {
assert_eq!(w.table_id(name).unwrap(), id);
for id in ids.values() {
assert!(w.table(id).is_some());
}
});
}

View File

@ -162,7 +162,7 @@ pub fn decode(
match payload {
Payload::Write(write) => {
let (tables, ids) = decode_database_batch(&write).map_err(|e| {
let tables = decode_database_batch(&write).map_err(|e| {
WriteBufferError::invalid_data(format!(
"failed to decode database batch: {}",
e
@ -179,8 +179,10 @@ pub fn decode(
Ok(DmlOperation::Write(DmlWrite::new(
NamespaceId::new(write.database_id),
tables,
ids.into_iter().map(|(k, v)| (k, TableId::new(v))).collect(),
tables
.into_iter()
.map(|(k, v)| (TableId::new(k), v))
.collect(),
partition_key,
meta,
)))
@ -304,12 +306,11 @@ mod tests {
#[test]
fn test_dml_write_round_trip() {
let (data, ids) = lp_to_batches("platanos great=42 100\nbananas greatness=1000 100");
let data = lp_to_batches("platanos great=42 100\nbananas greatness=1000 100");
let w = DmlWrite::new(
NamespaceId::new(42),
data,
ids,
PartitionKey::from("2022-01-01"),
DmlMeta::default(),
);
@ -338,27 +339,13 @@ mod tests {
assert_eq!(w.table_count(), got.table_count());
assert_eq!(w.min_timestamp(), got.min_timestamp());
assert_eq!(w.max_timestamp(), got.max_timestamp());
assert!(got.table("bananas").is_some());
// Validate the writes & table names all appear in the DML writes.
let mut a = w.tables().map(|(name, _)| name).collect::<Vec<_>>();
// Validate the table IDs all appear in the DML writes.
let mut a = w.tables().map(|(id, _)| id).collect::<Vec<_>>();
a.sort_unstable();
let mut b = got.tables().map(|(name, _)| name).collect::<Vec<_>>();
let mut b = got.tables().map(|(id, _)| id).collect::<Vec<_>>();
b.sort_unstable();
assert_eq!(a, b);
// Validate both DML writes contain the same mappings of name -> table
// ID.
let a = a
.into_iter()
.map(|name| w.table_id(name).expect("table ID map entry missing"))
.collect::<Vec<_>>();
let b = b
.into_iter()
.map(|name| got.table_id(name).expect("table ID map entry missing"))
.collect::<Vec<_>>();
assert_eq!(a, b);
}
}

View File

@ -371,23 +371,16 @@ pub mod test_utils {
test_flush(&adapter).await;
}
/// Parse the provided line-protocol and return both the table ID indexed
/// data map, and the table ID to table name map.
/// Parse the provided line-protocol and return the table ID indexed data map.
///
/// Makes up the namespace ID & table IDs.
pub fn lp_to_batches(lp: &str) -> (HashMap<String, MutableBatch>, HashMap<String, TableId>) {
pub fn lp_to_batches(lp: &str) -> HashMap<TableId, MutableBatch> {
mutable_batch_lp::lines_to_batches(lp, 0)
.unwrap()
.into_iter()
.enumerate()
.map(|(idx, (name, data))| {
let idx = idx as i64;
let name_mapping = (name.clone(), data);
let id_mapping = (name, TableId::new(idx));
(name_mapping, id_mapping)
})
.unzip()
.map(|(idx, (_table_name, batch))| (TableId::new(idx as _), batch))
.collect()
}
/// Writes line protocol and returns the [`DmlWrite`] that was written.
@ -398,12 +391,11 @@ pub mod test_utils {
partition_key: PartitionKey,
span_context: Option<&SpanContext>,
) -> DmlWrite {
let (tables, names) = lp_to_batches(lp);
let tables = lp_to_batches(lp);
let write = DmlWrite::new(
NamespaceId::new(42),
tables,
names,
partition_key,
DmlMeta::unsequenced(span_context.cloned()),
);
@ -1219,11 +1211,10 @@ pub mod test_utils {
{
let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await;
let (tables, names) = lp_to_batches("upc user=1 100");
let tables = lp_to_batches("upc user=1 100");
let write = DmlWrite::new(
NamespaceId::new(42),
tables,
names,
"bananas".into(),
Default::default(),
);

View File

@ -834,11 +834,10 @@ mod tests {
partition_key: impl Into<PartitionKey> + Send,
) -> DmlMeta {
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
let (tables, names) = lp_to_batches("table foo=1");
let tables = lp_to_batches("table foo=1");
let write = DmlWrite::new(
NamespaceId::new(42),
tables,
names,
partition_key.into(),
DmlMeta::unsequenced(Some(span_ctx)),
);

View File

@ -223,18 +223,13 @@ mod tests {
writer.commit();
let mut m = HashMap::default();
m.insert("table".to_string(), batch);
m.insert(TableId::new(24), batch);
let span = SpanContext::new(Arc::new(LogTraceCollector::new()));
let ids = [("table".to_string(), TableId::new(24))]
.into_iter()
.collect();
DmlOperation::Write(DmlWrite::new(
NamespaceId::new(42),
m,
ids,
"1970-01-01".into(),
DmlMeta::unsequenced(Some(span)),
))

View File

@ -175,12 +175,11 @@ impl MockBufferSharedState {
/// Push line protocol data with placeholder values used for write metadata
pub fn push_lp(&self, sequence: Sequence, lp: &str) {
let (tables, names) = lp_to_batches(lp);
let tables = lp_to_batches(lp);
let meta = DmlMeta::sequenced(sequence, iox_time::Time::from_timestamp_nanos(0), None, 0);
self.push_write(DmlWrite::new(
NamespaceId::new(42),
tables,
names,
"test-partition".into(),
meta,
))
@ -902,11 +901,10 @@ mod tests {
async fn test_always_error_write() {
let writer = MockBufferForWritingThatAlwaysErrors {};
let (tables, names) = lp_to_batches("upc user=1 100");
let tables = lp_to_batches("upc user=1 100");
let operation = DmlOperation::Write(DmlWrite::new(
NamespaceId::new(42),
tables,
names,
"bananas".into(),
Default::default(),
));