diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 94507f4b51..2a85b93c03 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -1,6 +1,7 @@ //! This module is responsible for compacting Ingester's data -use crate::data::{PersistingBatch, QueryableBatch}; +use std::sync::Arc; + use data_types::{CompactionLevel, NamespaceId, PartitionInfo}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use iox_query::{ @@ -12,7 +13,8 @@ use iox_time::TimeProvider; use parquet_file::metadata::IoxMetadata; use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey}; use snafu::{ResultExt, Snafu}; -use std::sync::Arc; + +use crate::{data::partition::PersistingBatch, query::QueryableBatch}; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -28,7 +30,13 @@ pub enum Error { #[snafu(display("Error while executing Ingester's compaction"))] ExecutePlan { source: DataFusionError }, - #[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))] + #[snafu(display( + "Error while building delete predicate from start time, {}, stop time, {}, and serialized \ + predicate, {}", + min, + max, + predicate + ))] DeletePredicate { source: predicate::delete_predicate::Error, min: String, @@ -169,6 +177,13 @@ pub async fn compact( #[cfg(test)] mod tests { + use arrow_util::assert_batches_eq; + use data_types::{Partition, PartitionId, ShardId, TableId}; + use iox_time::SystemProvider; + use mutable_batch_lp::lines_to_batches; + use schema::selection::Selection; + use uuid::Uuid; + use super::*; use crate::test_util::{ create_batches_with_influxtype, create_batches_with_influxtype_different_cardinality, @@ -180,12 +195,6 @@ mod tests { create_one_row_record_batch_with_influxtype, create_tombstone, make_meta, make_persisting_batch, make_queryable_batch, make_queryable_batch_with_deletes, }; - use arrow_util::assert_batches_eq; - use data_types::{Partition, PartitionId, ShardId, TableId}; - use iox_time::SystemProvider; - use mutable_batch_lp::lines_to_batches; - use schema::selection::Selection; - use uuid::Uuid; // this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782 // where if sending in a single row it would compact into an output of two batches, one of diff --git a/ingester/src/data.rs b/ingester/src/data.rs index d34250aa41..0c29b371c6 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1,44 +1,40 @@ //! Data for the lifecycle of the Ingester -use crate::{ - compact::{compact_persisting_batch, CompactedStream}, - lifecycle::LifecycleHandle, -}; +use std::{collections::BTreeMap, pin::Pin, sync::Arc}; + use arrow::{error::ArrowError, record_batch::RecordBatch}; use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{ - DeletePredicate, NamespaceId, PartitionId, PartitionInfo, PartitionKey, SequenceNumber, - ShardId, ShardIndex, TableId, Timestamp, Tombstone, -}; +use data_types::{PartitionId, SequenceNumber, ShardId, ShardIndex}; use datafusion::physical_plan::SendableRecordBatchStream; use dml::DmlOperation; use futures::{Stream, StreamExt}; use iox_catalog::interface::{get_table_schema_by_id, Catalog}; use iox_query::exec::Executor; use iox_time::SystemProvider; -use metric::{Attributes, Metric, U64Counter, U64Histogram, U64HistogramOptions}; -use mutable_batch::MutableBatch; +use metric::{Attributes, Metric, U64Histogram, U64HistogramOptions}; use object_store::DynObjectStore; use observability_deps::tracing::{debug, warn}; -use parking_lot::RwLock; use parquet_file::storage::ParquetStorage; -use schema::selection::Selection; -use snafu::{OptionExt, ResultExt, Snafu}; -use std::{ - collections::{btree_map::Entry, BTreeMap}, - pin::Pin, - sync::Arc, -}; -use uuid::Uuid; +use snafu::{OptionExt, Snafu}; use write_summary::ShardProgress; -mod triggers; -use self::triggers::TestTriggers; +use crate::{ + compact::{compact_persisting_batch, CompactedStream}, + lifecycle::LifecycleHandle, +}; +pub mod namespace; +pub mod partition; mod query_dedup; -use query_dedup::query; +pub mod shard; +pub mod table; + +use self::{partition::PartitionStatus, shard::ShardData}; + +#[cfg(test)] +mod triggers; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -190,7 +186,7 @@ impl IngesterData { .shards .iter() .map(|(_, shard_data)| shard_data) - .find(|shard_data| shard_data.shard_index == shard_index); + .find(|shard_data| shard_data.shard_index() == shard_index); let progress = match shard_data { Some(shard_data) => shard_data.progress().await, @@ -263,7 +259,7 @@ impl Persister for IngesterData { let mut repos = self.catalog.repositories().await; let table = repos .tables() - .get_by_namespace_and_name(namespace.namespace_id, &partition_info.table_name) + .get_by_namespace_and_name(namespace.namespace_id(), &partition_info.table_name) .await? .expect("table not found in catalog"); get_table_schema_by_id(table.id, repos.as_mut()).await @@ -278,7 +274,7 @@ impl Persister for IngesterData { let compacted_stream = match compact_persisting_batch( Arc::new(SystemProvider::new()), &self.exec, - namespace.namespace_id.get(), + namespace.namespace_id().get(), &partition_info, Arc::clone(&persisting_batch), ) @@ -404,1147 +400,6 @@ impl Persister for IngesterData { } } -/// Data of a Shard -#[derive(Debug)] -pub struct ShardData { - /// The shard index for this shard - shard_index: ShardIndex, - - // New namespaces can come in at any time so we need to be able to add new ones - namespaces: RwLock>>, - - metrics: Arc, - namespace_count: U64Counter, -} - -impl ShardData { - /// Initialise a new [`ShardData`] that emits metrics to `metrics`. - pub fn new(shard_index: ShardIndex, metrics: Arc) -> Self { - let namespace_count = metrics - .register_metric::( - "ingester_namespaces_total", - "Number of namespaces known to the ingester", - ) - .recorder(&[]); - - Self { - shard_index, - namespaces: Default::default(), - metrics, - namespace_count, - } - } - - /// Initialize new ShardData with namespace for testing purpose only - #[cfg(test)] - pub fn new_for_test( - shard_index: ShardIndex, - namespaces: BTreeMap>, - ) -> Self { - Self { - shard_index, - namespaces: RwLock::new(namespaces), - metrics: Default::default(), - namespace_count: Default::default(), - } - } - - /// Store the write or delete in the shard. Deletes will - /// be written into the catalog before getting stored in the buffer. - /// Any writes that create new IOx partitions will have those records - /// created in the catalog before putting into the buffer. - pub async fn buffer_operation( - &self, - dml_operation: DmlOperation, - shard_id: ShardId, - catalog: &dyn Catalog, - lifecycle_handle: &dyn LifecycleHandle, - executor: &Executor, - ) -> Result { - let namespace_data = match self.namespace(dml_operation.namespace()) { - Some(d) => d, - None => { - self.insert_namespace(dml_operation.namespace(), catalog) - .await? - } - }; - - namespace_data - .buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor) - .await - } - - /// Gets the namespace data out of the map - pub fn namespace(&self, namespace: &str) -> Option> { - let n = self.namespaces.read(); - n.get(namespace).cloned() - } - - /// Retrieves the namespace from the catalog and initializes an empty buffer, or - /// retrieves the buffer if some other caller gets it first - async fn insert_namespace( - &self, - namespace: &str, - catalog: &dyn Catalog, - ) -> Result> { - let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .get_by_name(namespace) - .await - .context(CatalogSnafu)? - .context(NamespaceNotFoundSnafu { namespace })?; - - let mut n = self.namespaces.write(); - - let data = match n.entry(namespace.name) { - Entry::Vacant(v) => { - let v = v.insert(Arc::new(NamespaceData::new(namespace.id, &*self.metrics))); - self.namespace_count.inc(1); - Arc::clone(v) - } - Entry::Occupied(v) => Arc::clone(v.get()), - }; - - Ok(data) - } - - /// Return the progress of this shard - async fn progress(&self) -> ShardProgress { - let namespaces: Vec<_> = self.namespaces.read().values().map(Arc::clone).collect(); - - let mut progress = ShardProgress::new(); - - for namespace_data in namespaces { - progress = progress.combine(namespace_data.progress().await); - } - progress - } -} - -/// Data of a Namespace that belongs to a given Shard -#[derive(Debug)] -pub struct NamespaceData { - namespace_id: NamespaceId, - tables: RwLock>>>, - - table_count: U64Counter, - - /// The sequence number being actively written, if any. - /// - /// This is used to know when a sequence number is only partially - /// buffered for readability reporting. For example, in the - /// following diagram a write for SequenceNumber 10 is only - /// partially readable because it has been written into partitions - /// A and B but not yet C. The max buffered number on each - /// PartitionData is not sufficient to determine if the write is - /// complete. - /// - /// ```text - /// ╔═══════════════════════════════════════════════╗ - /// ║ ║ DML Operation (write) - /// ║ ┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓ ║ SequenceNumber = 10 - /// ║ ┃ Data for C ┃ Data for B ┃ Data for A ┃ ║ - /// ║ ┗━━━━━━━━━━━━━┻━━━━━━━━━━━━━┻━━━━━━━━━━━━━┛ ║ - /// ║ │ │ │ ║ - /// ╚═══════════════════════╬═════════════╬═════════╝ - /// │ │ │ ┌──────────────────────────────────┐ - /// │ │ │ Partition A │ - /// │ │ └──────────▶│ max buffered = 10 │ - /// │ └──────────────────────────────────┘ - /// │ │ - /// │ ┌──────────────────────────────────┐ - /// │ │ │ Partition B │ - /// └────────────────────────▶│ max buffered = 10 │ - /// │ └──────────────────────────────────┘ - /// - /// │ - /// ┌──────────────────────────────────┐ - /// │ │ Partition C │ - /// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶│ max buffered = 7 │ - /// └──────────────────────────────────┘ - /// Write is partially buffered. It has been - /// written to Partitions A and B, but not - /// yet written to Partition C - /// PartitionData - /// (Ingester state per partition) - ///``` - buffering_sequence_number: RwLock>, - - /// Control the flow of ingest, for testing purposes - test_triggers: TestTriggers, -} - -impl NamespaceData { - /// Initialize new tables with default partition template of daily - pub fn new(namespace_id: NamespaceId, metrics: &metric::Registry) -> Self { - let table_count = metrics - .register_metric::( - "ingester_tables_total", - "Number of tables known to the ingester", - ) - .recorder(&[]); - - Self { - namespace_id, - tables: Default::default(), - table_count, - buffering_sequence_number: RwLock::new(None), - test_triggers: TestTriggers::new(), - } - } - - /// Initialize new tables with data for testing purpose only - #[cfg(test)] - pub(crate) fn new_for_test( - namespace_id: NamespaceId, - tables: BTreeMap>>, - ) -> Self { - Self { - namespace_id, - tables: RwLock::new(tables), - table_count: Default::default(), - buffering_sequence_number: RwLock::new(None), - test_triggers: TestTriggers::new(), - } - } - - /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the - /// catalog. Returns true if ingest should be paused due to memory limits set in the passed - /// lifecycle manager. - pub async fn buffer_operation( - &self, - dml_operation: DmlOperation, - shard_id: ShardId, - catalog: &dyn Catalog, - lifecycle_handle: &dyn LifecycleHandle, - executor: &Executor, - ) -> Result { - let sequence_number = dml_operation - .meta() - .sequence() - .expect("must have sequence number") - .sequence_number; - - // Note that this namespace is actively writing this sequence - // number. Since there is no namespace wide lock held during a - // write, this number is used to detect and update reported - // progress during a write - let _sequence_number_guard = - ScopedSequenceNumber::new(sequence_number, &self.buffering_sequence_number); - - match dml_operation { - DmlOperation::Write(write) => { - let mut pause_writes = false; - - // Extract the partition key derived by the router. - let partition_key = write - .partition_key() - .expect("no partition key in dml write") - .clone(); - - for (t, b) in write.into_tables() { - let table_data = match self.table_data(&t) { - Some(t) => t, - None => self.insert_table(shard_id, &t, catalog).await?, - }; - - { - // lock scope - let mut table_data = table_data.write().await; - let should_pause = table_data - .buffer_table_write( - sequence_number, - b, - partition_key.clone(), - shard_id, - catalog, - lifecycle_handle, - ) - .await?; - pause_writes = pause_writes || should_pause; - } - self.test_triggers.on_write().await; - } - - Ok(pause_writes) - } - DmlOperation::Delete(delete) => { - let table_name = delete.table_name().context(TableNotPresentSnafu)?; - let table_data = match self.table_data(table_name) { - Some(t) => t, - None => self.insert_table(shard_id, table_name, catalog).await?, - }; - - let mut table_data = table_data.write().await; - - table_data - .buffer_delete( - table_name, - delete.predicate(), - shard_id, - sequence_number, - catalog, - executor, - ) - .await?; - - // don't pause writes since deletes don't count towards memory limits - Ok(false) - } - } - } - - /// Snapshots the mutable buffer for the partition, which clears it out and moves it over to - /// snapshots. Then return a vec of the snapshots and the optional persisting batch. - pub async fn snapshot( - &self, - table_name: &str, - partition_key: &PartitionKey, - ) -> Option<(Vec>, Option>)> { - if let Some(t) = self.table_data(table_name) { - let mut t = t.write().await; - - return t.partition_data.get_mut(partition_key).map(|p| { - p.data - .snapshot() - .expect("snapshot on mutable batch should never fail"); - (p.data.snapshots.to_vec(), p.data.persisting.clone()) - }); - } - - None - } - - /// Snapshots the mutable buffer for the partition, which clears it out and then moves all - /// snapshots over to a persisting batch, which is returned. If there is no data to snapshot - /// or persist, None will be returned. - pub async fn snapshot_to_persisting( - &self, - partition_info: &PartitionInfo, - ) -> Option> { - if let Some(table_data) = self.table_data(&partition_info.table_name) { - let mut table_data = table_data.write().await; - - return table_data - .partition_data - .get_mut(&partition_info.partition.partition_key) - .and_then(|partition_data| { - partition_data.snapshot_to_persisting_batch( - partition_info.partition.shard_id, - partition_info.partition.table_id, - partition_info.partition.id, - &partition_info.table_name, - ) - }); - } - - None - } - - /// Gets the buffered table data - pub(crate) fn table_data( - &self, - table_name: &str, - ) -> Option>> { - let t = self.tables.read(); - t.get(table_name).cloned() - } - - /// Inserts the table or returns it if it happens to be inserted by some other thread - async fn insert_table( - &self, - shard_id: ShardId, - table_name: &str, - catalog: &dyn Catalog, - ) -> Result>> { - let mut repos = catalog.repositories().await; - let info = repos - .tables() - .get_table_persist_info(shard_id, self.namespace_id, table_name) - .await - .context(CatalogSnafu)? - .context(TableNotFoundSnafu { table_name })?; - - let mut t = self.tables.write(); - - let data = match t.entry(table_name.to_string()) { - Entry::Vacant(v) => { - let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new( - info.table_id, - info.tombstone_max_sequence_number, - )))); - self.table_count.inc(1); - Arc::clone(v) - } - Entry::Occupied(v) => Arc::clone(v.get()), - }; - - Ok(data) - } - - /// Walks down the table and partition and clears the persisting batch. The sequence number is - /// the max_sequence_number for the persisted parquet file, which should be kept in the table - /// data buffer. - async fn mark_persisted( - &self, - table_name: &str, - partition_key: &PartitionKey, - sequence_number: SequenceNumber, - ) { - if let Some(t) = self.table_data(table_name) { - let mut t = t.write().await; - let partition = t.partition_data.get_mut(partition_key); - - if let Some(p) = partition { - p.data.max_persisted_sequence_number = Some(sequence_number); - p.data.persisting = None; - // clear the deletes kept for this persisting batch - p.data.deletes_during_persisting.clear(); - } - } - } - - /// Return progress from this Namespace - async fn progress(&self) -> ShardProgress { - let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect(); - - // Consolidate progtress across partitions. - let mut progress = ShardProgress::new() - // Properly account for any sequence number that is - // actively buffering and thus not yet completely - // readable. - .actively_buffering(*self.buffering_sequence_number.read()); - - for table_data in tables { - progress = progress.combine(table_data.read().await.progress()) - } - progress - } -} - -/// RAAI struct that sets buffering sequence number on creation and clears it on free -struct ScopedSequenceNumber<'a> { - sequence_number: SequenceNumber, - buffering_sequence_number: &'a RwLock>, -} - -impl<'a> ScopedSequenceNumber<'a> { - fn new( - sequence_number: SequenceNumber, - buffering_sequence_number: &'a RwLock>, - ) -> Self { - *buffering_sequence_number.write() = Some(sequence_number); - - Self { - sequence_number, - buffering_sequence_number, - } - } -} - -impl<'a> Drop for ScopedSequenceNumber<'a> { - fn drop(&mut self) { - // clear write on drop - let mut buffering_sequence_number = self.buffering_sequence_number.write(); - assert_eq!( - *buffering_sequence_number, - Some(self.sequence_number), - "multiple operations are being buffered concurrently" - ); - *buffering_sequence_number = None; - } -} - -/// Data of a Table in a given Namesapce that belongs to a given Shard -#[derive(Debug)] -pub(crate) struct TableData { - table_id: TableId, - // the max sequence number for a tombstone associated with this table - tombstone_max_sequence_number: Option, - // Map pf partition key to its data - partition_data: BTreeMap, -} - -impl TableData { - /// Initialize new table buffer - pub fn new(table_id: TableId, tombstone_max_sequence_number: Option) -> Self { - Self { - table_id, - tombstone_max_sequence_number, - partition_data: Default::default(), - } - } - - /// Initialize new table buffer for testing purpose only - #[cfg(test)] - pub fn new_for_test( - table_id: TableId, - tombstone_max_sequence_number: Option, - partitions: BTreeMap, - ) -> Self { - Self { - table_id, - tombstone_max_sequence_number, - partition_data: partitions, - } - } - - /// Return parquet_max_sequence_number - pub fn parquet_max_sequence_number(&self) -> Option { - self.partition_data - .values() - .map(|p| p.data.max_persisted_sequence_number) - .max() - .flatten() - } - - /// Return tombstone_max_sequence_number - #[allow(dead_code)] // Used in tests - pub fn tombstone_max_sequence_number(&self) -> Option { - self.tombstone_max_sequence_number - } - - // buffers the table write and returns true if the lifecycle manager indicates that - // ingest should be paused. - async fn buffer_table_write( - &mut self, - sequence_number: SequenceNumber, - batch: MutableBatch, - partition_key: PartitionKey, - shard_id: ShardId, - catalog: &dyn Catalog, - lifecycle_handle: &dyn LifecycleHandle, - ) -> Result { - let partition_data = match self.partition_data.get_mut(&partition_key) { - Some(p) => p, - None => { - self.insert_partition(partition_key.clone(), shard_id, catalog) - .await?; - self.partition_data.get_mut(&partition_key).unwrap() - } - }; - - // skip the write if it has already been persisted - if let Some(max) = partition_data.data.max_persisted_sequence_number { - if max >= sequence_number { - return Ok(false); - } - } - - let should_pause = lifecycle_handle.log_write( - partition_data.id, - shard_id, - sequence_number, - batch.size(), - batch.rows(), - ); - partition_data.buffer_write(sequence_number, batch)?; - - Ok(should_pause) - } - - async fn buffer_delete( - &mut self, - table_name: &str, - predicate: &DeletePredicate, - shard_id: ShardId, - sequence_number: SequenceNumber, - catalog: &dyn Catalog, - executor: &Executor, - ) -> Result<()> { - let min_time = Timestamp::new(predicate.range.start()); - let max_time = Timestamp::new(predicate.range.end()); - - let mut repos = catalog.repositories().await; - let tombstone = repos - .tombstones() - .create_or_get( - self.table_id, - shard_id, - sequence_number, - min_time, - max_time, - &predicate.expr_sql_string(), - ) - .await - .context(CatalogSnafu)?; - - // remember "persisted" state - self.tombstone_max_sequence_number = Some(sequence_number); - - // modify one partition at a time - for data in self.partition_data.values_mut() { - data.buffer_tombstone(executor, table_name, tombstone.clone()) - .await; - } - - Ok(()) - } - - pub fn unpersisted_partition_data(&self) -> Vec { - self.partition_data - .values() - .map(|p| UnpersistedPartitionData { - partition_id: p.id, - non_persisted: p - .get_non_persisting_data() - .expect("get_non_persisting should always work"), - persisting: p.get_persisting_data(), - partition_status: PartitionStatus { - parquet_max_sequence_number: p.data.max_persisted_sequence_number, - tombstone_max_sequence_number: self.tombstone_max_sequence_number, - }, - }) - .collect() - } - - async fn insert_partition( - &mut self, - partition_key: PartitionKey, - shard_id: ShardId, - catalog: &dyn Catalog, - ) -> Result<()> { - let mut repos = catalog.repositories().await; - let partition = repos - .partitions() - .create_or_get(partition_key, shard_id, self.table_id) - .await - .context(CatalogSnafu)?; - - // get info on the persisted parquet files to use later for replay or for snapshot - // information on query. - let files = repos - .parquet_files() - .list_by_partition_not_to_delete(partition.id) - .await - .context(CatalogSnafu)?; - // for now we just need the max persisted - let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max(); - - let mut data = PartitionData::new(partition.id); - data.data.max_persisted_sequence_number = max_persisted_sequence_number; - - self.partition_data.insert(partition.partition_key, data); - - Ok(()) - } - - /// Return progress from this Table - fn progress(&self) -> ShardProgress { - let progress = ShardProgress::new(); - let progress = match self.parquet_max_sequence_number() { - Some(n) => progress.with_persisted(n), - None => progress, - }; - - self.partition_data - .values() - .fold(progress, |progress, partition_data| { - progress.combine(partition_data.progress()) - }) - } -} - -/// Read only copy of the unpersisted data for a partition in the ingester for a specific partition. -#[derive(Debug)] -pub(crate) struct UnpersistedPartitionData { - pub partition_id: PartitionId, - pub non_persisted: Vec>, - pub persisting: Option, - pub partition_status: PartitionStatus, -} - -/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard -#[derive(Debug)] -pub(crate) struct PartitionData { - id: PartitionId, - data: DataBuffer, -} - -impl PartitionData { - /// Initialize a new partition data buffer - pub fn new(id: PartitionId) -> Self { - Self { - id, - data: Default::default(), - } - } - - /// Snapshot anything in the buffer and move all snapshot data into a persisting batch - pub fn snapshot_to_persisting_batch( - &mut self, - shard_id: ShardId, - table_id: TableId, - partition_id: PartitionId, - table_name: &str, - ) -> Option> { - self.data - .snapshot_to_persisting(shard_id, table_id, partition_id, table_name) - } - - /// Snapshot whatever is in the buffer and return a new vec of the - /// arc cloned snapshots - #[allow(dead_code)] // Used in tests - pub fn snapshot(&mut self) -> Result>> { - self.data.snapshot().context(SnapshotSnafu)?; - Ok(self.data.snapshots.to_vec()) - } - - /// Return non persisting data - pub fn get_non_persisting_data(&self) -> Result>> { - self.data.buffer_and_snapshots() - } - - /// Return persisting data - pub fn get_persisting_data(&self) -> Option { - self.data.get_persisting_data() - } - - /// Write the given mb in the buffer - pub(crate) fn buffer_write( - &mut self, - sequence_number: SequenceNumber, - mb: MutableBatch, - ) -> Result<()> { - match &mut self.data.buffer { - Some(buf) => { - buf.max_sequence_number = sequence_number.max(buf.max_sequence_number); - buf.data.extend_from(&mb).context(BufferWriteSnafu)?; - } - None => { - self.data.buffer = Some(BufferBatch { - min_sequence_number: sequence_number, - max_sequence_number: sequence_number, - data: mb, - }) - } - } - - Ok(()) - } - - /// Buffers a new tombstone: - /// . All the data in the `buffer` and `snapshots` will be replaced with one - /// tombstone-applied snapshot - /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` - /// exists - pub(crate) async fn buffer_tombstone( - &mut self, - executor: &Executor, - table_name: &str, - tombstone: Tombstone, - ) { - self.data.add_tombstone(tombstone.clone()); - - // ---------------------------------------------------------- - // First apply the tombstone on all in-memory & non-persisting data - // Make a QueryableBatch for all buffer + snapshots + the given tombstone - let max_sequence_number = tombstone.sequence_number; - let query_batch = match self.data.snapshot_to_queryable_batch( - table_name, - self.id, - Some(tombstone.clone()), - ) { - Some(query_batch) if !query_batch.is_empty() => query_batch, - _ => { - // No need to proceed further - return; - } - }; - - let (min_sequence_number, _) = query_batch.min_max_sequence_numbers(); - assert!(min_sequence_number <= max_sequence_number); - - // Run query on the QueryableBatch to apply the tombstone. - let stream = match query(executor, Arc::new(query_batch)).await { - Err(e) => { - // this should never error out. if it does, we need to crash hard so - // someone can take a look. - panic!("unable to apply tombstones on snapshots: {:?}", e); - } - Ok(stream) => stream, - }; - let record_batches = match datafusion::physical_plan::common::collect(stream).await { - Err(e) => { - // this should never error out. if it does, we need to crash hard so - // someone can take a look. - panic!("unable to collect record batches: {:?}", e); - } - Ok(batches) => batches, - }; - - // Merge all result record batches into one record batch - // and make a snapshot for it - let snapshot = if !record_batches.is_empty() { - let record_batch = RecordBatch::concat(&record_batches[0].schema(), &record_batches) - .unwrap_or_else(|e| { - panic!("unable to concat record batches: {:?}", e); - }); - let snapshot = SnapshotBatch { - min_sequence_number, - max_sequence_number, - data: Arc::new(record_batch), - }; - - Some(Arc::new(snapshot)) - } else { - None - }; - - // ---------------------------------------------------------- - // Add the tombstone-applied data back in as one snapshot - if let Some(snapshot) = snapshot { - self.data.snapshots.push(snapshot); - } - } - - /// Return the progress from this Partition - fn progress(&self) -> ShardProgress { - self.data.progress() - } -} - -/// Data of an IOx partition split into batches -/// ┌────────────────────────┐ ┌────────────────────────┐ ┌─────────────────────────┐ -/// │ Buffer │ │ Snapshots │ │ Persisting │ -/// │ ┌───────────────────┐ │ │ │ │ │ -/// │ │ ┌───────────────┐│ │ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │ -/// │ │ ┌┴──────────────┐│├─┼────────┼─┼─▶┌───────────────┐│ │ │ │ ┌───────────────┐│ │ -/// │ │┌┴──────────────┐├┘│ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │ -/// │ ││ BufferBatch ├┘ │ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │ -/// │ │└───────────────┘ │ │ ┌───┼─▶│ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │ -/// │ └───────────────────┘ │ │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │ -/// │ ... │ │ │ └───────────────────┘ │ │ └───────────────────┘ │ -/// │ ┌───────────────────┐ │ │ │ │ │ │ -/// │ │ ┌───────────────┐│ │ │ │ ... │ │ ... │ -/// │ │ ┌┴──────────────┐││ │ │ │ │ │ │ -/// │ │┌┴──────────────┐├┘│─┼────┘ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │ -/// │ ││ BufferBatch ├┘ │ │ │ │ ┌───────────────┐│ │ │ │ ┌───────────────┐│ │ -/// │ │└───────────────┘ │ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │ -/// │ └───────────────────┘ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │ -/// │ │ │ ││ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │ -/// │ ... │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │ -/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │ -/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘ -#[derive(Debug, Default)] -struct DataBuffer { - /// Buffer of incoming writes - pub(crate) buffer: Option, - - /// The max_persisted_sequence number for any parquet_file in this partition - pub(crate) max_persisted_sequence_number: Option, - - /// Buffer of tombstones whose time range may overlap with this partition. - /// All tombstones were already applied to corresponding snapshots. This list - /// only keep the ones that come during persisting. The reason - /// we keep them becasue if a query comes, we need to apply these tombstones - /// on the persiting data before sending it to the Querier - /// When the `persiting` is done and removed, this list will get empty, too - pub(crate) deletes_during_persisting: Vec, - - /// Data in `buffer` will be moved to a `snapshot` when one of these happens: - /// . A background persist is called - /// . A read request from Querier - /// The `buffer` will be empty when this happens. - pub(crate) snapshots: Vec>, - /// When a persist is called, data in `buffer` will be moved to a `snapshot` - /// and then all `snapshots` will be moved to a `persisting`. - /// Both `buffer` and 'snaphots` will be empty when this happens. - pub(crate) persisting: Option>, - // Extra Notes: - // . In MVP, we will only persist a set of snapshots at a time. - // In later version, multiple persisting operations may be happening concurrently but - // their persisted info must be added into the Catalog in their data - // ingesting order. - // . When a read request comes from a Querier, all data from `snapshots` - // and `persisting` must be sent to the Querier. - // . After the `persisting` data is persisted and successfully added - // into the Catalog, it will be removed from this Data Buffer. - // This data might be added into an extra cache to serve up to - // Queriers that may not have loaded the parquet files from object - // storage yet. But this will be decided after MVP. -} - -impl DataBuffer { - /// Add a new tombstones into the DataBuffer - pub fn add_tombstone(&mut self, tombstone: Tombstone) { - // Only keep this tombstone if some data is being persisted - if self.persisting.is_some() { - self.deletes_during_persisting.push(tombstone); - } - } - - /// Move `BufferBatch`es to a `SnapshotBatch`. - pub fn snapshot(&mut self) -> Result<(), mutable_batch::Error> { - let snapshot = self.copy_buffer_to_snapshot()?; - if let Some(snapshot) = snapshot { - self.snapshots.push(snapshot); - self.buffer = None; - } - - Ok(()) - } - - /// Returns snapshot of the buffer but keep data in the buffer - pub fn copy_buffer_to_snapshot( - &self, - ) -> Result>, mutable_batch::Error> { - if let Some(buf) = &self.buffer { - return Ok(Some(Arc::new(SnapshotBatch { - min_sequence_number: buf.min_sequence_number, - max_sequence_number: buf.max_sequence_number, - data: Arc::new(buf.data.to_arrow(Selection::All)?), - }))); - } - - Ok(None) - } - - /// Snapshots the buffer and make a QueryableBatch for all the snapshots - /// Both buffer and snapshots will be empty after this - pub fn snapshot_to_queryable_batch( - &mut self, - table_name: &str, - partition_id: PartitionId, - tombstone: Option, - ) -> Option { - self.snapshot() - .expect("This mutable batch snapshot error should be impossible."); - - let mut data = vec![]; - std::mem::swap(&mut data, &mut self.snapshots); - - let mut tombstones = vec![]; - if let Some(tombstone) = tombstone { - tombstones.push(tombstone); - } - - // only produce batch if there is any data - if data.is_empty() { - None - } else { - Some(QueryableBatch::new( - table_name, - partition_id, - data, - tombstones, - )) - } - } - - /// Returns all existing snapshots plus data in the buffer - /// This only read data. Data in the buffer will be kept in the buffer - pub fn buffer_and_snapshots(&self) -> Result>> { - // Existing snapshots - let mut snapshots = self.snapshots.clone(); - - // copy the buffer to a snapshot - let buffer_snapshot = self - .copy_buffer_to_snapshot() - .context(BufferToSnapshotSnafu)?; - snapshots.extend(buffer_snapshot); - - Ok(snapshots) - } - - /// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. - /// - /// # Panic - /// - /// Panics if there is already a persisting batch. - pub fn snapshot_to_persisting( - &mut self, - shard_id: ShardId, - table_id: TableId, - partition_id: PartitionId, - table_name: &str, - ) -> Option> { - if self.persisting.is_some() { - panic!("Unable to snapshot while persisting. This is an unexpected state.") - } - - if let Some(queryable_batch) = - self.snapshot_to_queryable_batch(table_name, partition_id, None) - { - let persisting_batch = Arc::new(PersistingBatch { - shard_id, - table_id, - partition_id, - object_store_id: Uuid::new_v4(), - data: Arc::new(queryable_batch), - }); - - self.persisting = Some(Arc::clone(&persisting_batch)); - - Some(persisting_batch) - } else { - None - } - } - - /// Return a QueryableBatch of the persisting batch after applying new tombstones - pub fn get_persisting_data(&self) -> Option { - let persisting = match &self.persisting { - Some(p) => p, - None => return None, - }; - - // persisting data - let mut queryable_batch = (*persisting.data).clone(); - - // Add new tombstones if any - queryable_batch.add_tombstones(&self.deletes_during_persisting); - - Some(queryable_batch) - } - - /// Return the progress in this DataBuffer - fn progress(&self) -> ShardProgress { - let progress = ShardProgress::new(); - - let progress = if let Some(buffer) = &self.buffer { - progress.combine(buffer.progress()) - } else { - progress - }; - - let progress = self.snapshots.iter().fold(progress, |progress, snapshot| { - progress.combine(snapshot.progress()) - }); - - if let Some(persisting) = &self.persisting { - persisting - .data - .data - .iter() - .fold(progress, |progress, snapshot| { - progress.combine(snapshot.progress()) - }) - } else { - progress - } - } -} - -/// BufferBatch is a MutableBatch with its ingesting order, sequence_number, that helps the -/// ingester keep the batches of data in their ingesting order -#[derive(Debug)] -pub struct BufferBatch { - /// Sequence number of the first write in this batch - pub(crate) min_sequence_number: SequenceNumber, - /// Sequence number of the last write in this batch - pub(crate) max_sequence_number: SequenceNumber, - /// Ingesting data - pub(crate) data: MutableBatch, -} - -impl BufferBatch { - /// Return the progress in this DataBuffer - fn progress(&self) -> ShardProgress { - ShardProgress::new() - .with_buffered(self.min_sequence_number) - .with_buffered(self.max_sequence_number) - } -} - -/// SnapshotBatch contains data of many contiguous BufferBatches -#[derive(Debug, PartialEq)] -pub struct SnapshotBatch { - /// Min sequence number of its combined BufferBatches - pub(crate) min_sequence_number: SequenceNumber, - /// Max sequence number of its combined BufferBatches - pub(crate) max_sequence_number: SequenceNumber, - /// Data of its combined BufferBatches kept in one RecordBatch - pub(crate) data: Arc, -} - -impl SnapshotBatch { - /// Return only data of the given columns - pub fn scan(&self, selection: Selection<'_>) -> Result>> { - Ok(match selection { - Selection::All => Some(Arc::clone(&self.data)), - Selection::Some(columns) => { - let schema = self.data.schema(); - - let indices = columns - .iter() - .filter_map(|&column_name| { - match schema.index_of(column_name) { - Ok(idx) => Some(idx), - _ => None, // this batch does not include data of this column_name - } - }) - .collect::>(); - if indices.is_empty() { - None - } else { - Some(Arc::new( - self.data.project(&indices).context(FilterColumnSnafu {})?, - )) - } - } - }) - } - - /// Return progress in this data - fn progress(&self) -> ShardProgress { - ShardProgress::new() - .with_buffered(self.min_sequence_number) - .with_buffered(self.max_sequence_number) - } -} - -/// PersistingBatch contains all needed info and data for creating -/// a parquet file for given set of SnapshotBatches -#[derive(Debug, PartialEq, Clone)] -pub struct PersistingBatch { - /// Shard id of the data - pub(crate) shard_id: ShardId, - - /// Table id of the data - pub(crate) table_id: TableId, - - /// Partition Id of the data - pub(crate) partition_id: PartitionId, - - /// Id of to-be-created parquet file of this data - pub(crate) object_store_id: Uuid, - - /// data - pub(crate) data: Arc, -} - -/// Queryable data used for both query and persistence -#[derive(Debug, PartialEq, Clone)] -pub struct QueryableBatch { - /// data - pub(crate) data: Vec>, - - /// Delete predicates of the tombstones - pub(crate) delete_predicates: Vec>, - - /// This is needed to return a reference for a trait function - pub(crate) table_name: String, - - /// Partition ID - pub(crate) partition_id: PartitionId, -} - -/// Status of a partition that has unpersisted data. -/// -/// Note that this structure is specific to a partition (which itself is bound to a table and -/// shard)! -#[derive(Debug, Clone, PartialEq, Eq)] -#[allow(missing_copy_implementations)] -pub struct PartitionStatus { - /// Max sequence number persisted - pub parquet_max_sequence_number: Option, - - /// Max sequence number for a tombstone - pub tombstone_max_sequence_number: Option, -} - /// Stream of snapshots. /// /// Every snapshot is a dedicated [`SendableRecordBatchStream`]. @@ -1689,17 +544,17 @@ pub enum FlatIngesterQueryResponse { #[cfg(test)] mod tests { - use super::*; - use crate::{ - lifecycle::{LifecycleConfig, LifecycleManager}, - test_util::create_tombstone, + use std::{ + ops::DerefMut, + task::{Context, Poll}, + time::Duration, }; + use arrow::datatypes::SchemaRef; - use arrow_util::assert_batches_sorted_eq; use assert_matches::assert_matches; use data_types::{ - ColumnId, ColumnSet, CompactionLevel, NamespaceSchema, NonEmptyString, ParquetFileParams, - Sequence, TimestampRange, + ColumnId, ColumnSet, CompactionLevel, DeletePredicate, NamespaceSchema, NonEmptyString, + ParquetFileParams, Sequence, Timestamp, TimestampRange, }; use datafusion::physical_plan::RecordBatchStream; use dml::{DmlDelete, DmlMeta, DmlWrite}; @@ -1709,84 +564,15 @@ mod tests { use metric::{MetricObserver, Observation}; use mutable_batch_lp::{lines_to_batches, test_helpers::lp_to_mutable_batch}; use object_store::memory::InMemory; - use std::{ - ops::DerefMut, - task::{Context, Poll}, - time::Duration, + use schema::selection::Selection; + use uuid::Uuid; + + use super::*; + use crate::{ + data::namespace::NamespaceData, + lifecycle::{LifecycleConfig, LifecycleManager}, }; - #[test] - fn snapshot_empty_buffer_adds_no_snapshots() { - let mut data_buffer = DataBuffer::default(); - - data_buffer.snapshot().unwrap(); - - assert!(data_buffer.snapshots.is_empty()); - } - - #[test] - fn snapshot_buffer_batch_moves_to_snapshots() { - let mut data_buffer = DataBuffer::default(); - - let seq_num1 = SequenceNumber::new(1); - let (_, mutable_batch1) = - lp_to_mutable_batch(r#"foo,t1=asdf iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#); - let buffer_batch1 = BufferBatch { - min_sequence_number: seq_num1, - max_sequence_number: seq_num1, - data: mutable_batch1, - }; - let record_batch1 = buffer_batch1.data.to_arrow(Selection::All).unwrap(); - data_buffer.buffer = Some(buffer_batch1); - - data_buffer.snapshot().unwrap(); - - assert!(data_buffer.buffer.is_none()); - assert_eq!(data_buffer.snapshots.len(), 1); - - let snapshot = &data_buffer.snapshots[0]; - assert_eq!(snapshot.min_sequence_number, seq_num1); - assert_eq!(snapshot.max_sequence_number, seq_num1); - assert_eq!(&*snapshot.data, &record_batch1); - } - - #[test] - fn snapshot_buffer_different_but_compatible_schemas() { - let mut partition_data = PartitionData { - id: PartitionId::new(1), - data: Default::default(), - }; - - let seq_num1 = SequenceNumber::new(1); - // Missing tag `t1` - let (_, mut mutable_batch1) = - lp_to_mutable_batch(r#"foo iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#); - partition_data - .buffer_write(seq_num1, mutable_batch1.clone()) - .unwrap(); - - let seq_num2 = SequenceNumber::new(2); - // Missing field `iv` - let (_, mutable_batch2) = - lp_to_mutable_batch(r#"foo,t1=aoeu uv=1u,fv=12.0,bv=false,sv="bye" 10000"#); - - partition_data - .buffer_write(seq_num2, mutable_batch2.clone()) - .unwrap(); - partition_data.data.snapshot().unwrap(); - - assert!(partition_data.data.buffer.is_none()); - assert_eq!(partition_data.data.snapshots.len(), 1); - - let snapshot = &partition_data.data.snapshots[0]; - assert_eq!(snapshot.min_sequence_number, seq_num1); - assert_eq!(snapshot.max_sequence_number, seq_num2); - - mutable_batch1.extend_from(&mutable_batch2).unwrap(); - let combined_record_batch = mutable_batch1.to_arrow(Selection::All).unwrap(); - assert_eq!(&*snapshot.data, &combined_record_batch); - } - #[tokio::test] async fn buffer_write_updates_lifecycle_manager_indicates_pause() { let metrics = Arc::new(metric::Registry::new()); @@ -1956,7 +742,7 @@ mod tests { assert!(n.table_data("mem").is_some()); let mem_table = mem_table.write().await; let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap(); - p.id + p.id() }; data.persist(partition_id).await; @@ -2104,8 +890,8 @@ mod tests { let mem_table = mem_table.write().await; let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap(); - table_id = mem_table.table_id; - partition_id = p.id; + table_id = mem_table.table_id(); + partition_id = p.id(); } { // verify the partition doesn't have a sort key before any data has been persisted @@ -2355,261 +1141,6 @@ mod tests { assert_progress(&data, shard_index, expected_progress).await; } - // Test deletes mixed with writes on a single parittion - #[tokio::test] - async fn writes_and_deletes() { - // Make a partition with empty DataBuffer - let s_id = 1; - let t_id = 1; - let p_id = 1; - let table_name = "restaurant"; - let mut p = PartitionData::new(PartitionId::new(p_id)); - let exec = Executor::new(1); - - // ------------------------------------------ - // Fill `buffer` - // --- seq_num: 1 - let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Boston day="fri",temp=50 10"#); - p.buffer_write(SequenceNumber::new(1), mb).unwrap(); - - // --- seq_num: 2 - let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Andover day="thu",temp=44 15"#); - - p.buffer_write(SequenceNumber::new(2), mb).unwrap(); - - // verify data - assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequence_number, - SequenceNumber::new(1) - ); - assert_eq!( - p.data.buffer.as_ref().unwrap().max_sequence_number, - SequenceNumber::new(2) - ); - assert_eq!(p.data.snapshots.len(), 0); - assert_eq!(p.data.deletes_during_persisting.len(), 0); - assert_eq!(p.data.persisting, None); - - // ------------------------------------------ - // Delete - // --- seq_num: 3 - let ts = create_tombstone( - 1, // tombstone id - t_id, // table id - s_id, // shard id - 3, // delete's seq_number - 0, // min time of data to get deleted - 20, // max time of data to get deleted - "day=thu", // delete predicate - ); - // one row will get deleted, the other is moved to snapshot - p.buffer_tombstone(&exec, "restaurant", ts).await; - - // verify data - assert!(p.data.buffer.is_none()); // always empty after delete - assert_eq!(p.data.snapshots.len(), 1); // one snpashot if there is data - assert_eq!(p.data.deletes_during_persisting.len(), 0); - assert_eq!(p.data.persisting, None); - // snapshot only has one row since the other one got deleted - let data = (*p.data.snapshots[0].data).clone(); - let expected = vec![ - "+--------+-----+------+--------------------------------+", - "| city | day | temp | time |", - "+--------+-----+------+--------------------------------+", - "| Boston | fri | 50 | 1970-01-01T00:00:00.000000010Z |", - "+--------+-----+------+--------------------------------+", - ]; - assert_batches_sorted_eq!(&expected, &[data]); - assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 1); - assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 3); - - // ------------------------------------------ - // Fill `buffer` - // --- seq_num: 4 - let (_, mb) = lp_to_mutable_batch( - r#" - restaurant,city=Medford day="sun",temp=55 22 - restaurant,city=Boston day="sun",temp=57 24 - "#, - ); - p.buffer_write(SequenceNumber::new(4), mb).unwrap(); - - // --- seq_num: 5 - let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Andover day="tue",temp=56 30"#); - - p.buffer_write(SequenceNumber::new(5), mb).unwrap(); - - // verify data - assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequence_number, - SequenceNumber::new(4) - ); - assert_eq!( - p.data.buffer.as_ref().unwrap().max_sequence_number, - SequenceNumber::new(5) - ); - assert_eq!(p.data.snapshots.len(), 1); // existing sanpshot - assert_eq!(p.data.deletes_during_persisting.len(), 0); - assert_eq!(p.data.persisting, None); - - // ------------------------------------------ - // Delete - // --- seq_num: 6 - let ts = create_tombstone( - 2, // tombstone id - t_id, // table id - s_id, // shard id - 6, // delete's seq_number - 10, // min time of data to get deleted - 50, // max time of data to get deleted - "city=Boston", // delete predicate - ); - // two rows will get deleted, one from existing snapshot, one from the buffer being moved - // to snpashot - p.buffer_tombstone(&exec, "restaurant", ts).await; - - // verify data - assert!(p.data.buffer.is_none()); // always empty after delete - assert_eq!(p.data.snapshots.len(), 1); // one snpashot - assert_eq!(p.data.deletes_during_persisting.len(), 0); - assert_eq!(p.data.persisting, None); - // snapshot only has two rows since the other 2 rows with city=Boston have got deleted - let data = (*p.data.snapshots[0].data).clone(); - let expected = vec![ - "+---------+-----+------+--------------------------------+", - "| city | day | temp | time |", - "+---------+-----+------+--------------------------------+", - "| Andover | tue | 56 | 1970-01-01T00:00:00.000000030Z |", - "| Medford | sun | 55 | 1970-01-01T00:00:00.000000022Z |", - "+---------+-----+------+--------------------------------+", - ]; - assert_batches_sorted_eq!(&expected, &[data]); - assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 1); - assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 6); - - // ------------------------------------------ - // Persisting - let p_batch = p - .snapshot_to_persisting_batch( - ShardId::new(s_id), - TableId::new(t_id), - PartitionId::new(p_id), - table_name, - ) - .unwrap(); - - // verify data - assert!(p.data.buffer.is_none()); // always empty after issuing persit - assert_eq!(p.data.snapshots.len(), 0); // always empty after issuing persit - assert_eq!(p.data.deletes_during_persisting.len(), 0); // deletes not happen yet - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); - - // ------------------------------------------ - // Delete - // --- seq_num: 7 - let ts = create_tombstone( - 3, // tombstone id - t_id, // table id - s_id, // shard id - 7, // delete's seq_number - 10, // min time of data to get deleted - 50, // max time of data to get deleted - "temp=55", // delete predicate - ); - // if a query come while persisting, the row with temp=55 will be deleted before - // data is sent back to Querier - p.buffer_tombstone(&exec, "restaurant", ts).await; - - // verify data - assert!(p.data.buffer.is_none()); // always empty after delete - // no snpashots becasue buffer has not data yet and the - // snapshot was empty too - assert_eq!(p.data.snapshots.len(), 0); - assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is - // persisting - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); - - // ------------------------------------------ - // Fill `buffer` - // --- seq_num: 8 - let (_, mb) = lp_to_mutable_batch( - r#" - restaurant,city=Wilmington day="sun",temp=55 35 - restaurant,city=Boston day="sun",temp=60 36 - restaurant,city=Boston day="sun",temp=62 38 - "#, - ); - p.buffer_write(SequenceNumber::new(8), mb).unwrap(); - - // verify data - assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequence_number, - SequenceNumber::new(8) - ); // 1 newly added mutable batch of 3 rows of data - assert_eq!(p.data.snapshots.len(), 0); // still empty - assert_eq!(p.data.deletes_during_persisting.len(), 1); - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); - - // ------------------------------------------ - // Take snapshot of the `buffer` - p.snapshot().unwrap(); - // verify data - assert!(p.data.buffer.is_none()); // empty after snapshot - assert_eq!(p.data.snapshots.len(), 1); // data moved from buffer - assert_eq!(p.data.deletes_during_persisting.len(), 1); - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); - // snapshot has three rows moved from buffer - let data = (*p.data.snapshots[0].data).clone(); - let expected = vec![ - "+------------+-----+------+--------------------------------+", - "| city | day | temp | time |", - "+------------+-----+------+--------------------------------+", - "| Wilmington | sun | 55 | 1970-01-01T00:00:00.000000035Z |", - "| Boston | sun | 60 | 1970-01-01T00:00:00.000000036Z |", - "| Boston | sun | 62 | 1970-01-01T00:00:00.000000038Z |", - "+------------+-----+------+--------------------------------+", - ]; - assert_batches_sorted_eq!(&expected, &[data]); - assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8); - assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 8); - - // ------------------------------------------ - // Delete - // --- seq_num: 9 - let ts = create_tombstone( - 4, // tombstone id - t_id, // table id - s_id, // shard id - 9, // delete's seq_number - 10, // min time of data to get deleted - 50, // max time of data to get deleted - "temp=60", // delete predicate - ); - // the row with temp=60 will be removed from the sanphot - p.buffer_tombstone(&exec, "restaurant", ts).await; - - // verify data - assert!(p.data.buffer.is_none()); // always empty after delete - assert_eq!(p.data.snapshots.len(), 1); // new snapshot of the existing with delete applied - assert_eq!(p.data.deletes_during_persisting.len(), 2); // one more tombstone added make it 2 - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); - // snapshot has only 2 rows because the row with tem=60 was removed - let data = (*p.data.snapshots[0].data).clone(); - let expected = vec![ - "+------------+-----+------+--------------------------------+", - "| city | day | temp | time |", - "+------------+-----+------+--------------------------------+", - "| Wilmington | sun | 55 | 1970-01-01T00:00:00.000000035Z |", - "| Boston | sun | 62 | 1970-01-01T00:00:00.000000038Z |", - "+------------+-----+------+--------------------------------+", - ]; - assert_batches_sorted_eq!(&expected, &[data]); - assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8); - assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 9); - - exec.join().await; - } - #[tokio::test] async fn buffer_operation_ignores_already_persisted_data() { let metrics = Arc::new(metric::Registry::new()); @@ -2773,7 +1304,7 @@ mod tests { SequenceNumber::new(2) ); - assert_matches!(data.table_count.observe(), Observation::U64Counter(v) => { + assert_matches!(data.table_count().observe(), Observation::U64Counter(v) => { assert_eq!(v, 1, "unexpected table count metric value"); }); } diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs new file mode 100644 index 0000000000..d63289ad93 --- /dev/null +++ b/ingester/src/data/namespace.rs @@ -0,0 +1,367 @@ +//! Namespace level data buffer structures. + +use std::{ + collections::{btree_map::Entry, BTreeMap}, + sync::Arc, +}; + +use data_types::{NamespaceId, PartitionInfo, PartitionKey, SequenceNumber, ShardId}; +use dml::DmlOperation; +use iox_catalog::interface::Catalog; +use iox_query::exec::Executor; +use metric::U64Counter; +use parking_lot::RwLock; +use snafu::{OptionExt, ResultExt}; +use write_summary::ShardProgress; + +#[cfg(test)] +use super::triggers::TestTriggers; +use super::{ + partition::{PersistingBatch, SnapshotBatch}, + table::TableData, +}; +use crate::lifecycle::LifecycleHandle; + +/// Data of a Namespace that belongs to a given Shard +#[derive(Debug)] +pub struct NamespaceData { + namespace_id: NamespaceId, + tables: RwLock>>>, + + table_count: U64Counter, + + /// The sequence number being actively written, if any. + /// + /// This is used to know when a sequence number is only partially + /// buffered for readability reporting. For example, in the + /// following diagram a write for SequenceNumber 10 is only + /// partially readable because it has been written into partitions + /// A and B but not yet C. The max buffered number on each + /// PartitionData is not sufficient to determine if the write is + /// complete. + /// + /// ```text + /// ╔═══════════════════════════════════════════════╗ + /// ║ ║ DML Operation (write) + /// ║ ┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓ ║ SequenceNumber = 10 + /// ║ ┃ Data for C ┃ Data for B ┃ Data for A ┃ ║ + /// ║ ┗━━━━━━━━━━━━━┻━━━━━━━━━━━━━┻━━━━━━━━━━━━━┛ ║ + /// ║ │ │ │ ║ + /// ╚═══════════════════════╬═════════════╬═════════╝ + /// │ │ │ ┌──────────────────────────────────┐ + /// │ │ │ Partition A │ + /// │ │ └──────────▶│ max buffered = 10 │ + /// │ └──────────────────────────────────┘ + /// │ │ + /// │ ┌──────────────────────────────────┐ + /// │ │ │ Partition B │ + /// └────────────────────────▶│ max buffered = 10 │ + /// │ └──────────────────────────────────┘ + /// + /// │ + /// ┌──────────────────────────────────┐ + /// │ │ Partition C │ + /// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶│ max buffered = 7 │ + /// └──────────────────────────────────┘ + /// Write is partially buffered. It has been + /// written to Partitions A and B, but not + /// yet written to Partition C + /// PartitionData + /// (Ingester state per partition) + ///``` + buffering_sequence_number: RwLock>, + + /// Control the flow of ingest, for testing purposes + #[cfg(test)] + pub(crate) test_triggers: TestTriggers, +} + +impl NamespaceData { + /// Initialize new tables with default partition template of daily + pub fn new(namespace_id: NamespaceId, metrics: &metric::Registry) -> Self { + let table_count = metrics + .register_metric::( + "ingester_tables_total", + "Number of tables known to the ingester", + ) + .recorder(&[]); + + Self { + namespace_id, + tables: Default::default(), + table_count, + buffering_sequence_number: RwLock::new(None), + #[cfg(test)] + test_triggers: TestTriggers::new(), + } + } + + /// Initialize new tables with data for testing purpose only + #[cfg(test)] + pub(crate) fn new_for_test( + namespace_id: NamespaceId, + tables: BTreeMap>>, + ) -> Self { + Self { + namespace_id, + tables: RwLock::new(tables), + table_count: Default::default(), + buffering_sequence_number: RwLock::new(None), + test_triggers: TestTriggers::new(), + } + } + + /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the + /// catalog. Returns true if ingest should be paused due to memory limits set in the passed + /// lifecycle manager. + pub async fn buffer_operation( + &self, + dml_operation: DmlOperation, + shard_id: ShardId, + catalog: &dyn Catalog, + lifecycle_handle: &dyn LifecycleHandle, + executor: &Executor, + ) -> Result { + let sequence_number = dml_operation + .meta() + .sequence() + .expect("must have sequence number") + .sequence_number; + + // Note that this namespace is actively writing this sequence + // number. Since there is no namespace wide lock held during a + // write, this number is used to detect and update reported + // progress during a write + let _sequence_number_guard = + ScopedSequenceNumber::new(sequence_number, &self.buffering_sequence_number); + + match dml_operation { + DmlOperation::Write(write) => { + let mut pause_writes = false; + + // Extract the partition key derived by the router. + let partition_key = write + .partition_key() + .expect("no partition key in dml write") + .clone(); + + for (t, b) in write.into_tables() { + let table_data = match self.table_data(&t) { + Some(t) => t, + None => self.insert_table(shard_id, &t, catalog).await?, + }; + + { + // lock scope + let mut table_data = table_data.write().await; + let should_pause = table_data + .buffer_table_write( + sequence_number, + b, + partition_key.clone(), + shard_id, + catalog, + lifecycle_handle, + ) + .await?; + pause_writes = pause_writes || should_pause; + } + #[cfg(test)] + self.test_triggers.on_write().await; + } + + Ok(pause_writes) + } + DmlOperation::Delete(delete) => { + let table_name = delete.table_name().context(super::TableNotPresentSnafu)?; + let table_data = match self.table_data(table_name) { + Some(t) => t, + None => self.insert_table(shard_id, table_name, catalog).await?, + }; + + let mut table_data = table_data.write().await; + + table_data + .buffer_delete( + table_name, + delete.predicate(), + shard_id, + sequence_number, + catalog, + executor, + ) + .await?; + + // don't pause writes since deletes don't count towards memory limits + Ok(false) + } + } + } + + /// Snapshots the mutable buffer for the partition, which clears it out and moves it over to + /// snapshots. Then return a vec of the snapshots and the optional persisting batch. + pub async fn snapshot( + &self, + table_name: &str, + partition_key: &PartitionKey, + ) -> Option<(Vec>, Option>)> { + if let Some(t) = self.table_data(table_name) { + let mut t = t.write().await; + + return t.partition_data.get_mut(partition_key).map(|p| { + p.data + .generate_snapshot() + .expect("snapshot on mutable batch should never fail"); + (p.data.snapshots.to_vec(), p.data.persisting.clone()) + }); + } + + None + } + + /// Snapshots the mutable buffer for the partition, which clears it out and then moves all + /// snapshots over to a persisting batch, which is returned. If there is no data to snapshot + /// or persist, None will be returned. + pub async fn snapshot_to_persisting( + &self, + partition_info: &PartitionInfo, + ) -> Option> { + if let Some(table_data) = self.table_data(&partition_info.table_name) { + let mut table_data = table_data.write().await; + + return table_data + .partition_data + .get_mut(&partition_info.partition.partition_key) + .and_then(|partition_data| { + partition_data.snapshot_to_persisting_batch( + partition_info.partition.shard_id, + partition_info.partition.table_id, + partition_info.partition.id, + &partition_info.table_name, + ) + }); + } + + None + } + + /// Gets the buffered table data + pub(crate) fn table_data( + &self, + table_name: &str, + ) -> Option>> { + let t = self.tables.read(); + t.get(table_name).cloned() + } + + /// Inserts the table or returns it if it happens to be inserted by some other thread + async fn insert_table( + &self, + shard_id: ShardId, + table_name: &str, + catalog: &dyn Catalog, + ) -> Result>, super::Error> { + let mut repos = catalog.repositories().await; + let info = repos + .tables() + .get_table_persist_info(shard_id, self.namespace_id, table_name) + .await + .context(super::CatalogSnafu)? + .context(super::TableNotFoundSnafu { table_name })?; + + let mut t = self.tables.write(); + + let data = match t.entry(table_name.to_string()) { + Entry::Vacant(v) => { + let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new( + info.table_id, + info.tombstone_max_sequence_number, + )))); + self.table_count.inc(1); + Arc::clone(v) + } + Entry::Occupied(v) => Arc::clone(v.get()), + }; + + Ok(data) + } + + /// Walks down the table and partition and clears the persisting batch. The sequence number is + /// the max_sequence_number for the persisted parquet file, which should be kept in the table + /// data buffer. + pub(crate) async fn mark_persisted( + &self, + table_name: &str, + partition_key: &PartitionKey, + sequence_number: SequenceNumber, + ) { + if let Some(t) = self.table_data(table_name) { + let mut t = t.write().await; + let partition = t.partition_data.get_mut(partition_key); + + if let Some(p) = partition { + p.data.mark_persisted(sequence_number); + } + } + } + + /// Return progress from this Namespace + pub(crate) async fn progress(&self) -> ShardProgress { + let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect(); + + // Consolidate progtress across partitions. + let mut progress = ShardProgress::new() + // Properly account for any sequence number that is + // actively buffering and thus not yet completely + // readable. + .actively_buffering(*self.buffering_sequence_number.read()); + + for table_data in tables { + progress = progress.combine(table_data.read().await.progress()) + } + progress + } + + /// Return the [`NamespaceId`] this [`NamespaceData`] belongs to. + pub fn namespace_id(&self) -> NamespaceId { + self.namespace_id + } + + #[cfg(test)] + pub fn table_count(&self) -> &U64Counter { + &self.table_count + } +} + +/// RAAI struct that sets buffering sequence number on creation and clears it on free +struct ScopedSequenceNumber<'a> { + sequence_number: SequenceNumber, + buffering_sequence_number: &'a RwLock>, +} + +impl<'a> ScopedSequenceNumber<'a> { + fn new( + sequence_number: SequenceNumber, + buffering_sequence_number: &'a RwLock>, + ) -> Self { + *buffering_sequence_number.write() = Some(sequence_number); + + Self { + sequence_number, + buffering_sequence_number, + } + } +} + +impl<'a> Drop for ScopedSequenceNumber<'a> { + fn drop(&mut self) { + // clear write on drop + let mut buffering_sequence_number = self.buffering_sequence_number.write(); + assert_eq!( + *buffering_sequence_number, + Some(self.sequence_number), + "multiple operations are being buffered concurrently" + ); + *buffering_sequence_number = None; + } +} diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs new file mode 100644 index 0000000000..32a991e9a9 --- /dev/null +++ b/ingester/src/data/partition.rs @@ -0,0 +1,566 @@ +//! Partition level data buffer structures. + +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use data_types::{PartitionId, SequenceNumber, ShardId, TableId, Tombstone}; +use iox_query::exec::Executor; +use mutable_batch::MutableBatch; +use schema::selection::Selection; +use snafu::ResultExt; +use uuid::Uuid; +use write_summary::ShardProgress; + +use self::buffer::{BufferBatch, DataBuffer}; +use crate::{data::query_dedup::query, query::QueryableBatch}; + +mod buffer; + +/// Read only copy of the unpersisted data for a partition in the ingester for a specific partition. +#[derive(Debug)] +pub(crate) struct UnpersistedPartitionData { + pub partition_id: PartitionId, + pub non_persisted: Vec>, + pub persisting: Option, + pub partition_status: PartitionStatus, +} + +/// Status of a partition that has unpersisted data. +/// +/// Note that this structure is specific to a partition (which itself is bound to a table and +/// shard)! +#[derive(Debug, Clone, PartialEq, Eq)] +#[allow(missing_copy_implementations)] +pub struct PartitionStatus { + /// Max sequence number persisted + pub parquet_max_sequence_number: Option, + + /// Max sequence number for a tombstone + pub tombstone_max_sequence_number: Option, +} + +/// PersistingBatch contains all needed info and data for creating +/// a parquet file for given set of SnapshotBatches +#[derive(Debug, PartialEq, Clone)] +pub struct PersistingBatch { + /// Shard id of the data + pub(crate) shard_id: ShardId, + + /// Table id of the data + pub(crate) table_id: TableId, + + /// Partition Id of the data + pub(crate) partition_id: PartitionId, + + /// Id of to-be-created parquet file of this data + pub(crate) object_store_id: Uuid, + + /// data + pub(crate) data: Arc, +} + +/// SnapshotBatch contains data of many contiguous BufferBatches +#[derive(Debug, PartialEq)] +pub struct SnapshotBatch { + /// Min sequence number of its combined BufferBatches + pub(crate) min_sequence_number: SequenceNumber, + /// Max sequence number of its combined BufferBatches + pub(crate) max_sequence_number: SequenceNumber, + /// Data of its combined BufferBatches kept in one RecordBatch + pub(crate) data: Arc, +} + +impl SnapshotBatch { + /// Return only data of the given columns + pub fn scan(&self, selection: Selection<'_>) -> Result>, super::Error> { + Ok(match selection { + Selection::All => Some(Arc::clone(&self.data)), + Selection::Some(columns) => { + let schema = self.data.schema(); + + let indices = columns + .iter() + .filter_map(|&column_name| { + match schema.index_of(column_name) { + Ok(idx) => Some(idx), + _ => None, // this batch does not include data of this column_name + } + }) + .collect::>(); + if indices.is_empty() { + None + } else { + Some(Arc::new( + self.data + .project(&indices) + .context(super::FilterColumnSnafu {})?, + )) + } + } + }) + } + + /// Return progress in this data + fn progress(&self) -> ShardProgress { + ShardProgress::new() + .with_buffered(self.min_sequence_number) + .with_buffered(self.max_sequence_number) + } +} + +/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard +#[derive(Debug)] +pub(crate) struct PartitionData { + id: PartitionId, + pub(crate) data: DataBuffer, +} + +impl PartitionData { + /// Initialize a new partition data buffer + pub fn new(id: PartitionId) -> Self { + Self { + id, + data: Default::default(), + } + } + + /// Snapshot anything in the buffer and move all snapshot data into a persisting batch + pub fn snapshot_to_persisting_batch( + &mut self, + shard_id: ShardId, + table_id: TableId, + partition_id: PartitionId, + table_name: &str, + ) -> Option> { + self.data + .snapshot_to_persisting(shard_id, table_id, partition_id, table_name) + } + + /// Snapshot whatever is in the buffer and return a new vec of the + /// arc cloned snapshots + #[allow(dead_code)] // Used in tests + pub fn snapshot(&mut self) -> Result>, super::Error> { + self.data + .generate_snapshot() + .context(super::SnapshotSnafu)?; + Ok(self.data.get_snapshots().to_vec()) + } + + /// Return non persisting data + pub fn get_non_persisting_data(&self) -> Result>, super::Error> { + self.data.buffer_and_snapshots() + } + + /// Return persisting data + pub fn get_persisting_data(&self) -> Option { + self.data.get_persisting_data() + } + + /// Write the given mb in the buffer + pub(crate) fn buffer_write( + &mut self, + sequence_number: SequenceNumber, + mb: MutableBatch, + ) -> Result<(), super::Error> { + match &mut self.data.buffer { + Some(buf) => { + buf.max_sequence_number = sequence_number.max(buf.max_sequence_number); + buf.data.extend_from(&mb).context(super::BufferWriteSnafu)?; + } + None => { + self.data.buffer = Some(BufferBatch { + min_sequence_number: sequence_number, + max_sequence_number: sequence_number, + data: mb, + }) + } + } + + Ok(()) + } + + /// Buffers a new tombstone: + /// . All the data in the `buffer` and `snapshots` will be replaced with one + /// tombstone-applied snapshot + /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` + /// exists + pub(crate) async fn buffer_tombstone( + &mut self, + executor: &Executor, + table_name: &str, + tombstone: Tombstone, + ) { + self.data.add_tombstone(tombstone.clone()); + + // ---------------------------------------------------------- + // First apply the tombstone on all in-memory & non-persisting data + // Make a QueryableBatch for all buffer + snapshots + the given tombstone + let max_sequence_number = tombstone.sequence_number; + let query_batch = match self.data.snapshot_to_queryable_batch( + table_name, + self.id, + Some(tombstone.clone()), + ) { + Some(query_batch) if !query_batch.is_empty() => query_batch, + _ => { + // No need to proceed further + return; + } + }; + + let (min_sequence_number, _) = query_batch.min_max_sequence_numbers(); + assert!(min_sequence_number <= max_sequence_number); + + // Run query on the QueryableBatch to apply the tombstone. + let stream = match query(executor, Arc::new(query_batch)).await { + Err(e) => { + // this should never error out. if it does, we need to crash hard so + // someone can take a look. + panic!("unable to apply tombstones on snapshots: {:?}", e); + } + Ok(stream) => stream, + }; + let record_batches = match datafusion::physical_plan::common::collect(stream).await { + Err(e) => { + // this should never error out. if it does, we need to crash hard so + // someone can take a look. + panic!("unable to collect record batches: {:?}", e); + } + Ok(batches) => batches, + }; + + // Merge all result record batches into one record batch + // and make a snapshot for it + let snapshot = if !record_batches.is_empty() { + let record_batch = RecordBatch::concat(&record_batches[0].schema(), &record_batches) + .unwrap_or_else(|e| { + panic!("unable to concat record batches: {:?}", e); + }); + let snapshot = SnapshotBatch { + min_sequence_number, + max_sequence_number, + data: Arc::new(record_batch), + }; + + Some(Arc::new(snapshot)) + } else { + None + }; + + // ---------------------------------------------------------- + // Add the tombstone-applied data back in as one snapshot + if let Some(snapshot) = snapshot { + self.data.snapshots.push(snapshot); + } + } + + /// Return the progress from this Partition + pub(crate) fn progress(&self) -> ShardProgress { + self.data.progress() + } + + pub(crate) fn id(&self) -> PartitionId { + self.id + } +} + +#[cfg(test)] +mod tests { + use arrow_util::assert_batches_sorted_eq; + use mutable_batch_lp::test_helpers::lp_to_mutable_batch; + + use super::*; + use crate::test_util::create_tombstone; + + #[test] + fn snapshot_buffer_different_but_compatible_schemas() { + let mut partition_data = PartitionData { + id: PartitionId::new(1), + data: Default::default(), + }; + + let seq_num1 = SequenceNumber::new(1); + // Missing tag `t1` + let (_, mut mutable_batch1) = + lp_to_mutable_batch(r#"foo iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#); + partition_data + .buffer_write(seq_num1, mutable_batch1.clone()) + .unwrap(); + + let seq_num2 = SequenceNumber::new(2); + // Missing field `iv` + let (_, mutable_batch2) = + lp_to_mutable_batch(r#"foo,t1=aoeu uv=1u,fv=12.0,bv=false,sv="bye" 10000"#); + + partition_data + .buffer_write(seq_num2, mutable_batch2.clone()) + .unwrap(); + partition_data.data.generate_snapshot().unwrap(); + + assert!(partition_data.data.buffer.is_none()); + assert_eq!(partition_data.data.snapshots.len(), 1); + + let snapshot = &partition_data.data.snapshots[0]; + assert_eq!(snapshot.min_sequence_number, seq_num1); + assert_eq!(snapshot.max_sequence_number, seq_num2); + + mutable_batch1.extend_from(&mutable_batch2).unwrap(); + let combined_record_batch = mutable_batch1.to_arrow(Selection::All).unwrap(); + assert_eq!(&*snapshot.data, &combined_record_batch); + } + + // Test deletes mixed with writes on a single parittion + #[tokio::test] + async fn writes_and_deletes() { + // Make a partition with empty DataBuffer + let s_id = 1; + let t_id = 1; + let p_id = 1; + let table_name = "restaurant"; + let mut p = PartitionData::new(PartitionId::new(p_id)); + let exec = Executor::new(1); + + // ------------------------------------------ + // Fill `buffer` + // --- seq_num: 1 + let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Boston day="fri",temp=50 10"#); + p.buffer_write(SequenceNumber::new(1), mb).unwrap(); + + // --- seq_num: 2 + let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Andover day="thu",temp=44 15"#); + + p.buffer_write(SequenceNumber::new(2), mb).unwrap(); + + // verify data + assert_eq!( + p.data.buffer.as_ref().unwrap().min_sequence_number, + SequenceNumber::new(1) + ); + assert_eq!( + p.data.buffer.as_ref().unwrap().max_sequence_number, + SequenceNumber::new(2) + ); + assert_eq!(p.data.snapshots.len(), 0); + assert_eq!(p.data.deletes_during_persisting().len(), 0); + assert_eq!(p.data.persisting, None); + + // ------------------------------------------ + // Delete + // --- seq_num: 3 + let ts = create_tombstone( + 1, // tombstone id + t_id, // table id + s_id, // shard id + 3, // delete's seq_number + 0, // min time of data to get deleted + 20, // max time of data to get deleted + "day=thu", // delete predicate + ); + // one row will get deleted, the other is moved to snapshot + p.buffer_tombstone(&exec, "restaurant", ts).await; + + // verify data + assert!(p.data.buffer.is_none()); // always empty after delete + assert_eq!(p.data.snapshots.len(), 1); // one snpashot if there is data + assert_eq!(p.data.deletes_during_persisting().len(), 0); + assert_eq!(p.data.persisting, None); + // snapshot only has one row since the other one got deleted + let data = (*p.data.snapshots[0].data).clone(); + let expected = vec![ + "+--------+-----+------+--------------------------------+", + "| city | day | temp | time |", + "+--------+-----+------+--------------------------------+", + "| Boston | fri | 50 | 1970-01-01T00:00:00.000000010Z |", + "+--------+-----+------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &[data]); + assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 1); + assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 3); + + // ------------------------------------------ + // Fill `buffer` + // --- seq_num: 4 + let (_, mb) = lp_to_mutable_batch( + r#" + restaurant,city=Medford day="sun",temp=55 22 + restaurant,city=Boston day="sun",temp=57 24 + "#, + ); + p.buffer_write(SequenceNumber::new(4), mb).unwrap(); + + // --- seq_num: 5 + let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Andover day="tue",temp=56 30"#); + + p.buffer_write(SequenceNumber::new(5), mb).unwrap(); + + // verify data + assert_eq!( + p.data.buffer.as_ref().unwrap().min_sequence_number, + SequenceNumber::new(4) + ); + assert_eq!( + p.data.buffer.as_ref().unwrap().max_sequence_number, + SequenceNumber::new(5) + ); + assert_eq!(p.data.snapshots.len(), 1); // existing sanpshot + assert_eq!(p.data.deletes_during_persisting().len(), 0); + assert_eq!(p.data.persisting, None); + + // ------------------------------------------ + // Delete + // --- seq_num: 6 + let ts = create_tombstone( + 2, // tombstone id + t_id, // table id + s_id, // shard id + 6, // delete's seq_number + 10, // min time of data to get deleted + 50, // max time of data to get deleted + "city=Boston", // delete predicate + ); + // two rows will get deleted, one from existing snapshot, one from the buffer being moved + // to snpashot + p.buffer_tombstone(&exec, "restaurant", ts).await; + + // verify data + assert!(p.data.buffer.is_none()); // always empty after delete + assert_eq!(p.data.snapshots.len(), 1); // one snpashot + assert_eq!(p.data.deletes_during_persisting().len(), 0); + assert_eq!(p.data.persisting, None); + // snapshot only has two rows since the other 2 rows with city=Boston have got deleted + let data = (*p.data.snapshots[0].data).clone(); + let expected = vec![ + "+---------+-----+------+--------------------------------+", + "| city | day | temp | time |", + "+---------+-----+------+--------------------------------+", + "| Andover | tue | 56 | 1970-01-01T00:00:00.000000030Z |", + "| Medford | sun | 55 | 1970-01-01T00:00:00.000000022Z |", + "+---------+-----+------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &[data]); + assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 1); + assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 6); + + // ------------------------------------------ + // Persisting + let p_batch = p + .snapshot_to_persisting_batch( + ShardId::new(s_id), + TableId::new(t_id), + PartitionId::new(p_id), + table_name, + ) + .unwrap(); + + // verify data + assert!(p.data.buffer.is_none()); // always empty after issuing persit + assert_eq!(p.data.snapshots.len(), 0); // always empty after issuing persit + assert_eq!(p.data.deletes_during_persisting().len(), 0); // deletes not happen yet + assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); + + // ------------------------------------------ + // Delete + // --- seq_num: 7 + let ts = create_tombstone( + 3, // tombstone id + t_id, // table id + s_id, // shard id + 7, // delete's seq_number + 10, // min time of data to get deleted + 50, // max time of data to get deleted + "temp=55", // delete predicate + ); + // if a query come while persisting, the row with temp=55 will be deleted before + // data is sent back to Querier + p.buffer_tombstone(&exec, "restaurant", ts).await; + + // verify data + assert!(p.data.buffer.is_none()); // always empty after delete + // no snpashots becasue buffer has not data yet and the + // snapshot was empty too + assert_eq!(p.data.snapshots.len(), 0); + assert_eq!(p.data.deletes_during_persisting().len(), 1); // tombstone added since data is + // persisting + assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); + + // ------------------------------------------ + // Fill `buffer` + // --- seq_num: 8 + let (_, mb) = lp_to_mutable_batch( + r#" + restaurant,city=Wilmington day="sun",temp=55 35 + restaurant,city=Boston day="sun",temp=60 36 + restaurant,city=Boston day="sun",temp=62 38 + "#, + ); + p.buffer_write(SequenceNumber::new(8), mb).unwrap(); + + // verify data + assert_eq!( + p.data.buffer.as_ref().unwrap().min_sequence_number, + SequenceNumber::new(8) + ); // 1 newly added mutable batch of 3 rows of data + assert_eq!(p.data.snapshots.len(), 0); // still empty + assert_eq!(p.data.deletes_during_persisting().len(), 1); + assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); + + // ------------------------------------------ + // Take snapshot of the `buffer` + p.snapshot().unwrap(); + // verify data + assert!(p.data.buffer.is_none()); // empty after snapshot + assert_eq!(p.data.snapshots.len(), 1); // data moved from buffer + assert_eq!(p.data.deletes_during_persisting().len(), 1); + assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); + // snapshot has three rows moved from buffer + let data = (*p.data.snapshots[0].data).clone(); + let expected = vec![ + "+------------+-----+------+--------------------------------+", + "| city | day | temp | time |", + "+------------+-----+------+--------------------------------+", + "| Wilmington | sun | 55 | 1970-01-01T00:00:00.000000035Z |", + "| Boston | sun | 60 | 1970-01-01T00:00:00.000000036Z |", + "| Boston | sun | 62 | 1970-01-01T00:00:00.000000038Z |", + "+------------+-----+------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &[data]); + assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8); + assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 8); + + // ------------------------------------------ + // Delete + // --- seq_num: 9 + let ts = create_tombstone( + 4, // tombstone id + t_id, // table id + s_id, // shard id + 9, // delete's seq_number + 10, // min time of data to get deleted + 50, // max time of data to get deleted + "temp=60", // delete predicate + ); + // the row with temp=60 will be removed from the sanphot + p.buffer_tombstone(&exec, "restaurant", ts).await; + + // verify data + assert!(p.data.buffer.is_none()); // always empty after delete + assert_eq!(p.data.snapshots.len(), 1); // new snapshot of the existing with delete applied + assert_eq!(p.data.deletes_during_persisting().len(), 2); // one more tombstone added make it 2 + assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); + // snapshot has only 2 rows because the row with tem=60 was removed + let data = (*p.data.snapshots[0].data).clone(); + let expected = vec![ + "+------------+-----+------+--------------------------------+", + "| city | day | temp | time |", + "+------------+-----+------+--------------------------------+", + "| Wilmington | sun | 55 | 1970-01-01T00:00:00.000000035Z |", + "| Boston | sun | 62 | 1970-01-01T00:00:00.000000038Z |", + "+------------+-----+------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &[data]); + assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8); + assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 9); + + exec.join().await; + } +} diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs new file mode 100644 index 0000000000..9de62192ea --- /dev/null +++ b/ingester/src/data/partition/buffer.rs @@ -0,0 +1,319 @@ +//! Data for the lifecycle of the Ingester + +use std::sync::Arc; + +use data_types::{PartitionId, SequenceNumber, ShardId, TableId, Tombstone}; +use mutable_batch::MutableBatch; +use schema::selection::Selection; +use snafu::ResultExt; +use uuid::Uuid; +use write_summary::ShardProgress; + +use super::{PersistingBatch, QueryableBatch, SnapshotBatch}; + +/// Data of an IOx partition split into batches +/// ┌────────────────────────┐ ┌────────────────────────┐ ┌─────────────────────────┐ +/// │ Buffer │ │ Snapshots │ │ Persisting │ +/// │ ┌───────────────────┐ │ │ │ │ │ +/// │ │ ┌───────────────┐│ │ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │ +/// │ │ ┌┴──────────────┐│├─┼────────┼─┼─▶┌───────────────┐│ │ │ │ ┌───────────────┐│ │ +/// │ │┌┴──────────────┐├┘│ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │ +/// │ ││ BufferBatch ├┘ │ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │ +/// │ │└───────────────┘ │ │ ┌───┼─▶│ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │ +/// │ └───────────────────┘ │ │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │ +/// │ ... │ │ │ └───────────────────┘ │ │ └───────────────────┘ │ +/// │ ┌───────────────────┐ │ │ │ │ │ │ +/// │ │ ┌───────────────┐│ │ │ │ ... │ │ ... │ +/// │ │ ┌┴──────────────┐││ │ │ │ │ │ │ +/// │ │┌┴──────────────┐├┘│─┼────┘ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │ +/// │ ││ BufferBatch ├┘ │ │ │ │ ┌───────────────┐│ │ │ │ ┌───────────────┐│ │ +/// │ │└───────────────┘ │ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │ +/// │ └───────────────────┘ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │ +/// │ │ │ ││ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │ +/// │ ... │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │ +/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │ +/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘ +#[derive(Debug, Default)] +pub(crate) struct DataBuffer { + /// Buffer of incoming writes + pub(crate) buffer: Option, + + /// The max_persisted_sequence number for any parquet_file in this partition + pub(crate) max_persisted_sequence_number: Option, + + /// Buffer of tombstones whose time range may overlap with this partition. + /// All tombstones were already applied to corresponding snapshots. This list + /// only keep the ones that come during persisting. The reason + /// we keep them becasue if a query comes, we need to apply these tombstones + /// on the persiting data before sending it to the Querier + /// When the `persiting` is done and removed, this list will get empty, too + deletes_during_persisting: Vec, + + /// Data in `buffer` will be moved to a `snapshot` when one of these happens: + /// . A background persist is called + /// . A read request from Querier + /// The `buffer` will be empty when this happens. + pub(crate) snapshots: Vec>, + /// When a persist is called, data in `buffer` will be moved to a `snapshot` + /// and then all `snapshots` will be moved to a `persisting`. + /// Both `buffer` and 'snaphots` will be empty when this happens. + pub(crate) persisting: Option>, + // Extra Notes: + // . In MVP, we will only persist a set of snapshots at a time. + // In later version, multiple persisting operations may be happening concurrently but + // their persisted info must be added into the Catalog in their data + // ingesting order. + // . When a read request comes from a Querier, all data from `snapshots` + // and `persisting` must be sent to the Querier. + // . After the `persisting` data is persisted and successfully added + // into the Catalog, it will be removed from this Data Buffer. + // This data might be added into an extra cache to serve up to + // Queriers that may not have loaded the parquet files from object + // storage yet. But this will be decided after MVP. +} + +impl DataBuffer { + /// Add a new tombstones into the [`DataBuffer`]. + pub(super) fn add_tombstone(&mut self, tombstone: Tombstone) { + // Only keep this tombstone if some data is being persisted + if self.persisting.is_some() { + self.deletes_during_persisting.push(tombstone); + } + } + + /// If a [`BufferBatch`] exists, convert it to a [`SnapshotBatch`] and add + /// it to the list of snapshots. + /// + /// Does nothing if there is no [`BufferBatch`]. + pub(crate) fn generate_snapshot(&mut self) -> Result<(), mutable_batch::Error> { + let snapshot = self.copy_buffer_to_snapshot()?; + if let Some(snapshot) = snapshot { + self.snapshots.push(snapshot); + self.buffer = None; + } + + Ok(()) + } + + /// Returns snapshot of the buffer but keeps data in the buffer + fn copy_buffer_to_snapshot(&self) -> Result>, mutable_batch::Error> { + if let Some(buf) = &self.buffer { + return Ok(Some(Arc::new(SnapshotBatch { + min_sequence_number: buf.min_sequence_number, + max_sequence_number: buf.max_sequence_number, + data: Arc::new(buf.data.to_arrow(Selection::All)?), + }))); + } + + Ok(None) + } + + /// Snapshots the buffer and make a QueryableBatch for all the snapshots + /// Both buffer and snapshots will be empty after this + pub(super) fn snapshot_to_queryable_batch( + &mut self, + table_name: &str, + partition_id: PartitionId, + tombstone: Option, + ) -> Option { + self.generate_snapshot() + .expect("This mutable batch snapshot error should be impossible."); + + let mut data = vec![]; + std::mem::swap(&mut data, &mut self.snapshots); + + let mut tombstones = vec![]; + if let Some(tombstone) = tombstone { + tombstones.push(tombstone); + } + + // only produce batch if there is any data + if data.is_empty() { + None + } else { + Some(QueryableBatch::new( + table_name, + partition_id, + data, + tombstones, + )) + } + } + + /// Returns all existing snapshots plus data in the buffer + /// This only read data. Data in the buffer will be kept in the buffer + pub(super) fn buffer_and_snapshots( + &self, + ) -> Result>, crate::data::Error> { + // Existing snapshots + let mut snapshots = self.snapshots.clone(); + + // copy the buffer to a snapshot + let buffer_snapshot = self + .copy_buffer_to_snapshot() + .context(crate::data::BufferToSnapshotSnafu)?; + snapshots.extend(buffer_snapshot); + + Ok(snapshots) + } + + /// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. + /// + /// # Panic + /// + /// Panics if there is already a persisting batch. + pub(super) fn snapshot_to_persisting( + &mut self, + shard_id: ShardId, + table_id: TableId, + partition_id: PartitionId, + table_name: &str, + ) -> Option> { + if self.persisting.is_some() { + panic!("Unable to snapshot while persisting. This is an unexpected state.") + } + + if let Some(queryable_batch) = + self.snapshot_to_queryable_batch(table_name, partition_id, None) + { + let persisting_batch = Arc::new(PersistingBatch { + shard_id, + table_id, + partition_id, + object_store_id: Uuid::new_v4(), + data: Arc::new(queryable_batch), + }); + + self.persisting = Some(Arc::clone(&persisting_batch)); + + Some(persisting_batch) + } else { + None + } + } + + /// Return a QueryableBatch of the persisting batch after applying new tombstones + pub(super) fn get_persisting_data(&self) -> Option { + let persisting = match &self.persisting { + Some(p) => p, + None => return None, + }; + + // persisting data + let mut queryable_batch = (*persisting.data).clone(); + + // Add new tombstones if any + queryable_batch.add_tombstones(&self.deletes_during_persisting); + + Some(queryable_batch) + } + + /// Return the progress in this DataBuffer + pub(super) fn progress(&self) -> ShardProgress { + let progress = ShardProgress::new(); + + let progress = if let Some(buffer) = &self.buffer { + progress.combine(buffer.progress()) + } else { + progress + }; + + let progress = self.snapshots.iter().fold(progress, |progress, snapshot| { + progress.combine(snapshot.progress()) + }); + + if let Some(persisting) = &self.persisting { + persisting + .data + .data + .iter() + .fold(progress, |progress, snapshot| { + progress.combine(snapshot.progress()) + }) + } else { + progress + } + } + + pub(super) fn get_snapshots(&self) -> &[Arc] { + self.snapshots.as_ref() + } + + pub(crate) fn mark_persisted(&mut self, up_to: SequenceNumber) { + self.max_persisted_sequence_number = Some(up_to); + self.persisting = None; + self.deletes_during_persisting.clear() + } + + pub(crate) fn max_persisted_sequence_number(&self) -> Option { + self.max_persisted_sequence_number + } + + #[cfg(test)] + pub(super) fn deletes_during_persisting(&self) -> &[Tombstone] { + self.deletes_during_persisting.as_ref() + } +} + +/// BufferBatch is a MutableBatch with its ingesting order, sequence_number, that helps the +/// ingester keep the batches of data in their ingesting order +#[derive(Debug)] +pub struct BufferBatch { + /// Sequence number of the first write in this batch + pub(crate) min_sequence_number: SequenceNumber, + /// Sequence number of the last write in this batch + pub(crate) max_sequence_number: SequenceNumber, + /// Ingesting data + pub(crate) data: MutableBatch, +} + +impl BufferBatch { + /// Return the progress in this DataBuffer + fn progress(&self) -> ShardProgress { + ShardProgress::new() + .with_buffered(self.min_sequence_number) + .with_buffered(self.max_sequence_number) + } +} + +#[cfg(test)] +mod tests { + use mutable_batch_lp::test_helpers::lp_to_mutable_batch; + + use super::*; + + #[test] + fn snapshot_empty_buffer_adds_no_snapshots() { + let mut data_buffer = DataBuffer::default(); + + data_buffer.generate_snapshot().unwrap(); + + assert!(data_buffer.snapshots.is_empty()); + } + + #[test] + fn snapshot_buffer_batch_moves_to_snapshots() { + let mut data_buffer = DataBuffer::default(); + + let seq_num1 = SequenceNumber::new(1); + let (_, mutable_batch1) = + lp_to_mutable_batch(r#"foo,t1=asdf iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#); + let buffer_batch1 = BufferBatch { + min_sequence_number: seq_num1, + max_sequence_number: seq_num1, + data: mutable_batch1, + }; + let record_batch1 = buffer_batch1.data.to_arrow(Selection::All).unwrap(); + data_buffer.buffer = Some(buffer_batch1); + + data_buffer.generate_snapshot().unwrap(); + + assert!(data_buffer.buffer.is_none()); + assert_eq!(data_buffer.snapshots.len(), 1); + + let snapshot = &data_buffer.snapshots[0]; + assert_eq!(snapshot.min_sequence_number, seq_num1); + assert_eq!(snapshot.max_sequence_number, seq_num1); + assert_eq!(&*snapshot.data, &record_batch1); + } +} diff --git a/ingester/src/data/query_dedup.rs b/ingester/src/data/query_dedup.rs index 17e1fecd48..2fb37364ea 100644 --- a/ingester/src/data/query_dedup.rs +++ b/ingester/src/data/query_dedup.rs @@ -8,7 +8,7 @@ use iox_query::{ use observability_deps::tracing::debug; use snafu::{ResultExt, Snafu}; -use super::QueryableBatch; +use crate::query::QueryableBatch; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -88,13 +88,12 @@ pub async fn query( mod tests { use arrow_util::assert_batches_eq; + use super::*; use crate::test_util::{ create_one_record_batch_with_influxtype_no_duplicates, create_tombstone, make_queryable_batch, make_queryable_batch_with_deletes, }; - use super::*; - #[tokio::test] async fn test_query() { test_helpers::maybe_start_logging(); diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs new file mode 100644 index 0000000000..d69a062137 --- /dev/null +++ b/ingester/src/data/shard.rs @@ -0,0 +1,141 @@ +//! Shard level data buffer structures. + +use std::{ + collections::{btree_map::Entry, BTreeMap}, + sync::Arc, +}; + +use data_types::{ShardId, ShardIndex}; +use dml::DmlOperation; +use iox_catalog::interface::Catalog; +use iox_query::exec::Executor; +use metric::U64Counter; +use parking_lot::RwLock; +use snafu::{OptionExt, ResultExt}; +use write_summary::ShardProgress; + +use super::namespace::NamespaceData; +use crate::lifecycle::LifecycleHandle; + +/// Data of a Shard +#[derive(Debug)] +pub struct ShardData { + /// The shard index for this shard + shard_index: ShardIndex, + + // New namespaces can come in at any time so we need to be able to add new ones + namespaces: RwLock>>, + + metrics: Arc, + namespace_count: U64Counter, +} + +impl ShardData { + /// Initialise a new [`ShardData`] that emits metrics to `metrics`. + pub fn new(shard_index: ShardIndex, metrics: Arc) -> Self { + let namespace_count = metrics + .register_metric::( + "ingester_namespaces_total", + "Number of namespaces known to the ingester", + ) + .recorder(&[]); + + Self { + shard_index, + namespaces: Default::default(), + metrics, + namespace_count, + } + } + + /// Initialize new ShardData with namespace for testing purpose only + #[cfg(test)] + pub fn new_for_test( + shard_index: ShardIndex, + namespaces: BTreeMap>, + ) -> Self { + Self { + shard_index, + namespaces: RwLock::new(namespaces), + metrics: Default::default(), + namespace_count: Default::default(), + } + } + + /// Store the write or delete in the shard. Deletes will + /// be written into the catalog before getting stored in the buffer. + /// Any writes that create new IOx partitions will have those records + /// created in the catalog before putting into the buffer. + pub async fn buffer_operation( + &self, + dml_operation: DmlOperation, + shard_id: ShardId, + catalog: &dyn Catalog, + lifecycle_handle: &dyn LifecycleHandle, + executor: &Executor, + ) -> Result { + let namespace_data = match self.namespace(dml_operation.namespace()) { + Some(d) => d, + None => { + self.insert_namespace(dml_operation.namespace(), catalog) + .await? + } + }; + + namespace_data + .buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor) + .await + } + + /// Gets the namespace data out of the map + pub fn namespace(&self, namespace: &str) -> Option> { + let n = self.namespaces.read(); + n.get(namespace).cloned() + } + + /// Retrieves the namespace from the catalog and initializes an empty buffer, or + /// retrieves the buffer if some other caller gets it first + async fn insert_namespace( + &self, + namespace: &str, + catalog: &dyn Catalog, + ) -> Result, super::Error> { + let mut repos = catalog.repositories().await; + let namespace = repos + .namespaces() + .get_by_name(namespace) + .await + .context(super::CatalogSnafu)? + .context(super::NamespaceNotFoundSnafu { namespace })?; + + let mut n = self.namespaces.write(); + + let data = match n.entry(namespace.name) { + Entry::Vacant(v) => { + let v = v.insert(Arc::new(NamespaceData::new(namespace.id, &*self.metrics))); + self.namespace_count.inc(1); + Arc::clone(v) + } + Entry::Occupied(v) => Arc::clone(v.get()), + }; + + Ok(data) + } + + /// Return the progress of this shard + pub(crate) async fn progress(&self) -> ShardProgress { + let namespaces: Vec<_> = self.namespaces.read().values().map(Arc::clone).collect(); + + let mut progress = ShardProgress::new(); + + for namespace_data in namespaces { + progress = progress.combine(namespace_data.progress().await); + } + progress + } + + /// Return the [`ShardIndex`] this [`ShardData`] is buffering for. + pub fn shard_index(&self) -> ShardIndex { + self.shard_index + } +} diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs new file mode 100644 index 0000000000..c84857c805 --- /dev/null +++ b/ingester/src/data/table.rs @@ -0,0 +1,208 @@ +//! Table level data buffer structures. + +use std::collections::BTreeMap; + +use data_types::{DeletePredicate, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp}; +use iox_catalog::interface::Catalog; +use iox_query::exec::Executor; +use mutable_batch::MutableBatch; +use snafu::ResultExt; +use write_summary::ShardProgress; + +use super::partition::{PartitionData, PartitionStatus, UnpersistedPartitionData}; +use crate::lifecycle::LifecycleHandle; + +/// Data of a Table in a given Namesapce that belongs to a given Shard +#[derive(Debug)] +pub(crate) struct TableData { + table_id: TableId, + // the max sequence number for a tombstone associated with this table + tombstone_max_sequence_number: Option, + // Map pf partition key to its data + pub(super) partition_data: BTreeMap, +} + +impl TableData { + /// Initialize new table buffer + pub fn new(table_id: TableId, tombstone_max_sequence_number: Option) -> Self { + Self { + table_id, + tombstone_max_sequence_number, + partition_data: Default::default(), + } + } + + /// Initialize new table buffer for testing purpose only + #[cfg(test)] + pub fn new_for_test( + table_id: TableId, + tombstone_max_sequence_number: Option, + partitions: BTreeMap, + ) -> Self { + Self { + table_id, + tombstone_max_sequence_number, + partition_data: partitions, + } + } + + /// Return parquet_max_sequence_number + pub fn parquet_max_sequence_number(&self) -> Option { + self.partition_data + .values() + .map(|p| p.data.max_persisted_sequence_number()) + .max() + .flatten() + } + + /// Return tombstone_max_sequence_number + #[allow(dead_code)] // Used in tests + pub fn tombstone_max_sequence_number(&self) -> Option { + self.tombstone_max_sequence_number + } + + // buffers the table write and returns true if the lifecycle manager indicates that + // ingest should be paused. + pub(super) async fn buffer_table_write( + &mut self, + sequence_number: SequenceNumber, + batch: MutableBatch, + partition_key: PartitionKey, + shard_id: ShardId, + catalog: &dyn Catalog, + lifecycle_handle: &dyn LifecycleHandle, + ) -> Result { + let partition_data = match self.partition_data.get_mut(&partition_key) { + Some(p) => p, + None => { + self.insert_partition(partition_key.clone(), shard_id, catalog) + .await?; + self.partition_data.get_mut(&partition_key).unwrap() + } + }; + + // skip the write if it has already been persisted + if let Some(max) = partition_data.data.max_persisted_sequence_number() { + if max >= sequence_number { + return Ok(false); + } + } + + let should_pause = lifecycle_handle.log_write( + partition_data.id(), + shard_id, + sequence_number, + batch.size(), + batch.rows(), + ); + partition_data.buffer_write(sequence_number, batch)?; + + Ok(should_pause) + } + + pub(super) async fn buffer_delete( + &mut self, + table_name: &str, + predicate: &DeletePredicate, + shard_id: ShardId, + sequence_number: SequenceNumber, + catalog: &dyn Catalog, + executor: &Executor, + ) -> Result<(), super::Error> { + let min_time = Timestamp::new(predicate.range.start()); + let max_time = Timestamp::new(predicate.range.end()); + + let mut repos = catalog.repositories().await; + let tombstone = repos + .tombstones() + .create_or_get( + self.table_id, + shard_id, + sequence_number, + min_time, + max_time, + &predicate.expr_sql_string(), + ) + .await + .context(super::CatalogSnafu)?; + + // remember "persisted" state + self.tombstone_max_sequence_number = Some(sequence_number); + + // modify one partition at a time + for data in self.partition_data.values_mut() { + data.buffer_tombstone(executor, table_name, tombstone.clone()) + .await; + } + + Ok(()) + } + + pub fn unpersisted_partition_data(&self) -> Vec { + self.partition_data + .values() + .map(|p| UnpersistedPartitionData { + partition_id: p.id(), + non_persisted: p + .get_non_persisting_data() + .expect("get_non_persisting should always work"), + persisting: p.get_persisting_data(), + partition_status: PartitionStatus { + parquet_max_sequence_number: p.data.max_persisted_sequence_number(), + tombstone_max_sequence_number: self.tombstone_max_sequence_number, + }, + }) + .collect() + } + + async fn insert_partition( + &mut self, + partition_key: PartitionKey, + shard_id: ShardId, + catalog: &dyn Catalog, + ) -> Result<(), super::Error> { + let mut repos = catalog.repositories().await; + let partition = repos + .partitions() + .create_or_get(partition_key, shard_id, self.table_id) + .await + .context(super::CatalogSnafu)?; + + // get info on the persisted parquet files to use later for replay or for snapshot + // information on query. + let files = repos + .parquet_files() + .list_by_partition_not_to_delete(partition.id) + .await + .context(super::CatalogSnafu)?; + // for now we just need the max persisted + let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max(); + + let mut data = PartitionData::new(partition.id); + data.data.max_persisted_sequence_number = max_persisted_sequence_number; + + self.partition_data.insert(partition.partition_key, data); + + Ok(()) + } + + /// Return progress from this Table + pub(crate) fn progress(&self) -> ShardProgress { + let progress = ShardProgress::new(); + let progress = match self.parquet_max_sequence_number() { + Some(n) => progress.with_persisted(n), + None => progress, + }; + + self.partition_data + .values() + .fold(progress, |progress, partition_data| { + progress.combine(partition_data.progress()) + }) + } + + #[cfg(test)] + pub(crate) fn table_id(&self) -> TableId { + self.table_id + } +} diff --git a/ingester/src/data/triggers/test.rs b/ingester/src/data/triggers/test.rs index f83eceb9b6..588438e0b8 100644 --- a/ingester/src/data/triggers/test.rs +++ b/ingester/src/data/triggers/test.rs @@ -1,4 +1,5 @@ use std::sync::Arc; + use tokio::sync::{Barrier, Mutex}; #[derive(Debug, Clone)] diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index e3acc9333f..a8a7308150 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -1,15 +1,7 @@ //! Ingest handler -use crate::{ - data::{IngesterData, IngesterQueryResponse, ShardData}, - lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, - poison::PoisonCabinet, - querier_handler::prepare_data_to_querier, - stream_handler::{ - sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation, - PeriodicWatermarkFetcher, SequencedStreamHandler, - }, -}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + use async_trait::async_trait; use backoff::BackoffConfig; use data_types::{Shard, ShardIndex, TopicMetadata}; @@ -26,7 +18,6 @@ use metric::{DurationHistogram, Metric, U64Counter}; use object_store::DynObjectStore; use observability_deps::tracing::*; use snafu::{ResultExt, Snafu}; -use std::{collections::BTreeMap, sync::Arc, time::Duration}; use tokio::{ sync::{Semaphore, TryAcquireError}, task::{JoinError, JoinHandle}, @@ -35,6 +26,17 @@ use tokio_util::sync::CancellationToken; use write_buffer::core::WriteBufferReading; use write_summary::ShardProgress; +use crate::{ + data::{shard::ShardData, IngesterData, IngesterQueryResponse}, + lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, + poison::PoisonCabinet, + querier_handler::prepare_data_to_querier, + stream_handler::{ + sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation, + PeriodicWatermarkFetcher, SequencedStreamHandler, + }, +}; + #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] pub enum Error { @@ -382,8 +384,8 @@ impl Drop for IngestHandlerImpl { #[cfg(test)] mod tests { - use super::*; - use crate::data::SnapshotBatch; + use std::{num::NonZeroU32, ops::DerefMut}; + use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlWrite}; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; @@ -391,10 +393,12 @@ mod tests { use metric::{Attributes, Metric, U64Counter, U64Gauge}; use mutable_batch_lp::lines_to_batches; use object_store::memory::InMemory; - use std::{num::NonZeroU32, ops::DerefMut}; use test_helpers::maybe_start_logging; use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; + use super::*; + use crate::data::partition::SnapshotBatch; + #[tokio::test] async fn read_from_write_buffer_write_to_mutable_buffer() { let ingester = TestIngester::new().await; @@ -764,8 +768,7 @@ mod tests { verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| { if first_batch.min_sequence_number == SequenceNumber::new(1) { panic!( - "initialization did a seek to the beginning rather than \ - the min_unpersisted" + "initialization did a seek to the beginning rather than the min_unpersisted" ); } }) diff --git a/ingester/src/job.rs b/ingester/src/job.rs index 6b20226f23..ffec98838b 100644 --- a/ingester/src/job.rs +++ b/ingester/src/job.rs @@ -1,7 +1,8 @@ +use std::sync::Arc; + use data_types::PartitionId; use iox_time::TimeProvider; use parking_lot::Mutex; -use std::sync::Arc; use tracker::{ AbstractTaskRegistry, TaskRegistration, TaskRegistry, TaskRegistryWithHistory, TaskRegistryWithMetrics, TaskTracker, diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 75338b5e02..63a274cc4e 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -5,20 +5,22 @@ //! some absolute number and individual Parquet files that get persisted below some number. It //! is expected that they may be above or below the absolute thresholds. -use crate::{ - data::Persister, - job::{Job, JobRegistry}, - poison::{PoisonCabinet, PoisonPill}, -}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + use data_types::{PartitionId, SequenceNumber, ShardId}; use iox_time::{Time, TimeProvider}; use metric::{Metric, U64Counter}; use observability_deps::tracing::{error, info, warn}; use parking_lot::Mutex; -use std::{collections::BTreeMap, sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; use tracker::TrackedFutureExt; +use crate::{ + data::Persister, + job::{Job, JobRegistry}, + poison::{PoisonCabinet, PoisonPill}, +}; + /// API suitable for ingester tasks to query and update the [`LifecycleManager`] state. pub trait LifecycleHandle: Send + Sync + 'static { /// Logs bytes written into a partition so that it can be tracked for the manager to @@ -566,13 +568,15 @@ pub(crate) async fn run_lifecycle_manager( #[cfg(test)] mod tests { - use super::*; + use std::collections::BTreeSet; + use async_trait::async_trait; use iox_time::MockProvider; use metric::{Attributes, Registry}; - use std::collections::BTreeSet; use tokio::sync::Barrier; + use super::*; + #[derive(Default)] struct TestPersister { persist_called: Mutex>, diff --git a/ingester/src/poison.rs b/ingester/src/poison.rs index 5180e07150..9fdd2733d2 100644 --- a/ingester/src/poison.rs +++ b/ingester/src/poison.rs @@ -1,12 +1,13 @@ -use data_types::ShardIndex; -use futures::Future; -use parking_lot::{RwLock, RwLockUpgradableReadGuard}; -use pin_project::pin_project; use std::{ sync::Arc, task::{Poll, Waker}, }; +use data_types::ShardIndex; +use futures::Future; +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; +use pin_project::pin_project; + #[derive(Debug, Clone, PartialEq, Eq)] #[allow(dead_code)] pub enum PoisonPill { diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 552f9eb65b..c8e9d37129 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -1,9 +1,7 @@ //! Handle all requests from Querier -use crate::data::{ - IngesterData, IngesterQueryPartition, IngesterQueryResponse, QueryableBatch, - UnpersistedPartitionData, -}; +use std::sync::Arc; + use arrow::error::ArrowError; use datafusion::{ error::DataFusionError, logical_plan::LogicalPlanBuilder, @@ -19,7 +17,14 @@ use observability_deps::tracing::debug; use predicate::Predicate; use schema::selection::Selection; use snafu::{ensure, ResultExt, Snafu}; -use std::sync::Arc; + +use crate::{ + data::{ + partition::UnpersistedPartitionData, IngesterData, IngesterQueryPartition, + IngesterQueryResponse, + }, + query::QueryableBatch, +}; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -282,6 +287,13 @@ pub(crate) async fn query( #[cfg(test)] mod tests { + use arrow::record_batch::RecordBatch; + use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; + use assert_matches::assert_matches; + use datafusion::logical_plan::{col, lit}; + use futures::TryStreamExt; + use predicate::Predicate; + use super::*; use crate::{ data::FlatIngesterQueryResponse, @@ -291,12 +303,6 @@ mod tests { make_queryable_batch_with_deletes, DataLocation, TEST_NAMESPACE, TEST_TABLE, }, }; - use arrow::record_batch::RecordBatch; - use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; - use assert_matches::assert_matches; - use datafusion::logical_plan::{col, lit}; - use futures::TryStreamExt; - use predicate::Predicate; #[tokio::test] async fn test_query() { diff --git a/ingester/src/query.rs b/ingester/src/query.rs index d801e19c02..ff735d96ca 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -1,6 +1,7 @@ //! Module to handle query on Ingester's data -use crate::data::{QueryableBatch, SnapshotBatch}; +use std::{any::Any, sync::Arc}; + use arrow::record_batch::RecordBatch; use arrow_util::util::ensure_schema; use data_types::{ @@ -23,7 +24,8 @@ use predicate::{ }; use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema}; use snafu::{ResultExt, Snafu}; -use std::{any::Any, sync::Arc}; + +use crate::data::partition::SnapshotBatch; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] @@ -45,6 +47,22 @@ pub enum Error { /// A specialized `Error` for Ingester's Query errors pub type Result = std::result::Result; +/// Queryable data used for both query and persistence +#[derive(Debug, PartialEq, Clone)] +pub struct QueryableBatch { + /// data + pub(crate) data: Vec>, + + /// Delete predicates of the tombstones + pub(crate) delete_predicates: Vec>, + + /// This is needed to return a reference for a trait function + pub(crate) table_name: String, + + /// Partition ID + pub(crate) partition_id: PartitionId, +} + impl QueryableBatch { /// Initilaize a QueryableBatch pub fn new( @@ -242,8 +260,6 @@ impl QueryChunk for QueryableBatch { #[cfg(test)] mod tests { - use super::*; - use crate::test_util::create_tombstone; use arrow::{ array::{ ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, @@ -253,6 +269,9 @@ mod tests { }; use data_types::{DeleteExpr, Op, Scalar, TimestampRange}; + use super::*; + use crate::test_util::create_tombstone; + #[tokio::test] async fn test_merge_batch_schema() { // Merge schema of the batches diff --git a/ingester/src/server.rs b/ingester/src/server.rs index 387619043f..c05395881c 100644 --- a/ingester/src/server.rs +++ b/ingester/src/server.rs @@ -1,10 +1,9 @@ //! Ingester server entrypoint. -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use self::{grpc::GrpcDelegate, http::HttpDelegate}; use crate::handler::IngestHandler; -use std::fmt::Debug; pub mod grpc; pub mod http; diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index ad018a6397..4f06a93a46 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -1,9 +1,14 @@ //! gRPC service implementations for `ingester`. -use crate::{ - data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream}, - handler::IngestHandler, +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + task::Poll, }; + use arrow::error::ArrowError; use arrow_flight::{ flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, @@ -20,18 +25,15 @@ use observability_deps::tracing::{debug, info, warn}; use pin_project::pin_project; use prost::Message; use snafu::{ResultExt, Snafu}; -use std::{ - pin::Pin, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - task::Poll, -}; use tonic::{Request, Response, Streaming}; use trace::ctx::SpanContext; use write_summary::WriteSummary; +use crate::{ + data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream}, + handler::IngestHandler, +}; + /// This type is responsible for managing all gRPC services exposed by /// `ingester`. #[derive(Debug, Default)] @@ -465,9 +467,8 @@ mod tests { use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use schema::selection::Selection; - use crate::data::PartitionStatus; - use super::*; + use crate::data::partition::PartitionStatus; #[tokio::test] async fn test_get_stream_empty() { diff --git a/ingester/src/server/http.rs b/ingester/src/server/http.rs index 0630365e9c..ea48a8d6b2 100644 --- a/ingester/src/server/http.rs +++ b/ingester/src/server/http.rs @@ -1,10 +1,12 @@ //! HTTP service implementations for `ingester`. -use crate::handler::IngestHandler; -use hyper::{Body, Request, Response, StatusCode}; use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; use thiserror::Error; +use crate::handler::IngestHandler; + /// Errors returned by the `router` HTTP request handler. #[derive(Debug, Error, Copy, Clone)] pub enum Error { diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 40e34992e7..c998c49bcb 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -1,15 +1,17 @@ -use super::DmlSink; -use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl}; +use std::{fmt::Debug, time::Duration}; + use data_types::{SequenceNumber, ShardIndex}; use dml::DmlOperation; use futures::{pin_mut, FutureExt, StreamExt}; use iox_time::{SystemProvider, TimeProvider}; use metric::{Attributes, DurationCounter, DurationGauge, U64Counter}; use observability_deps::tracing::*; -use std::{fmt::Debug, time::Duration}; use tokio_util::sync::CancellationToken; use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler}; +use super::DmlSink; +use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl}; + /// When the [`LifecycleManager`] indicates that ingest should be paused because /// of memory pressure, the shard will loop, sleeping this long between /// calls to [`LifecycleHandle::can_resume_ingest()`] with the manager if it @@ -89,10 +91,13 @@ impl SequencedStreamHandler { skip_to_oldest_available: bool, ) -> Self { // TTBR - let time_to_be_readable = metrics.register_metric::( - "ingester_ttbr", - "duration of time between producer writing to consumer putting into queryable cache", - ).recorder(metric_attrs(shard_index, &topic_name, None, false)); + let time_to_be_readable = metrics + .register_metric::( + "ingester_ttbr", + "duration of time between producer writing to consumer putting into queryable \ + cache", + ) + .recorder(metric_attrs(shard_index, &topic_name, None, false)); // Lifecycle-driven ingest pause duration let pause_duration = metrics @@ -461,11 +466,8 @@ fn metric_attrs( #[cfg(test)] mod tests { - use super::*; - use crate::{ - lifecycle::{LifecycleConfig, LifecycleManager}, - stream_handler::mock_sink::MockDmlSink, - }; + use std::sync::Arc; + use assert_matches::assert_matches; use async_trait::async_trait; use data_types::{DeletePredicate, Sequence, TimestampRange}; @@ -475,12 +477,17 @@ mod tests { use metric::Metric; use mutable_batch_lp::lines_to_batches; use once_cell::sync::Lazy; - use std::sync::Arc; use test_helpers::timeout::FutureTimeout; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; use write_buffer::core::WriteBufferError; + use super::*; + use crate::{ + lifecycle::{LifecycleConfig, LifecycleManager}, + stream_handler::mock_sink::MockDmlSink, + }; + static TEST_TIME: Lazy