refactor: allow ingester to be integrated into query tests (#4427)

* refactor: improve `IngesterData` public interface

* feat: impl `Debug` for `Test{Namespace,Sequencer}`

* refactor: trait interface for `LifecyleHandle`

This is required to mock the lifecycle for query tests.

* refactor: trait for partitioner
pull/24376/head
Marco Neumann 2022-04-26 15:44:30 +02:00 committed by GitHub
parent 401009a7b6
commit bd600bbac6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 183 additions and 83 deletions

View File

@ -1,13 +1,15 @@
//! Data for the lifecycle of the Ingester
use crate::{
compact::compact_persisting_batch, lifecycle::LifecycleHandle, persist::persist,
compact::compact_persisting_batch,
lifecycle::LifecycleHandle,
partioning::{Partitioner, PartitionerError},
persist::persist,
querier_handler::query,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use chrono::{format::StrftimeItems, TimeZone, Utc};
use data_types2::{
DeletePredicate, KafkaPartition, NamespaceId, PartitionId, PartitionInfo, SequenceNumber,
SequencerId, TableId, Timestamp, Tombstone,
@ -17,13 +19,13 @@ use dml::DmlOperation;
use iox_catalog::interface::Catalog;
use iox_time::SystemProvider;
use metric::U64Counter;
use mutable_batch::{column::ColumnData, MutableBatch};
use mutable_batch::MutableBatch;
use object_store::DynObjectStore;
use observability_deps::tracing::warn;
use parking_lot::RwLock;
use predicate::Predicate;
use query::exec::Executor;
use schema::{selection::Selection, Schema, TIME_COLUMN_NAME};
use schema::{selection::Selection, Schema};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{btree_map::Entry, BTreeMap},
@ -80,8 +82,8 @@ pub enum Error {
#[snafu(display("The given batch does not match any in the Persisting list. Nothing is removed from the Persisting list"))]
PersistingNotMatch,
#[snafu(display("Time column not present"))]
TimeColumnNotPresent,
#[snafu(display("Cannot partition data: {}", source))]
Partitioning { source: PartitionerError },
#[snafu(display("Snapshot error: {}", source))]
Snapshot { source: mutable_batch::Error },
@ -106,20 +108,61 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct IngesterData {
/// Object store for persistence of parquet files
pub(crate) object_store: Arc<DynObjectStore>,
object_store: Arc<DynObjectStore>,
/// The global catalog for schema, parquet files and tombstones
pub(crate) catalog: Arc<dyn Catalog>,
catalog: Arc<dyn Catalog>,
/// This map gets set up on initialization of the ingester so it won't ever be modified.
/// The content of each SequenceData will get changed when more namespaces and tables
/// get ingested.
pub(crate) sequencers: BTreeMap<SequencerId, SequencerData>,
sequencers: BTreeMap<SequencerId, SequencerData>,
/// Partitioner.
partitioner: Arc<dyn Partitioner>,
/// Executor for running queries and compacting and persisting
pub(crate) exec: Arc<Executor>,
exec: Arc<Executor>,
/// Backoff config
pub(crate) backoff_config: BackoffConfig,
backoff_config: BackoffConfig,
}
impl IngesterData {
/// Create new instance.
pub fn new(
object_store: Arc<DynObjectStore>,
catalog: Arc<dyn Catalog>,
sequencers: BTreeMap<SequencerId, SequencerData>,
partitioner: Arc<dyn Partitioner>,
exec: Arc<Executor>,
backoff_config: BackoffConfig,
) -> Self {
Self {
object_store,
catalog,
sequencers,
partitioner,
exec,
backoff_config,
}
}
/// Executor for running queries and compacting and persisting
pub(crate) fn exec(&self) -> &Arc<Executor> {
&self.exec
}
/// Get sequencer data for specific sequencer.
pub(crate) fn sequencer(&self, sequencer_id: SequencerId) -> Option<&SequencerData> {
self.sequencers.get(&sequencer_id)
}
/// Get iterator over sequencers (ID and data).
pub(crate) fn sequencers(&self) -> impl Iterator<Item = (&SequencerId, &SequencerData)> {
self.sequencers.iter()
}
/// Store the write or delete in the in memory buffer. Deletes will
/// be written into the catalog before getting stored in the buffer.
/// Any writes that create new IOx partitions will have those records
@ -130,7 +173,7 @@ impl IngesterData {
&self,
sequencer_id: SequencerId,
dml_operation: DmlOperation,
lifecycle_handle: &LifecycleHandle,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool> {
let sequencer_data = self
.sequencers
@ -142,6 +185,7 @@ impl IngesterData {
sequencer_id,
self.catalog.as_ref(),
lifecycle_handle,
self.partitioner.as_ref(),
&self.exec,
)
.await
@ -384,7 +428,8 @@ impl SequencerData {
dml_operation: DmlOperation,
sequencer_id: SequencerId,
catalog: &dyn Catalog,
lifecycle_handle: &LifecycleHandle,
lifecycle_handle: &dyn LifecycleHandle,
partitioner: &dyn Partitioner,
executor: &Executor,
) -> Result<bool> {
let namespace_data = match self.namespace(dml_operation.namespace()) {
@ -401,6 +446,7 @@ impl SequencerData {
sequencer_id,
catalog,
lifecycle_handle,
partitioner,
executor,
)
.await
@ -500,7 +546,8 @@ impl NamespaceData {
dml_operation: DmlOperation,
sequencer_id: SequencerId,
catalog: &dyn Catalog,
lifecycle_handle: &LifecycleHandle,
lifecycle_handle: &dyn LifecycleHandle,
partitioner: &dyn Partitioner,
executor: &Executor,
) -> Result<bool> {
let sequence_number = dml_operation
@ -529,6 +576,7 @@ impl NamespaceData {
sequencer_id,
catalog,
lifecycle_handle,
partitioner,
)
.await?;
@ -742,22 +790,12 @@ impl TableData {
batch: MutableBatch,
sequencer_id: SequencerId,
catalog: &dyn Catalog,
lifecycle_handle: &LifecycleHandle,
lifecycle_handle: &dyn LifecycleHandle,
partitioner: &dyn Partitioner,
) -> Result<bool> {
let (_, col) = batch
.columns()
.find(|(name, _)| *name == TIME_COLUMN_NAME)
.unwrap();
let timestamp = match col.data() {
ColumnData::I64(_, s) => s.min.unwrap(),
_ => return Err(Error::TimeColumnNotPresent),
};
let partition_key = format!(
"{}",
Utc.timestamp_nanos(timestamp)
.format_with_items(StrftimeItems::new("%Y-%m-%d"))
);
let partition_key = partitioner
.partition_key(&batch)
.context(PartitioningSnafu)?;
let partition_data = match self.partition_data.get_mut(&partition_key) {
Some(p) => p,
@ -1456,6 +1494,7 @@ mod tests {
use super::*;
use crate::{
lifecycle::{LifecycleConfig, LifecycleManager},
partioning::DefaultPartitioner,
test_util::create_tombstone,
};
use arrow_util::assert_batches_sorted_eq;
@ -1572,13 +1611,14 @@ mod tests {
let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreImpl::new_in_memory());
let data = Arc::new(IngesterData {
object_store: Arc::clone(&object_store),
catalog: Arc::clone(&catalog),
let data = Arc::new(IngesterData::new(
Arc::clone(&object_store),
Arc::clone(&catalog),
sequencers,
exec: Arc::new(Executor::new(1)),
backoff_config: BackoffConfig::default(),
});
Arc::new(DefaultPartitioner::default()),
Arc::new(Executor::new(1)),
BackoffConfig::default(),
));
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
@ -1659,13 +1699,14 @@ mod tests {
let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreImpl::new_in_memory());
let data = Arc::new(IngesterData {
object_store: Arc::clone(&object_store),
catalog: Arc::clone(&catalog),
let data = Arc::new(IngesterData::new(
Arc::clone(&object_store),
Arc::clone(&catalog),
sequencers,
exec: Arc::new(Executor::new(1)),
backoff_config: BackoffConfig::default(),
});
Arc::new(DefaultPartitioner::default()),
Arc::new(Executor::new(1)),
BackoffConfig::default(),
));
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
@ -2161,6 +2202,7 @@ mod tests {
Arc::clone(&metrics),
Arc::new(SystemProvider::new()),
);
let partitioner = DefaultPartitioner::default();
let exec = Executor::new(1);
let data = NamespaceData::new(namespace.id, &*metrics);
@ -2172,6 +2214,7 @@ mod tests {
sequencer.id,
catalog.as_ref(),
&manager.handle(),
&partitioner,
&exec,
)
.await
@ -2194,6 +2237,7 @@ mod tests {
sequencer.id,
catalog.as_ref(),
&manager.handle(),
&partitioner,
&exec,
)
.await

View File

@ -3,6 +3,7 @@
use crate::{
data::{IngesterData, IngesterQueryResponse, SequencerData},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
partioning::DefaultPartitioner,
poison::PoisonCabinet,
querier_handler::prepare_data_to_querier,
stream_handler::{
@ -129,13 +130,14 @@ impl IngestHandlerImpl {
SequencerData::new(s.kafka_partition, Arc::clone(&metric_registry)),
);
}
let data = Arc::new(IngesterData {
let data = Arc::new(IngesterData::new(
object_store,
catalog,
sequencers,
Arc::new(DefaultPartitioner::default()),
exec,
backoff_config: BackoffConfig::default(),
});
BackoffConfig::default(),
));
let ingester_data = Arc::clone(&data);
let kafka_topic_name = topic.name.clone();
@ -263,12 +265,12 @@ impl IngestHandler for IngestHandlerImpl {
}
}
self.data.exec.join().await;
self.data.exec().join().await;
}
fn shutdown(&self) {
self.shutdown.cancel();
self.data.exec.shutdown();
self.data.exec().shutdown();
}
/// Return the ingestion progress from each sequencer
@ -363,12 +365,7 @@ mod tests {
loop {
let mut has_measurement = false;
if let Some(data) = ingester
.ingester
.data
.sequencers
.get(&ingester.sequencer.id)
{
if let Some(data) = ingester.ingester.data.sequencer(ingester.sequencer.id) {
if let Some(data) = data.namespace(&ingester.namespace.name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("a", "1970-01-01").await {
@ -600,8 +597,7 @@ mod tests {
if let Some(data) = ingester
.data
.sequencers
.get(&sequencer.id)
.sequencer(sequencer.id)
{
if let Some(data) = data.namespace(&namespace.name) {
// verify there's data in the buffer

View File

@ -19,6 +19,7 @@ pub mod data;
pub mod handler;
mod job;
pub mod lifecycle;
pub mod partioning;
pub mod persist;
mod poison;
pub mod querier_handler;

View File

@ -19,13 +19,31 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracker::TrackedFutureExt;
/// API suitable for ingester tasks to query and update the [`LifecycleManager`] state.
pub trait LifecycleHandle: Send + Sync + 'static {
/// Logs bytes written into a partition so that it can be tracked for the manager to
/// trigger persistence. Returns true if the ingester should pause consuming from the
/// write buffer so that persistence can catch up and free up memory.
fn log_write(
&self,
partition_id: PartitionId,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
bytes_written: usize,
) -> bool;
/// Returns true if the `total_bytes` tracked by the manager is less than the pause amount.
/// As persistence runs, the `total_bytes` go down.
fn can_resume_ingest(&self) -> bool;
}
/// A handle for sequencer consumers to interact with the global
/// [`LifecycleManager`] instance.
///
/// This handle presents an API suitable for ingester tasks to query and update
/// the [`LifecycleManager`] state.
#[derive(Debug, Clone)]
pub struct LifecycleHandle {
pub struct LifecycleHandleImpl {
time_provider: Arc<dyn TimeProvider>,
config: Arc<LifecycleConfig>,
@ -34,11 +52,8 @@ pub struct LifecycleHandle {
state: Arc<Mutex<LifecycleState>>,
}
impl LifecycleHandle {
/// Logs bytes written into a partition so that it can be tracked for the manager to
/// trigger persistence. Returns true if the ingester should pause consuming from the
/// write buffer so that persistence can catch up and free up memory.
pub fn log_write(
impl LifecycleHandle for LifecycleHandleImpl {
fn log_write(
&self,
partition_id: PartitionId,
sequencer_id: SequencerId,
@ -67,9 +82,7 @@ impl LifecycleHandle {
s.total_bytes > self.config.pause_ingest_size
}
/// Returns true if the `total_bytes` tracked by the manager is less than the pause amount.
/// As persistence runs, the `total_bytes` go down.
pub fn can_resume_ingest(&self) -> bool {
fn can_resume_ingest(&self) -> bool {
let s = self.state.lock();
s.total_bytes < self.config.pause_ingest_size
}
@ -239,8 +252,8 @@ impl LifecycleManager {
}
/// Acquire a shareable [`LifecycleHandle`] for this manager instance.
pub fn handle(&self) -> LifecycleHandle {
LifecycleHandle {
pub fn handle(&self) -> LifecycleHandleImpl {
LifecycleHandleImpl {
time_provider: Arc::clone(&self.time_provider),
config: Arc::clone(&self.config),
state: Arc::clone(&self.state),

View File

@ -0,0 +1,39 @@
//! Temporary partioning implementation until partitions are properly transmitted by the router.
use chrono::{format::StrftimeItems, TimeZone, Utc};
use mutable_batch::{column::ColumnData, MutableBatch};
use schema::TIME_COLUMN_NAME;
/// Error for [`Partitioner`]
pub type PartitionerError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Interface to partition data within the ingester.
///
/// This is a temporary solution until the partition keys/IDs are properly transmitted by the router.
pub trait Partitioner: std::fmt::Debug + Send + Sync + 'static {
/// Calculate partition key for given mutable batch.
fn partition_key(&self, batch: &MutableBatch) -> Result<String, PartitionerError>;
}
/// Default partitioner that matches the current router implementation.
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
pub struct DefaultPartitioner {}
impl Partitioner for DefaultPartitioner {
fn partition_key(&self, batch: &MutableBatch) -> Result<String, PartitionerError> {
let (_, col) = batch
.columns()
.find(|(name, _)| *name == TIME_COLUMN_NAME)
.unwrap();
let timestamp = match col.data() {
ColumnData::I64(_, s) => s.min.unwrap(),
_ => return Err(String::from("Time column not present").into()),
};
Ok(format!(
"{}",
Utc.timestamp_nanos(timestamp)
.format_with_items(StrftimeItems::new("%Y-%m-%d"))
))
}
}

View File

@ -93,7 +93,7 @@ pub async fn prepare_data_to_querier(
let mut found_namespace = false;
let mut batches = vec![];
let mut batch_partition_ids = vec![];
for sequencer_data in ingest_data.sequencers.values() {
for (_sequencer_id, sequencer_data) in ingest_data.sequencers() {
let namespace_data = match sequencer_data.namespace(&request.namespace) {
Some(namespace_data) => {
found_namespace = true;
@ -125,7 +125,7 @@ pub async fn prepare_data_to_querier(
// extract payload
let partition_id = partition.partition_id;
let (schema, batch) =
prepare_data_to_querier_for_partition(&ingest_data.exec, partition, request)
prepare_data_to_querier_for_partition(ingest_data.exec(), partition, request)
.await?;
schema_merger = schema_merger
.merge(schema.as_ref())

View File

@ -9,7 +9,7 @@ use observability_deps::tracing::*;
use tokio_util::sync::CancellationToken;
use write_buffer::core::{WriteBufferError, WriteBufferErrorKind};
use crate::lifecycle::LifecycleHandle;
use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl};
use super::DmlSink;
@ -44,7 +44,7 @@ pub struct SequencedStreamHandler<I, O, T = SystemProvider> {
/// request ingest be paused to control memory pressure.
///
/// [`LifecycleManager`]: crate::lifecycle::LifecycleManager
lifecycle_handle: LifecycleHandle,
lifecycle_handle: LifecycleHandleImpl,
// Metrics
time_provider: T,
@ -72,7 +72,7 @@ impl<I, O> SequencedStreamHandler<I, O> {
pub fn new(
stream: I,
sink: O,
lifecycle_handle: LifecycleHandle,
lifecycle_handle: LifecycleHandleImpl,
kafka_topic_name: String,
kafka_partition: KafkaPartition,
metrics: &metric::Registry,
@ -694,7 +694,7 @@ mod tests {
Ok(DmlOperation::Write(make_write("good_op", 2)))
],
sink_rets = [
Err(crate::data::Error::TimeColumnNotPresent),
Err(crate::data::Error::Partitioning{source: String::from("Time column not present").into()}),
Ok(true),
],
want_ttbr = 2,

View File

@ -6,7 +6,7 @@ use async_trait::async_trait;
use data_types2::SequencerId;
use dml::DmlOperation;
use crate::{data::IngesterData, lifecycle::LifecycleHandle};
use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl};
use super::DmlSink;
@ -14,7 +14,7 @@ use super::DmlSink;
#[derive(Debug)]
pub struct IngestSinkAdaptor {
ingest_data: Arc<IngesterData>,
lifecycle_handle: LifecycleHandle,
lifecycle_handle: LifecycleHandleImpl,
sequencer_id: SequencerId,
}
@ -23,7 +23,7 @@ impl IngestSinkAdaptor {
/// implementation.
pub fn new(
ingest_data: Arc<IngesterData>,
lifecycle_handle: LifecycleHandle,
lifecycle_handle: LifecycleHandleImpl,
sequencer_id: SequencerId,
) -> Self {
Self {

View File

@ -1,9 +1,12 @@
//! Test setups and data for ingester crate
#![allow(missing_docs)]
use crate::data::{
IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, SequencerData,
SnapshotBatch, TableData,
use crate::{
data::{
IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, SequencerData,
SnapshotBatch, TableData,
},
partioning::DefaultPartitioner,
};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
@ -710,13 +713,14 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
sequencers.insert(seq_id, seq_data);
// Ingester data that inlcudes one sequencer/shard
IngesterData {
IngesterData::new(
object_store,
catalog,
sequencers,
Arc::new(DefaultPartitioner::default()),
exec,
backoff_config: backoff::BackoffConfig::default(),
}
backoff::BackoffConfig::default(),
)
}
pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData {
@ -754,13 +758,14 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
sequencers.insert(seq_id, seq_data);
// Ingester data that inlcudes one sequencer/shard
IngesterData {
IngesterData::new(
object_store,
catalog,
sequencers,
Arc::new(DefaultPartitioner::default()),
exec,
backoff_config: backoff::BackoffConfig::default(),
}
backoff::BackoffConfig::default(),
)
}
/// Make data for one or two partitions per requested

View File

@ -219,6 +219,7 @@ impl TestCatalog {
}
/// A test namespace
#[derive(Debug)]
#[allow(missing_docs)]
pub struct TestNamespace {
pub catalog: Arc<TestCatalog>,
@ -264,6 +265,7 @@ impl TestNamespace {
}
/// A test sequencer with ist namespace in the catalog
#[derive(Debug)]
#[allow(missing_docs)]
pub struct TestSequencer {
pub catalog: Arc<TestCatalog>,