refactor: persist() passes all necessary IDs

This commit changes the persist() call so that it passes through all
relevant IDs so that the impl can locate the partition in the buffer
tree - this will enable elimination of many queries against the catalog
in the future.

This commit also cleans up the persist() impl, deferring queries until
the result will be used to avoid unnecessary load, improves logging &
error handling, and documents a TOCTOU bug in code:

    https://github.com/influxdata/influxdb_iox/issues/5777
pull/24376/head
Dom Dwyer 2022-09-30 15:18:50 +02:00
parent f9bf86927d
commit 1a7eb47b81
6 changed files with 281 additions and 228 deletions

View File

@ -86,11 +86,8 @@ pub(crate) async fn compact_persisting_batch(
namespace_id: i64,
partition_info: &PartitionInfo,
batch: Arc<PersistingBatch>,
) -> Result<Option<CompactedStream>> {
// Nothing to compact
if batch.data.data.is_empty() {
return Ok(None);
}
) -> Result<CompactedStream> {
assert!(!batch.data.data.is_empty());
let namespace_name = &partition_info.namespace_name;
let table_name = &partition_info.table_name;
@ -141,11 +138,11 @@ pub(crate) async fn compact_persisting_batch(
sort_key: Some(metadata_sort_key),
};
Ok(Some(CompactedStream {
Ok(CompactedStream {
stream,
iox_metadata,
sort_key_update,
}))
})
}
/// Compact a given Queryable Batch
@ -254,7 +251,6 @@ mod tests {
let CompactedStream { stream, .. } =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -328,7 +324,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -426,7 +421,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -527,7 +521,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -629,7 +622,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
@ -739,7 +731,6 @@ mod tests {
sort_key_update,
} = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)

View File

@ -6,7 +6,7 @@ use arrow::{error::ArrowError, record_batch::RecordBatch};
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{PartitionId, SequenceNumber, ShardId, ShardIndex};
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId};
use datafusion::physical_plan::SendableRecordBatchStream;
use dml::DmlOperation;
use futures::{Stream, StreamExt};
@ -220,7 +220,13 @@ impl IngesterData {
#[async_trait]
pub trait Persister: Send + Sync + 'static {
/// Persits the partition ID. Will retry forever until it succeeds.
async fn persist(&self, partition_id: PartitionId);
async fn persist(
&self,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
);
/// Updates the shard's `min_unpersisted_sequence_number` in the catalog.
/// This number represents the minimum that might be unpersisted, which is the
@ -235,7 +241,69 @@ pub trait Persister: Send + Sync + 'static {
#[async_trait]
impl Persister for IngesterData {
async fn persist(&self, partition_id: PartitionId) {
async fn persist(
&self,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
) {
// lookup the state from the ingester data. If something isn't found,
// it's unexpected. Crash so someone can take a look.
let shard_data = self
.shards
.get(&shard_id)
.unwrap_or_else(|| panic!("shard state for {shard_id} not in ingester data"));
let namespace = shard_data
.namespace_by_id(namespace_id)
.unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state"));
let partition_key;
let batch;
{
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
});
let mut guard = table_data.write().await;
let partition = guard.get_partition(partition_id).unwrap_or_else(|| {
panic!(
"partition {partition_id} in table {table_id} in namespace {namespace_id} not in shard {shard_id} state"
)
});
partition_key = partition.partition_key().clone();
batch = partition.snapshot_to_persisting_batch();
};
debug!(%shard_id, %namespace_id, %table_id, %partition_id, %partition_key, "persisting partition");
// Check if there is any data to persist.
let batch = match batch {
Some(v) if !v.data.data.is_empty() => v,
_ => {
warn!(
%shard_id,
%namespace_id,
%table_id,
%partition_id,
%partition_key,
"partition marked for persistence contains no data"
);
return;
}
};
// lookup column IDs from catalog
// TODO: this can be removed once the ingester uses column IDs internally as well
let table_schema = Backoff::new(&self.backoff_config)
.retry_all_errors("get table schema", || async {
let mut repos = self.catalog.repositories().await;
get_table_schema_by_id(table_id, repos.as_mut()).await
})
.await
.expect("retry forever");
// lookup the partition_info from the catalog
let partition_info = Backoff::new(&self.backoff_config)
.retry_all_errors("get partition_info_by_id", || async {
@ -243,217 +311,158 @@ impl Persister for IngesterData {
repos.partitions().partition_info_by_id(partition_id).await
})
.await
.expect("retry forever");
.expect("retry forever").unwrap_or_else(|| panic!("partition {partition_id} in table {table_id} in namespace {namespace_id} in shard {shard_id} has no partition info in catalog"));
// lookup the state from the ingester data. If something isn't found, it's unexpected. Crash
// so someone can take a look.
let partition_info = partition_info
.unwrap_or_else(|| panic!("partition {} not found in catalog", partition_id));
let shard_data = self
.shards
.get(&partition_info.partition.shard_id)
.unwrap_or_else(|| {
panic!(
"shard state for {} not in ingester data",
partition_info.partition.shard_id
)
}); //{
let namespace = shard_data
.namespace(&partition_info.namespace_name)
.unwrap_or_else(|| {
panic!(
"namespace {} not in shard {} state",
partition_info.namespace_name, partition_info.partition.shard_id
)
});
debug!(?partition_id, ?partition_info, "persisting partition");
// do the CPU intensive work of compaction, de-duplication and sorting
let CompactedStream {
stream: record_stream,
iox_metadata,
sort_key_update,
} = compact_persisting_batch(
Arc::new(SystemProvider::new()),
&self.exec,
namespace.namespace_id().get(),
&partition_info,
Arc::clone(&batch),
)
.await
.expect("unable to compact persisting batch");
// lookup column IDs from catalog
// TODO: this can be removed once the ingester uses column IDs internally as well
let table_schema = Backoff::new(&self.backoff_config)
.retry_all_errors("get table schema", || async {
let mut repos = self.catalog.repositories().await;
let table = repos
.tables()
.get_by_namespace_and_name(namespace.namespace_id(), &partition_info.table_name)
.await?
.expect("table not found in catalog");
get_table_schema_by_id(table.id, repos.as_mut()).await
})
// Save the compacted data to a parquet file in object storage.
//
// This call retries until it completes.
let (md, file_size) = self
.store
.upload(record_stream, &iox_metadata)
.await
.expect("retry forever");
.expect("unexpected fatal persist error");
let persisting_batch = namespace
.snapshot_to_persisting(
&partition_info.table_name,
&partition_info.partition.partition_key,
)
.await;
if let Some(persisting_batch) = persisting_batch {
// do the CPU intensive work of compaction, de-duplication and sorting
let compacted_stream = match compact_persisting_batch(
Arc::new(SystemProvider::new()),
&self.exec,
namespace.namespace_id().get(),
&partition_info,
Arc::clone(&persisting_batch),
)
.await
{
Err(e) => {
// this should never error out. if it does, we need to crash hard so
// someone can take a look.
panic!("unable to compact persisting batch with error: {:?}", e);
}
Ok(Some(r)) => r,
Ok(None) => {
warn!("persist called with no data");
return;
}
};
let CompactedStream {
stream: record_stream,
iox_metadata,
sort_key_update,
} = compacted_stream;
// Save the compacted data to a parquet file in object storage.
//
// This call retries until it completes.
let (md, file_size) = self
.store
.upload(record_stream, &iox_metadata)
.await
.expect("unexpected fatal persist error");
// Update the sort key in the catalog if there are
// additional columns BEFORE adding parquet file to the
// catalog. If the order is reversed, the querier or
// compactor may see a parquet file with an inconsistent
// sort key. https://github.com/influxdata/influxdb_iox/issues/5090
if let Some(new_sort_key) = sort_key_update {
let sort_key = new_sort_key.to_columns().collect::<Vec<_>>();
Backoff::new(&self.backoff_config)
.retry_all_errors("update_sort_key", || async {
let mut repos = self.catalog.repositories().await;
let _partition = repos
.partitions()
.update_sort_key(partition_id, &sort_key)
.await?;
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
debug!(
?partition_id,
table = partition_info.table_name,
?new_sort_key,
"adjusted sort key during batch compact & persist"
);
}
// Add the parquet file to the catalog until succeed
let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| {
table_schema.columns.get(name).expect("Unknown column").id
});
// Assert partitions are persisted in-order.
//
// It is an invariant that partitions are persisted in order so that
// both the per-shard, and per-partition watermarks are correctly
// advanced and accurate.
if let Some(last_persist) = partition_info.partition.persisted_sequence_number {
assert!(
parquet_file.max_sequence_number > last_persist,
"out of order partition persistence, persisting {}, previously persisted {}",
parquet_file.max_sequence_number.get(),
last_persist.get(),
);
}
// Add the parquet file to the catalog.
//
// This has the effect of allowing the queriers to "discover" the
// parquet file by polling / querying the catalog.
// Update the sort key in the catalog if there are
// additional columns BEFORE adding parquet file to the
// catalog. If the order is reversed, the querier or
// compactor may see a parquet file with an inconsistent
// sort key. https://github.com/influxdata/influxdb_iox/issues/5090
if let Some(new_sort_key) = sort_key_update {
let sort_key = new_sort_key.to_columns().collect::<Vec<_>>();
Backoff::new(&self.backoff_config)
.retry_all_errors("add parquet file to catalog", || async {
.retry_all_errors("update_sort_key", || async {
let mut repos = self.catalog.repositories().await;
let parquet_file = repos.parquet_files().create(parquet_file.clone()).await?;
debug!(
?partition_id,
table_id=?parquet_file.table_id,
parquet_file_id=?parquet_file.id,
table_name=%iox_metadata.table_name,
"parquet file written to catalog"
);
let _partition = repos
.partitions()
.update_sort_key(partition_id, &sort_key)
.await?;
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
// Update the per-partition persistence watermark, so that new
// ingester instances skip the just-persisted ops during replay.
//
// This could be transactional with the above parquet insert to
// maintain catalog consistency, though in practice it is an
// unnecessary overhead - the system can tolerate replaying the ops
// that lead to this parquet file being generated, and tolerate
// creating a parquet file containing duplicate data (remedied by
// compaction).
//
// This means it is possible to observe a parquet file with a
// max_persisted_sequence_number >
// partition.persisted_sequence_number, either in-between these
// catalog updates, or for however long it takes a crashed ingester
// to restart and replay the ops, and re-persist a file containing
// the same (or subset of) data.
//
// The above is also true of the per-shard persist marker that
// governs the ingester's replay start point, which is
// non-transactionally updated after all partitions have persisted.
Backoff::new(&self.backoff_config)
.retry_all_errors("set partition persist marker", || async {
self.catalog
.repositories()
.await
.partitions()
.update_persisted_sequence_number(
parquet_file.partition_id,
parquet_file.max_sequence_number,
)
.await
})
.await
.expect("retry forever");
// Record metrics
let attributes = Attributes::from([(
"shard_id",
format!("{}", partition_info.partition.shard_id).into(),
)]);
self.persisted_file_size_bytes
.recorder(attributes)
.record(file_size as u64);
// and remove the persisted data from memory
namespace
.mark_persisted(
&partition_info.table_name,
&partition_info.partition.partition_key,
iox_metadata.max_sequence_number,
)
.await;
debug!(
?partition_id,
table_name=%partition_info.table_name,
partition_key=%partition_info.partition.partition_key,
max_sequence_number=%iox_metadata.max_sequence_number.get(),
"marked partition as persisted"
table = partition_info.table_name,
?new_sort_key,
"adjusted sort key during batch compact & persist"
);
}
// Add the parquet file to the catalog until succeed
let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| {
table_schema.columns.get(name).expect("Unknown column").id
});
// Assert partitions are persisted in-order.
//
// It is an invariant that partitions are persisted in order so that
// both the per-shard, and per-partition watermarks are correctly
// advanced and accurate.
if let Some(last_persist) = partition_info.partition.persisted_sequence_number {
assert!(
parquet_file.max_sequence_number > last_persist,
"out of order partition persistence, persisting {}, previously persisted {}",
parquet_file.max_sequence_number.get(),
last_persist.get(),
);
}
// Add the parquet file to the catalog.
//
// This has the effect of allowing the queriers to "discover" the
// parquet file by polling / querying the catalog.
Backoff::new(&self.backoff_config)
.retry_all_errors("add parquet file to catalog", || async {
let mut repos = self.catalog.repositories().await;
let parquet_file = repos.parquet_files().create(parquet_file.clone()).await?;
debug!(
?partition_id,
table_id=?parquet_file.table_id,
parquet_file_id=?parquet_file.id,
table_name=%iox_metadata.table_name,
"parquet file written to catalog"
);
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
// Update the per-partition persistence watermark, so that new
// ingester instances skip the just-persisted ops during replay.
//
// This could be transactional with the above parquet insert to
// maintain catalog consistency, though in practice it is an
// unnecessary overhead - the system can tolerate replaying the ops
// that lead to this parquet file being generated, and tolerate
// creating a parquet file containing duplicate data (remedied by
// compaction).
//
// This means it is possible to observe a parquet file with a
// max_persisted_sequence_number >
// partition.persisted_sequence_number, either in-between these
// catalog updates, or for however long it takes a crashed ingester
// to restart and replay the ops, and re-persist a file containing
// the same (or subset of) data.
//
// The above is also true of the per-shard persist marker that
// governs the ingester's replay start point, which is
// non-transactionally updated after all partitions have persisted.
Backoff::new(&self.backoff_config)
.retry_all_errors("set partition persist marker", || async {
self.catalog
.repositories()
.await
.partitions()
.update_persisted_sequence_number(
parquet_file.partition_id,
parquet_file.max_sequence_number,
)
.await
})
.await
.expect("retry forever");
// Record metrics
let attributes = Attributes::from([(
"shard_id",
format!("{}", partition_info.partition.shard_id).into(),
)]);
self.persisted_file_size_bytes
.recorder(attributes)
.record(file_size as u64);
// and remove the persisted data from memory
namespace
.mark_persisted(
&partition_info.table_name,
&partition_info.partition.partition_key,
iox_metadata.max_sequence_number,
)
.await;
debug!(
?partition_id,
table_name=%partition_info.table_name,
partition_key=%partition_info.partition.partition_key,
max_sequence_number=%iox_metadata.max_sequence_number.get(),
"marked partition as persisted"
);
}
async fn update_min_unpersisted_sequence_number(
@ -804,7 +813,7 @@ mod tests {
// limits)
assert!(!should_pause);
let partition_id = {
let (table_id, partition_id) = {
let sd = data.shards.get(&shard1.id).unwrap();
let n = sd.namespace("foo").unwrap();
let mem_table = n.table_data("mem").unwrap();
@ -813,10 +822,11 @@ mod tests {
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
.unwrap();
p.partition_id()
(mem_table.table_id(), p.partition_id())
};
data.persist(partition_id).await;
data.persist(shard1.id, namespace.id, table_id, partition_id)
.await;
// verify that a file got put into object store
let file_paths: Vec<_> = object_store
@ -953,12 +963,13 @@ mod tests {
{
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("cpu").is_some());
let mem_table = mem_table.write().await;
table_id = mem_table.table_id();
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
.unwrap();
table_id = mem_table.table_id();
partition_id = p.partition_id();
}
{
@ -973,7 +984,8 @@ mod tests {
assert!(partition_info.partition.sort_key.is_empty());
}
data.persist(partition_id).await;
data.persist(shard1.id, namespace.id, table_id, partition_id)
.await;
// verify that a file got put into object store
let file_paths: Vec<_> = object_store

View File

@ -39,7 +39,6 @@ impl DoubleRef {
self.by_name.get(name).map(Arc::clone)
}
#[cfg(test)]
fn by_id(&self, id: TableId) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
self.by_id.get(&id).map(Arc::clone)
}
@ -240,6 +239,7 @@ impl NamespaceData {
/// Snapshots the mutable buffer for the partition, which clears it out and then moves all
/// snapshots over to a persisting batch, which is returned. If there is no data to snapshot
/// or persist, None will be returned.
#[cfg(test)] // Only used in tests
pub(crate) async fn snapshot_to_persisting(
&self,
table_name: &str,
@ -266,7 +266,6 @@ impl NamespaceData {
}
/// Return the table data by ID.
#[cfg(test)]
pub(crate) fn table_id(
&self,
table_id: TableId,

View File

@ -37,7 +37,6 @@ impl DoubleRef {
self.by_name.get(name).map(Arc::clone)
}
#[cfg(test)]
fn by_id(&self, id: NamespaceId) -> Option<Arc<NamespaceData>> {
self.by_id.get(&id).map(Arc::clone)
}
@ -120,7 +119,6 @@ impl ShardData {
}
/// Gets the namespace data out of the map
#[cfg(test)]
pub(crate) fn namespace_by_id(&self, namespace_id: NamespaceId) -> Option<Arc<NamespaceData>> {
// TODO: this should be the default once IDs are pushed over the wire.
//

View File

@ -234,7 +234,7 @@ struct LifecycleStats {
}
/// The stats for a partition
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
struct PartitionLifecycleStats {
/// The shard this partition is under
shard_id: ShardId,
@ -469,6 +469,18 @@ impl LifecycleManager {
let persist_tasks: Vec<_> = to_persist
.into_iter()
.map(|s| {
// BUG: TOCTOU: memory usage released may be incorrect.
//
// Here the amount of memory to be reduced is acquired, but this
// code does not prevent continued writes adding more data to
// the partition in another thread.
//
// This may lead to more actual data being persisted than the
// call below returns to the server pool - this would slowly
// starve the ingester of memory it thinks it has.
//
// See https://github.com/influxdata/influxdb_iox/issues/5777
// Mark this partition as being persisted, and remember the
// memory allocation it had accumulated.
let partition_memory_usage = self
@ -483,7 +495,9 @@ impl LifecycleManager {
let state = Arc::clone(&self.state);
tokio::task::spawn(async move {
persister.persist(s.partition_id).await;
persister
.persist(s.shard_id, s.namespace_id, s.table_id, s.partition_id)
.await;
// Now the data has been uploaded and the memory it was
// using has been freed, released the memory capacity back
// the ingester.
@ -602,7 +616,13 @@ mod tests {
#[async_trait]
impl Persister for TestPersister {
async fn persist(&self, partition_id: PartitionId) {
async fn persist(
&self,
_shard_id: ShardId,
_namespace_id: NamespaceId,
_table_id: TableId,
partition_id: PartitionId,
) {
let mut p = self.persist_called.lock();
p.insert(partition_id);
}
@ -662,8 +682,16 @@ mod tests {
#[async_trait]
impl Persister for PausablePersister {
async fn persist(&self, partition_id: PartitionId) {
self.inner.persist(partition_id).await;
async fn persist(
&self,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
) {
self.inner
.persist(shard_id, namespace_id, table_id, partition_id)
.await;
if let Some(event) = self.event(partition_id) {
event.before.wait().await;
event.after.wait().await;

View File

@ -752,7 +752,32 @@ impl MockIngester {
.map(|f| f.id)
.collect();
self.ingester_data.persist(*partition_id).await;
let p = self
.catalog
.catalog
.repositories()
.await
.partitions()
.get_by_id(*partition_id)
.await
.unwrap()
.expect("partition not found");
let namespace_id = self
.catalog
.catalog
.repositories()
.await
.tables()
.get_by_id(p.table_id)
.await
.unwrap()
.expect("table does not exist")
.namespace_id;
self.ingester_data
.persist(p.shard_id, namespace_id, p.table_id, *partition_id)
.await;
result.extend(
self.catalog