refactor: use propagated partition key in ingester

Changes the ingester to use the partition key derived in the router, and
transmitted over through the kafka API boundary.

This should have no observable behavioural change, but be more resilient
as we're no longer assuming the partitioning algorithm produces the same
value in both the router (where data is partitioned) and the ingester
(where data is persisted, segregated by partition key).

This is a pre-requisite to allowing the user to specify partitioning
schemes.
pull/24376/head
Dom Dwyer 2022-06-20 15:16:15 +01:00
parent 70337087a8
commit 75a3fd5e1e
7 changed files with 14 additions and 121 deletions

View File

@ -1,10 +1,7 @@
//! Data for the lifecycle of the Ingester //! Data for the lifecycle of the Ingester
use crate::{ use crate::{
compact::compact_persisting_batch, compact::compact_persisting_batch, lifecycle::LifecycleHandle, querier_handler::query,
lifecycle::LifecycleHandle,
partioning::{Partitioner, PartitionerError},
querier_handler::query,
}; };
use arrow::{error::ArrowError, record_batch::RecordBatch}; use arrow::{error::ArrowError, record_batch::RecordBatch};
use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use arrow_util::optimize::{optimize_record_batch, optimize_schema};
@ -90,9 +87,6 @@ pub enum Error {
))] ))]
PersistingNotMatch, PersistingNotMatch,
#[snafu(display("Cannot partition data: {}", source))]
Partitioning { source: PartitionerError },
#[snafu(display("Snapshot error: {}", source))] #[snafu(display("Snapshot error: {}", source))]
Snapshot { source: mutable_batch::Error }, Snapshot { source: mutable_batch::Error },
@ -126,9 +120,6 @@ pub struct IngesterData {
/// get ingested. /// get ingested.
sequencers: BTreeMap<SequencerId, SequencerData>, sequencers: BTreeMap<SequencerId, SequencerData>,
/// Partitioner.
partitioner: Arc<dyn Partitioner>,
/// Executor for running queries and compacting and persisting /// Executor for running queries and compacting and persisting
exec: Arc<Executor>, exec: Arc<Executor>,
@ -142,7 +133,6 @@ impl IngesterData {
object_store: Arc<DynObjectStore>, object_store: Arc<DynObjectStore>,
catalog: Arc<dyn Catalog>, catalog: Arc<dyn Catalog>,
sequencers: BTreeMap<SequencerId, SequencerData>, sequencers: BTreeMap<SequencerId, SequencerData>,
partitioner: Arc<dyn Partitioner>,
exec: Arc<Executor>, exec: Arc<Executor>,
backoff_config: BackoffConfig, backoff_config: BackoffConfig,
) -> Self { ) -> Self {
@ -150,7 +140,6 @@ impl IngesterData {
store: ParquetStorage::new(object_store), store: ParquetStorage::new(object_store),
catalog, catalog,
sequencers, sequencers,
partitioner,
exec, exec,
backoff_config, backoff_config,
} }
@ -194,7 +183,6 @@ impl IngesterData {
sequencer_id, sequencer_id,
self.catalog.as_ref(), self.catalog.as_ref(),
lifecycle_handle, lifecycle_handle,
self.partitioner.as_ref(),
&self.exec, &self.exec,
) )
.await .await
@ -446,7 +434,6 @@ impl SequencerData {
sequencer_id: SequencerId, sequencer_id: SequencerId,
catalog: &dyn Catalog, catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle, lifecycle_handle: &dyn LifecycleHandle,
partitioner: &dyn Partitioner,
executor: &Executor, executor: &Executor,
) -> Result<bool> { ) -> Result<bool> {
let namespace_data = match self.namespace(dml_operation.namespace()) { let namespace_data = match self.namespace(dml_operation.namespace()) {
@ -463,7 +450,6 @@ impl SequencerData {
sequencer_id, sequencer_id,
catalog, catalog,
lifecycle_handle, lifecycle_handle,
partitioner,
executor, executor,
) )
.await .await
@ -613,7 +599,6 @@ impl NamespaceData {
sequencer_id: SequencerId, sequencer_id: SequencerId,
catalog: &dyn Catalog, catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle, lifecycle_handle: &dyn LifecycleHandle,
partitioner: &dyn Partitioner,
executor: &Executor, executor: &Executor,
) -> Result<bool> { ) -> Result<bool> {
let sequence_number = dml_operation let sequence_number = dml_operation
@ -633,6 +618,12 @@ impl NamespaceData {
DmlOperation::Write(write) => { DmlOperation::Write(write) => {
let mut pause_writes = false; let mut pause_writes = false;
// Extract the partition key derived by the router.
let partition_key = write
.partition_key()
.expect("no partition key in dml write")
.clone();
for (t, b) in write.into_tables() { for (t, b) in write.into_tables() {
let table_data = match self.table_data(&t) { let table_data = match self.table_data(&t) {
Some(t) => t, Some(t) => t,
@ -646,10 +637,10 @@ impl NamespaceData {
.buffer_table_write( .buffer_table_write(
sequence_number, sequence_number,
b, b,
partition_key.clone(),
sequencer_id, sequencer_id,
catalog, catalog,
lifecycle_handle, lifecycle_handle,
partitioner,
) )
.await?; .await?;
pause_writes = pause_writes || should_pause; pause_writes = pause_writes || should_pause;
@ -901,15 +892,11 @@ impl TableData {
&mut self, &mut self,
sequence_number: SequenceNumber, sequence_number: SequenceNumber,
batch: MutableBatch, batch: MutableBatch,
partition_key: PartitionKey,
sequencer_id: SequencerId, sequencer_id: SequencerId,
catalog: &dyn Catalog, catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle, lifecycle_handle: &dyn LifecycleHandle,
partitioner: &dyn Partitioner,
) -> Result<bool> { ) -> Result<bool> {
let partition_key = partitioner
.partition_key(&batch)
.context(PartitioningSnafu)?;
let partition_data = match self.partition_data.get_mut(&partition_key) { let partition_data = match self.partition_data.get_mut(&partition_key) {
Some(p) => p, Some(p) => p,
None => { None => {
@ -1678,7 +1665,6 @@ mod tests {
use super::*; use super::*;
use crate::{ use crate::{
lifecycle::{LifecycleConfig, LifecycleManager}, lifecycle::{LifecycleConfig, LifecycleManager},
partioning::DefaultPartitioner,
test_util::create_tombstone, test_util::create_tombstone,
}; };
use arrow::datatypes::SchemaRef; use arrow::datatypes::SchemaRef;
@ -1807,7 +1793,6 @@ mod tests {
Arc::clone(&object_store), Arc::clone(&object_store),
Arc::clone(&catalog), Arc::clone(&catalog),
sequencers, sequencers,
Arc::new(DefaultPartitioner::default()),
Arc::new(Executor::new(1)), Arc::new(Executor::new(1)),
BackoffConfig::default(), BackoffConfig::default(),
)); ));
@ -1901,7 +1886,6 @@ mod tests {
Arc::clone(&object_store), Arc::clone(&object_store),
Arc::clone(&catalog), Arc::clone(&catalog),
sequencers, sequencers,
Arc::new(DefaultPartitioner::default()),
Arc::new(Executor::new(1)), Arc::new(Executor::new(1)),
BackoffConfig::default(), BackoffConfig::default(),
)); ));
@ -2097,7 +2081,6 @@ mod tests {
Arc::clone(&object_store), Arc::clone(&object_store),
Arc::clone(&catalog), Arc::clone(&catalog),
sequencers, sequencers,
Arc::new(DefaultPartitioner::default()),
Arc::new(Executor::new(1)), Arc::new(Executor::new(1)),
BackoffConfig::default(), BackoffConfig::default(),
)); ));
@ -2557,7 +2540,6 @@ mod tests {
Arc::clone(&metrics), Arc::clone(&metrics),
Arc::new(SystemProvider::new()), Arc::new(SystemProvider::new()),
); );
let partitioner = DefaultPartitioner::default();
let exec = Executor::new(1); let exec = Executor::new(1);
let data = NamespaceData::new(namespace.id, &*metrics); let data = NamespaceData::new(namespace.id, &*metrics);
@ -2569,7 +2551,6 @@ mod tests {
sequencer.id, sequencer.id,
catalog.as_ref(), catalog.as_ref(),
&manager.handle(), &manager.handle(),
&partitioner,
&exec, &exec,
) )
.await .await
@ -2592,7 +2573,6 @@ mod tests {
sequencer.id, sequencer.id,
catalog.as_ref(), catalog.as_ref(),
&manager.handle(), &manager.handle(),
&partitioner,
&exec, &exec,
) )
.await .await
@ -2643,7 +2623,6 @@ mod tests {
Arc::clone(&object_store), Arc::clone(&object_store),
Arc::clone(&catalog), Arc::clone(&catalog),
sequencers, sequencers,
Arc::new(DefaultPartitioner::default()),
Arc::new(Executor::new(1)), Arc::new(Executor::new(1)),
BackoffConfig::default(), BackoffConfig::default(),
)); ));

View File

@ -3,7 +3,6 @@
use crate::{ use crate::{
data::{IngesterData, IngesterQueryResponse, SequencerData}, data::{IngesterData, IngesterQueryResponse, SequencerData},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
partioning::DefaultPartitioner,
poison::PoisonCabinet, poison::PoisonCabinet,
querier_handler::prepare_data_to_querier, querier_handler::prepare_data_to_querier,
stream_handler::{ stream_handler::{
@ -156,7 +155,6 @@ impl IngestHandlerImpl {
object_store, object_store,
catalog, catalog,
sequencers, sequencers,
Arc::new(DefaultPartitioner::default()),
exec, exec,
BackoffConfig::default(), BackoffConfig::default(),
)); ));

View File

@ -18,7 +18,6 @@ pub mod data;
pub mod handler; pub mod handler;
mod job; mod job;
pub mod lifecycle; pub mod lifecycle;
pub mod partioning;
mod poison; mod poison;
pub mod querier_handler; pub mod querier_handler;
pub mod query; pub mod query;

View File

@ -1,41 +0,0 @@
//! Temporary partioning implementation until partitions are properly transmitted by the router.
use chrono::{format::StrftimeItems, TimeZone, Utc};
use data_types::PartitionKey;
use mutable_batch::{column::ColumnData, MutableBatch};
use schema::TIME_COLUMN_NAME;
/// Error for [`Partitioner`]
pub type PartitionerError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Interface to partition data within the ingester.
///
/// This is a temporary solution until the partition keys/IDs are properly transmitted by the router.
pub trait Partitioner: std::fmt::Debug + Send + Sync + 'static {
/// Calculate partition key for given mutable batch.
fn partition_key(&self, batch: &MutableBatch) -> Result<PartitionKey, PartitionerError>;
}
/// Default partitioner that matches the current router implementation.
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
pub struct DefaultPartitioner {}
impl Partitioner for DefaultPartitioner {
fn partition_key(&self, batch: &MutableBatch) -> Result<PartitionKey, PartitionerError> {
let (_, col) = batch
.columns()
.find(|(name, _)| *name == TIME_COLUMN_NAME)
.unwrap();
let timestamp = match col.data() {
ColumnData::I64(_, s) => s.min.unwrap(),
_ => return Err(String::from("Time column not present").into()),
};
Ok(format!(
"{}",
Utc.timestamp_nanos(timestamp)
.format_with_items(StrftimeItems::new("%Y-%m-%d"))
)
.into())
}
}

View File

@ -872,9 +872,7 @@ mod tests {
Ok(DmlOperation::Write(make_write("good_op", 2))) Ok(DmlOperation::Write(make_write("good_op", 2)))
]], ]],
sink_rets = [ sink_rets = [
Err(crate::data::Error::Partitioning { Err(crate::data::Error::TableNotPresent),
source: String::from("Time column not present").into()
}),
Ok(true), Ok(true),
], ],
want_ttbr = 2, want_ttbr = 2,

View File

@ -2,12 +2,9 @@
#![allow(missing_docs)] #![allow(missing_docs)]
use crate::{ use crate::data::{
data::{
IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, SequencerData, IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, SequencerData,
SnapshotBatch, TableData, SnapshotBatch, TableData,
},
partioning::DefaultPartitioner,
}; };
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
@ -708,7 +705,6 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
object_store, object_store,
catalog, catalog,
sequencers, sequencers,
Arc::new(DefaultPartitioner::default()),
exec, exec,
backoff::BackoffConfig::default(), backoff::BackoffConfig::default(),
) )
@ -753,7 +749,6 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
object_store, object_store,
catalog, catalog,
sequencers, sequencers,
Arc::new(DefaultPartitioner::default()),
exec, exec,
backoff::BackoffConfig::default(), backoff::BackoffConfig::default(),
) )

View File

@ -18,13 +18,11 @@ use ingester::{
FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, SequencerData, FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, SequencerData,
}, },
lifecycle::LifecycleHandle, lifecycle::LifecycleHandle,
partioning::{Partitioner, PartitionerError},
querier_handler::prepare_data_to_querier, querier_handler::prepare_data_to_querier,
}; };
use iox_catalog::interface::get_schema_by_name; use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::{TestCatalog, TestNamespace, TestSequencer}; use iox_tests::util::{TestCatalog, TestNamespace, TestSequencer};
use itertools::Itertools; use itertools::Itertools;
use mutable_batch::MutableBatch;
use mutable_batch_lp::LinesConverter; use mutable_batch_lp::LinesConverter;
use parquet_file::storage::ParquetStorage; use parquet_file::storage::ParquetStorage;
use querier::{ use querier::{
@ -38,7 +36,6 @@ use std::{
fmt::Display, fmt::Display,
fmt::Write, fmt::Write,
sync::Arc, sync::Arc,
sync::Mutex,
}; };
// Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle // Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle
@ -623,9 +620,6 @@ struct MockIngester {
/// Sequencer used for testing. /// Sequencer used for testing.
sequencer: Arc<TestSequencer>, sequencer: Arc<TestSequencer>,
/// Special partitioner that lets us control to which partition we write.
partitioner: Arc<ConstantPartitioner>,
/// Memory of partition keys for certain sequence numbers. /// Memory of partition keys for certain sequence numbers.
/// ///
/// This is currently required because [`DmlWrite`] does not carry partiion information so we /// This is currently required because [`DmlWrite`] does not carry partiion information so we
@ -657,12 +651,10 @@ impl MockIngester {
catalog.metric_registry(), catalog.metric_registry(),
), ),
)]); )]);
let partitioner = Arc::new(ConstantPartitioner::default());
let ingester_data = Arc::new(IngesterData::new( let ingester_data = Arc::new(IngesterData::new(
catalog.object_store(), catalog.object_store(),
catalog.catalog(), catalog.catalog(),
sequencers, sequencers,
Arc::clone(&partitioner) as _,
catalog.exec(), catalog.exec(),
BackoffConfig::default(), BackoffConfig::default(),
)); ));
@ -671,7 +663,6 @@ impl MockIngester {
catalog, catalog,
ns, ns,
sequencer, sequencer,
partitioner,
partition_keys: Default::default(), partition_keys: Default::default(),
ingester_data, ingester_data,
sequence_counter: 0, sequence_counter: 0,
@ -687,13 +678,6 @@ impl MockIngester {
async fn buffer_operation(&mut self, dml_operation: DmlOperation) { async fn buffer_operation(&mut self, dml_operation: DmlOperation) {
let lifecycle_handle = NoopLifecycleHandle {}; let lifecycle_handle = NoopLifecycleHandle {};
// set up partitioner for writes
if matches!(dml_operation, DmlOperation::Write(_)) {
let sequence_number = dml_operation.meta().sequence().unwrap().sequence_number;
self.partitioner
.set(self.partition_keys.get(&sequence_number).unwrap().clone());
}
let should_pause = self let should_pause = self
.ingester_data .ingester_data
.buffer_operation( .buffer_operation(
@ -781,7 +765,7 @@ impl MockIngester {
let op = DmlOperation::Write(DmlWrite::new( let op = DmlOperation::Write(DmlWrite::new(
self.ns.namespace.name.clone(), self.ns.namespace.name.clone(),
mutable_batches, mutable_batches,
None, Some(PartitionKey::from(partition_key)),
meta, meta,
)); ));
(op, partition_ids) (op, partition_ids)
@ -896,25 +880,6 @@ impl LifecycleHandle for NoopLifecycleHandle {
} }
} }
/// Special partitioner that returns a constant values.
#[derive(Debug, Default)]
struct ConstantPartitioner {
partition_key: Mutex<String>,
}
impl ConstantPartitioner {
/// Set partition key.
fn set(&self, partition_key: String) {
*self.partition_key.lock().unwrap() = partition_key;
}
}
impl Partitioner for ConstantPartitioner {
fn partition_key(&self, _batch: &MutableBatch) -> Result<PartitionKey, PartitionerError> {
Ok(self.partition_key.lock().unwrap().clone().into())
}
}
#[async_trait] #[async_trait]
impl IngesterFlightClient for MockIngester { impl IngesterFlightClient for MockIngester {
async fn query( async fn query(