diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 040a1c983c..ce516ffe85 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -86,11 +86,8 @@ pub(crate) async fn compact_persisting_batch( namespace_id: i64, partition_info: &PartitionInfo, batch: Arc, -) -> Result> { - // Nothing to compact - if batch.data.data.is_empty() { - return Ok(None); - } +) -> Result { + assert!(!batch.data.data.is_empty()); let namespace_name = &partition_info.namespace_name; let table_name = &partition_info.table_name; @@ -141,11 +138,11 @@ pub(crate) async fn compact_persisting_batch( sort_key: Some(metadata_sort_key), }; - Ok(Some(CompactedStream { + Ok(CompactedStream { stream, iox_metadata, sort_key_update, - })) + }) } /// Compact a given Queryable Batch @@ -254,7 +251,6 @@ mod tests { let CompactedStream { stream, .. } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -328,7 +324,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -426,7 +421,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -527,7 +521,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -629,7 +622,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) @@ -739,7 +731,6 @@ mod tests { sort_key_update, } = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await - .unwrap() .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 9c26a09a20..89486ed2df 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -6,7 +6,7 @@ use arrow::{error::ArrowError, record_batch::RecordBatch}; use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{PartitionId, SequenceNumber, ShardId, ShardIndex}; +use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId}; use datafusion::physical_plan::SendableRecordBatchStream; use dml::DmlOperation; use futures::{Stream, StreamExt}; @@ -220,7 +220,13 @@ impl IngesterData { #[async_trait] pub trait Persister: Send + Sync + 'static { /// Persits the partition ID. Will retry forever until it succeeds. - async fn persist(&self, partition_id: PartitionId); + async fn persist( + &self, + shard_id: ShardId, + namespace_id: NamespaceId, + table_id: TableId, + partition_id: PartitionId, + ); /// Updates the shard's `min_unpersisted_sequence_number` in the catalog. /// This number represents the minimum that might be unpersisted, which is the @@ -235,7 +241,69 @@ pub trait Persister: Send + Sync + 'static { #[async_trait] impl Persister for IngesterData { - async fn persist(&self, partition_id: PartitionId) { + async fn persist( + &self, + shard_id: ShardId, + namespace_id: NamespaceId, + table_id: TableId, + partition_id: PartitionId, + ) { + // lookup the state from the ingester data. If something isn't found, + // it's unexpected. Crash so someone can take a look. + let shard_data = self + .shards + .get(&shard_id) + .unwrap_or_else(|| panic!("shard state for {shard_id} not in ingester data")); + let namespace = shard_data + .namespace_by_id(namespace_id) + .unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state")); + + let partition_key; + let batch; + { + let table_data = namespace.table_id(table_id).unwrap_or_else(|| { + panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state") + }); + + let mut guard = table_data.write().await; + let partition = guard.get_partition(partition_id).unwrap_or_else(|| { + panic!( + "partition {partition_id} in table {table_id} in namespace {namespace_id} not in shard {shard_id} state" + ) + }); + + partition_key = partition.partition_key().clone(); + batch = partition.snapshot_to_persisting_batch(); + }; + + debug!(%shard_id, %namespace_id, %table_id, %partition_id, %partition_key, "persisting partition"); + + // Check if there is any data to persist. + let batch = match batch { + Some(v) if !v.data.data.is_empty() => v, + _ => { + warn!( + %shard_id, + %namespace_id, + %table_id, + %partition_id, + %partition_key, + "partition marked for persistence contains no data" + ); + return; + } + }; + + // lookup column IDs from catalog + // TODO: this can be removed once the ingester uses column IDs internally as well + let table_schema = Backoff::new(&self.backoff_config) + .retry_all_errors("get table schema", || async { + let mut repos = self.catalog.repositories().await; + get_table_schema_by_id(table_id, repos.as_mut()).await + }) + .await + .expect("retry forever"); + // lookup the partition_info from the catalog let partition_info = Backoff::new(&self.backoff_config) .retry_all_errors("get partition_info_by_id", || async { @@ -243,217 +311,158 @@ impl Persister for IngesterData { repos.partitions().partition_info_by_id(partition_id).await }) .await - .expect("retry forever"); + .expect("retry forever").unwrap_or_else(|| panic!("partition {partition_id} in table {table_id} in namespace {namespace_id} in shard {shard_id} has no partition info in catalog")); - // lookup the state from the ingester data. If something isn't found, it's unexpected. Crash - // so someone can take a look. - let partition_info = partition_info - .unwrap_or_else(|| panic!("partition {} not found in catalog", partition_id)); - let shard_data = self - .shards - .get(&partition_info.partition.shard_id) - .unwrap_or_else(|| { - panic!( - "shard state for {} not in ingester data", - partition_info.partition.shard_id - ) - }); //{ - let namespace = shard_data - .namespace(&partition_info.namespace_name) - .unwrap_or_else(|| { - panic!( - "namespace {} not in shard {} state", - partition_info.namespace_name, partition_info.partition.shard_id - ) - }); - debug!(?partition_id, ?partition_info, "persisting partition"); + // do the CPU intensive work of compaction, de-duplication and sorting + let CompactedStream { + stream: record_stream, + iox_metadata, + sort_key_update, + } = compact_persisting_batch( + Arc::new(SystemProvider::new()), + &self.exec, + namespace.namespace_id().get(), + &partition_info, + Arc::clone(&batch), + ) + .await + .expect("unable to compact persisting batch"); - // lookup column IDs from catalog - // TODO: this can be removed once the ingester uses column IDs internally as well - let table_schema = Backoff::new(&self.backoff_config) - .retry_all_errors("get table schema", || async { - let mut repos = self.catalog.repositories().await; - let table = repos - .tables() - .get_by_namespace_and_name(namespace.namespace_id(), &partition_info.table_name) - .await? - .expect("table not found in catalog"); - get_table_schema_by_id(table.id, repos.as_mut()).await - }) + // Save the compacted data to a parquet file in object storage. + // + // This call retries until it completes. + let (md, file_size) = self + .store + .upload(record_stream, &iox_metadata) .await - .expect("retry forever"); + .expect("unexpected fatal persist error"); - let persisting_batch = namespace - .snapshot_to_persisting( - &partition_info.table_name, - &partition_info.partition.partition_key, - ) - .await; - - if let Some(persisting_batch) = persisting_batch { - // do the CPU intensive work of compaction, de-duplication and sorting - let compacted_stream = match compact_persisting_batch( - Arc::new(SystemProvider::new()), - &self.exec, - namespace.namespace_id().get(), - &partition_info, - Arc::clone(&persisting_batch), - ) - .await - { - Err(e) => { - // this should never error out. if it does, we need to crash hard so - // someone can take a look. - panic!("unable to compact persisting batch with error: {:?}", e); - } - Ok(Some(r)) => r, - Ok(None) => { - warn!("persist called with no data"); - return; - } - }; - let CompactedStream { - stream: record_stream, - iox_metadata, - sort_key_update, - } = compacted_stream; - - // Save the compacted data to a parquet file in object storage. - // - // This call retries until it completes. - let (md, file_size) = self - .store - .upload(record_stream, &iox_metadata) - .await - .expect("unexpected fatal persist error"); - - // Update the sort key in the catalog if there are - // additional columns BEFORE adding parquet file to the - // catalog. If the order is reversed, the querier or - // compactor may see a parquet file with an inconsistent - // sort key. https://github.com/influxdata/influxdb_iox/issues/5090 - if let Some(new_sort_key) = sort_key_update { - let sort_key = new_sort_key.to_columns().collect::>(); - Backoff::new(&self.backoff_config) - .retry_all_errors("update_sort_key", || async { - let mut repos = self.catalog.repositories().await; - let _partition = repos - .partitions() - .update_sort_key(partition_id, &sort_key) - .await?; - // compiler insisted on getting told the type of the error :shrug: - Ok(()) as Result<(), iox_catalog::interface::Error> - }) - .await - .expect("retry forever"); - debug!( - ?partition_id, - table = partition_info.table_name, - ?new_sort_key, - "adjusted sort key during batch compact & persist" - ); - } - - // Add the parquet file to the catalog until succeed - let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| { - table_schema.columns.get(name).expect("Unknown column").id - }); - - // Assert partitions are persisted in-order. - // - // It is an invariant that partitions are persisted in order so that - // both the per-shard, and per-partition watermarks are correctly - // advanced and accurate. - if let Some(last_persist) = partition_info.partition.persisted_sequence_number { - assert!( - parquet_file.max_sequence_number > last_persist, - "out of order partition persistence, persisting {}, previously persisted {}", - parquet_file.max_sequence_number.get(), - last_persist.get(), - ); - } - - // Add the parquet file to the catalog. - // - // This has the effect of allowing the queriers to "discover" the - // parquet file by polling / querying the catalog. + // Update the sort key in the catalog if there are + // additional columns BEFORE adding parquet file to the + // catalog. If the order is reversed, the querier or + // compactor may see a parquet file with an inconsistent + // sort key. https://github.com/influxdata/influxdb_iox/issues/5090 + if let Some(new_sort_key) = sort_key_update { + let sort_key = new_sort_key.to_columns().collect::>(); Backoff::new(&self.backoff_config) - .retry_all_errors("add parquet file to catalog", || async { + .retry_all_errors("update_sort_key", || async { let mut repos = self.catalog.repositories().await; - let parquet_file = repos.parquet_files().create(parquet_file.clone()).await?; - debug!( - ?partition_id, - table_id=?parquet_file.table_id, - parquet_file_id=?parquet_file.id, - table_name=%iox_metadata.table_name, - "parquet file written to catalog" - ); + let _partition = repos + .partitions() + .update_sort_key(partition_id, &sort_key) + .await?; // compiler insisted on getting told the type of the error :shrug: Ok(()) as Result<(), iox_catalog::interface::Error> }) .await .expect("retry forever"); - - // Update the per-partition persistence watermark, so that new - // ingester instances skip the just-persisted ops during replay. - // - // This could be transactional with the above parquet insert to - // maintain catalog consistency, though in practice it is an - // unnecessary overhead - the system can tolerate replaying the ops - // that lead to this parquet file being generated, and tolerate - // creating a parquet file containing duplicate data (remedied by - // compaction). - // - // This means it is possible to observe a parquet file with a - // max_persisted_sequence_number > - // partition.persisted_sequence_number, either in-between these - // catalog updates, or for however long it takes a crashed ingester - // to restart and replay the ops, and re-persist a file containing - // the same (or subset of) data. - // - // The above is also true of the per-shard persist marker that - // governs the ingester's replay start point, which is - // non-transactionally updated after all partitions have persisted. - Backoff::new(&self.backoff_config) - .retry_all_errors("set partition persist marker", || async { - self.catalog - .repositories() - .await - .partitions() - .update_persisted_sequence_number( - parquet_file.partition_id, - parquet_file.max_sequence_number, - ) - .await - }) - .await - .expect("retry forever"); - - // Record metrics - let attributes = Attributes::from([( - "shard_id", - format!("{}", partition_info.partition.shard_id).into(), - )]); - self.persisted_file_size_bytes - .recorder(attributes) - .record(file_size as u64); - - // and remove the persisted data from memory - namespace - .mark_persisted( - &partition_info.table_name, - &partition_info.partition.partition_key, - iox_metadata.max_sequence_number, - ) - .await; debug!( ?partition_id, - table_name=%partition_info.table_name, - partition_key=%partition_info.partition.partition_key, - max_sequence_number=%iox_metadata.max_sequence_number.get(), - "marked partition as persisted" + table = partition_info.table_name, + ?new_sort_key, + "adjusted sort key during batch compact & persist" ); } + + // Add the parquet file to the catalog until succeed + let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| { + table_schema.columns.get(name).expect("Unknown column").id + }); + + // Assert partitions are persisted in-order. + // + // It is an invariant that partitions are persisted in order so that + // both the per-shard, and per-partition watermarks are correctly + // advanced and accurate. + if let Some(last_persist) = partition_info.partition.persisted_sequence_number { + assert!( + parquet_file.max_sequence_number > last_persist, + "out of order partition persistence, persisting {}, previously persisted {}", + parquet_file.max_sequence_number.get(), + last_persist.get(), + ); + } + + // Add the parquet file to the catalog. + // + // This has the effect of allowing the queriers to "discover" the + // parquet file by polling / querying the catalog. + Backoff::new(&self.backoff_config) + .retry_all_errors("add parquet file to catalog", || async { + let mut repos = self.catalog.repositories().await; + let parquet_file = repos.parquet_files().create(parquet_file.clone()).await?; + debug!( + ?partition_id, + table_id=?parquet_file.table_id, + parquet_file_id=?parquet_file.id, + table_name=%iox_metadata.table_name, + "parquet file written to catalog" + ); + // compiler insisted on getting told the type of the error :shrug: + Ok(()) as Result<(), iox_catalog::interface::Error> + }) + .await + .expect("retry forever"); + + // Update the per-partition persistence watermark, so that new + // ingester instances skip the just-persisted ops during replay. + // + // This could be transactional with the above parquet insert to + // maintain catalog consistency, though in practice it is an + // unnecessary overhead - the system can tolerate replaying the ops + // that lead to this parquet file being generated, and tolerate + // creating a parquet file containing duplicate data (remedied by + // compaction). + // + // This means it is possible to observe a parquet file with a + // max_persisted_sequence_number > + // partition.persisted_sequence_number, either in-between these + // catalog updates, or for however long it takes a crashed ingester + // to restart and replay the ops, and re-persist a file containing + // the same (or subset of) data. + // + // The above is also true of the per-shard persist marker that + // governs the ingester's replay start point, which is + // non-transactionally updated after all partitions have persisted. + Backoff::new(&self.backoff_config) + .retry_all_errors("set partition persist marker", || async { + self.catalog + .repositories() + .await + .partitions() + .update_persisted_sequence_number( + parquet_file.partition_id, + parquet_file.max_sequence_number, + ) + .await + }) + .await + .expect("retry forever"); + + // Record metrics + let attributes = Attributes::from([( + "shard_id", + format!("{}", partition_info.partition.shard_id).into(), + )]); + self.persisted_file_size_bytes + .recorder(attributes) + .record(file_size as u64); + + // and remove the persisted data from memory + namespace + .mark_persisted( + &partition_info.table_name, + &partition_info.partition.partition_key, + iox_metadata.max_sequence_number, + ) + .await; + debug!( + ?partition_id, + table_name=%partition_info.table_name, + partition_key=%partition_info.partition.partition_key, + max_sequence_number=%iox_metadata.max_sequence_number.get(), + "marked partition as persisted" + ); } async fn update_min_unpersisted_sequence_number( @@ -804,7 +813,7 @@ mod tests { // limits) assert!(!should_pause); - let partition_id = { + let (table_id, partition_id) = { let sd = data.shards.get(&shard1.id).unwrap(); let n = sd.namespace("foo").unwrap(); let mem_table = n.table_data("mem").unwrap(); @@ -813,10 +822,11 @@ mod tests { let p = mem_table .get_partition_by_key(&"1970-01-01".into()) .unwrap(); - p.partition_id() + (mem_table.table_id(), p.partition_id()) }; - data.persist(partition_id).await; + data.persist(shard1.id, namespace.id, table_id, partition_id) + .await; // verify that a file got put into object store let file_paths: Vec<_> = object_store @@ -953,12 +963,13 @@ mod tests { { let mem_table = n.table_data("mem").unwrap(); assert!(n.table_data("cpu").is_some()); + let mem_table = mem_table.write().await; + table_id = mem_table.table_id(); + let p = mem_table .get_partition_by_key(&"1970-01-01".into()) .unwrap(); - - table_id = mem_table.table_id(); partition_id = p.partition_id(); } { @@ -973,7 +984,8 @@ mod tests { assert!(partition_info.partition.sort_key.is_empty()); } - data.persist(partition_id).await; + data.persist(shard1.id, namespace.id, table_id, partition_id) + .await; // verify that a file got put into object store let file_paths: Vec<_> = object_store diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 6c0be9bc6b..987dad6c27 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -39,7 +39,6 @@ impl DoubleRef { self.by_name.get(name).map(Arc::clone) } - #[cfg(test)] fn by_id(&self, id: TableId) -> Option>> { self.by_id.get(&id).map(Arc::clone) } @@ -240,6 +239,7 @@ impl NamespaceData { /// Snapshots the mutable buffer for the partition, which clears it out and then moves all /// snapshots over to a persisting batch, which is returned. If there is no data to snapshot /// or persist, None will be returned. + #[cfg(test)] // Only used in tests pub(crate) async fn snapshot_to_persisting( &self, table_name: &str, @@ -266,7 +266,6 @@ impl NamespaceData { } /// Return the table data by ID. - #[cfg(test)] pub(crate) fn table_id( &self, table_id: TableId, diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index ff32804520..3390b2aed8 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -37,7 +37,6 @@ impl DoubleRef { self.by_name.get(name).map(Arc::clone) } - #[cfg(test)] fn by_id(&self, id: NamespaceId) -> Option> { self.by_id.get(&id).map(Arc::clone) } @@ -120,7 +119,6 @@ impl ShardData { } /// Gets the namespace data out of the map - #[cfg(test)] pub(crate) fn namespace_by_id(&self, namespace_id: NamespaceId) -> Option> { // TODO: this should be the default once IDs are pushed over the wire. // diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index b46b84dde7..01b9ff2f33 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -234,7 +234,7 @@ struct LifecycleStats { } /// The stats for a partition -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] struct PartitionLifecycleStats { /// The shard this partition is under shard_id: ShardId, @@ -469,6 +469,18 @@ impl LifecycleManager { let persist_tasks: Vec<_> = to_persist .into_iter() .map(|s| { + // BUG: TOCTOU: memory usage released may be incorrect. + // + // Here the amount of memory to be reduced is acquired, but this + // code does not prevent continued writes adding more data to + // the partition in another thread. + // + // This may lead to more actual data being persisted than the + // call below returns to the server pool - this would slowly + // starve the ingester of memory it thinks it has. + // + // See https://github.com/influxdata/influxdb_iox/issues/5777 + // Mark this partition as being persisted, and remember the // memory allocation it had accumulated. let partition_memory_usage = self @@ -483,7 +495,9 @@ impl LifecycleManager { let state = Arc::clone(&self.state); tokio::task::spawn(async move { - persister.persist(s.partition_id).await; + persister + .persist(s.shard_id, s.namespace_id, s.table_id, s.partition_id) + .await; // Now the data has been uploaded and the memory it was // using has been freed, released the memory capacity back // the ingester. @@ -602,7 +616,13 @@ mod tests { #[async_trait] impl Persister for TestPersister { - async fn persist(&self, partition_id: PartitionId) { + async fn persist( + &self, + _shard_id: ShardId, + _namespace_id: NamespaceId, + _table_id: TableId, + partition_id: PartitionId, + ) { let mut p = self.persist_called.lock(); p.insert(partition_id); } @@ -662,8 +682,16 @@ mod tests { #[async_trait] impl Persister for PausablePersister { - async fn persist(&self, partition_id: PartitionId) { - self.inner.persist(partition_id).await; + async fn persist( + &self, + shard_id: ShardId, + namespace_id: NamespaceId, + table_id: TableId, + partition_id: PartitionId, + ) { + self.inner + .persist(shard_id, namespace_id, table_id, partition_id) + .await; if let Some(event) = self.event(partition_id) { event.before.wait().await; event.after.wait().await; diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index a46200101b..6b2249dc20 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -752,7 +752,32 @@ impl MockIngester { .map(|f| f.id) .collect(); - self.ingester_data.persist(*partition_id).await; + let p = self + .catalog + .catalog + .repositories() + .await + .partitions() + .get_by_id(*partition_id) + .await + .unwrap() + .expect("partition not found"); + + let namespace_id = self + .catalog + .catalog + .repositories() + .await + .tables() + .get_by_id(p.table_id) + .await + .unwrap() + .expect("table does not exist") + .namespace_id; + + self.ingester_data + .persist(p.shard_id, namespace_id, p.table_id, *partition_id) + .await; result.extend( self.catalog