Merge pull request #5791 from influxdata/dom/remove-partition-queries

refactor: reference buffer tree nodes by ID
pull/24376/head
kodiakhq[bot] 2022-10-05 10:54:19 +00:00 committed by GitHub
commit e81dad972f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 726 additions and 301 deletions

View File

@ -86,11 +86,8 @@ pub(crate) async fn compact_persisting_batch(
namespace_id: i64,
partition_info: &PartitionInfo,
batch: Arc<PersistingBatch>,
) -> Result<Option<CompactedStream>> {
// Nothing to compact
if batch.data.data.is_empty() {
return Ok(None);
}
) -> Result<CompactedStream> {
assert!(!batch.data.data.is_empty());
let namespace_name = &partition_info.namespace_name;
let table_name = &partition_info.table_name;
@ -141,11 +138,11 @@ pub(crate) async fn compact_persisting_batch(
sort_key: Some(metadata_sort_key),
};
Ok(Some(CompactedStream {
Ok(CompactedStream {
stream,
iox_metadata,
sort_key_update,
}))
})
}
/// Compact a given Queryable Batch
@ -254,7 +251,6 @@ mod tests {
let CompactedStream { stream, .. } =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -328,7 +324,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -426,7 +421,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -527,7 +521,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -629,7 +622,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -739,7 +731,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)

View File

@ -6,7 +6,7 @@ use arrow::{error::ArrowError, record_batch::RecordBatch};
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{PartitionId, SequenceNumber, ShardId, ShardIndex};
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId};
use datafusion::physical_plan::SendableRecordBatchStream;
use dml::DmlOperation;
use futures::{Stream, StreamExt};
@ -220,7 +220,13 @@ impl IngesterData {
#[async_trait]
pub trait Persister: Send + Sync + 'static {
/// Persits the partition ID. Will retry forever until it succeeds.
async fn persist(&self, partition_id: PartitionId);
async fn persist(
&self,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
);
/// Updates the shard's `min_unpersisted_sequence_number` in the catalog.
/// This number represents the minimum that might be unpersisted, which is the
@ -235,7 +241,69 @@ pub trait Persister: Send + Sync + 'static {
#[async_trait]
impl Persister for IngesterData {
async fn persist(&self, partition_id: PartitionId) {
async fn persist(
&self,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
) {
// lookup the state from the ingester data. If something isn't found,
// it's unexpected. Crash so someone can take a look.
let shard_data = self
.shards
.get(&shard_id)
.unwrap_or_else(|| panic!("shard state for {shard_id} not in ingester data"));
let namespace = shard_data
.namespace_by_id(namespace_id)
.unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state"));
let partition_key;
let batch;
{
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
});
let mut guard = table_data.write().await;
let partition = guard.get_partition(partition_id).unwrap_or_else(|| {
panic!(
"partition {partition_id} in table {table_id} in namespace {namespace_id} not in shard {shard_id} state"
)
});
partition_key = partition.partition_key().clone();
batch = partition.snapshot_to_persisting_batch();
};
debug!(%shard_id, %namespace_id, %table_id, %partition_id, %partition_key, "persisting partition");
// Check if there is any data to persist.
let batch = match batch {
Some(v) if !v.data.data.is_empty() => v,
_ => {
warn!(
%shard_id,
%namespace_id,
%table_id,
%partition_id,
%partition_key,
"partition marked for persistence contains no data"
);
return;
}
};
// lookup column IDs from catalog
// TODO: this can be removed once the ingester uses column IDs internally as well
let table_schema = Backoff::new(&self.backoff_config)
.retry_all_errors("get table schema", || async {
let mut repos = self.catalog.repositories().await;
get_table_schema_by_id(table_id, repos.as_mut()).await
})
.await
.expect("retry forever");
// lookup the partition_info from the catalog
let partition_info = Backoff::new(&self.backoff_config)
.retry_all_errors("get partition_info_by_id", || async {
@ -243,217 +311,158 @@ impl Persister for IngesterData {
repos.partitions().partition_info_by_id(partition_id).await
})
.await
.expect("retry forever");
.expect("retry forever").unwrap_or_else(|| panic!("partition {partition_id} in table {table_id} in namespace {namespace_id} in shard {shard_id} has no partition info in catalog"));
// lookup the state from the ingester data. If something isn't found, it's unexpected. Crash
// so someone can take a look.
let partition_info = partition_info
.unwrap_or_else(|| panic!("partition {} not found in catalog", partition_id));
let shard_data = self
.shards
.get(&partition_info.partition.shard_id)
.unwrap_or_else(|| {
panic!(
"shard state for {} not in ingester data",
partition_info.partition.shard_id
)
}); //{
let namespace = shard_data
.namespace(&partition_info.namespace_name)
.unwrap_or_else(|| {
panic!(
"namespace {} not in shard {} state",
partition_info.namespace_name, partition_info.partition.shard_id
)
});
debug!(?partition_id, ?partition_info, "persisting partition");
// do the CPU intensive work of compaction, de-duplication and sorting
let CompactedStream {
stream: record_stream,
iox_metadata,
sort_key_update,
} = compact_persisting_batch(
Arc::new(SystemProvider::new()),
&self.exec,
namespace.namespace_id().get(),
&partition_info,
Arc::clone(&batch),
)
.await
.expect("unable to compact persisting batch");
// lookup column IDs from catalog
// TODO: this can be removed once the ingester uses column IDs internally as well
let table_schema = Backoff::new(&self.backoff_config)
.retry_all_errors("get table schema", || async {
let mut repos = self.catalog.repositories().await;
let table = repos
.tables()
.get_by_namespace_and_name(namespace.namespace_id(), &partition_info.table_name)
.await?
.expect("table not found in catalog");
get_table_schema_by_id(table.id, repos.as_mut()).await
})
// Save the compacted data to a parquet file in object storage.
//
// This call retries until it completes.
let (md, file_size) = self
.store
.upload(record_stream, &iox_metadata)
.await
.expect("retry forever");
.expect("unexpected fatal persist error");
let persisting_batch = namespace
.snapshot_to_persisting(
&partition_info.table_name,
&partition_info.partition.partition_key,
)
.await;
if let Some(persisting_batch) = persisting_batch {
// do the CPU intensive work of compaction, de-duplication and sorting
let compacted_stream = match compact_persisting_batch(
Arc::new(SystemProvider::new()),
&self.exec,
namespace.namespace_id().get(),
&partition_info,
Arc::clone(&persisting_batch),
)
.await
{
Err(e) => {
// this should never error out. if it does, we need to crash hard so
// someone can take a look.
panic!("unable to compact persisting batch with error: {:?}", e);
}
Ok(Some(r)) => r,
Ok(None) => {
warn!("persist called with no data");
return;
}
};
let CompactedStream {
stream: record_stream,
iox_metadata,
sort_key_update,
} = compacted_stream;
// Save the compacted data to a parquet file in object storage.
//
// This call retries until it completes.
let (md, file_size) = self
.store
.upload(record_stream, &iox_metadata)
.await
.expect("unexpected fatal persist error");
// Update the sort key in the catalog if there are
// additional columns BEFORE adding parquet file to the
// catalog. If the order is reversed, the querier or
// compactor may see a parquet file with an inconsistent
// sort key. https://github.com/influxdata/influxdb_iox/issues/5090
if let Some(new_sort_key) = sort_key_update {
let sort_key = new_sort_key.to_columns().collect::<Vec<_>>();
Backoff::new(&self.backoff_config)
.retry_all_errors("update_sort_key", || async {
let mut repos = self.catalog.repositories().await;
let _partition = repos
.partitions()
.update_sort_key(partition_id, &sort_key)
.await?;
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
debug!(
?partition_id,
table = partition_info.table_name,
?new_sort_key,
"adjusted sort key during batch compact & persist"
);
}
// Add the parquet file to the catalog until succeed
let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| {
table_schema.columns.get(name).expect("Unknown column").id
});
// Assert partitions are persisted in-order.
//
// It is an invariant that partitions are persisted in order so that
// both the per-shard, and per-partition watermarks are correctly
// advanced and accurate.
if let Some(last_persist) = partition_info.partition.persisted_sequence_number {
assert!(
parquet_file.max_sequence_number > last_persist,
"out of order partition persistence, persisting {}, previously persisted {}",
parquet_file.max_sequence_number.get(),
last_persist.get(),
);
}
// Add the parquet file to the catalog.
//
// This has the effect of allowing the queriers to "discover" the
// parquet file by polling / querying the catalog.
// Update the sort key in the catalog if there are
// additional columns BEFORE adding parquet file to the
// catalog. If the order is reversed, the querier or
// compactor may see a parquet file with an inconsistent
// sort key. https://github.com/influxdata/influxdb_iox/issues/5090
if let Some(new_sort_key) = sort_key_update {
let sort_key = new_sort_key.to_columns().collect::<Vec<_>>();
Backoff::new(&self.backoff_config)
.retry_all_errors("add parquet file to catalog", || async {
.retry_all_errors("update_sort_key", || async {
let mut repos = self.catalog.repositories().await;
let parquet_file = repos.parquet_files().create(parquet_file.clone()).await?;
debug!(
?partition_id,
table_id=?parquet_file.table_id,
parquet_file_id=?parquet_file.id,
table_name=%iox_metadata.table_name,
"parquet file written to catalog"
);
let _partition = repos
.partitions()
.update_sort_key(partition_id, &sort_key)
.await?;
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
// Update the per-partition persistence watermark, so that new
// ingester instances skip the just-persisted ops during replay.
//
// This could be transactional with the above parquet insert to
// maintain catalog consistency, though in practice it is an
// unnecessary overhead - the system can tolerate replaying the ops
// that lead to this parquet file being generated, and tolerate
// creating a parquet file containing duplicate data (remedied by
// compaction).
//
// This means it is possible to observe a parquet file with a
// max_persisted_sequence_number >
// partition.persisted_sequence_number, either in-between these
// catalog updates, or for however long it takes a crashed ingester
// to restart and replay the ops, and re-persist a file containing
// the same (or subset of) data.
//
// The above is also true of the per-shard persist marker that
// governs the ingester's replay start point, which is
// non-transactionally updated after all partitions have persisted.
Backoff::new(&self.backoff_config)
.retry_all_errors("set partition persist marker", || async {
self.catalog
.repositories()
.await
.partitions()
.update_persisted_sequence_number(
parquet_file.partition_id,
parquet_file.max_sequence_number,
)
.await
})
.await
.expect("retry forever");
// Record metrics
let attributes = Attributes::from([(
"shard_id",
format!("{}", partition_info.partition.shard_id).into(),
)]);
self.persisted_file_size_bytes
.recorder(attributes)
.record(file_size as u64);
// and remove the persisted data from memory
namespace
.mark_persisted(
&partition_info.table_name,
&partition_info.partition.partition_key,
iox_metadata.max_sequence_number,
)
.await;
debug!(
?partition_id,
table_name=%partition_info.table_name,
partition_key=%partition_info.partition.partition_key,
max_sequence_number=%iox_metadata.max_sequence_number.get(),
"marked partition as persisted"
table = partition_info.table_name,
?new_sort_key,
"adjusted sort key during batch compact & persist"
);
}
// Add the parquet file to the catalog until succeed
let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| {
table_schema.columns.get(name).expect("Unknown column").id
});
// Assert partitions are persisted in-order.
//
// It is an invariant that partitions are persisted in order so that
// both the per-shard, and per-partition watermarks are correctly
// advanced and accurate.
if let Some(last_persist) = partition_info.partition.persisted_sequence_number {
assert!(
parquet_file.max_sequence_number > last_persist,
"out of order partition persistence, persisting {}, previously persisted {}",
parquet_file.max_sequence_number.get(),
last_persist.get(),
);
}
// Add the parquet file to the catalog.
//
// This has the effect of allowing the queriers to "discover" the
// parquet file by polling / querying the catalog.
Backoff::new(&self.backoff_config)
.retry_all_errors("add parquet file to catalog", || async {
let mut repos = self.catalog.repositories().await;
let parquet_file = repos.parquet_files().create(parquet_file.clone()).await?;
debug!(
?partition_id,
table_id=?parquet_file.table_id,
parquet_file_id=?parquet_file.id,
table_name=%iox_metadata.table_name,
"parquet file written to catalog"
);
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
// Update the per-partition persistence watermark, so that new
// ingester instances skip the just-persisted ops during replay.
//
// This could be transactional with the above parquet insert to
// maintain catalog consistency, though in practice it is an
// unnecessary overhead - the system can tolerate replaying the ops
// that lead to this parquet file being generated, and tolerate
// creating a parquet file containing duplicate data (remedied by
// compaction).
//
// This means it is possible to observe a parquet file with a
// max_persisted_sequence_number >
// partition.persisted_sequence_number, either in-between these
// catalog updates, or for however long it takes a crashed ingester
// to restart and replay the ops, and re-persist a file containing
// the same (or subset of) data.
//
// The above is also true of the per-shard persist marker that
// governs the ingester's replay start point, which is
// non-transactionally updated after all partitions have persisted.
Backoff::new(&self.backoff_config)
.retry_all_errors("set partition persist marker", || async {
self.catalog
.repositories()
.await
.partitions()
.update_persisted_sequence_number(
parquet_file.partition_id,
parquet_file.max_sequence_number,
)
.await
})
.await
.expect("retry forever");
// Record metrics
let attributes = Attributes::from([(
"shard_id",
format!("{}", partition_info.partition.shard_id).into(),
)]);
self.persisted_file_size_bytes
.recorder(attributes)
.record(file_size as u64);
// and remove the persisted data from memory
namespace
.mark_persisted(
&partition_info.table_name,
&partition_info.partition.partition_key,
iox_metadata.max_sequence_number,
)
.await;
debug!(
?partition_id,
table_name=%partition_info.table_name,
partition_key=%partition_info.partition.partition_key,
max_sequence_number=%iox_metadata.max_sequence_number.get(),
"marked partition as persisted"
);
}
async fn update_min_unpersisted_sequence_number(
@ -804,17 +813,20 @@ mod tests {
// limits)
assert!(!should_pause);
let partition_id = {
let (table_id, partition_id) = {
let sd = data.shards.get(&shard1.id).unwrap();
let n = sd.namespace("foo").unwrap();
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("mem").is_some());
let mem_table = mem_table.write().await;
let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap();
p.id()
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
.unwrap();
(mem_table.table_id(), p.partition_id())
};
data.persist(partition_id).await;
data.persist(shard1.id, namespace.id, table_id, partition_id)
.await;
// verify that a file got put into object store
let file_paths: Vec<_> = object_store
@ -951,11 +963,14 @@ mod tests {
{
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("cpu").is_some());
let mem_table = mem_table.write().await;
let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap();
let mem_table = mem_table.write().await;
table_id = mem_table.table_id();
partition_id = p.id();
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
.unwrap();
partition_id = p.partition_id();
}
{
// verify the partition doesn't have a sort key before any data has been persisted
@ -969,7 +984,8 @@ mod tests {
assert!(partition_info.partition.sort_key.is_empty());
}
data.persist(partition_id).await;
data.persist(shard1.id, namespace.id, table_id, partition_id)
.await;
// verify that a file got put into object store
let file_paths: Vec<_> = object_store
@ -1352,7 +1368,7 @@ mod tests {
{
let table_data = data.table_data("mem").unwrap();
let table = table_data.read().await;
let p = table.partition_data.get(&"1970-01-01".into()).unwrap();
let p = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
assert_eq!(
p.max_persisted_sequence_number(),
Some(SequenceNumber::new(1))
@ -1368,7 +1384,7 @@ mod tests {
let table_data = data.table_data("mem").unwrap();
let table = table_data.read().await;
let partition = table.partition_data.get(&"1970-01-01".into()).unwrap();
let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
assert_eq!(
partition.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(2)

View File

@ -1,11 +1,8 @@
//! Namespace level data buffer structures.
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};
use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId};
use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId};
use dml::DmlOperation;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -16,12 +13,37 @@ use write_summary::ShardProgress;
#[cfg(test)]
use super::triggers::TestTriggers;
use super::{
partition::{resolver::PartitionProvider, PersistingBatch},
table::TableData,
};
use super::{partition::resolver::PartitionProvider, table::TableData};
use crate::lifecycle::LifecycleHandle;
/// A double-referenced map where [`TableData`] can be looked up by name, or ID.
#[derive(Debug, Default)]
struct DoubleRef {
// TODO(4880): this can be removed when IDs are sent over the wire.
by_name: HashMap<Arc<str>, Arc<tokio::sync::RwLock<TableData>>>,
by_id: HashMap<TableId, Arc<tokio::sync::RwLock<TableData>>>,
}
impl DoubleRef {
fn insert(&mut self, t: TableData) -> Arc<tokio::sync::RwLock<TableData>> {
let name = Arc::clone(t.table_name());
let id = t.table_id();
let t = Arc::new(tokio::sync::RwLock::new(t));
self.by_name.insert(name, Arc::clone(&t));
self.by_id.insert(id, Arc::clone(&t));
t
}
fn by_name(&self, name: &str) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
self.by_name.get(name).map(Arc::clone)
}
fn by_id(&self, id: TableId) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
self.by_id.get(&id).map(Arc::clone)
}
}
/// Data of a Namespace that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct NamespaceData {
@ -30,7 +52,7 @@ pub(crate) struct NamespaceData {
/// The catalog ID of the shard this namespace is being populated from.
shard_id: ShardId,
tables: RwLock<BTreeMap<String, Arc<tokio::sync::RwLock<TableData>>>>,
tables: RwLock<DoubleRef>,
table_count: U64Counter,
/// The resolver of `(shard_id, table_id, partition_key)` to
@ -198,12 +220,12 @@ impl NamespaceData {
partition_key: &PartitionKey,
) -> Option<(
Vec<Arc<super::partition::SnapshotBatch>>,
Option<Arc<PersistingBatch>>,
Option<Arc<super::partition::PersistingBatch>>,
)> {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
return t.partition_data.get_mut(partition_key).map(|p| {
return t.get_partition_by_key_mut(partition_key).map(|p| {
p.data
.generate_snapshot()
.expect("snapshot on mutable batch should never fail");
@ -217,17 +239,17 @@ impl NamespaceData {
/// Snapshots the mutable buffer for the partition, which clears it out and then moves all
/// snapshots over to a persisting batch, which is returned. If there is no data to snapshot
/// or persist, None will be returned.
#[cfg(test)] // Only used in tests
pub(crate) async fn snapshot_to_persisting(
&self,
table_name: &str,
partition_key: &PartitionKey,
) -> Option<Arc<PersistingBatch>> {
) -> Option<Arc<super::partition::PersistingBatch>> {
if let Some(table_data) = self.table_data(table_name) {
let mut table_data = table_data.write().await;
return table_data
.partition_data
.get_mut(partition_key)
.get_partition_by_key_mut(partition_key)
.and_then(|partition_data| partition_data.snapshot_to_persisting_batch());
}
@ -240,7 +262,16 @@ impl NamespaceData {
table_name: &str,
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
let t = self.tables.read();
t.get(table_name).cloned()
t.by_name(table_name)
}
/// Return the table data by ID.
pub(crate) fn table_id(
&self,
table_id: TableId,
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
let t = self.tables.read();
t.by_id(table_id)
}
/// Inserts the table or returns it if it happens to be inserted by some other thread
@ -259,23 +290,22 @@ impl NamespaceData {
let mut t = self.tables.write();
let data = match t.entry(table_name.to_string()) {
Entry::Vacant(v) => {
let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new(
Ok(match t.by_name(table_name) {
Some(v) => v,
None => {
self.table_count.inc(1);
// Insert the table and then return a ref to it.
t.insert(TableData::new(
info.table_id,
table_name,
self.shard_id,
self.namespace_id,
info.tombstone_max_sequence_number,
Arc::clone(&self.partition_provider),
))));
self.table_count.inc(1);
Arc::clone(v)
))
}
Entry::Occupied(v) => Arc::clone(v.get()),
};
Ok(data)
})
}
/// Walks down the table and partition and clears the persisting batch. The sequence number is
@ -289,7 +319,7 @@ impl NamespaceData {
) {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
let partition = t.partition_data.get_mut(partition_key);
let partition = t.get_partition_by_key_mut(partition_key);
if let Some(p) = partition {
p.mark_persisted(sequence_number);
@ -299,7 +329,7 @@ impl NamespaceData {
/// Return progress from this Namespace
pub(super) async fn progress(&self) -> ShardProgress {
let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect();
let tables: Vec<_> = self.tables.read().by_id.values().map(Arc::clone).collect();
// Consolidate progtress across partitions.
let mut progress = ShardProgress::new()
@ -357,3 +387,84 @@ impl<'a> Drop for ScopedSequenceNumber<'a> {
*buffering_sequence_number = None;
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::{PartitionId, ShardIndex};
use metric::{Attributes, Metric};
use crate::{
data::partition::{resolver::MockPartitionProvider, PartitionData},
lifecycle::mock_handle::MockLifecycleHandle,
test_util::{make_write_op, populate_catalog},
};
use super::*;
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
#[tokio::test]
async fn test_namespace_double_ref() {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
let exec = Executor::new(1);
// Populate the catalog with the shard / namespace / table
let (shard_id, ns_id, table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
// Configure the mock partition provider to return a partition for this
// table ID.
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("banana-split"),
shard_id,
ns_id,
table_id,
TABLE_NAME.into(),
None,
),
));
let ns = NamespaceData::new(ns_id, shard_id, partition_provider, &*metrics);
// Assert the namespace does not contain the test data
assert!(ns.table_data(TABLE_NAME).is_none());
assert!(ns.table_id(table_id).is_none());
// Write some test data
ns.buffer_operation(
DmlOperation::Write(make_write_op(
&PartitionKey::from("banana-split"),
SHARD_INDEX,
NAMESPACE_NAME,
0,
r#"bananas,city=Medford day="sun",temp=55 22"#,
)),
&catalog,
&MockLifecycleHandle::default(),
&exec,
)
.await
.expect("buffer op should succeed");
// Both forms of referencing the table should succeed
assert!(ns.table_data(TABLE_NAME).is_some());
assert!(ns.table_id(table_id).is_some());
// And the table counter metric should increase
let tables = metrics
.get_instrument::<Metric<U64Counter>>("ingester_tables_total")
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch();
assert_eq!(tables, 1);
}
}

View File

@ -302,7 +302,7 @@ impl PartitionData {
self.data.progress()
}
pub(super) fn id(&self) -> PartitionId {
pub(super) fn partition_id(&self) -> PartitionId {
self.id
}

View File

@ -221,7 +221,7 @@ mod tests {
)
.await;
assert_eq!(got.id(), PARTITION_ID);
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);
@ -255,7 +255,7 @@ mod tests {
)
.await;
assert_eq!(got.id(), PARTITION_ID);
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);
@ -307,7 +307,7 @@ mod tests {
)
.await;
assert_eq!(got.id(), other_key_id);
assert_eq!(got.partition_id(), other_key_id);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);
@ -346,7 +346,7 @@ mod tests {
)
.await;
assert_eq!(got.id(), PARTITION_ID);
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), other_table);
assert_eq!(got.table_name(), TABLE_NAME);
@ -385,7 +385,7 @@ mod tests {
)
.await;
assert_eq!(got.id(), PARTITION_ID);
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.shard_id(), other_shard);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);

View File

@ -82,7 +82,7 @@ mod tests {
Arc::clone(&table_name),
)
.await;
assert_eq!(got.id(), partition);
assert_eq!(got.partition_id(), partition);
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(*got.table_name(), *table_name);
}

View File

@ -1,11 +1,8 @@
//! Shard level data buffer structures.
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};
use data_types::{ShardId, ShardIndex};
use data_types::{NamespaceId, ShardId, ShardIndex};
use dml::DmlOperation;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -17,6 +14,34 @@ use write_summary::ShardProgress;
use super::{namespace::NamespaceData, partition::resolver::PartitionProvider};
use crate::lifecycle::LifecycleHandle;
/// A double-referenced map where [`NamespaceData`] can be looked up by name, or
/// ID.
#[derive(Debug, Default)]
struct DoubleRef {
// TODO(4880): this can be removed when IDs are sent over the wire.
by_name: HashMap<String, Arc<NamespaceData>>,
by_id: HashMap<NamespaceId, Arc<NamespaceData>>,
}
impl DoubleRef {
fn insert(&mut self, name: String, ns: NamespaceData) -> Arc<NamespaceData> {
let id = ns.namespace_id();
let ns = Arc::new(ns);
self.by_name.insert(name, Arc::clone(&ns));
self.by_id.insert(id, Arc::clone(&ns));
ns
}
fn by_name(&self, name: &str) -> Option<Arc<NamespaceData>> {
self.by_name.get(name).map(Arc::clone)
}
fn by_id(&self, id: NamespaceId) -> Option<Arc<NamespaceData>> {
self.by_id.get(&id).map(Arc::clone)
}
}
/// Data of a Shard
#[derive(Debug)]
pub(crate) struct ShardData {
@ -32,7 +57,7 @@ pub(crate) struct ShardData {
partition_provider: Arc<dyn PartitionProvider>,
// New namespaces can come in at any time so we need to be able to add new ones
namespaces: RwLock<BTreeMap<String, Arc<NamespaceData>>>,
namespaces: RwLock<DoubleRef>,
metrics: Arc<metric::Registry>,
namespace_count: U64Counter,
@ -90,7 +115,17 @@ impl ShardData {
/// Gets the namespace data out of the map
pub(crate) fn namespace(&self, namespace: &str) -> Option<Arc<NamespaceData>> {
let n = self.namespaces.read();
n.get(namespace).cloned()
n.by_name(namespace)
}
/// Gets the namespace data out of the map
pub(crate) fn namespace_by_id(&self, namespace_id: NamespaceId) -> Option<Arc<NamespaceData>> {
// TODO: this should be the default once IDs are pushed over the wire.
//
// At which point the map should be indexed by IDs, instead of namespace
// names.
let n = self.namespaces.read();
n.by_id(namespace_id)
}
/// Retrieves the namespace from the catalog and initializes an empty buffer, or
@ -110,26 +145,34 @@ impl ShardData {
let mut n = self.namespaces.write();
let data = match n.entry(namespace.name) {
Entry::Vacant(v) => {
let v = v.insert(Arc::new(NamespaceData::new(
namespace.id,
self.shard_id,
Arc::clone(&self.partition_provider),
&*self.metrics,
)));
Ok(match n.by_name(&namespace.name) {
Some(v) => v,
None => {
self.namespace_count.inc(1);
Arc::clone(v)
}
Entry::Occupied(v) => Arc::clone(v.get()),
};
Ok(data)
// Insert the table and then return a ref to it.
n.insert(
namespace.name,
NamespaceData::new(
namespace.id,
self.shard_id,
Arc::clone(&self.partition_provider),
&*self.metrics,
),
)
}
})
}
/// Return the progress of this shard
pub(super) async fn progress(&self) -> ShardProgress {
let namespaces: Vec<_> = self.namespaces.read().values().map(Arc::clone).collect();
let namespaces: Vec<_> = self
.namespaces
.read()
.by_id
.values()
.map(Arc::clone)
.collect();
let mut progress = ShardProgress::new();
@ -144,3 +187,90 @@ impl ShardData {
self.shard_index
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::{PartitionId, PartitionKey, ShardIndex};
use metric::{Attributes, Metric};
use crate::{
data::partition::{resolver::MockPartitionProvider, PartitionData},
lifecycle::mock_handle::MockLifecycleHandle,
test_util::{make_write_op, populate_catalog},
};
use super::*;
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
#[tokio::test]
async fn test_shard_double_ref() {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
let exec = Executor::new(1);
// Populate the catalog with the shard / namespace / table
let (shard_id, ns_id, table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
// Configure the mock partition provider to return a partition for this
// table ID.
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("banana-split"),
shard_id,
ns_id,
table_id,
TABLE_NAME.into(),
None,
),
));
let shard = ShardData::new(
SHARD_INDEX,
shard_id,
partition_provider,
Arc::clone(&metrics),
);
// Assert the namespace does not contain the test data
assert!(shard.namespace(NAMESPACE_NAME).is_none());
assert!(shard.namespace_by_id(ns_id).is_none());
// Write some test data
shard
.buffer_operation(
DmlOperation::Write(make_write_op(
&PartitionKey::from("banana-split"),
SHARD_INDEX,
NAMESPACE_NAME,
0,
r#"bananas,city=Medford day="sun",temp=55 22"#,
)),
&catalog,
&MockLifecycleHandle::default(),
&exec,
)
.await
.expect("buffer op should succeed");
// Both forms of referencing the table should succeed
assert!(shard.namespace(NAMESPACE_NAME).is_some());
assert!(shard.namespace_by_id(ns_id).is_some());
// And the table counter metric should increase
let tables = metrics
.get_instrument::<Metric<U64Counter>>("ingester_namespaces_total")
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch();
assert_eq!(tables, 1);
}
}

View File

@ -1,9 +1,10 @@
//! Table level data buffer structures.
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::HashMap, sync::Arc};
use data_types::{
DeletePredicate, NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp,
DeletePredicate, NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId,
Timestamp,
};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -16,6 +17,39 @@ use super::partition::{
};
use crate::lifecycle::LifecycleHandle;
/// A double-referenced map where [`PartitionData`] can be looked up by
/// [`PartitionKey`], or ID.
#[derive(Debug, Default)]
struct DoubleRef {
// TODO(4880): this can be removed when IDs are sent over the wire.
by_key: HashMap<PartitionKey, PartitionData>,
by_id: HashMap<PartitionId, PartitionKey>,
}
impl DoubleRef {
fn insert(&mut self, ns: PartitionData) {
let id = ns.partition_id();
let key = ns.partition_key().clone();
assert!(self.by_key.insert(key.clone(), ns).is_none());
assert!(self.by_id.insert(id, key).is_none());
}
#[cfg(test)]
fn by_key(&self, key: &PartitionKey) -> Option<&PartitionData> {
self.by_key.get(key)
}
fn by_key_mut(&mut self, key: &PartitionKey) -> Option<&mut PartitionData> {
self.by_key.get_mut(key)
}
fn by_id_mut(&mut self, id: PartitionId) -> Option<&mut PartitionData> {
let key = self.by_id.get(&id)?.clone();
self.by_key_mut(&key)
}
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct TableData {
@ -34,8 +68,8 @@ pub(crate) struct TableData {
/// `(key, shard, table)` triplet.
partition_provider: Arc<dyn PartitionProvider>,
// Map pf partition key to its data
pub(super) partition_data: BTreeMap<PartitionKey, PartitionData>,
// Map of partition key to its data
partition_data: DoubleRef,
}
impl TableData {
@ -71,6 +105,7 @@ impl TableData {
/// Return parquet_max_sequence_number
pub(super) fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.partition_data
.by_key
.values()
.map(|p| p.max_persisted_sequence_number())
.max()
@ -92,7 +127,7 @@ impl TableData {
partition_key: PartitionKey,
lifecycle_handle: &dyn LifecycleHandle,
) -> Result<bool, super::Error> {
let partition_data = match self.partition_data.get_mut(&partition_key) {
let partition_data = match self.partition_data.by_key.get_mut(&partition_key) {
Some(p) => p,
None => {
let p = self
@ -105,12 +140,9 @@ impl TableData {
Arc::clone(&self.table_name),
)
.await;
// Add the partition to the map.
assert!(self
.partition_data
.insert(partition_key.clone(), p)
.is_none());
self.partition_data.get_mut(&partition_key).unwrap()
// Add the double-referenced partition to the map.
self.partition_data.insert(p);
self.partition_data.by_key_mut(&partition_key).unwrap()
}
};
@ -131,7 +163,7 @@ impl TableData {
// op may fail which would lead to a write being recorded, but not
// applied.
let should_pause = lifecycle_handle.log_write(
partition_data.id(),
partition_data.partition_id(),
self.shard_id,
self.namespace_id,
self.table_id,
@ -171,18 +203,45 @@ impl TableData {
self.tombstone_max_sequence_number = Some(sequence_number);
// modify one partition at a time
for data in self.partition_data.values_mut() {
for data in self.partition_data.by_key.values_mut() {
data.buffer_tombstone(executor, tombstone.clone()).await;
}
Ok(())
}
/// Return the [`PartitionData`] for the specified ID.
#[allow(unused)]
pub(crate) fn get_partition(
&mut self,
partition_id: PartitionId,
) -> Option<&mut PartitionData> {
self.partition_data.by_id_mut(partition_id)
}
/// Return the [`PartitionData`] for the specified partition key.
#[cfg(test)]
pub(crate) fn get_partition_by_key(
&self,
partition_key: &PartitionKey,
) -> Option<&PartitionData> {
self.partition_data.by_key(partition_key)
}
/// Return the [`PartitionData`] for the specified partition key.
pub(crate) fn get_partition_by_key_mut(
&mut self,
partition_key: &PartitionKey,
) -> Option<&mut PartitionData> {
self.partition_data.by_key_mut(partition_key)
}
pub(crate) fn unpersisted_partition_data(&self) -> Vec<UnpersistedPartitionData> {
self.partition_data
.by_key
.values()
.map(|p| UnpersistedPartitionData {
partition_id: p.id(),
partition_id: p.partition_id(),
non_persisted: p
.get_non_persisting_data()
.expect("get_non_persisting should always work"),
@ -204,16 +263,22 @@ impl TableData {
};
self.partition_data
.by_key
.values()
.fold(progress, |progress, partition_data| {
progress.combine(partition_data.progress())
})
}
#[cfg(test)]
/// Returns the table ID for this partition.
pub(super) fn table_id(&self) -> TableId {
self.table_id
}
/// Returns the name of this table.
pub(crate) fn table_name(&self) -> &Arc<str> {
&self.table_name
}
}
#[cfg(test)]
@ -243,6 +308,65 @@ mod tests {
const PARTITION_KEY: &str = "platanos";
const PARTITION_ID: PartitionId = PartitionId::new(0);
#[tokio::test]
async fn test_partition_double_ref() {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
// Populate the catalog with the shard / namespace / table
let (shard_id, ns_id, table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
// Configure the mock partition provider to return a partition for this
// table ID.
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
shard_id,
ns_id,
table_id,
TABLE_NAME.into(),
None,
),
));
let mut table = TableData::new(
table_id,
TABLE_NAME,
shard_id,
ns_id,
None,
partition_provider,
);
let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0)
.unwrap()
.remove(TABLE_NAME)
.unwrap();
// Assert the table does not contain the test partition
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none());
assert!(table.partition_data.by_id_mut(PARTITION_ID).is_none());
// Write some test data
let pause = table
.buffer_table_write(
SequenceNumber::new(42),
batch,
PARTITION_KEY.into(),
&MockLifecycleHandle::default(),
)
.await
.expect("buffer op should succeed");
assert!(!pause);
// Referencing the partition should succeed
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some());
assert!(table.partition_data.by_id_mut(PARTITION_ID).is_some());
}
#[tokio::test]
async fn test_bad_write_memory_counting() {
let metrics = Arc::new(metric::Registry::default());
@ -286,7 +410,7 @@ mod tests {
let handle = MockLifecycleHandle::default();
// Assert the table does not contain the test partition
assert!(table.partition_data.get(&PARTITION_KEY.into()).is_none());
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none());
// Write some test data
let pause = table
@ -301,7 +425,7 @@ mod tests {
assert!(!pause);
// Referencing the partition should succeed
assert!(table.partition_data.get(&PARTITION_KEY.into()).is_some());
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some());
// And the lifecycle handle was called with the expected values
assert_eq!(

View File

@ -234,7 +234,7 @@ struct LifecycleStats {
}
/// The stats for a partition
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
struct PartitionLifecycleStats {
/// The shard this partition is under
shard_id: ShardId,
@ -469,6 +469,18 @@ impl LifecycleManager {
let persist_tasks: Vec<_> = to_persist
.into_iter()
.map(|s| {
// BUG: TOCTOU: memory usage released may be incorrect.
//
// Here the amount of memory to be reduced is acquired, but this
// code does not prevent continued writes adding more data to
// the partition in another thread.
//
// This may lead to more actual data being persisted than the
// call below returns to the server pool - this would slowly
// starve the ingester of memory it thinks it has.
//
// See https://github.com/influxdata/influxdb_iox/issues/5777
// Mark this partition as being persisted, and remember the
// memory allocation it had accumulated.
let partition_memory_usage = self
@ -483,7 +495,9 @@ impl LifecycleManager {
let state = Arc::clone(&self.state);
tokio::task::spawn(async move {
persister.persist(s.partition_id).await;
persister
.persist(s.shard_id, s.namespace_id, s.table_id, s.partition_id)
.await;
// Now the data has been uploaded and the memory it was
// using has been freed, released the memory capacity back
// the ingester.
@ -602,7 +616,13 @@ mod tests {
#[async_trait]
impl Persister for TestPersister {
async fn persist(&self, partition_id: PartitionId) {
async fn persist(
&self,
_shard_id: ShardId,
_namespace_id: NamespaceId,
_table_id: TableId,
partition_id: PartitionId,
) {
let mut p = self.persist_called.lock();
p.insert(partition_id);
}
@ -662,8 +682,16 @@ mod tests {
#[async_trait]
impl Persister for PausablePersister {
async fn persist(&self, partition_id: PartitionId) {
self.inner.persist(partition_id).await;
async fn persist(
&self,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
) {
self.inner
.persist(shard_id, namespace_id, table_id, partition_id)
.await;
if let Some(event) = self.event(partition_id) {
event.before.wait().await;
event.after.wait().await;

View File

@ -752,7 +752,32 @@ impl MockIngester {
.map(|f| f.id)
.collect();
self.ingester_data.persist(*partition_id).await;
let p = self
.catalog
.catalog
.repositories()
.await
.partitions()
.get_by_id(*partition_id)
.await
.unwrap()
.expect("partition not found");
let namespace_id = self
.catalog
.catalog
.repositories()
.await
.tables()
.get_by_id(p.table_id)
.await
.unwrap()
.expect("table does not exist")
.namespace_id;
self.ingester_data
.persist(p.shard_id, namespace_id, p.table_id, *partition_id)
.await;
result.extend(
self.catalog