From 1a7eb47b8109deb0eef5366ae758640902c2b5ac Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 30 Sep 2022 15:18:50 +0200 Subject: [PATCH] refactor: persist() passes all necessary IDs This commit changes the persist() call so that it passes through all relevant IDs so that the impl can locate the partition in the buffer tree - this will enable elimination of many queries against the catalog in the future. This commit also cleans up the persist() impl, deferring queries until the result will be used to avoid unnecessary load, improves logging & error handling, and documents a TOCTOU bug in code: https://github.com/influxdata/influxdb_iox/issues/5777 --- ingester/src/compact.rs | 17 +- ingester/src/data.rs | 422 +++++++++++++++--------------- ingester/src/data/namespace.rs | 3 +- ingester/src/data/shard.rs | 2 - ingester/src/lifecycle.rs | 38 ++- query_tests/src/scenarios/util.rs | 27 +- 6 files changed, 281 insertions(+), 228 deletions(-) diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 040a1c983c..ce516ffe85 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -86,11 +86,8 @@ pub(crate) async fn compact_persisting_batch( namespace_id: i64, partition_info: &PartitionInfo, batch: Arc, -) -> Result> { - // Nothing to compact - if batch.data.data.is_empty() { - return Ok(None); - } +) -> Result { + assert!(!batch.data.data.is_empty()); let namespace_name = &partition_info.namespace_name; let table_name = &partition_info.table_name; @@ -141,11 +138,11 @@ pub(crate) async fn compact_persisting_batch( sort_key: Some(metadata_sort_key), }; - Ok(Some(CompactedStream { + Ok(CompactedStream { stream, iox_metadata, sort_key_update, - })) + }) } /// Compact a given Queryable Batch @@ -254,7 +251,6 @@ mod tests { let CompactedStream { stream, .. } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -328,7 +324,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -426,7 +421,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -527,7 +521,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -629,7 +622,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -739,7 +731,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 9c26a09a20..89486ed2df 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -6,7 +6,7 @@ use arrow::{error::ArrowError, record_batch::RecordBatch}; use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{PartitionId, SequenceNumber, ShardId, ShardIndex}; +use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId}; use datafusion::physical_plan::SendableRecordBatchStream; use dml::DmlOperation; use futures::{Stream, StreamExt}; @@ -220,7 +220,13 @@ impl IngesterData { #[async_trait] pub trait Persister: Send + Sync + 'static { /// Persits the partition ID. Will retry forever until it succeeds. - async fn persist(&self, partition_id: PartitionId); + async fn persist( + &self, + shard_id: ShardId, + namespace_id: NamespaceId, + table_id: TableId, + partition_id: PartitionId, + ); /// Updates the shard's `min_unpersisted_sequence_number` in the catalog. /// This number represents the minimum that might be unpersisted, which is the @@ -235,7 +241,69 @@ pub trait Persister: Send + Sync + 'static { #[async_trait] impl Persister for IngesterData { - async fn persist(&self, partition_id: PartitionId) { + async fn persist( + &self, + shard_id: ShardId, + namespace_id: NamespaceId, + table_id: TableId, + partition_id: PartitionId, + ) { + // lookup the state from the ingester data. If something isn't found, + // it's unexpected. Crash so someone can take a look. + let shard_data = self + .shards + .get(&shard_id) + .unwrap_or_else(|| panic!("shard state for {shard_id} not in ingester data")); + let namespace = shard_data + .namespace_by_id(namespace_id) + .unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state")); + + let partition_key; + let batch; + { + let table_data = namespace.table_id(table_id).unwrap_or_else(|| { + panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state") + }); + + let mut guard = table_data.write().await; + let partition = guard.get_partition(partition_id).unwrap_or_else(|| { + panic!( + "partition {partition_id} in table {table_id} in namespace {namespace_id} not in shard {shard_id} state" + ) + }); + + partition_key = partition.partition_key().clone(); + batch = partition.snapshot_to_persisting_batch(); + }; + + debug!(%shard_id, %namespace_id, %table_id, %partition_id, %partition_key, "persisting partition"); + + // Check if there is any data to persist. + let batch = match batch { + Some(v) if !v.data.data.is_empty() => v, + _ => { + warn!( + %shard_id, + %namespace_id, + %table_id, + %partition_id, + %partition_key, + "partition marked for persistence contains no data" + ); + return; + } + }; + + // lookup column IDs from catalog + // TODO: this can be removed once the ingester uses column IDs internally as well + let table_schema = Backoff::new(&self.backoff_config) + .retry_all_errors("get table schema", || async { + let mut repos = self.catalog.repositories().await; + get_table_schema_by_id(table_id, repos.as_mut()).await + }) + .await + .expect("retry forever"); + // lookup the partition_info from the catalog let partition_info = Backoff::new(&self.backoff_config) .retry_all_errors("get partition_info_by_id", || async { @@ -243,217 +311,158 @@ impl Persister for IngesterData { repos.partitions().partition_info_by_id(partition_id).await }) .await - .expect("retry forever"); + .expect("retry forever").unwrap_or_else(|| panic!("partition {partition_id} in table {table_id} in namespace {namespace_id} in shard {shard_id} has no partition info in catalog")); - // lookup the state from the ingester data. If something isn't found, it's unexpected. Crash - // so someone can take a look. - let partition_info = partition_info - .unwrap_or_else(|| panic!("partition {} not found in catalog", partition_id)); - let shard_data = self - .shards - .get(&partition_info.partition.shard_id) - .unwrap_or_else(|| { - panic!( - "shard state for {} not in ingester data", - partition_info.partition.shard_id - ) - }); //{ - let namespace = shard_data - .namespace(&partition_info.namespace_name) - .unwrap_or_else(|| { - panic!( - "namespace {} not in shard {} state", - partition_info.namespace_name, partition_info.partition.shard_id - ) - }); - debug!(?partition_id, ?partition_info, "persisting partition"); + // do the CPU intensive work of compaction, de-duplication and sorting + let CompactedStream { + stream: record_stream, + iox_metadata, + sort_key_update, + } = compact_persisting_batch( + Arc::new(SystemProvider::new()), + &self.exec, + namespace.namespace_id().get(), + &partition_info, + Arc::clone(&batch), + ) + .await + .expect("unable to compact persisting batch"); - // lookup column IDs from catalog - // TODO: this can be removed once the ingester uses column IDs internally as well - let table_schema = Backoff::new(&self.backoff_config) - .retry_all_errors("get table schema", || async { - let mut repos = self.catalog.repositories().await; - let table = repos - .tables() - .get_by_namespace_and_name(namespace.namespace_id(), &partition_info.table_name) - .await? - .expect("table not found in catalog"); - get_table_schema_by_id(table.id, repos.as_mut()).await - }) + // Save the compacted data to a parquet file in object storage. + // + // This call retries until it completes. + let (md, file_size) = self + .store + .upload(record_stream, &iox_metadata) .await - .expect("retry forever"); + .expect("unexpected fatal persist error"); - let persisting_batch = namespace - .snapshot_to_persisting( - &partition_info.table_name, - &partition_info.partition.partition_key, - ) - .await; - - if let Some(persisting_batch) = persisting_batch { - // do the CPU intensive work of compaction, de-duplication and sorting - let compacted_stream = match compact_persisting_batch( - Arc::new(SystemProvider::new()), - &self.exec, - namespace.namespace_id().get(), - &partition_info, - Arc::clone(&persisting_batch), - ) - .await - { - Err(e) => { - // this should never error out. if it does, we need to crash hard so - // someone can take a look. - panic!("unable to compact persisting batch with error: {:?}", e); - } - Ok(Some(r)) => r, - Ok(None) => { - warn!("persist called with no data"); - return; - } - }; - let CompactedStream { - stream: record_stream, - iox_metadata, - sort_key_update, - } = compacted_stream; - - // Save the compacted data to a parquet file in object storage. - // - // This call retries until it completes. - let (md, file_size) = self - .store - .upload(record_stream, &iox_metadata) - .await - .expect("unexpected fatal persist error"); - - // Update the sort key in the catalog if there are - // additional columns BEFORE adding parquet file to the - // catalog. If the order is reversed, the querier or - // compactor may see a parquet file with an inconsistent - // sort key. https://github.com/influxdata/influxdb_iox/issues/5090 - if let Some(new_sort_key) = sort_key_update { - let sort_key = new_sort_key.to_columns().collect::>(); - Backoff::new(&self.backoff_config) - .retry_all_errors("update_sort_key", || async { - let mut repos = self.catalog.repositories().await; - let _partition = repos - .partitions() - .update_sort_key(partition_id, &sort_key) - .await?; - // compiler insisted on getting told the type of the error :shrug: - Ok(()) as Result<(), iox_catalog::interface::Error> - }) - .await - .expect("retry forever"); - debug!( - ?partition_id, - table = partition_info.table_name, - ?new_sort_key, - "adjusted sort key during batch compact & persist" - ); - } - - // Add the parquet file to the catalog until succeed - let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| { - table_schema.columns.get(name).expect("Unknown column").id - }); - - // Assert partitions are persisted in-order. - // - // It is an invariant that partitions are persisted in order so that - // both the per-shard, and per-partition watermarks are correctly - // advanced and accurate. - if let Some(last_persist) = partition_info.partition.persisted_sequence_number { - assert!( - parquet_file.max_sequence_number > last_persist, - "out of order partition persistence, persisting {}, previously persisted {}", - parquet_file.max_sequence_number.get(), - last_persist.get(), - ); - } - - // Add the parquet file to the catalog. - // - // This has the effect of allowing the queriers to "discover" the - // parquet file by polling / querying the catalog. + // Update the sort key in the catalog if there are + // additional columns BEFORE adding parquet file to the + // catalog. If the order is reversed, the querier or + // compactor may see a parquet file with an inconsistent + // sort key. https://github.com/influxdata/influxdb_iox/issues/5090 + if let Some(new_sort_key) = sort_key_update { + let sort_key = new_sort_key.to_columns().collect::>(); Backoff::new(&self.backoff_config) - .retry_all_errors("add parquet file to catalog", || async { + .retry_all_errors("update_sort_key", || async { let mut repos = self.catalog.repositories().await; - let parquet_file = repos.parquet_files().create(parquet_file.clone()).await?; - debug!( - ?partition_id, - table_id=?parquet_file.table_id, - parquet_file_id=?parquet_file.id, - table_name=%iox_metadata.table_name, - "parquet file written to catalog" - ); + let _partition = repos + .partitions() + .update_sort_key(partition_id, &sort_key) + .await?; // compiler insisted on getting told the type of the error :shrug: Ok(()) as Result<(), iox_catalog::interface::Error> }) .await .expect("retry forever"); - - // Update the per-partition persistence watermark, so that new - // ingester instances skip the just-persisted ops during replay. - // - // This could be transactional with the above parquet insert to - // maintain catalog consistency, though in practice it is an - // unnecessary overhead - the system can tolerate replaying the ops - // that lead to this parquet file being generated, and tolerate - // creating a parquet file containing duplicate data (remedied by - // compaction). - // - // This means it is possible to observe a parquet file with a - // max_persisted_sequence_number > - // partition.persisted_sequence_number, either in-between these - // catalog updates, or for however long it takes a crashed ingester - // to restart and replay the ops, and re-persist a file containing - // the same (or subset of) data. - // - // The above is also true of the per-shard persist marker that - // governs the ingester's replay start point, which is - // non-transactionally updated after all partitions have persisted. - Backoff::new(&self.backoff_config) - .retry_all_errors("set partition persist marker", || async { - self.catalog - .repositories() - .await - .partitions() - .update_persisted_sequence_number( - parquet_file.partition_id, - parquet_file.max_sequence_number, - ) - .await - }) - .await - .expect("retry forever"); - - // Record metrics - let attributes = Attributes::from([( - "shard_id", - format!("{}", partition_info.partition.shard_id).into(), - )]); - self.persisted_file_size_bytes - .recorder(attributes) - .record(file_size as u64); - - // and remove the persisted data from memory - namespace - .mark_persisted( - &partition_info.table_name, - &partition_info.partition.partition_key, - iox_metadata.max_sequence_number, - ) - .await; debug!( ?partition_id, - table_name=%partition_info.table_name, - partition_key=%partition_info.partition.partition_key, - max_sequence_number=%iox_metadata.max_sequence_number.get(), - "marked partition as persisted" + table = partition_info.table_name, + ?new_sort_key, + "adjusted sort key during batch compact & persist" ); } + + // Add the parquet file to the catalog until succeed + let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| { + table_schema.columns.get(name).expect("Unknown column").id + }); + + // Assert partitions are persisted in-order. + // + // It is an invariant that partitions are persisted in order so that + // both the per-shard, and per-partition watermarks are correctly + // advanced and accurate. + if let Some(last_persist) = partition_info.partition.persisted_sequence_number { + assert!( + parquet_file.max_sequence_number > last_persist, + "out of order partition persistence, persisting {}, previously persisted {}", + parquet_file.max_sequence_number.get(), + last_persist.get(), + ); + } + + // Add the parquet file to the catalog. + // + // This has the effect of allowing the queriers to "discover" the + // parquet file by polling / querying the catalog. + Backoff::new(&self.backoff_config) + .retry_all_errors("add parquet file to catalog", || async { + let mut repos = self.catalog.repositories().await; + let parquet_file = repos.parquet_files().create(parquet_file.clone()).await?; + debug!( + ?partition_id, + table_id=?parquet_file.table_id, + parquet_file_id=?parquet_file.id, + table_name=%iox_metadata.table_name, + "parquet file written to catalog" + ); + // compiler insisted on getting told the type of the error :shrug: + Ok(()) as Result<(), iox_catalog::interface::Error> + }) + .await + .expect("retry forever"); + + // Update the per-partition persistence watermark, so that new + // ingester instances skip the just-persisted ops during replay. + // + // This could be transactional with the above parquet insert to + // maintain catalog consistency, though in practice it is an + // unnecessary overhead - the system can tolerate replaying the ops + // that lead to this parquet file being generated, and tolerate + // creating a parquet file containing duplicate data (remedied by + // compaction). + // + // This means it is possible to observe a parquet file with a + // max_persisted_sequence_number > + // partition.persisted_sequence_number, either in-between these + // catalog updates, or for however long it takes a crashed ingester + // to restart and replay the ops, and re-persist a file containing + // the same (or subset of) data. + // + // The above is also true of the per-shard persist marker that + // governs the ingester's replay start point, which is + // non-transactionally updated after all partitions have persisted. + Backoff::new(&self.backoff_config) + .retry_all_errors("set partition persist marker", || async { + self.catalog + .repositories() + .await + .partitions() + .update_persisted_sequence_number( + parquet_file.partition_id, + parquet_file.max_sequence_number, + ) + .await + }) + .await + .expect("retry forever"); + + // Record metrics + let attributes = Attributes::from([( + "shard_id", + format!("{}", partition_info.partition.shard_id).into(), + )]); + self.persisted_file_size_bytes + .recorder(attributes) + .record(file_size as u64); + + // and remove the persisted data from memory + namespace + .mark_persisted( + &partition_info.table_name, + &partition_info.partition.partition_key, + iox_metadata.max_sequence_number, + ) + .await; + debug!( + ?partition_id, + table_name=%partition_info.table_name, + partition_key=%partition_info.partition.partition_key, + max_sequence_number=%iox_metadata.max_sequence_number.get(), + "marked partition as persisted" + ); } async fn update_min_unpersisted_sequence_number( @@ -804,7 +813,7 @@ mod tests { // limits) assert!(!should_pause); - let partition_id = { + let (table_id, partition_id) = { let sd = data.shards.get(&shard1.id).unwrap(); let n = sd.namespace("foo").unwrap(); let mem_table = n.table_data("mem").unwrap(); @@ -813,10 +822,11 @@ mod tests { let p = mem_table .get_partition_by_key(&"1970-01-01".into()) .unwrap(); - p.partition_id() + (mem_table.table_id(), p.partition_id()) }; - data.persist(partition_id).await; + data.persist(shard1.id, namespace.id, table_id, partition_id) + .await; // verify that a file got put into object store let file_paths: Vec<_> = object_store @@ -953,12 +963,13 @@ mod tests { { let mem_table = n.table_data("mem").unwrap(); assert!(n.table_data("cpu").is_some()); + let mem_table = mem_table.write().await; + table_id = mem_table.table_id(); + let p = mem_table .get_partition_by_key(&"1970-01-01".into()) .unwrap(); - - table_id = mem_table.table_id(); partition_id = p.partition_id(); } { @@ -973,7 +984,8 @@ mod tests { assert!(partition_info.partition.sort_key.is_empty()); } - data.persist(partition_id).await; + data.persist(shard1.id, namespace.id, table_id, partition_id) + .await; // verify that a file got put into object store let file_paths: Vec<_> = object_store diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 6c0be9bc6b..987dad6c27 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -39,7 +39,6 @@ impl DoubleRef { self.by_name.get(name).map(Arc::clone) } - #[cfg(test)] fn by_id(&self, id: TableId) -> Option>> { self.by_id.get(&id).map(Arc::clone) } @@ -240,6 +239,7 @@ impl NamespaceData { /// Snapshots the mutable buffer for the partition, which clears it out and then moves all /// snapshots over to a persisting batch, which is returned. If there is no data to snapshot /// or persist, None will be returned. + #[cfg(test)] // Only used in tests pub(crate) async fn snapshot_to_persisting( &self, table_name: &str, @@ -266,7 +266,6 @@ impl NamespaceData { } /// Return the table data by ID. - #[cfg(test)] pub(crate) fn table_id( &self, table_id: TableId, diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index ff32804520..3390b2aed8 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -37,7 +37,6 @@ impl DoubleRef { self.by_name.get(name).map(Arc::clone) } - #[cfg(test)] fn by_id(&self, id: NamespaceId) -> Option> { self.by_id.get(&id).map(Arc::clone) } @@ -120,7 +119,6 @@ impl ShardData { } /// Gets the namespace data out of the map - #[cfg(test)] pub(crate) fn namespace_by_id(&self, namespace_id: NamespaceId) -> Option> { // TODO: this should be the default once IDs are pushed over the wire. // diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index b46b84dde7..01b9ff2f33 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -234,7 +234,7 @@ struct LifecycleStats { } /// The stats for a partition -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] struct PartitionLifecycleStats { /// The shard this partition is under shard_id: ShardId, @@ -469,6 +469,18 @@ impl LifecycleManager { let persist_tasks: Vec<_> = to_persist .into_iter() .map(|s| { + // BUG: TOCTOU: memory usage released may be incorrect. + // + // Here the amount of memory to be reduced is acquired, but this + // code does not prevent continued writes adding more data to + // the partition in another thread. + // + // This may lead to more actual data being persisted than the + // call below returns to the server pool - this would slowly + // starve the ingester of memory it thinks it has. + // + // See https://github.com/influxdata/influxdb_iox/issues/5777 + // Mark this partition as being persisted, and remember the // memory allocation it had accumulated. let partition_memory_usage = self @@ -483,7 +495,9 @@ impl LifecycleManager { let state = Arc::clone(&self.state); tokio::task::spawn(async move { - persister.persist(s.partition_id).await; + persister + .persist(s.shard_id, s.namespace_id, s.table_id, s.partition_id) + .await; // Now the data has been uploaded and the memory it was // using has been freed, released the memory capacity back // the ingester. @@ -602,7 +616,13 @@ mod tests { #[async_trait] impl Persister for TestPersister { - async fn persist(&self, partition_id: PartitionId) { + async fn persist( + &self, + _shard_id: ShardId, + _namespace_id: NamespaceId, + _table_id: TableId, + partition_id: PartitionId, + ) { let mut p = self.persist_called.lock(); p.insert(partition_id); } @@ -662,8 +682,16 @@ mod tests { #[async_trait] impl Persister for PausablePersister { - async fn persist(&self, partition_id: PartitionId) { - self.inner.persist(partition_id).await; + async fn persist( + &self, + shard_id: ShardId, + namespace_id: NamespaceId, + table_id: TableId, + partition_id: PartitionId, + ) { + self.inner + .persist(shard_id, namespace_id, table_id, partition_id) + .await; if let Some(event) = self.event(partition_id) { event.before.wait().await; event.after.wait().await; diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index a46200101b..6b2249dc20 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -752,7 +752,32 @@ impl MockIngester { .map(|f| f.id) .collect(); - self.ingester_data.persist(*partition_id).await; + let p = self + .catalog + .catalog + .repositories() + .await + .partitions() + .get_by_id(*partition_id) + .await + .unwrap() + .expect("partition not found"); + + let namespace_id = self + .catalog + .catalog + .repositories() + .await + .tables() + .get_by_id(p.table_id) + .await + .unwrap() + .expect("table does not exist") + .namespace_id; + + self.ingester_data + .persist(p.shard_id, namespace_id, p.table_id, *partition_id) + .await; result.extend( self.catalog