Merge pull request #5634 from influxdata/dom/split-data

refactor(ingester): split data.rs into submodules
pull/24376/head
Dom 2022-09-14 13:44:01 +01:00 committed by GitHub
commit 9ed931271a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1819 additions and 1617 deletions

View File

@ -1,6 +1,7 @@
//! This module is responsible for compacting Ingester's data
use crate::data::{PersistingBatch, QueryableBatch};
use std::sync::Arc;
use data_types::{CompactionLevel, NamespaceId, PartitionInfo};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_query::{
@ -12,7 +13,8 @@ use iox_time::TimeProvider;
use parquet_file::metadata::IoxMetadata;
use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use crate::{data::partition::PersistingBatch, query::QueryableBatch};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
@ -28,7 +30,13 @@ pub enum Error {
#[snafu(display("Error while executing Ingester's compaction"))]
ExecutePlan { source: DataFusionError },
#[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))]
#[snafu(display(
"Error while building delete predicate from start time, {}, stop time, {}, and serialized \
predicate, {}",
min,
max,
predicate
))]
DeletePredicate {
source: predicate::delete_predicate::Error,
min: String,
@ -169,6 +177,13 @@ pub async fn compact(
#[cfg(test)]
mod tests {
use arrow_util::assert_batches_eq;
use data_types::{Partition, PartitionId, ShardId, TableId};
use iox_time::SystemProvider;
use mutable_batch_lp::lines_to_batches;
use schema::selection::Selection;
use uuid::Uuid;
use super::*;
use crate::test_util::{
create_batches_with_influxtype, create_batches_with_influxtype_different_cardinality,
@ -180,12 +195,6 @@ mod tests {
create_one_row_record_batch_with_influxtype, create_tombstone, make_meta,
make_persisting_batch, make_queryable_batch, make_queryable_batch_with_deletes,
};
use arrow_util::assert_batches_eq;
use data_types::{Partition, PartitionId, ShardId, TableId};
use iox_time::SystemProvider;
use mutable_batch_lp::lines_to_batches;
use schema::selection::Selection;
use uuid::Uuid;
// this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782
// where if sending in a single row it would compact into an output of two batches, one of

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,367 @@
//! Namespace level data buffer structures.
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};
use data_types::{NamespaceId, PartitionInfo, PartitionKey, SequenceNumber, ShardId};
use dml::DmlOperation;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use metric::U64Counter;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use write_summary::ShardProgress;
#[cfg(test)]
use super::triggers::TestTriggers;
use super::{
partition::{PersistingBatch, SnapshotBatch},
table::TableData,
};
use crate::lifecycle::LifecycleHandle;
/// Data of a Namespace that belongs to a given Shard
#[derive(Debug)]
pub struct NamespaceData {
namespace_id: NamespaceId,
tables: RwLock<BTreeMap<String, Arc<tokio::sync::RwLock<TableData>>>>,
table_count: U64Counter,
/// The sequence number being actively written, if any.
///
/// This is used to know when a sequence number is only partially
/// buffered for readability reporting. For example, in the
/// following diagram a write for SequenceNumber 10 is only
/// partially readable because it has been written into partitions
/// A and B but not yet C. The max buffered number on each
/// PartitionData is not sufficient to determine if the write is
/// complete.
///
/// ```text
/// ╔═══════════════════════════════════════════════╗
/// ║ ║ DML Operation (write)
/// ║ ┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓ ║ SequenceNumber = 10
/// ║ ┃ Data for C ┃ Data for B ┃ Data for A ┃ ║
/// ║ ┗━━━━━━━━━━━━━┻━━━━━━━━━━━━━┻━━━━━━━━━━━━━┛ ║
/// ║ │ │ │ ║
/// ╚═══════════════════════╬═════════════╬═════════╝
/// │ │ │ ┌──────────────────────────────────┐
/// │ │ │ Partition A │
/// │ │ └──────────▶│ max buffered = 10 │
/// │ └──────────────────────────────────┘
/// │ │
/// │ ┌──────────────────────────────────┐
/// │ │ │ Partition B │
/// └────────────────────────▶│ max buffered = 10 │
/// │ └──────────────────────────────────┘
///
/// │
/// ┌──────────────────────────────────┐
/// │ │ Partition C │
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶│ max buffered = 7 │
/// └──────────────────────────────────┘
/// Write is partially buffered. It has been
/// written to Partitions A and B, but not
/// yet written to Partition C
/// PartitionData
/// (Ingester state per partition)
///```
buffering_sequence_number: RwLock<Option<SequenceNumber>>,
/// Control the flow of ingest, for testing purposes
#[cfg(test)]
pub(crate) test_triggers: TestTriggers,
}
impl NamespaceData {
/// Initialize new tables with default partition template of daily
pub fn new(namespace_id: NamespaceId, metrics: &metric::Registry) -> Self {
let table_count = metrics
.register_metric::<U64Counter>(
"ingester_tables_total",
"Number of tables known to the ingester",
)
.recorder(&[]);
Self {
namespace_id,
tables: Default::default(),
table_count,
buffering_sequence_number: RwLock::new(None),
#[cfg(test)]
test_triggers: TestTriggers::new(),
}
}
/// Initialize new tables with data for testing purpose only
#[cfg(test)]
pub(crate) fn new_for_test(
namespace_id: NamespaceId,
tables: BTreeMap<String, Arc<tokio::sync::RwLock<TableData>>>,
) -> Self {
Self {
namespace_id,
tables: RwLock::new(tables),
table_count: Default::default(),
buffering_sequence_number: RwLock::new(None),
test_triggers: TestTriggers::new(),
}
}
/// Buffer the operation in the cache, adding any new partitions or delete tombstones to the
/// catalog. Returns true if ingest should be paused due to memory limits set in the passed
/// lifecycle manager.
pub async fn buffer_operation(
&self,
dml_operation: DmlOperation,
shard_id: ShardId,
catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle,
executor: &Executor,
) -> Result<bool, super::Error> {
let sequence_number = dml_operation
.meta()
.sequence()
.expect("must have sequence number")
.sequence_number;
// Note that this namespace is actively writing this sequence
// number. Since there is no namespace wide lock held during a
// write, this number is used to detect and update reported
// progress during a write
let _sequence_number_guard =
ScopedSequenceNumber::new(sequence_number, &self.buffering_sequence_number);
match dml_operation {
DmlOperation::Write(write) => {
let mut pause_writes = false;
// Extract the partition key derived by the router.
let partition_key = write
.partition_key()
.expect("no partition key in dml write")
.clone();
for (t, b) in write.into_tables() {
let table_data = match self.table_data(&t) {
Some(t) => t,
None => self.insert_table(shard_id, &t, catalog).await?,
};
{
// lock scope
let mut table_data = table_data.write().await;
let should_pause = table_data
.buffer_table_write(
sequence_number,
b,
partition_key.clone(),
shard_id,
catalog,
lifecycle_handle,
)
.await?;
pause_writes = pause_writes || should_pause;
}
#[cfg(test)]
self.test_triggers.on_write().await;
}
Ok(pause_writes)
}
DmlOperation::Delete(delete) => {
let table_name = delete.table_name().context(super::TableNotPresentSnafu)?;
let table_data = match self.table_data(table_name) {
Some(t) => t,
None => self.insert_table(shard_id, table_name, catalog).await?,
};
let mut table_data = table_data.write().await;
table_data
.buffer_delete(
table_name,
delete.predicate(),
shard_id,
sequence_number,
catalog,
executor,
)
.await?;
// don't pause writes since deletes don't count towards memory limits
Ok(false)
}
}
}
/// Snapshots the mutable buffer for the partition, which clears it out and moves it over to
/// snapshots. Then return a vec of the snapshots and the optional persisting batch.
pub async fn snapshot(
&self,
table_name: &str,
partition_key: &PartitionKey,
) -> Option<(Vec<Arc<SnapshotBatch>>, Option<Arc<PersistingBatch>>)> {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
return t.partition_data.get_mut(partition_key).map(|p| {
p.data
.generate_snapshot()
.expect("snapshot on mutable batch should never fail");
(p.data.snapshots.to_vec(), p.data.persisting.clone())
});
}
None
}
/// Snapshots the mutable buffer for the partition, which clears it out and then moves all
/// snapshots over to a persisting batch, which is returned. If there is no data to snapshot
/// or persist, None will be returned.
pub async fn snapshot_to_persisting(
&self,
partition_info: &PartitionInfo,
) -> Option<Arc<PersistingBatch>> {
if let Some(table_data) = self.table_data(&partition_info.table_name) {
let mut table_data = table_data.write().await;
return table_data
.partition_data
.get_mut(&partition_info.partition.partition_key)
.and_then(|partition_data| {
partition_data.snapshot_to_persisting_batch(
partition_info.partition.shard_id,
partition_info.partition.table_id,
partition_info.partition.id,
&partition_info.table_name,
)
});
}
None
}
/// Gets the buffered table data
pub(crate) fn table_data(
&self,
table_name: &str,
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
let t = self.tables.read();
t.get(table_name).cloned()
}
/// Inserts the table or returns it if it happens to be inserted by some other thread
async fn insert_table(
&self,
shard_id: ShardId,
table_name: &str,
catalog: &dyn Catalog,
) -> Result<Arc<tokio::sync::RwLock<TableData>>, super::Error> {
let mut repos = catalog.repositories().await;
let info = repos
.tables()
.get_table_persist_info(shard_id, self.namespace_id, table_name)
.await
.context(super::CatalogSnafu)?
.context(super::TableNotFoundSnafu { table_name })?;
let mut t = self.tables.write();
let data = match t.entry(table_name.to_string()) {
Entry::Vacant(v) => {
let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new(
info.table_id,
info.tombstone_max_sequence_number,
))));
self.table_count.inc(1);
Arc::clone(v)
}
Entry::Occupied(v) => Arc::clone(v.get()),
};
Ok(data)
}
/// Walks down the table and partition and clears the persisting batch. The sequence number is
/// the max_sequence_number for the persisted parquet file, which should be kept in the table
/// data buffer.
pub(crate) async fn mark_persisted(
&self,
table_name: &str,
partition_key: &PartitionKey,
sequence_number: SequenceNumber,
) {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
let partition = t.partition_data.get_mut(partition_key);
if let Some(p) = partition {
p.data.mark_persisted(sequence_number);
}
}
}
/// Return progress from this Namespace
pub(crate) async fn progress(&self) -> ShardProgress {
let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect();
// Consolidate progtress across partitions.
let mut progress = ShardProgress::new()
// Properly account for any sequence number that is
// actively buffering and thus not yet completely
// readable.
.actively_buffering(*self.buffering_sequence_number.read());
for table_data in tables {
progress = progress.combine(table_data.read().await.progress())
}
progress
}
/// Return the [`NamespaceId`] this [`NamespaceData`] belongs to.
pub fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
#[cfg(test)]
pub fn table_count(&self) -> &U64Counter {
&self.table_count
}
}
/// RAAI struct that sets buffering sequence number on creation and clears it on free
struct ScopedSequenceNumber<'a> {
sequence_number: SequenceNumber,
buffering_sequence_number: &'a RwLock<Option<SequenceNumber>>,
}
impl<'a> ScopedSequenceNumber<'a> {
fn new(
sequence_number: SequenceNumber,
buffering_sequence_number: &'a RwLock<Option<SequenceNumber>>,
) -> Self {
*buffering_sequence_number.write() = Some(sequence_number);
Self {
sequence_number,
buffering_sequence_number,
}
}
}
impl<'a> Drop for ScopedSequenceNumber<'a> {
fn drop(&mut self) {
// clear write on drop
let mut buffering_sequence_number = self.buffering_sequence_number.write();
assert_eq!(
*buffering_sequence_number,
Some(self.sequence_number),
"multiple operations are being buffered concurrently"
);
*buffering_sequence_number = None;
}
}

View File

@ -0,0 +1,566 @@
//! Partition level data buffer structures.
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use data_types::{PartitionId, SequenceNumber, ShardId, TableId, Tombstone};
use iox_query::exec::Executor;
use mutable_batch::MutableBatch;
use schema::selection::Selection;
use snafu::ResultExt;
use uuid::Uuid;
use write_summary::ShardProgress;
use self::buffer::{BufferBatch, DataBuffer};
use crate::{data::query_dedup::query, query::QueryableBatch};
mod buffer;
/// Read only copy of the unpersisted data for a partition in the ingester for a specific partition.
#[derive(Debug)]
pub(crate) struct UnpersistedPartitionData {
pub partition_id: PartitionId,
pub non_persisted: Vec<Arc<SnapshotBatch>>,
pub persisting: Option<QueryableBatch>,
pub partition_status: PartitionStatus,
}
/// Status of a partition that has unpersisted data.
///
/// Note that this structure is specific to a partition (which itself is bound to a table and
/// shard)!
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(missing_copy_implementations)]
pub struct PartitionStatus {
/// Max sequence number persisted
pub parquet_max_sequence_number: Option<SequenceNumber>,
/// Max sequence number for a tombstone
pub tombstone_max_sequence_number: Option<SequenceNumber>,
}
/// PersistingBatch contains all needed info and data for creating
/// a parquet file for given set of SnapshotBatches
#[derive(Debug, PartialEq, Clone)]
pub struct PersistingBatch {
/// Shard id of the data
pub(crate) shard_id: ShardId,
/// Table id of the data
pub(crate) table_id: TableId,
/// Partition Id of the data
pub(crate) partition_id: PartitionId,
/// Id of to-be-created parquet file of this data
pub(crate) object_store_id: Uuid,
/// data
pub(crate) data: Arc<QueryableBatch>,
}
/// SnapshotBatch contains data of many contiguous BufferBatches
#[derive(Debug, PartialEq)]
pub struct SnapshotBatch {
/// Min sequence number of its combined BufferBatches
pub(crate) min_sequence_number: SequenceNumber,
/// Max sequence number of its combined BufferBatches
pub(crate) max_sequence_number: SequenceNumber,
/// Data of its combined BufferBatches kept in one RecordBatch
pub(crate) data: Arc<RecordBatch>,
}
impl SnapshotBatch {
/// Return only data of the given columns
pub fn scan(&self, selection: Selection<'_>) -> Result<Option<Arc<RecordBatch>>, super::Error> {
Ok(match selection {
Selection::All => Some(Arc::clone(&self.data)),
Selection::Some(columns) => {
let schema = self.data.schema();
let indices = columns
.iter()
.filter_map(|&column_name| {
match schema.index_of(column_name) {
Ok(idx) => Some(idx),
_ => None, // this batch does not include data of this column_name
}
})
.collect::<Vec<_>>();
if indices.is_empty() {
None
} else {
Some(Arc::new(
self.data
.project(&indices)
.context(super::FilterColumnSnafu {})?,
))
}
}
})
}
/// Return progress in this data
fn progress(&self) -> ShardProgress {
ShardProgress::new()
.with_buffered(self.min_sequence_number)
.with_buffered(self.max_sequence_number)
}
}
/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct PartitionData {
id: PartitionId,
pub(crate) data: DataBuffer,
}
impl PartitionData {
/// Initialize a new partition data buffer
pub fn new(id: PartitionId) -> Self {
Self {
id,
data: Default::default(),
}
}
/// Snapshot anything in the buffer and move all snapshot data into a persisting batch
pub fn snapshot_to_persisting_batch(
&mut self,
shard_id: ShardId,
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Option<Arc<PersistingBatch>> {
self.data
.snapshot_to_persisting(shard_id, table_id, partition_id, table_name)
}
/// Snapshot whatever is in the buffer and return a new vec of the
/// arc cloned snapshots
#[allow(dead_code)] // Used in tests
pub fn snapshot(&mut self) -> Result<Vec<Arc<SnapshotBatch>>, super::Error> {
self.data
.generate_snapshot()
.context(super::SnapshotSnafu)?;
Ok(self.data.get_snapshots().to_vec())
}
/// Return non persisting data
pub fn get_non_persisting_data(&self) -> Result<Vec<Arc<SnapshotBatch>>, super::Error> {
self.data.buffer_and_snapshots()
}
/// Return persisting data
pub fn get_persisting_data(&self) -> Option<QueryableBatch> {
self.data.get_persisting_data()
}
/// Write the given mb in the buffer
pub(crate) fn buffer_write(
&mut self,
sequence_number: SequenceNumber,
mb: MutableBatch,
) -> Result<(), super::Error> {
match &mut self.data.buffer {
Some(buf) => {
buf.max_sequence_number = sequence_number.max(buf.max_sequence_number);
buf.data.extend_from(&mb).context(super::BufferWriteSnafu)?;
}
None => {
self.data.buffer = Some(BufferBatch {
min_sequence_number: sequence_number,
max_sequence_number: sequence_number,
data: mb,
})
}
}
Ok(())
}
/// Buffers a new tombstone:
/// . All the data in the `buffer` and `snapshots` will be replaced with one
/// tombstone-applied snapshot
/// . The tombstone is only added in the `deletes_during_persisting` if the `persisting`
/// exists
pub(crate) async fn buffer_tombstone(
&mut self,
executor: &Executor,
table_name: &str,
tombstone: Tombstone,
) {
self.data.add_tombstone(tombstone.clone());
// ----------------------------------------------------------
// First apply the tombstone on all in-memory & non-persisting data
// Make a QueryableBatch for all buffer + snapshots + the given tombstone
let max_sequence_number = tombstone.sequence_number;
let query_batch = match self.data.snapshot_to_queryable_batch(
table_name,
self.id,
Some(tombstone.clone()),
) {
Some(query_batch) if !query_batch.is_empty() => query_batch,
_ => {
// No need to proceed further
return;
}
};
let (min_sequence_number, _) = query_batch.min_max_sequence_numbers();
assert!(min_sequence_number <= max_sequence_number);
// Run query on the QueryableBatch to apply the tombstone.
let stream = match query(executor, Arc::new(query_batch)).await {
Err(e) => {
// this should never error out. if it does, we need to crash hard so
// someone can take a look.
panic!("unable to apply tombstones on snapshots: {:?}", e);
}
Ok(stream) => stream,
};
let record_batches = match datafusion::physical_plan::common::collect(stream).await {
Err(e) => {
// this should never error out. if it does, we need to crash hard so
// someone can take a look.
panic!("unable to collect record batches: {:?}", e);
}
Ok(batches) => batches,
};
// Merge all result record batches into one record batch
// and make a snapshot for it
let snapshot = if !record_batches.is_empty() {
let record_batch = RecordBatch::concat(&record_batches[0].schema(), &record_batches)
.unwrap_or_else(|e| {
panic!("unable to concat record batches: {:?}", e);
});
let snapshot = SnapshotBatch {
min_sequence_number,
max_sequence_number,
data: Arc::new(record_batch),
};
Some(Arc::new(snapshot))
} else {
None
};
// ----------------------------------------------------------
// Add the tombstone-applied data back in as one snapshot
if let Some(snapshot) = snapshot {
self.data.snapshots.push(snapshot);
}
}
/// Return the progress from this Partition
pub(crate) fn progress(&self) -> ShardProgress {
self.data.progress()
}
pub(crate) fn id(&self) -> PartitionId {
self.id
}
}
#[cfg(test)]
mod tests {
use arrow_util::assert_batches_sorted_eq;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use super::*;
use crate::test_util::create_tombstone;
#[test]
fn snapshot_buffer_different_but_compatible_schemas() {
let mut partition_data = PartitionData {
id: PartitionId::new(1),
data: Default::default(),
};
let seq_num1 = SequenceNumber::new(1);
// Missing tag `t1`
let (_, mut mutable_batch1) =
lp_to_mutable_batch(r#"foo iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#);
partition_data
.buffer_write(seq_num1, mutable_batch1.clone())
.unwrap();
let seq_num2 = SequenceNumber::new(2);
// Missing field `iv`
let (_, mutable_batch2) =
lp_to_mutable_batch(r#"foo,t1=aoeu uv=1u,fv=12.0,bv=false,sv="bye" 10000"#);
partition_data
.buffer_write(seq_num2, mutable_batch2.clone())
.unwrap();
partition_data.data.generate_snapshot().unwrap();
assert!(partition_data.data.buffer.is_none());
assert_eq!(partition_data.data.snapshots.len(), 1);
let snapshot = &partition_data.data.snapshots[0];
assert_eq!(snapshot.min_sequence_number, seq_num1);
assert_eq!(snapshot.max_sequence_number, seq_num2);
mutable_batch1.extend_from(&mutable_batch2).unwrap();
let combined_record_batch = mutable_batch1.to_arrow(Selection::All).unwrap();
assert_eq!(&*snapshot.data, &combined_record_batch);
}
// Test deletes mixed with writes on a single parittion
#[tokio::test]
async fn writes_and_deletes() {
// Make a partition with empty DataBuffer
let s_id = 1;
let t_id = 1;
let p_id = 1;
let table_name = "restaurant";
let mut p = PartitionData::new(PartitionId::new(p_id));
let exec = Executor::new(1);
// ------------------------------------------
// Fill `buffer`
// --- seq_num: 1
let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Boston day="fri",temp=50 10"#);
p.buffer_write(SequenceNumber::new(1), mb).unwrap();
// --- seq_num: 2
let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Andover day="thu",temp=44 15"#);
p.buffer_write(SequenceNumber::new(2), mb).unwrap();
// verify data
assert_eq!(
p.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(1)
);
assert_eq!(
p.data.buffer.as_ref().unwrap().max_sequence_number,
SequenceNumber::new(2)
);
assert_eq!(p.data.snapshots.len(), 0);
assert_eq!(p.data.deletes_during_persisting().len(), 0);
assert_eq!(p.data.persisting, None);
// ------------------------------------------
// Delete
// --- seq_num: 3
let ts = create_tombstone(
1, // tombstone id
t_id, // table id
s_id, // shard id
3, // delete's seq_number
0, // min time of data to get deleted
20, // max time of data to get deleted
"day=thu", // delete predicate
);
// one row will get deleted, the other is moved to snapshot
p.buffer_tombstone(&exec, "restaurant", ts).await;
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete
assert_eq!(p.data.snapshots.len(), 1); // one snpashot if there is data
assert_eq!(p.data.deletes_during_persisting().len(), 0);
assert_eq!(p.data.persisting, None);
// snapshot only has one row since the other one got deleted
let data = (*p.data.snapshots[0].data).clone();
let expected = vec![
"+--------+-----+------+--------------------------------+",
"| city | day | temp | time |",
"+--------+-----+------+--------------------------------+",
"| Boston | fri | 50 | 1970-01-01T00:00:00.000000010Z |",
"+--------+-----+------+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &[data]);
assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 1);
assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 3);
// ------------------------------------------
// Fill `buffer`
// --- seq_num: 4
let (_, mb) = lp_to_mutable_batch(
r#"
restaurant,city=Medford day="sun",temp=55 22
restaurant,city=Boston day="sun",temp=57 24
"#,
);
p.buffer_write(SequenceNumber::new(4), mb).unwrap();
// --- seq_num: 5
let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Andover day="tue",temp=56 30"#);
p.buffer_write(SequenceNumber::new(5), mb).unwrap();
// verify data
assert_eq!(
p.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(4)
);
assert_eq!(
p.data.buffer.as_ref().unwrap().max_sequence_number,
SequenceNumber::new(5)
);
assert_eq!(p.data.snapshots.len(), 1); // existing sanpshot
assert_eq!(p.data.deletes_during_persisting().len(), 0);
assert_eq!(p.data.persisting, None);
// ------------------------------------------
// Delete
// --- seq_num: 6
let ts = create_tombstone(
2, // tombstone id
t_id, // table id
s_id, // shard id
6, // delete's seq_number
10, // min time of data to get deleted
50, // max time of data to get deleted
"city=Boston", // delete predicate
);
// two rows will get deleted, one from existing snapshot, one from the buffer being moved
// to snpashot
p.buffer_tombstone(&exec, "restaurant", ts).await;
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete
assert_eq!(p.data.snapshots.len(), 1); // one snpashot
assert_eq!(p.data.deletes_during_persisting().len(), 0);
assert_eq!(p.data.persisting, None);
// snapshot only has two rows since the other 2 rows with city=Boston have got deleted
let data = (*p.data.snapshots[0].data).clone();
let expected = vec![
"+---------+-----+------+--------------------------------+",
"| city | day | temp | time |",
"+---------+-----+------+--------------------------------+",
"| Andover | tue | 56 | 1970-01-01T00:00:00.000000030Z |",
"| Medford | sun | 55 | 1970-01-01T00:00:00.000000022Z |",
"+---------+-----+------+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &[data]);
assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 1);
assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 6);
// ------------------------------------------
// Persisting
let p_batch = p
.snapshot_to_persisting_batch(
ShardId::new(s_id),
TableId::new(t_id),
PartitionId::new(p_id),
table_name,
)
.unwrap();
// verify data
assert!(p.data.buffer.is_none()); // always empty after issuing persit
assert_eq!(p.data.snapshots.len(), 0); // always empty after issuing persit
assert_eq!(p.data.deletes_during_persisting().len(), 0); // deletes not happen yet
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
// ------------------------------------------
// Delete
// --- seq_num: 7
let ts = create_tombstone(
3, // tombstone id
t_id, // table id
s_id, // shard id
7, // delete's seq_number
10, // min time of data to get deleted
50, // max time of data to get deleted
"temp=55", // delete predicate
);
// if a query come while persisting, the row with temp=55 will be deleted before
// data is sent back to Querier
p.buffer_tombstone(&exec, "restaurant", ts).await;
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete
// no snpashots becasue buffer has not data yet and the
// snapshot was empty too
assert_eq!(p.data.snapshots.len(), 0);
assert_eq!(p.data.deletes_during_persisting().len(), 1); // tombstone added since data is
// persisting
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
// ------------------------------------------
// Fill `buffer`
// --- seq_num: 8
let (_, mb) = lp_to_mutable_batch(
r#"
restaurant,city=Wilmington day="sun",temp=55 35
restaurant,city=Boston day="sun",temp=60 36
restaurant,city=Boston day="sun",temp=62 38
"#,
);
p.buffer_write(SequenceNumber::new(8), mb).unwrap();
// verify data
assert_eq!(
p.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(8)
); // 1 newly added mutable batch of 3 rows of data
assert_eq!(p.data.snapshots.len(), 0); // still empty
assert_eq!(p.data.deletes_during_persisting().len(), 1);
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
// ------------------------------------------
// Take snapshot of the `buffer`
p.snapshot().unwrap();
// verify data
assert!(p.data.buffer.is_none()); // empty after snapshot
assert_eq!(p.data.snapshots.len(), 1); // data moved from buffer
assert_eq!(p.data.deletes_during_persisting().len(), 1);
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
// snapshot has three rows moved from buffer
let data = (*p.data.snapshots[0].data).clone();
let expected = vec![
"+------------+-----+------+--------------------------------+",
"| city | day | temp | time |",
"+------------+-----+------+--------------------------------+",
"| Wilmington | sun | 55 | 1970-01-01T00:00:00.000000035Z |",
"| Boston | sun | 60 | 1970-01-01T00:00:00.000000036Z |",
"| Boston | sun | 62 | 1970-01-01T00:00:00.000000038Z |",
"+------------+-----+------+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &[data]);
assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8);
assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 8);
// ------------------------------------------
// Delete
// --- seq_num: 9
let ts = create_tombstone(
4, // tombstone id
t_id, // table id
s_id, // shard id
9, // delete's seq_number
10, // min time of data to get deleted
50, // max time of data to get deleted
"temp=60", // delete predicate
);
// the row with temp=60 will be removed from the sanphot
p.buffer_tombstone(&exec, "restaurant", ts).await;
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete
assert_eq!(p.data.snapshots.len(), 1); // new snapshot of the existing with delete applied
assert_eq!(p.data.deletes_during_persisting().len(), 2); // one more tombstone added make it 2
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
// snapshot has only 2 rows because the row with tem=60 was removed
let data = (*p.data.snapshots[0].data).clone();
let expected = vec![
"+------------+-----+------+--------------------------------+",
"| city | day | temp | time |",
"+------------+-----+------+--------------------------------+",
"| Wilmington | sun | 55 | 1970-01-01T00:00:00.000000035Z |",
"| Boston | sun | 62 | 1970-01-01T00:00:00.000000038Z |",
"+------------+-----+------+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &[data]);
assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8);
assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 9);
exec.join().await;
}
}

View File

@ -0,0 +1,319 @@
//! Data for the lifecycle of the Ingester
use std::sync::Arc;
use data_types::{PartitionId, SequenceNumber, ShardId, TableId, Tombstone};
use mutable_batch::MutableBatch;
use schema::selection::Selection;
use snafu::ResultExt;
use uuid::Uuid;
use write_summary::ShardProgress;
use super::{PersistingBatch, QueryableBatch, SnapshotBatch};
/// Data of an IOx partition split into batches
/// ┌────────────────────────┐ ┌────────────────────────┐ ┌─────────────────────────┐
/// │ Buffer │ │ Snapshots │ │ Persisting │
/// │ ┌───────────────────┐ │ │ │ │ │
/// │ │ ┌───────────────┐│ │ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │
/// │ │ ┌┴──────────────┐│├─┼────────┼─┼─▶┌───────────────┐│ │ │ │ ┌───────────────┐│ │
/// │ │┌┴──────────────┐├┘│ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │
/// │ ││ BufferBatch ├┘ │ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │
/// │ │└───────────────┘ │ │ ┌───┼─▶│ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │
/// │ └───────────────────┘ │ │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │
/// │ ... │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
/// │ ┌───────────────────┐ │ │ │ │ │ │
/// │ │ ┌───────────────┐│ │ │ │ ... │ │ ... │
/// │ │ ┌┴──────────────┐││ │ │ │ │ │ │
/// │ │┌┴──────────────┐├┘│─┼────┘ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │
/// │ ││ BufferBatch ├┘ │ │ │ │ ┌───────────────┐│ │ │ │ ┌───────────────┐│ │
/// │ │└───────────────┘ │ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │
/// │ └───────────────────┘ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │
/// │ │ │ ││ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │
/// │ ... │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │
/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘
#[derive(Debug, Default)]
pub(crate) struct DataBuffer {
/// Buffer of incoming writes
pub(crate) buffer: Option<BufferBatch>,
/// The max_persisted_sequence number for any parquet_file in this partition
pub(crate) max_persisted_sequence_number: Option<SequenceNumber>,
/// Buffer of tombstones whose time range may overlap with this partition.
/// All tombstones were already applied to corresponding snapshots. This list
/// only keep the ones that come during persisting. The reason
/// we keep them becasue if a query comes, we need to apply these tombstones
/// on the persiting data before sending it to the Querier
/// When the `persiting` is done and removed, this list will get empty, too
deletes_during_persisting: Vec<Tombstone>,
/// Data in `buffer` will be moved to a `snapshot` when one of these happens:
/// . A background persist is called
/// . A read request from Querier
/// The `buffer` will be empty when this happens.
pub(crate) snapshots: Vec<Arc<SnapshotBatch>>,
/// When a persist is called, data in `buffer` will be moved to a `snapshot`
/// and then all `snapshots` will be moved to a `persisting`.
/// Both `buffer` and 'snaphots` will be empty when this happens.
pub(crate) persisting: Option<Arc<PersistingBatch>>,
// Extra Notes:
// . In MVP, we will only persist a set of snapshots at a time.
// In later version, multiple persisting operations may be happening concurrently but
// their persisted info must be added into the Catalog in their data
// ingesting order.
// . When a read request comes from a Querier, all data from `snapshots`
// and `persisting` must be sent to the Querier.
// . After the `persisting` data is persisted and successfully added
// into the Catalog, it will be removed from this Data Buffer.
// This data might be added into an extra cache to serve up to
// Queriers that may not have loaded the parquet files from object
// storage yet. But this will be decided after MVP.
}
impl DataBuffer {
/// Add a new tombstones into the [`DataBuffer`].
pub(super) fn add_tombstone(&mut self, tombstone: Tombstone) {
// Only keep this tombstone if some data is being persisted
if self.persisting.is_some() {
self.deletes_during_persisting.push(tombstone);
}
}
/// If a [`BufferBatch`] exists, convert it to a [`SnapshotBatch`] and add
/// it to the list of snapshots.
///
/// Does nothing if there is no [`BufferBatch`].
pub(crate) fn generate_snapshot(&mut self) -> Result<(), mutable_batch::Error> {
let snapshot = self.copy_buffer_to_snapshot()?;
if let Some(snapshot) = snapshot {
self.snapshots.push(snapshot);
self.buffer = None;
}
Ok(())
}
/// Returns snapshot of the buffer but keeps data in the buffer
fn copy_buffer_to_snapshot(&self) -> Result<Option<Arc<SnapshotBatch>>, mutable_batch::Error> {
if let Some(buf) = &self.buffer {
return Ok(Some(Arc::new(SnapshotBatch {
min_sequence_number: buf.min_sequence_number,
max_sequence_number: buf.max_sequence_number,
data: Arc::new(buf.data.to_arrow(Selection::All)?),
})));
}
Ok(None)
}
/// Snapshots the buffer and make a QueryableBatch for all the snapshots
/// Both buffer and snapshots will be empty after this
pub(super) fn snapshot_to_queryable_batch(
&mut self,
table_name: &str,
partition_id: PartitionId,
tombstone: Option<Tombstone>,
) -> Option<QueryableBatch> {
self.generate_snapshot()
.expect("This mutable batch snapshot error should be impossible.");
let mut data = vec![];
std::mem::swap(&mut data, &mut self.snapshots);
let mut tombstones = vec![];
if let Some(tombstone) = tombstone {
tombstones.push(tombstone);
}
// only produce batch if there is any data
if data.is_empty() {
None
} else {
Some(QueryableBatch::new(
table_name,
partition_id,
data,
tombstones,
))
}
}
/// Returns all existing snapshots plus data in the buffer
/// This only read data. Data in the buffer will be kept in the buffer
pub(super) fn buffer_and_snapshots(
&self,
) -> Result<Vec<Arc<SnapshotBatch>>, crate::data::Error> {
// Existing snapshots
let mut snapshots = self.snapshots.clone();
// copy the buffer to a snapshot
let buffer_snapshot = self
.copy_buffer_to_snapshot()
.context(crate::data::BufferToSnapshotSnafu)?;
snapshots.extend(buffer_snapshot);
Ok(snapshots)
}
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`.
///
/// # Panic
///
/// Panics if there is already a persisting batch.
pub(super) fn snapshot_to_persisting(
&mut self,
shard_id: ShardId,
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Option<Arc<PersistingBatch>> {
if self.persisting.is_some() {
panic!("Unable to snapshot while persisting. This is an unexpected state.")
}
if let Some(queryable_batch) =
self.snapshot_to_queryable_batch(table_name, partition_id, None)
{
let persisting_batch = Arc::new(PersistingBatch {
shard_id,
table_id,
partition_id,
object_store_id: Uuid::new_v4(),
data: Arc::new(queryable_batch),
});
self.persisting = Some(Arc::clone(&persisting_batch));
Some(persisting_batch)
} else {
None
}
}
/// Return a QueryableBatch of the persisting batch after applying new tombstones
pub(super) fn get_persisting_data(&self) -> Option<QueryableBatch> {
let persisting = match &self.persisting {
Some(p) => p,
None => return None,
};
// persisting data
let mut queryable_batch = (*persisting.data).clone();
// Add new tombstones if any
queryable_batch.add_tombstones(&self.deletes_during_persisting);
Some(queryable_batch)
}
/// Return the progress in this DataBuffer
pub(super) fn progress(&self) -> ShardProgress {
let progress = ShardProgress::new();
let progress = if let Some(buffer) = &self.buffer {
progress.combine(buffer.progress())
} else {
progress
};
let progress = self.snapshots.iter().fold(progress, |progress, snapshot| {
progress.combine(snapshot.progress())
});
if let Some(persisting) = &self.persisting {
persisting
.data
.data
.iter()
.fold(progress, |progress, snapshot| {
progress.combine(snapshot.progress())
})
} else {
progress
}
}
pub(super) fn get_snapshots(&self) -> &[Arc<SnapshotBatch>] {
self.snapshots.as_ref()
}
pub(crate) fn mark_persisted(&mut self, up_to: SequenceNumber) {
self.max_persisted_sequence_number = Some(up_to);
self.persisting = None;
self.deletes_during_persisting.clear()
}
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
self.max_persisted_sequence_number
}
#[cfg(test)]
pub(super) fn deletes_during_persisting(&self) -> &[Tombstone] {
self.deletes_during_persisting.as_ref()
}
}
/// BufferBatch is a MutableBatch with its ingesting order, sequence_number, that helps the
/// ingester keep the batches of data in their ingesting order
#[derive(Debug)]
pub struct BufferBatch {
/// Sequence number of the first write in this batch
pub(crate) min_sequence_number: SequenceNumber,
/// Sequence number of the last write in this batch
pub(crate) max_sequence_number: SequenceNumber,
/// Ingesting data
pub(crate) data: MutableBatch,
}
impl BufferBatch {
/// Return the progress in this DataBuffer
fn progress(&self) -> ShardProgress {
ShardProgress::new()
.with_buffered(self.min_sequence_number)
.with_buffered(self.max_sequence_number)
}
}
#[cfg(test)]
mod tests {
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use super::*;
#[test]
fn snapshot_empty_buffer_adds_no_snapshots() {
let mut data_buffer = DataBuffer::default();
data_buffer.generate_snapshot().unwrap();
assert!(data_buffer.snapshots.is_empty());
}
#[test]
fn snapshot_buffer_batch_moves_to_snapshots() {
let mut data_buffer = DataBuffer::default();
let seq_num1 = SequenceNumber::new(1);
let (_, mutable_batch1) =
lp_to_mutable_batch(r#"foo,t1=asdf iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#);
let buffer_batch1 = BufferBatch {
min_sequence_number: seq_num1,
max_sequence_number: seq_num1,
data: mutable_batch1,
};
let record_batch1 = buffer_batch1.data.to_arrow(Selection::All).unwrap();
data_buffer.buffer = Some(buffer_batch1);
data_buffer.generate_snapshot().unwrap();
assert!(data_buffer.buffer.is_none());
assert_eq!(data_buffer.snapshots.len(), 1);
let snapshot = &data_buffer.snapshots[0];
assert_eq!(snapshot.min_sequence_number, seq_num1);
assert_eq!(snapshot.max_sequence_number, seq_num1);
assert_eq!(&*snapshot.data, &record_batch1);
}
}

View File

@ -8,7 +8,7 @@ use iox_query::{
use observability_deps::tracing::debug;
use snafu::{ResultExt, Snafu};
use super::QueryableBatch;
use crate::query::QueryableBatch;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
@ -88,13 +88,12 @@ pub async fn query(
mod tests {
use arrow_util::assert_batches_eq;
use super::*;
use crate::test_util::{
create_one_record_batch_with_influxtype_no_duplicates, create_tombstone,
make_queryable_batch, make_queryable_batch_with_deletes,
};
use super::*;
#[tokio::test]
async fn test_query() {
test_helpers::maybe_start_logging();

141
ingester/src/data/shard.rs Normal file
View File

@ -0,0 +1,141 @@
//! Shard level data buffer structures.
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};
use data_types::{ShardId, ShardIndex};
use dml::DmlOperation;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use metric::U64Counter;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use write_summary::ShardProgress;
use super::namespace::NamespaceData;
use crate::lifecycle::LifecycleHandle;
/// Data of a Shard
#[derive(Debug)]
pub struct ShardData {
/// The shard index for this shard
shard_index: ShardIndex,
// New namespaces can come in at any time so we need to be able to add new ones
namespaces: RwLock<BTreeMap<String, Arc<NamespaceData>>>,
metrics: Arc<metric::Registry>,
namespace_count: U64Counter,
}
impl ShardData {
/// Initialise a new [`ShardData`] that emits metrics to `metrics`.
pub fn new(shard_index: ShardIndex, metrics: Arc<metric::Registry>) -> Self {
let namespace_count = metrics
.register_metric::<U64Counter>(
"ingester_namespaces_total",
"Number of namespaces known to the ingester",
)
.recorder(&[]);
Self {
shard_index,
namespaces: Default::default(),
metrics,
namespace_count,
}
}
/// Initialize new ShardData with namespace for testing purpose only
#[cfg(test)]
pub fn new_for_test(
shard_index: ShardIndex,
namespaces: BTreeMap<String, Arc<NamespaceData>>,
) -> Self {
Self {
shard_index,
namespaces: RwLock::new(namespaces),
metrics: Default::default(),
namespace_count: Default::default(),
}
}
/// Store the write or delete in the shard. Deletes will
/// be written into the catalog before getting stored in the buffer.
/// Any writes that create new IOx partitions will have those records
/// created in the catalog before putting into the buffer.
pub async fn buffer_operation(
&self,
dml_operation: DmlOperation,
shard_id: ShardId,
catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle,
executor: &Executor,
) -> Result<bool, super::Error> {
let namespace_data = match self.namespace(dml_operation.namespace()) {
Some(d) => d,
None => {
self.insert_namespace(dml_operation.namespace(), catalog)
.await?
}
};
namespace_data
.buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor)
.await
}
/// Gets the namespace data out of the map
pub fn namespace(&self, namespace: &str) -> Option<Arc<NamespaceData>> {
let n = self.namespaces.read();
n.get(namespace).cloned()
}
/// Retrieves the namespace from the catalog and initializes an empty buffer, or
/// retrieves the buffer if some other caller gets it first
async fn insert_namespace(
&self,
namespace: &str,
catalog: &dyn Catalog,
) -> Result<Arc<NamespaceData>, super::Error> {
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.get_by_name(namespace)
.await
.context(super::CatalogSnafu)?
.context(super::NamespaceNotFoundSnafu { namespace })?;
let mut n = self.namespaces.write();
let data = match n.entry(namespace.name) {
Entry::Vacant(v) => {
let v = v.insert(Arc::new(NamespaceData::new(namespace.id, &*self.metrics)));
self.namespace_count.inc(1);
Arc::clone(v)
}
Entry::Occupied(v) => Arc::clone(v.get()),
};
Ok(data)
}
/// Return the progress of this shard
pub(crate) async fn progress(&self) -> ShardProgress {
let namespaces: Vec<_> = self.namespaces.read().values().map(Arc::clone).collect();
let mut progress = ShardProgress::new();
for namespace_data in namespaces {
progress = progress.combine(namespace_data.progress().await);
}
progress
}
/// Return the [`ShardIndex`] this [`ShardData`] is buffering for.
pub fn shard_index(&self) -> ShardIndex {
self.shard_index
}
}

208
ingester/src/data/table.rs Normal file
View File

@ -0,0 +1,208 @@
//! Table level data buffer structures.
use std::collections::BTreeMap;
use data_types::{DeletePredicate, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use mutable_batch::MutableBatch;
use snafu::ResultExt;
use write_summary::ShardProgress;
use super::partition::{PartitionData, PartitionStatus, UnpersistedPartitionData};
use crate::lifecycle::LifecycleHandle;
/// Data of a Table in a given Namesapce that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct TableData {
table_id: TableId,
// the max sequence number for a tombstone associated with this table
tombstone_max_sequence_number: Option<SequenceNumber>,
// Map pf partition key to its data
pub(super) partition_data: BTreeMap<PartitionKey, PartitionData>,
}
impl TableData {
/// Initialize new table buffer
pub fn new(table_id: TableId, tombstone_max_sequence_number: Option<SequenceNumber>) -> Self {
Self {
table_id,
tombstone_max_sequence_number,
partition_data: Default::default(),
}
}
/// Initialize new table buffer for testing purpose only
#[cfg(test)]
pub fn new_for_test(
table_id: TableId,
tombstone_max_sequence_number: Option<SequenceNumber>,
partitions: BTreeMap<PartitionKey, PartitionData>,
) -> Self {
Self {
table_id,
tombstone_max_sequence_number,
partition_data: partitions,
}
}
/// Return parquet_max_sequence_number
pub fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.partition_data
.values()
.map(|p| p.data.max_persisted_sequence_number())
.max()
.flatten()
}
/// Return tombstone_max_sequence_number
#[allow(dead_code)] // Used in tests
pub fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.tombstone_max_sequence_number
}
// buffers the table write and returns true if the lifecycle manager indicates that
// ingest should be paused.
pub(super) async fn buffer_table_write(
&mut self,
sequence_number: SequenceNumber,
batch: MutableBatch,
partition_key: PartitionKey,
shard_id: ShardId,
catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool, super::Error> {
let partition_data = match self.partition_data.get_mut(&partition_key) {
Some(p) => p,
None => {
self.insert_partition(partition_key.clone(), shard_id, catalog)
.await?;
self.partition_data.get_mut(&partition_key).unwrap()
}
};
// skip the write if it has already been persisted
if let Some(max) = partition_data.data.max_persisted_sequence_number() {
if max >= sequence_number {
return Ok(false);
}
}
let should_pause = lifecycle_handle.log_write(
partition_data.id(),
shard_id,
sequence_number,
batch.size(),
batch.rows(),
);
partition_data.buffer_write(sequence_number, batch)?;
Ok(should_pause)
}
pub(super) async fn buffer_delete(
&mut self,
table_name: &str,
predicate: &DeletePredicate,
shard_id: ShardId,
sequence_number: SequenceNumber,
catalog: &dyn Catalog,
executor: &Executor,
) -> Result<(), super::Error> {
let min_time = Timestamp::new(predicate.range.start());
let max_time = Timestamp::new(predicate.range.end());
let mut repos = catalog.repositories().await;
let tombstone = repos
.tombstones()
.create_or_get(
self.table_id,
shard_id,
sequence_number,
min_time,
max_time,
&predicate.expr_sql_string(),
)
.await
.context(super::CatalogSnafu)?;
// remember "persisted" state
self.tombstone_max_sequence_number = Some(sequence_number);
// modify one partition at a time
for data in self.partition_data.values_mut() {
data.buffer_tombstone(executor, table_name, tombstone.clone())
.await;
}
Ok(())
}
pub fn unpersisted_partition_data(&self) -> Vec<UnpersistedPartitionData> {
self.partition_data
.values()
.map(|p| UnpersistedPartitionData {
partition_id: p.id(),
non_persisted: p
.get_non_persisting_data()
.expect("get_non_persisting should always work"),
persisting: p.get_persisting_data(),
partition_status: PartitionStatus {
parquet_max_sequence_number: p.data.max_persisted_sequence_number(),
tombstone_max_sequence_number: self.tombstone_max_sequence_number,
},
})
.collect()
}
async fn insert_partition(
&mut self,
partition_key: PartitionKey,
shard_id: ShardId,
catalog: &dyn Catalog,
) -> Result<(), super::Error> {
let mut repos = catalog.repositories().await;
let partition = repos
.partitions()
.create_or_get(partition_key, shard_id, self.table_id)
.await
.context(super::CatalogSnafu)?;
// get info on the persisted parquet files to use later for replay or for snapshot
// information on query.
let files = repos
.parquet_files()
.list_by_partition_not_to_delete(partition.id)
.await
.context(super::CatalogSnafu)?;
// for now we just need the max persisted
let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max();
let mut data = PartitionData::new(partition.id);
data.data.max_persisted_sequence_number = max_persisted_sequence_number;
self.partition_data.insert(partition.partition_key, data);
Ok(())
}
/// Return progress from this Table
pub(crate) fn progress(&self) -> ShardProgress {
let progress = ShardProgress::new();
let progress = match self.parquet_max_sequence_number() {
Some(n) => progress.with_persisted(n),
None => progress,
};
self.partition_data
.values()
.fold(progress, |progress, partition_data| {
progress.combine(partition_data.progress())
})
}
#[cfg(test)]
pub(crate) fn table_id(&self) -> TableId {
self.table_id
}
}

View File

@ -1,4 +1,5 @@
use std::sync::Arc;
use tokio::sync::{Barrier, Mutex};
#[derive(Debug, Clone)]

View File

@ -1,15 +1,7 @@
//! Ingest handler
use crate::{
data::{IngesterData, IngesterQueryResponse, ShardData},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
poison::PoisonCabinet,
querier_handler::prepare_data_to_querier,
stream_handler::{
sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation,
PeriodicWatermarkFetcher, SequencedStreamHandler,
},
};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{Shard, ShardIndex, TopicMetadata};
@ -26,7 +18,6 @@ use metric::{DurationHistogram, Metric, U64Counter};
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tokio::{
sync::{Semaphore, TryAcquireError},
task::{JoinError, JoinHandle},
@ -35,6 +26,17 @@ use tokio_util::sync::CancellationToken;
use write_buffer::core::WriteBufferReading;
use write_summary::ShardProgress;
use crate::{
data::{shard::ShardData, IngesterData, IngesterQueryResponse},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
poison::PoisonCabinet,
querier_handler::prepare_data_to_querier,
stream_handler::{
sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation,
PeriodicWatermarkFetcher, SequencedStreamHandler,
},
};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
@ -382,8 +384,8 @@ impl<T> Drop for IngestHandlerImpl<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::data::SnapshotBatch;
use std::{num::NonZeroU32, ops::DerefMut};
use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
@ -391,10 +393,12 @@ mod tests {
use metric::{Attributes, Metric, U64Counter, U64Gauge};
use mutable_batch_lp::lines_to_batches;
use object_store::memory::InMemory;
use std::{num::NonZeroU32, ops::DerefMut};
use test_helpers::maybe_start_logging;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use super::*;
use crate::data::partition::SnapshotBatch;
#[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() {
let ingester = TestIngester::new().await;
@ -764,8 +768,7 @@ mod tests {
verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| {
if first_batch.min_sequence_number == SequenceNumber::new(1) {
panic!(
"initialization did a seek to the beginning rather than \
the min_unpersisted"
"initialization did a seek to the beginning rather than the min_unpersisted"
);
}
})

View File

@ -1,7 +1,8 @@
use std::sync::Arc;
use data_types::PartitionId;
use iox_time::TimeProvider;
use parking_lot::Mutex;
use std::sync::Arc;
use tracker::{
AbstractTaskRegistry, TaskRegistration, TaskRegistry, TaskRegistryWithHistory,
TaskRegistryWithMetrics, TaskTracker,

View File

@ -5,20 +5,22 @@
//! some absolute number and individual Parquet files that get persisted below some number. It
//! is expected that they may be above or below the absolute thresholds.
use crate::{
data::Persister,
job::{Job, JobRegistry},
poison::{PoisonCabinet, PoisonPill},
};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{PartitionId, SequenceNumber, ShardId};
use iox_time::{Time, TimeProvider};
use metric::{Metric, U64Counter};
use observability_deps::tracing::{error, info, warn};
use parking_lot::Mutex;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracker::TrackedFutureExt;
use crate::{
data::Persister,
job::{Job, JobRegistry},
poison::{PoisonCabinet, PoisonPill},
};
/// API suitable for ingester tasks to query and update the [`LifecycleManager`] state.
pub trait LifecycleHandle: Send + Sync + 'static {
/// Logs bytes written into a partition so that it can be tracked for the manager to
@ -566,13 +568,15 @@ pub(crate) async fn run_lifecycle_manager<P: Persister>(
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeSet;
use async_trait::async_trait;
use iox_time::MockProvider;
use metric::{Attributes, Registry};
use std::collections::BTreeSet;
use tokio::sync::Barrier;
use super::*;
#[derive(Default)]
struct TestPersister {
persist_called: Mutex<BTreeSet<PartitionId>>,

View File

@ -1,12 +1,13 @@
use data_types::ShardIndex;
use futures::Future;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use pin_project::pin_project;
use std::{
sync::Arc,
task::{Poll, Waker},
};
use data_types::ShardIndex;
use futures::Future;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use pin_project::pin_project;
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(dead_code)]
pub enum PoisonPill {

View File

@ -1,9 +1,7 @@
//! Handle all requests from Querier
use crate::data::{
IngesterData, IngesterQueryPartition, IngesterQueryResponse, QueryableBatch,
UnpersistedPartitionData,
};
use std::sync::Arc;
use arrow::error::ArrowError;
use datafusion::{
error::DataFusionError, logical_plan::LogicalPlanBuilder,
@ -19,7 +17,14 @@ use observability_deps::tracing::debug;
use predicate::Predicate;
use schema::selection::Selection;
use snafu::{ensure, ResultExt, Snafu};
use std::sync::Arc;
use crate::{
data::{
partition::UnpersistedPartitionData, IngesterData, IngesterQueryPartition,
IngesterQueryResponse,
},
query::QueryableBatch,
};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
@ -282,6 +287,13 @@ pub(crate) async fn query(
#[cfg(test)]
mod tests {
use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use assert_matches::assert_matches;
use datafusion::logical_plan::{col, lit};
use futures::TryStreamExt;
use predicate::Predicate;
use super::*;
use crate::{
data::FlatIngesterQueryResponse,
@ -291,12 +303,6 @@ mod tests {
make_queryable_batch_with_deletes, DataLocation, TEST_NAMESPACE, TEST_TABLE,
},
};
use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use assert_matches::assert_matches;
use datafusion::logical_plan::{col, lit};
use futures::TryStreamExt;
use predicate::Predicate;
#[tokio::test]
async fn test_query() {

View File

@ -1,6 +1,7 @@
//! Module to handle query on Ingester's data
use crate::data::{QueryableBatch, SnapshotBatch};
use std::{any::Any, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::util::ensure_schema;
use data_types::{
@ -23,7 +24,8 @@ use predicate::{
};
use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use std::{any::Any, sync::Arc};
use crate::data::partition::SnapshotBatch;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
@ -45,6 +47,22 @@ pub enum Error {
/// A specialized `Error` for Ingester's Query errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Queryable data used for both query and persistence
#[derive(Debug, PartialEq, Clone)]
pub struct QueryableBatch {
/// data
pub(crate) data: Vec<Arc<SnapshotBatch>>,
/// Delete predicates of the tombstones
pub(crate) delete_predicates: Vec<Arc<DeletePredicate>>,
/// This is needed to return a reference for a trait function
pub(crate) table_name: String,
/// Partition ID
pub(crate) partition_id: PartitionId,
}
impl QueryableBatch {
/// Initilaize a QueryableBatch
pub fn new(
@ -242,8 +260,6 @@ impl QueryChunk for QueryableBatch {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::create_tombstone;
use arrow::{
array::{
ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray,
@ -253,6 +269,9 @@ mod tests {
};
use data_types::{DeleteExpr, Op, Scalar, TimestampRange};
use super::*;
use crate::test_util::create_tombstone;
#[tokio::test]
async fn test_merge_batch_schema() {
// Merge schema of the batches

View File

@ -1,10 +1,9 @@
//! Ingester server entrypoint.
use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};
use self::{grpc::GrpcDelegate, http::HttpDelegate};
use crate::handler::IngestHandler;
use std::fmt::Debug;
pub mod grpc;
pub mod http;

View File

@ -1,9 +1,14 @@
//! gRPC service implementations for `ingester`.
use crate::{
data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
handler::IngestHandler,
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use arrow::error::ArrowError;
use arrow_flight::{
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
@ -20,18 +25,15 @@ use observability_deps::tracing::{debug, info, warn};
use pin_project::pin_project;
use prost::Message;
use snafu::{ResultExt, Snafu};
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use tonic::{Request, Response, Streaming};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
use crate::{
data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
handler::IngestHandler,
};
/// This type is responsible for managing all gRPC services exposed by
/// `ingester`.
#[derive(Debug, Default)]
@ -465,9 +467,8 @@ mod tests {
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::selection::Selection;
use crate::data::PartitionStatus;
use super::*;
use crate::data::partition::PartitionStatus;
#[tokio::test]
async fn test_get_stream_empty() {

View File

@ -1,10 +1,12 @@
//! HTTP service implementations for `ingester`.
use crate::handler::IngestHandler;
use hyper::{Body, Request, Response, StatusCode};
use std::sync::Arc;
use hyper::{Body, Request, Response, StatusCode};
use thiserror::Error;
use crate::handler::IngestHandler;
/// Errors returned by the `router` HTTP request handler.
#[derive(Debug, Error, Copy, Clone)]
pub enum Error {

View File

@ -1,15 +1,17 @@
use super::DmlSink;
use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl};
use std::{fmt::Debug, time::Duration};
use data_types::{SequenceNumber, ShardIndex};
use dml::DmlOperation;
use futures::{pin_mut, FutureExt, StreamExt};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationCounter, DurationGauge, U64Counter};
use observability_deps::tracing::*;
use std::{fmt::Debug, time::Duration};
use tokio_util::sync::CancellationToken;
use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler};
use super::DmlSink;
use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl};
/// When the [`LifecycleManager`] indicates that ingest should be paused because
/// of memory pressure, the shard will loop, sleeping this long between
/// calls to [`LifecycleHandle::can_resume_ingest()`] with the manager if it
@ -89,10 +91,13 @@ impl<I, O> SequencedStreamHandler<I, O> {
skip_to_oldest_available: bool,
) -> Self {
// TTBR
let time_to_be_readable = metrics.register_metric::<DurationGauge>(
"ingester_ttbr",
"duration of time between producer writing to consumer putting into queryable cache",
).recorder(metric_attrs(shard_index, &topic_name, None, false));
let time_to_be_readable = metrics
.register_metric::<DurationGauge>(
"ingester_ttbr",
"duration of time between producer writing to consumer putting into queryable \
cache",
)
.recorder(metric_attrs(shard_index, &topic_name, None, false));
// Lifecycle-driven ingest pause duration
let pause_duration = metrics
@ -461,11 +466,8 @@ fn metric_attrs(
#[cfg(test)]
mod tests {
use super::*;
use crate::{
lifecycle::{LifecycleConfig, LifecycleManager},
stream_handler::mock_sink::MockDmlSink,
};
use std::sync::Arc;
use assert_matches::assert_matches;
use async_trait::async_trait;
use data_types::{DeletePredicate, Sequence, TimestampRange};
@ -475,12 +477,17 @@ mod tests {
use metric::Metric;
use mutable_batch_lp::lines_to_batches;
use once_cell::sync::Lazy;
use std::sync::Arc;
use test_helpers::timeout::FutureTimeout;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use write_buffer::core::WriteBufferError;
use super::*;
use crate::{
lifecycle::{LifecycleConfig, LifecycleManager},
stream_handler::mock_sink::MockDmlSink,
};
static TEST_TIME: Lazy<Time> = Lazy::new(|| SystemProvider::default().now());
static TEST_SHARD_INDEX: ShardIndex = ShardIndex::new(42);
static TEST_TOPIC_NAME: &str = "topic_name";
@ -967,8 +974,8 @@ mod tests {
// An abnormal end to the steam causes a panic, rather than a silent stream reader exit.
#[tokio::test]
#[should_panic(
expected = "shard index ShardIndex(42) stream for topic topic_name ended without \
graceful shutdown"
expected = "shard index ShardIndex(42) stream for topic topic_name ended without graceful \
shutdown"
)]
async fn test_early_stream_end_panic() {
let metrics = Arc::new(metric::Registry::default());

View File

@ -1,7 +1,3 @@
use super::sink_instrumentation::WatermarkFetcher;
use data_types::ShardIndex;
use metric::U64Counter;
use observability_deps::tracing::*;
use std::{
sync::{
atomic::{AtomicI64, Ordering},
@ -9,9 +5,15 @@ use std::{
},
time::{Duration, Instant},
};
use data_types::ShardIndex;
use metric::U64Counter;
use observability_deps::tracing::*;
use tokio::task::JoinHandle;
use write_buffer::core::WriteBufferReading;
use super::sink_instrumentation::WatermarkFetcher;
/// Periodically fetch and cache the maximum known write buffer offset
/// (watermark) from the write buffer for a given shard.
///

View File

@ -1,11 +1,13 @@
//! Compatibility layer providing a [`DmlSink`] impl for [`IngesterData`].
use super::DmlSink;
use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl};
use std::sync::Arc;
use async_trait::async_trait;
use data_types::ShardId;
use dml::DmlOperation;
use std::sync::Arc;
use super::DmlSink;
use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl};
/// Provides a [`DmlSink`] implementation for a [`IngesterData`] instance.
#[derive(Debug)]

View File

@ -1,14 +1,16 @@
//! Instrumentation for [`DmlSink`] implementations.
use super::DmlSink;
use std::fmt::Debug;
use async_trait::async_trait;
use data_types::ShardIndex;
use dml::DmlOperation;
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram, U64Counter, U64Gauge};
use std::fmt::Debug;
use trace::span::{SpanExt, SpanRecorder};
use super::DmlSink;
/// A [`WatermarkFetcher`] abstracts a source of the write buffer high watermark
/// (max known offset).
///
@ -100,10 +102,13 @@ where
"Last consumed sequence number (e.g. Kafka offset)",
)
.recorder(attr.clone());
let write_buffer_sequence_number_lag = metrics.register_metric::<U64Gauge>(
"ingester_write_buffer_sequence_number_lag",
"The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed sequence number",
).recorder(attr.clone());
let write_buffer_sequence_number_lag = metrics
.register_metric::<U64Gauge>(
"ingester_write_buffer_sequence_number_lag",
"The difference between the the last sequence number available (e.g. Kafka \
offset) and (= minus) last consumed sequence number",
)
.recorder(attr.clone());
let write_buffer_last_ingest_ts = metrics
.register_metric::<U64Gauge>(
"ingester_write_buffer_last_ingest_ts",
@ -240,12 +245,11 @@ mod tests {
use once_cell::sync::Lazy;
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
use super::*;
use crate::stream_handler::{
mock_sink::MockDmlSink, mock_watermark_fetcher::MockWatermarkFetcher,
};
use super::*;
/// The shard index the [`SinkInstrumentation`] under test is configured to
/// be observing for.
const SHARD_INDEX: ShardIndex = ShardIndex::new(42);

View File

@ -2,10 +2,8 @@
#![allow(missing_docs)]
use crate::data::{
IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, ShardData,
SnapshotBatch, TableData,
};
use std::{collections::BTreeMap, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bitflags::bitflags;
@ -19,9 +17,19 @@ use iox_time::{SystemProvider, Time, TimeProvider};
use object_store::memory::InMemory;
use parquet_file::metadata::IoxMetadata;
use schema::sort::SortKey;
use std::{collections::BTreeMap, sync::Arc};
use uuid::Uuid;
use crate::{
data::{
namespace::NamespaceData,
partition::{PartitionData, PersistingBatch, SnapshotBatch},
shard::ShardData,
table::TableData,
IngesterData,
},
query::QueryableBatch,
};
/// Create a persisting batch, some tombstones and corresponding metadata for them after compaction
pub async fn make_persisting_batch_with_meta() -> (Arc<PersistingBatch>, Vec<Tombstone>, IoxMetadata)
{

View File

@ -14,7 +14,9 @@ use generated_types::{
};
use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError};
use ingester::{
data::{FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, ShardData},
data::{
shard::ShardData, FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister,
},
lifecycle::LifecycleHandle,
querier_handler::prepare_data_to_querier,
};