From 75a3fd5e1ed7292a8d76b220d2e52fc158e3c615 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 20 Jun 2022 15:16:15 +0100 Subject: [PATCH] refactor: use propagated partition key in ingester Changes the ingester to use the partition key derived in the router, and transmitted over through the kafka API boundary. This should have no observable behavioural change, but be more resilient as we're no longer assuming the partitioning algorithm produces the same value in both the router (where data is partitioned) and the ingester (where data is persisted, segregated by partition key). This is a pre-requisite to allowing the user to specify partitioning schemes. --- ingester/src/data.rs | 39 ++++++------------------ ingester/src/handler.rs | 2 -- ingester/src/lib.rs | 1 - ingester/src/partioning.rs | 41 -------------------------- ingester/src/stream_handler/handler.rs | 4 +-- ingester/src/test_util.rs | 11 ++----- query_tests/src/scenarios/util.rs | 37 +---------------------- 7 files changed, 14 insertions(+), 121 deletions(-) delete mode 100644 ingester/src/partioning.rs diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 6cd5dd5278..67713938c6 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1,10 +1,7 @@ //! Data for the lifecycle of the Ingester use crate::{ - compact::compact_persisting_batch, - lifecycle::LifecycleHandle, - partioning::{Partitioner, PartitionerError}, - querier_handler::query, + compact::compact_persisting_batch, lifecycle::LifecycleHandle, querier_handler::query, }; use arrow::{error::ArrowError, record_batch::RecordBatch}; use arrow_util::optimize::{optimize_record_batch, optimize_schema}; @@ -90,9 +87,6 @@ pub enum Error { ))] PersistingNotMatch, - #[snafu(display("Cannot partition data: {}", source))] - Partitioning { source: PartitionerError }, - #[snafu(display("Snapshot error: {}", source))] Snapshot { source: mutable_batch::Error }, @@ -126,9 +120,6 @@ pub struct IngesterData { /// get ingested. sequencers: BTreeMap, - /// Partitioner. - partitioner: Arc, - /// Executor for running queries and compacting and persisting exec: Arc, @@ -142,7 +133,6 @@ impl IngesterData { object_store: Arc, catalog: Arc, sequencers: BTreeMap, - partitioner: Arc, exec: Arc, backoff_config: BackoffConfig, ) -> Self { @@ -150,7 +140,6 @@ impl IngesterData { store: ParquetStorage::new(object_store), catalog, sequencers, - partitioner, exec, backoff_config, } @@ -194,7 +183,6 @@ impl IngesterData { sequencer_id, self.catalog.as_ref(), lifecycle_handle, - self.partitioner.as_ref(), &self.exec, ) .await @@ -446,7 +434,6 @@ impl SequencerData { sequencer_id: SequencerId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, - partitioner: &dyn Partitioner, executor: &Executor, ) -> Result { let namespace_data = match self.namespace(dml_operation.namespace()) { @@ -463,7 +450,6 @@ impl SequencerData { sequencer_id, catalog, lifecycle_handle, - partitioner, executor, ) .await @@ -613,7 +599,6 @@ impl NamespaceData { sequencer_id: SequencerId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, - partitioner: &dyn Partitioner, executor: &Executor, ) -> Result { let sequence_number = dml_operation @@ -633,6 +618,12 @@ impl NamespaceData { DmlOperation::Write(write) => { let mut pause_writes = false; + // Extract the partition key derived by the router. + let partition_key = write + .partition_key() + .expect("no partition key in dml write") + .clone(); + for (t, b) in write.into_tables() { let table_data = match self.table_data(&t) { Some(t) => t, @@ -646,10 +637,10 @@ impl NamespaceData { .buffer_table_write( sequence_number, b, + partition_key.clone(), sequencer_id, catalog, lifecycle_handle, - partitioner, ) .await?; pause_writes = pause_writes || should_pause; @@ -901,15 +892,11 @@ impl TableData { &mut self, sequence_number: SequenceNumber, batch: MutableBatch, + partition_key: PartitionKey, sequencer_id: SequencerId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, - partitioner: &dyn Partitioner, ) -> Result { - let partition_key = partitioner - .partition_key(&batch) - .context(PartitioningSnafu)?; - let partition_data = match self.partition_data.get_mut(&partition_key) { Some(p) => p, None => { @@ -1678,7 +1665,6 @@ mod tests { use super::*; use crate::{ lifecycle::{LifecycleConfig, LifecycleManager}, - partioning::DefaultPartitioner, test_util::create_tombstone, }; use arrow::datatypes::SchemaRef; @@ -1807,7 +1793,6 @@ mod tests { Arc::clone(&object_store), Arc::clone(&catalog), sequencers, - Arc::new(DefaultPartitioner::default()), Arc::new(Executor::new(1)), BackoffConfig::default(), )); @@ -1901,7 +1886,6 @@ mod tests { Arc::clone(&object_store), Arc::clone(&catalog), sequencers, - Arc::new(DefaultPartitioner::default()), Arc::new(Executor::new(1)), BackoffConfig::default(), )); @@ -2097,7 +2081,6 @@ mod tests { Arc::clone(&object_store), Arc::clone(&catalog), sequencers, - Arc::new(DefaultPartitioner::default()), Arc::new(Executor::new(1)), BackoffConfig::default(), )); @@ -2557,7 +2540,6 @@ mod tests { Arc::clone(&metrics), Arc::new(SystemProvider::new()), ); - let partitioner = DefaultPartitioner::default(); let exec = Executor::new(1); let data = NamespaceData::new(namespace.id, &*metrics); @@ -2569,7 +2551,6 @@ mod tests { sequencer.id, catalog.as_ref(), &manager.handle(), - &partitioner, &exec, ) .await @@ -2592,7 +2573,6 @@ mod tests { sequencer.id, catalog.as_ref(), &manager.handle(), - &partitioner, &exec, ) .await @@ -2643,7 +2623,6 @@ mod tests { Arc::clone(&object_store), Arc::clone(&catalog), sequencers, - Arc::new(DefaultPartitioner::default()), Arc::new(Executor::new(1)), BackoffConfig::default(), )); diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 7a8cd80871..2a769865cd 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -3,7 +3,6 @@ use crate::{ data::{IngesterData, IngesterQueryResponse, SequencerData}, lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, - partioning::DefaultPartitioner, poison::PoisonCabinet, querier_handler::prepare_data_to_querier, stream_handler::{ @@ -156,7 +155,6 @@ impl IngestHandlerImpl { object_store, catalog, sequencers, - Arc::new(DefaultPartitioner::default()), exec, BackoffConfig::default(), )); diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index bdfc7d919f..2e261e10e1 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -18,7 +18,6 @@ pub mod data; pub mod handler; mod job; pub mod lifecycle; -pub mod partioning; mod poison; pub mod querier_handler; pub mod query; diff --git a/ingester/src/partioning.rs b/ingester/src/partioning.rs deleted file mode 100644 index b07154ec26..0000000000 --- a/ingester/src/partioning.rs +++ /dev/null @@ -1,41 +0,0 @@ -//! Temporary partioning implementation until partitions are properly transmitted by the router. -use chrono::{format::StrftimeItems, TimeZone, Utc}; -use data_types::PartitionKey; -use mutable_batch::{column::ColumnData, MutableBatch}; -use schema::TIME_COLUMN_NAME; - -/// Error for [`Partitioner`] -pub type PartitionerError = Box; - -/// Interface to partition data within the ingester. -/// -/// This is a temporary solution until the partition keys/IDs are properly transmitted by the router. -pub trait Partitioner: std::fmt::Debug + Send + Sync + 'static { - /// Calculate partition key for given mutable batch. - fn partition_key(&self, batch: &MutableBatch) -> Result; -} - -/// Default partitioner that matches the current router implementation. -#[derive(Debug, Default)] -#[allow(missing_copy_implementations)] -pub struct DefaultPartitioner {} - -impl Partitioner for DefaultPartitioner { - fn partition_key(&self, batch: &MutableBatch) -> Result { - let (_, col) = batch - .columns() - .find(|(name, _)| *name == TIME_COLUMN_NAME) - .unwrap(); - let timestamp = match col.data() { - ColumnData::I64(_, s) => s.min.unwrap(), - _ => return Err(String::from("Time column not present").into()), - }; - - Ok(format!( - "{}", - Utc.timestamp_nanos(timestamp) - .format_with_items(StrftimeItems::new("%Y-%m-%d")) - ) - .into()) - } -} diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 1d9a415694..83c09f45c6 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -872,9 +872,7 @@ mod tests { Ok(DmlOperation::Write(make_write("good_op", 2))) ]], sink_rets = [ - Err(crate::data::Error::Partitioning { - source: String::from("Time column not present").into() - }), + Err(crate::data::Error::TableNotPresent), Ok(true), ], want_ttbr = 2, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index db7b84e148..2650a4d6d5 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -2,12 +2,9 @@ #![allow(missing_docs)] -use crate::{ - data::{ - IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, SequencerData, - SnapshotBatch, TableData, - }, - partioning::DefaultPartitioner, +use crate::data::{ + IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, SequencerData, + SnapshotBatch, TableData, }; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; @@ -708,7 +705,6 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa object_store, catalog, sequencers, - Arc::new(DefaultPartitioner::default()), exec, backoff::BackoffConfig::default(), ) @@ -753,7 +749,6 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa object_store, catalog, sequencers, - Arc::new(DefaultPartitioner::default()), exec, backoff::BackoffConfig::default(), ) diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 240e05c41e..3b98d9ba30 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -18,13 +18,11 @@ use ingester::{ FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, SequencerData, }, lifecycle::LifecycleHandle, - partioning::{Partitioner, PartitionerError}, querier_handler::prepare_data_to_querier, }; use iox_catalog::interface::get_schema_by_name; use iox_tests::util::{TestCatalog, TestNamespace, TestSequencer}; use itertools::Itertools; -use mutable_batch::MutableBatch; use mutable_batch_lp::LinesConverter; use parquet_file::storage::ParquetStorage; use querier::{ @@ -38,7 +36,6 @@ use std::{ fmt::Display, fmt::Write, sync::Arc, - sync::Mutex, }; // Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle @@ -623,9 +620,6 @@ struct MockIngester { /// Sequencer used for testing. sequencer: Arc, - /// Special partitioner that lets us control to which partition we write. - partitioner: Arc, - /// Memory of partition keys for certain sequence numbers. /// /// This is currently required because [`DmlWrite`] does not carry partiion information so we @@ -657,12 +651,10 @@ impl MockIngester { catalog.metric_registry(), ), )]); - let partitioner = Arc::new(ConstantPartitioner::default()); let ingester_data = Arc::new(IngesterData::new( catalog.object_store(), catalog.catalog(), sequencers, - Arc::clone(&partitioner) as _, catalog.exec(), BackoffConfig::default(), )); @@ -671,7 +663,6 @@ impl MockIngester { catalog, ns, sequencer, - partitioner, partition_keys: Default::default(), ingester_data, sequence_counter: 0, @@ -687,13 +678,6 @@ impl MockIngester { async fn buffer_operation(&mut self, dml_operation: DmlOperation) { let lifecycle_handle = NoopLifecycleHandle {}; - // set up partitioner for writes - if matches!(dml_operation, DmlOperation::Write(_)) { - let sequence_number = dml_operation.meta().sequence().unwrap().sequence_number; - self.partitioner - .set(self.partition_keys.get(&sequence_number).unwrap().clone()); - } - let should_pause = self .ingester_data .buffer_operation( @@ -781,7 +765,7 @@ impl MockIngester { let op = DmlOperation::Write(DmlWrite::new( self.ns.namespace.name.clone(), mutable_batches, - None, + Some(PartitionKey::from(partition_key)), meta, )); (op, partition_ids) @@ -896,25 +880,6 @@ impl LifecycleHandle for NoopLifecycleHandle { } } -/// Special partitioner that returns a constant values. -#[derive(Debug, Default)] -struct ConstantPartitioner { - partition_key: Mutex, -} - -impl ConstantPartitioner { - /// Set partition key. - fn set(&self, partition_key: String) { - *self.partition_key.lock().unwrap() = partition_key; - } -} - -impl Partitioner for ConstantPartitioner { - fn partition_key(&self, _batch: &MutableBatch) -> Result { - Ok(self.partition_key.lock().unwrap().clone().into()) - } -} - #[async_trait] impl IngesterFlightClient for MockIngester { async fn query(