Merge branch 'main' into cn/share-code-with-full-compaction

pull/24376/head
kodiakhq[bot] 2022-09-16 21:15:44 +00:00 committed by GitHub
commit eed31bec4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 177 additions and 144 deletions

4
Cargo.lock generated
View File

@ -1218,9 +1218,9 @@ checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
version = "0.10.3"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c"
dependencies = [
"block-buffer",
"crypto-common",

View File

@ -170,15 +170,12 @@ impl Compactor {
let file_size_buckets = U64HistogramOptions::new([
50 * 1024, // 50KB
100 * 1024, // 100KB
300 * 1024, // 300KB
500 * 1024, // 500 KB
1024 * 1024, // 1 MB
3 * 1024 * 1024, // 3 MB
10 * 1024 * 1024, // 10 MB
30 * 1024 * 1024, // 30 MB
100 * 1024 * 1024, // 100 MB
300 * 1024 * 1024, // 300 MB
500 * 1024 * 1024, // 500 MB
u64::MAX, // Inf
]);
@ -196,17 +193,14 @@ impl Compactor {
);
let duration_histogram_options = DurationHistogramOptions::new([
Duration::from_millis(100),
Duration::from_millis(500),
Duration::from_micros(2_000),
Duration::from_millis(1_000), // 1 second
Duration::from_millis(5_000),
Duration::from_millis(15_000),
Duration::from_millis(30_000),
Duration::from_millis(60_000), // 1 minute
Duration::from_millis(5 * 60_000),
Duration::from_millis(10 * 60_000),
Duration::from_millis(20 * 60_000),
Duration::from_millis(40 * 60_000),
Duration::from_millis(15 * 60_000),
Duration::from_millis(60 * 60_000),
DURATION_MAX,
]);

View File

@ -166,7 +166,6 @@ impl IngesterData {
shard_data
.buffer_operation(
dml_operation,
shard_id,
self.catalog.as_ref(),
lifecycle_handle,
&self.exec,
@ -267,7 +266,12 @@ impl Persister for IngesterData {
.await
.expect("retry forever");
let persisting_batch = namespace.snapshot_to_persisting(&partition_info).await;
let persisting_batch = namespace
.snapshot_to_persisting(
&partition_info.table_name,
&partition_info.partition.partition_key,
)
.await;
if let Some(persisting_batch) = persisting_batch {
// do the CPU intensive work of compaction, de-duplication and sorting
@ -648,7 +652,10 @@ mod tests {
let mut shards = BTreeMap::new();
let shard_index = ShardIndex::new(0);
shards.insert(shard1.id, ShardData::new(shard_index, Arc::clone(&metrics)));
shards.insert(
shard1.id,
ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
@ -733,7 +740,7 @@ mod tests {
let mut shards = BTreeMap::new();
shards.insert(
shard1.id,
ShardData::new(shard1.shard_index, Arc::clone(&metrics)),
ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
@ -838,11 +845,11 @@ mod tests {
let mut shards = BTreeMap::new();
shards.insert(
shard1.id,
ShardData::new(shard1.shard_index, Arc::clone(&metrics)),
ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)),
);
shards.insert(
shard2.id,
ShardData::new(shard2.shard_index, Arc::clone(&metrics)),
ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
@ -1094,11 +1101,11 @@ mod tests {
let mut shards = BTreeMap::new();
shards.insert(
shard1.id,
ShardData::new(shard1.shard_index, Arc::clone(&metrics)),
ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)),
);
shards.insert(
shard2.id,
ShardData::new(shard2.shard_index, Arc::clone(&metrics)),
ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
@ -1271,6 +1278,11 @@ mod tests {
.create_or_get("1970-01-01".into(), shard.id, table.id)
.await
.unwrap();
repos
.partitions()
.update_persisted_sequence_number(partition.id, SequenceNumber::new(1))
.await
.unwrap();
let partition2 = repos
.partitions()
.create_or_get("1970-01-02".into(), shard.id, table.id)
@ -1328,13 +1340,14 @@ mod tests {
);
let exec = Executor::new(1);
let data = NamespaceData::new(namespace.id, &*metrics);
let data = NamespaceData::new(namespace.id, shard.id, &*metrics);
// w1 should be ignored so it shouldn't be present in the buffer
// w1 should be ignored because the per-partition replay offset is set
// to 1 already, so it shouldn't be buffered and the buffer should
// remain empty.
let should_pause = data
.buffer_operation(
DmlOperation::Write(w1),
shard.id,
catalog.as_ref(),
&manager.handle(),
&exec,
@ -1356,7 +1369,6 @@ mod tests {
// w2 should be in the buffer
data.buffer_operation(
DmlOperation::Write(w2),
shard.id,
catalog.as_ref(),
&manager.handle(),
&exec,
@ -1398,7 +1410,10 @@ mod tests {
let mut shards = BTreeMap::new();
let shard_index = ShardIndex::new(0);
shards.insert(shard1.id, ShardData::new(shard_index, Arc::clone(&metrics)));
shards.insert(
shard1.id,
ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());

View File

@ -5,7 +5,7 @@ use std::{
sync::Arc,
};
use data_types::{NamespaceId, PartitionInfo, PartitionKey, SequenceNumber, ShardId};
use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId};
use dml::DmlOperation;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -26,8 +26,11 @@ use crate::lifecycle::LifecycleHandle;
#[derive(Debug)]
pub struct NamespaceData {
namespace_id: NamespaceId,
tables: RwLock<BTreeMap<String, Arc<tokio::sync::RwLock<TableData>>>>,
/// The catalog ID of the shard this namespace is being populated from.
shard_id: ShardId,
tables: RwLock<BTreeMap<String, Arc<tokio::sync::RwLock<TableData>>>>,
table_count: U64Counter,
/// The sequence number being actively written, if any.
@ -78,7 +81,7 @@ pub struct NamespaceData {
impl NamespaceData {
/// Initialize new tables with default partition template of daily
pub fn new(namespace_id: NamespaceId, metrics: &metric::Registry) -> Self {
pub fn new(namespace_id: NamespaceId, shard_id: ShardId, metrics: &metric::Registry) -> Self {
let table_count = metrics
.register_metric::<U64Counter>(
"ingester_tables_total",
@ -88,6 +91,7 @@ impl NamespaceData {
Self {
namespace_id,
shard_id,
tables: Default::default(),
table_count,
buffering_sequence_number: RwLock::new(None),
@ -100,10 +104,12 @@ impl NamespaceData {
#[cfg(test)]
pub(crate) fn new_for_test(
namespace_id: NamespaceId,
shard_id: ShardId,
tables: BTreeMap<String, Arc<tokio::sync::RwLock<TableData>>>,
) -> Self {
Self {
namespace_id,
shard_id,
tables: RwLock::new(tables),
table_count: Default::default(),
buffering_sequence_number: RwLock::new(None),
@ -117,7 +123,6 @@ impl NamespaceData {
pub async fn buffer_operation(
&self,
dml_operation: DmlOperation,
shard_id: ShardId,
catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle,
executor: &Executor,
@ -148,7 +153,7 @@ impl NamespaceData {
for (t, b) in write.into_tables() {
let table_data = match self.table_data(&t) {
Some(t) => t,
None => self.insert_table(shard_id, &t, catalog).await?,
None => self.insert_table(&t, catalog).await?,
};
{
@ -159,7 +164,6 @@ impl NamespaceData {
sequence_number,
b,
partition_key.clone(),
shard_id,
catalog,
lifecycle_handle,
)
@ -176,20 +180,13 @@ impl NamespaceData {
let table_name = delete.table_name().context(super::TableNotPresentSnafu)?;
let table_data = match self.table_data(table_name) {
Some(t) => t,
None => self.insert_table(shard_id, table_name, catalog).await?,
None => self.insert_table(table_name, catalog).await?,
};
let mut table_data = table_data.write().await;
table_data
.buffer_delete(
table_name,
delete.predicate(),
shard_id,
sequence_number,
catalog,
executor,
)
.buffer_delete(delete.predicate(), sequence_number, catalog, executor)
.await?;
// don't pause writes since deletes don't count towards memory limits
@ -224,22 +221,16 @@ impl NamespaceData {
/// or persist, None will be returned.
pub async fn snapshot_to_persisting(
&self,
partition_info: &PartitionInfo,
table_name: &str,
partition_key: &PartitionKey,
) -> Option<Arc<PersistingBatch>> {
if let Some(table_data) = self.table_data(&partition_info.table_name) {
if let Some(table_data) = self.table_data(table_name) {
let mut table_data = table_data.write().await;
return table_data
.partition_data
.get_mut(&partition_info.partition.partition_key)
.and_then(|partition_data| {
partition_data.snapshot_to_persisting_batch(
partition_info.partition.shard_id,
partition_info.partition.table_id,
partition_info.partition.id,
&partition_info.table_name,
)
});
.get_mut(partition_key)
.and_then(|partition_data| partition_data.snapshot_to_persisting_batch());
}
None
@ -257,14 +248,13 @@ impl NamespaceData {
/// Inserts the table or returns it if it happens to be inserted by some other thread
async fn insert_table(
&self,
shard_id: ShardId,
table_name: &str,
catalog: &dyn Catalog,
) -> Result<Arc<tokio::sync::RwLock<TableData>>, super::Error> {
let mut repos = catalog.repositories().await;
let info = repos
.tables()
.get_table_persist_info(shard_id, self.namespace_id, table_name)
.get_table_persist_info(self.shard_id, self.namespace_id, table_name)
.await
.context(super::CatalogSnafu)?
.context(super::TableNotFoundSnafu { table_name })?;
@ -275,6 +265,8 @@ impl NamespaceData {
Entry::Vacant(v) => {
let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new(
info.table_id,
table_name,
self.shard_id,
info.tombstone_max_sequence_number,
))));
self.table_count.inc(1);

View File

@ -111,7 +111,14 @@ impl SnapshotBatch {
/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct PartitionData {
/// The catalog ID of the partition this buffer is for.
id: PartitionId,
/// The shard and table IDs for this partition.
shard_id: ShardId,
table_id: TableId,
/// The name of the table this partition is part of.
table_name: Arc<str>,
pub(crate) data: DataBuffer,
/// The max_persisted_sequence number for any parquet_file in this
@ -121,24 +128,27 @@ pub(crate) struct PartitionData {
impl PartitionData {
/// Initialize a new partition data buffer
pub fn new(id: PartitionId, max_persisted_sequence_number: Option<SequenceNumber>) -> Self {
pub fn new(
id: PartitionId,
shard_id: ShardId,
table_id: TableId,
table_name: Arc<str>,
max_persisted_sequence_number: Option<SequenceNumber>,
) -> Self {
Self {
id,
shard_id,
table_id,
table_name,
data: Default::default(),
max_persisted_sequence_number,
}
}
/// Snapshot anything in the buffer and move all snapshot data into a persisting batch
pub fn snapshot_to_persisting_batch(
&mut self,
shard_id: ShardId,
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Option<Arc<PersistingBatch>> {
pub fn snapshot_to_persisting_batch(&mut self) -> Option<Arc<PersistingBatch>> {
self.data
.snapshot_to_persisting(shard_id, table_id, partition_id, table_name)
.snapshot_to_persisting(self.shard_id, self.table_id, self.id, &self.table_name)
}
/// Snapshot whatever is in the buffer and return a new vec of the
@ -189,12 +199,7 @@ impl PartitionData {
/// tombstone-applied snapshot
/// . The tombstone is only added in the `deletes_during_persisting` if the `persisting`
/// exists
pub(crate) async fn buffer_tombstone(
&mut self,
executor: &Executor,
table_name: &str,
tombstone: Tombstone,
) {
pub(crate) async fn buffer_tombstone(&mut self, executor: &Executor, tombstone: Tombstone) {
self.data.add_tombstone(tombstone.clone());
// ----------------------------------------------------------
@ -202,7 +207,7 @@ impl PartitionData {
// Make a QueryableBatch for all buffer + snapshots + the given tombstone
let max_sequence_number = tombstone.sequence_number;
let query_batch = match self.data.snapshot_to_queryable_batch(
table_name,
&self.table_name,
self.id,
Some(tombstone.clone()),
) {
@ -292,11 +297,13 @@ mod tests {
#[test]
fn snapshot_buffer_different_but_compatible_schemas() {
let mut partition_data = PartitionData {
id: PartitionId::new(1),
data: Default::default(),
max_persisted_sequence_number: None,
};
let mut partition_data = PartitionData::new(
PartitionId::new(1),
ShardId::new(1),
TableId::new(1),
"foo".into(),
None,
);
let seq_num1 = SequenceNumber::new(1);
// Missing tag `t1`
@ -335,8 +342,13 @@ mod tests {
let s_id = 1;
let t_id = 1;
let p_id = 1;
let table_name = "restaurant";
let mut p = PartitionData::new(PartitionId::new(p_id), None);
let mut p = PartitionData::new(
PartitionId::new(p_id),
ShardId::new(s_id),
TableId::new(t_id),
"restaurant".into(),
None,
);
let exec = Executor::new(1);
// ------------------------------------------
@ -376,7 +388,7 @@ mod tests {
"day=thu", // delete predicate
);
// one row will get deleted, the other is moved to snapshot
p.buffer_tombstone(&exec, "restaurant", ts).await;
p.buffer_tombstone(&exec, ts).await;
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete
@ -439,7 +451,7 @@ mod tests {
);
// two rows will get deleted, one from existing snapshot, one from the buffer being moved
// to snpashot
p.buffer_tombstone(&exec, "restaurant", ts).await;
p.buffer_tombstone(&exec, ts).await;
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete
@ -462,14 +474,7 @@ mod tests {
// ------------------------------------------
// Persisting
let p_batch = p
.snapshot_to_persisting_batch(
ShardId::new(s_id),
TableId::new(t_id),
PartitionId::new(p_id),
table_name,
)
.unwrap();
let p_batch = p.snapshot_to_persisting_batch().unwrap();
// verify data
assert!(p.data.buffer.is_none()); // always empty after issuing persit
@ -491,7 +496,7 @@ mod tests {
);
// if a query come while persisting, the row with temp=55 will be deleted before
// data is sent back to Querier
p.buffer_tombstone(&exec, "restaurant", ts).await;
p.buffer_tombstone(&exec, ts).await;
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete
@ -559,7 +564,7 @@ mod tests {
"temp=60", // delete predicate
);
// the row with temp=60 will be removed from the sanphot
p.buffer_tombstone(&exec, "restaurant", ts).await;
p.buffer_tombstone(&exec, ts).await;
// verify data
assert!(p.data.buffer.is_none()); // always empty after delete

View File

@ -109,7 +109,7 @@ impl DataBuffer {
/// Both buffer and snapshots will be empty after this
pub(super) fn snapshot_to_queryable_batch(
&mut self,
table_name: &str,
table_name: &Arc<str>,
partition_id: PartitionId,
tombstone: Option<Tombstone>,
) -> Option<QueryableBatch> {
@ -129,7 +129,7 @@ impl DataBuffer {
None
} else {
Some(QueryableBatch::new(
table_name,
Arc::clone(table_name),
partition_id,
data,
tombstones,
@ -164,7 +164,7 @@ impl DataBuffer {
shard_id: ShardId,
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
table_name: &Arc<str>,
) -> Option<Arc<PersistingBatch>> {
if self.persisting.is_some() {
panic!("Unable to snapshot while persisting. This is an unexpected state.")

View File

@ -22,6 +22,8 @@ use crate::lifecycle::LifecycleHandle;
pub struct ShardData {
/// The shard index for this shard
shard_index: ShardIndex,
/// The catalog ID for this shard.
shard_id: ShardId,
// New namespaces can come in at any time so we need to be able to add new ones
namespaces: RwLock<BTreeMap<String, Arc<NamespaceData>>>,
@ -32,7 +34,7 @@ pub struct ShardData {
impl ShardData {
/// Initialise a new [`ShardData`] that emits metrics to `metrics`.
pub fn new(shard_index: ShardIndex, metrics: Arc<metric::Registry>) -> Self {
pub fn new(shard_index: ShardIndex, shard_id: ShardId, metrics: Arc<metric::Registry>) -> Self {
let namespace_count = metrics
.register_metric::<U64Counter>(
"ingester_namespaces_total",
@ -42,6 +44,7 @@ impl ShardData {
Self {
shard_index,
shard_id,
namespaces: Default::default(),
metrics,
namespace_count,
@ -52,10 +55,12 @@ impl ShardData {
#[cfg(test)]
pub fn new_for_test(
shard_index: ShardIndex,
shard_id: ShardId,
namespaces: BTreeMap<String, Arc<NamespaceData>>,
) -> Self {
Self {
shard_index,
shard_id,
namespaces: RwLock::new(namespaces),
metrics: Default::default(),
namespace_count: Default::default(),
@ -69,7 +74,6 @@ impl ShardData {
pub async fn buffer_operation(
&self,
dml_operation: DmlOperation,
shard_id: ShardId,
catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle,
executor: &Executor,
@ -83,7 +87,7 @@ impl ShardData {
};
namespace_data
.buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor)
.buffer_operation(dml_operation, catalog, lifecycle_handle, executor)
.await
}
@ -112,7 +116,11 @@ impl ShardData {
let data = match n.entry(namespace.name) {
Entry::Vacant(v) => {
let v = v.insert(Arc::new(NamespaceData::new(namespace.id, &*self.metrics)));
let v = v.insert(Arc::new(NamespaceData::new(
namespace.id,
self.shard_id,
&*self.metrics,
)));
self.namespace_count.inc(1);
Arc::clone(v)
}

View File

@ -1,6 +1,6 @@
//! Table level data buffer structures.
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};
use data_types::{DeletePredicate, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp};
use iox_catalog::interface::Catalog;
@ -16,6 +16,11 @@ use crate::lifecycle::LifecycleHandle;
#[derive(Debug)]
pub(crate) struct TableData {
table_id: TableId,
table_name: Arc<str>,
/// The catalog ID of the shard this table is being populated from.
shard_id: ShardId,
// the max sequence number for a tombstone associated with this table
tombstone_max_sequence_number: Option<SequenceNumber>,
// Map pf partition key to its data
@ -24,9 +29,16 @@ pub(crate) struct TableData {
impl TableData {
/// Initialize new table buffer
pub fn new(table_id: TableId, tombstone_max_sequence_number: Option<SequenceNumber>) -> Self {
pub fn new(
table_id: TableId,
table_name: &str,
shard_id: ShardId,
tombstone_max_sequence_number: Option<SequenceNumber>,
) -> Self {
Self {
table_id,
table_name: table_name.into(),
shard_id,
tombstone_max_sequence_number,
partition_data: Default::default(),
}
@ -36,11 +48,15 @@ impl TableData {
#[cfg(test)]
pub fn new_for_test(
table_id: TableId,
table_name: &str,
shard_id: ShardId,
tombstone_max_sequence_number: Option<SequenceNumber>,
partitions: BTreeMap<PartitionKey, PartitionData>,
) -> Self {
Self {
table_id,
table_name: table_name.into(),
shard_id,
tombstone_max_sequence_number,
partition_data: partitions,
}
@ -68,14 +84,13 @@ impl TableData {
sequence_number: SequenceNumber,
batch: MutableBatch,
partition_key: PartitionKey,
shard_id: ShardId,
catalog: &dyn Catalog,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool, super::Error> {
let partition_data = match self.partition_data.get_mut(&partition_key) {
Some(p) => p,
None => {
self.insert_partition(partition_key.clone(), shard_id, catalog)
self.insert_partition(partition_key.clone(), self.shard_id, catalog)
.await?;
self.partition_data.get_mut(&partition_key).unwrap()
}
@ -90,7 +105,7 @@ impl TableData {
let should_pause = lifecycle_handle.log_write(
partition_data.id(),
shard_id,
self.shard_id,
sequence_number,
batch.size(),
batch.rows(),
@ -102,9 +117,7 @@ impl TableData {
pub(super) async fn buffer_delete(
&mut self,
table_name: &str,
predicate: &DeletePredicate,
shard_id: ShardId,
sequence_number: SequenceNumber,
catalog: &dyn Catalog,
executor: &Executor,
@ -117,7 +130,7 @@ impl TableData {
.tombstones()
.create_or_get(
self.table_id,
shard_id,
self.shard_id,
sequence_number,
min_time,
max_time,
@ -131,8 +144,7 @@ impl TableData {
// modify one partition at a time
for data in self.partition_data.values_mut() {
data.buffer_tombstone(executor, table_name, tombstone.clone())
.await;
data.buffer_tombstone(executor, tombstone.clone()).await;
}
Ok(())
@ -161,26 +173,23 @@ impl TableData {
shard_id: ShardId,
catalog: &dyn Catalog,
) -> Result<(), super::Error> {
let mut repos = catalog.repositories().await;
let partition = repos
let partition = catalog
.repositories()
.await
.partitions()
.create_or_get(partition_key, shard_id, self.table_id)
.await
.context(super::CatalogSnafu)?;
// get info on the persisted parquet files to use later for replay or for snapshot
// information on query.
let files = repos
.parquet_files()
.list_by_partition_not_to_delete(partition.id)
.await
.context(super::CatalogSnafu)?;
// for now we just need the max persisted
let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max();
self.partition_data.insert(
partition.partition_key,
PartitionData::new(partition.id, max_persisted_sequence_number),
PartitionData::new(
partition.id,
shard_id,
self.table_id,
Arc::clone(&self.table_name),
partition.persisted_sequence_number,
),
);
Ok(())

View File

@ -140,7 +140,7 @@ impl IngestHandlerImpl {
for s in shard_states.values() {
shards.insert(
s.id,
ShardData::new(s.shard_index, Arc::clone(&metric_registry)),
ShardData::new(s.shard_index, s.id, Arc::clone(&metric_registry)),
);
}
let data = Arc::new(IngesterData::new(

View File

@ -191,7 +191,7 @@ async fn prepare_data_to_querier_for_partition(
.persisting
.unwrap_or_else(|| {
QueryableBatch::new(
&request.table,
request.table.clone().into(),
unpersisted_partition_data.partition_id,
vec![],
vec![],

View File

@ -57,7 +57,7 @@ pub struct QueryableBatch {
pub(crate) delete_predicates: Vec<Arc<DeletePredicate>>,
/// This is needed to return a reference for a trait function
pub(crate) table_name: String,
pub(crate) table_name: Arc<str>,
/// Partition ID
pub(crate) partition_id: PartitionId,
@ -66,7 +66,7 @@ pub struct QueryableBatch {
impl QueryableBatch {
/// Initilaize a QueryableBatch
pub fn new(
table_name: &str,
table_name: Arc<str>,
partition_id: PartitionId,
data: Vec<Arc<SnapshotBatch>>,
deletes: Vec<Tombstone>,
@ -75,7 +75,7 @@ impl QueryableBatch {
Self {
data,
delete_predicates,
table_name: table_name.to_string(),
table_name,
partition_id,
}
}
@ -318,7 +318,7 @@ mod tests {
// This new queryable batch will convert tombstone to delete predicates
let query_batch =
QueryableBatch::new("test_table", PartitionId::new(0), vec![], tombstones);
QueryableBatch::new("test_table".into(), PartitionId::new(0), vec![], tombstones);
let predicates = query_batch.delete_predicates();
let expected = vec![
Arc::new(DeletePredicate {

View File

@ -205,7 +205,7 @@ pub fn make_queryable_batch_with_deletes(
}
Arc::new(QueryableBatch::new(
table_name,
table_name.into(),
PartitionId::new(partition_id),
snapshots,
tombstones,
@ -685,16 +685,20 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
let data_table_id = TableId::new(2);
// Make partitions per requested
let partitions = make_partitions(two_partitions, loc, shard_id, data_table_id, TEST_TABLE);
let partitions = make_partitions(two_partitions, loc, shard_id, data_table_id);
// Two tables: one empty and one with data of one or two partitions
let mut tables = BTreeMap::new();
let empty_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new(
empty_table_id,
"test_table",
shard_id,
None,
)));
let data_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new_for_test(
data_table_id,
"test_table",
shard_id,
None,
partitions,
)));
@ -703,14 +707,18 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
// Two namespaces: one empty and one with data of 2 tables
let mut namespaces = BTreeMap::new();
let empty_ns = Arc::new(NamespaceData::new(NamespaceId::new(1), &*metrics));
let data_ns = Arc::new(NamespaceData::new_for_test(NamespaceId::new(2), tables));
let empty_ns = Arc::new(NamespaceData::new(NamespaceId::new(1), shard_id, &*metrics));
let data_ns = Arc::new(NamespaceData::new_for_test(
NamespaceId::new(2),
shard_id,
tables,
));
namespaces.insert(TEST_NAMESPACE_EMPTY.to_string(), empty_ns);
namespaces.insert(TEST_NAMESPACE.to_string(), data_ns);
// One shard that contains 2 namespaces
let shard_index = ShardIndex::new(0);
let shard_data = ShardData::new_for_test(shard_index, namespaces);
let shard_data = ShardData::new_for_test(shard_index, shard_id, namespaces);
let mut shards = BTreeMap::new();
shards.insert(shard_id, shard_data);
@ -737,12 +745,11 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
let data_table_id = TableId::new(2);
// Make partitions per requested
let partitions =
make_one_partition_with_tombstones(&exec, loc, shard_id, data_table_id, TEST_TABLE).await;
let partitions = make_one_partition_with_tombstones(&exec, loc, shard_id, data_table_id).await;
// Two tables: one empty and one with data of one or two partitions
let mut tables = BTreeMap::new();
let data_tbl = TableData::new_for_test(data_table_id, None, partitions);
let data_tbl = TableData::new_for_test(data_table_id, TEST_TABLE, shard_id, None, partitions);
tables.insert(
TEST_TABLE.to_string(),
Arc::new(tokio::sync::RwLock::new(data_tbl)),
@ -750,12 +757,16 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
// Two namespaces: one empty and one with data of 2 tables
let mut namespaces = BTreeMap::new();
let data_ns = Arc::new(NamespaceData::new_for_test(NamespaceId::new(2), tables));
let data_ns = Arc::new(NamespaceData::new_for_test(
NamespaceId::new(2),
shard_id,
tables,
));
namespaces.insert(TEST_NAMESPACE.to_string(), data_ns);
// One shard that contains 1 namespace
let shard_index = ShardIndex::new(0);
let shard_data = ShardData::new_for_test(shard_index, namespaces);
let shard_data = ShardData::new_for_test(shard_index, shard_id, namespaces);
let mut shards = BTreeMap::new();
shards.insert(shard_id, shard_data);
@ -776,7 +787,6 @@ pub(crate) fn make_partitions(
loc: DataLocation,
shard_id: ShardId,
table_id: TableId,
table_name: &str,
) -> BTreeMap<PartitionKey, PartitionData> {
// In-memory data includes these rows but split between 4 groups go into
// different batches of parittion 1 or partittion 2 as requeted
@ -800,8 +810,7 @@ pub(crate) fn make_partitions(
// ------------------------------------------
// Build the first partition
let partition_id = PartitionId::new(1);
let (mut p1, seq_num) =
make_first_partition_data(partition_id, loc, shard_id, table_id, table_name);
let (mut p1, seq_num) = make_first_partition_data(partition_id, loc, shard_id, table_id);
// ------------------------------------------
// Build the second partition if asked
@ -809,7 +818,7 @@ pub(crate) fn make_partitions(
let mut seq_num = seq_num.get();
if two_partitions {
let partition_id = PartitionId::new(2);
let mut p2 = PartitionData::new(partition_id, None);
let mut p2 = PartitionData::new(partition_id, shard_id, table_id, TEST_TABLE.into(), None);
// Group 4: in buffer of p2
// Fill `buffer`
seq_num += 1;
@ -849,7 +858,6 @@ pub(crate) async fn make_one_partition_with_tombstones(
loc: DataLocation,
shard_id: ShardId,
table_id: TableId,
table_name: &str,
) -> BTreeMap<PartitionKey, PartitionData> {
// In-memory data includes these rows but split between 4 groups go into
// different batches of parittion 1 or partittion 2 as requeted
@ -869,8 +877,7 @@ pub(crate) async fn make_one_partition_with_tombstones(
// ];
let partition_id = PartitionId::new(1);
let (mut p1, seq_num) =
make_first_partition_data(partition_id, loc, shard_id, table_id, table_name);
let (mut p1, seq_num) = make_first_partition_data(partition_id, loc, shard_id, table_id);
// Add tombstones
// Depending on where the existing data is, they (buffer & snapshot) will be either moved to a new snapshot after
@ -888,7 +895,7 @@ pub(crate) async fn make_one_partition_with_tombstones(
50, // max time of data to get deleted
"city=Boston", // delete predicate
);
p1.buffer_tombstone(exec, table_name, ts).await;
p1.buffer_tombstone(exec, ts).await;
// Group 4: in buffer of p1 after the tombstone
// Fill `buffer`
@ -914,7 +921,6 @@ fn make_first_partition_data(
loc: DataLocation,
shard_id: ShardId,
table_id: TableId,
table_name: &str,
) -> (PartitionData, SequenceNumber) {
// In-memory data includes these rows but split between 3 groups go into
// different batches of parittion p1
@ -933,7 +939,7 @@ fn make_first_partition_data(
// ------------------------------------------
// Build the first partition
let mut p1 = PartitionData::new(partition_id, None);
let mut p1 = PartitionData::new(partition_id, shard_id, table_id, TEST_TABLE.into(), None);
let mut seq_num = 0;
// --------------------
@ -952,7 +958,7 @@ fn make_first_partition_data(
if loc.contains(DataLocation::PERSISTING) {
// Move group 1 data to persisting
p1.snapshot_to_persisting_batch(shard_id, table_id, partition_id, table_name);
p1.snapshot_to_persisting_batch();
} else if loc.contains(DataLocation::SNAPSHOT) {
// move group 1 data to snapshot
p1.snapshot().unwrap();

View File

@ -694,7 +694,11 @@ impl MockIngester {
let shards = BTreeMap::from([(
shard.shard.id,
ShardData::new(shard.shard.shard_index, catalog.metric_registry()),
ShardData::new(
shard.shard.shard_index,
shard.shard.id,
catalog.metric_registry(),
),
)]);
let ingester_data = Arc::new(IngesterData::new(
catalog.object_store(),