diff --git a/Cargo.lock b/Cargo.lock index fdb8277d56..64378ce720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2066,7 +2066,6 @@ dependencies = [ "futures", "generated_types", "hashbrown", - "heappy", "http", "humantime", "influxdb_iox_client", @@ -2392,6 +2391,7 @@ dependencies = [ "futures", "generated_types", "hashbrown", + "heappy", "http", "hyper", "log", diff --git a/docs/images/heappy_graph.png b/docs/images/heappy_graph.png new file mode 100644 index 0000000000..6ba62aa585 Binary files /dev/null and b/docs/images/heappy_graph.png differ diff --git a/docs/profiling.md b/docs/profiling.md index 477d688cc0..da913c3d02 100644 --- a/docs/profiling.md +++ b/docs/profiling.md @@ -54,3 +54,21 @@ For example, if you aim your browser at an IOx server with a URL such as http:// You will see a beautiful flame graph such as ![Flame Graph](images/flame_graph.png) + + +# IOx — Heap Profiling + +IOx includes a memory heap profile tool as well as a CPU profiler. The memory usage tool is based on [heappy](https://github.com/mkmik/heappy) + +Support is is not compiled in by defauly, but must be enabled via the `heappy` feature: + +```shell +# Compile and run IOx with heap profiling enabled +cargo run --no-default-features --features=heappy -- run all-in-one +``` + +Now, you aim your browser at an IOx server with a URL such as http://localhost:8080/debug/pprof/allocs?seconds=5 + +You will see a green flamegraph such as + +![Heappy Graph](images/heappy_graph.png) diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 5666054036..5b10ec8809 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -62,7 +62,6 @@ tonic = "0.7" uuid = { version = "1", features = ["v4"] } # jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] } -heappy = { git = "https://github.com/mkmik/heappy", rev = "5d47dff152b8430e1dc1aea5a54c91c0c3099219", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true } workspace-hack = { path = "../workspace-hack"} [dev-dependencies] @@ -82,6 +81,7 @@ azure = ["clap_blocks/azure"] # Optional Azure Object store support gcp = ["clap_blocks/gcp"] # Optional GCP object store support aws = ["clap_blocks/aws"] # Optional AWS / S3 object store support pprof = ["ioxd_common/pprof"] # Optional http://localhost:8080/debug/pprof/profile support +heappy = ["ioxd_common/heappy"] # Optional http://localhost:8080/debug/pproc/alloc support # Enable tokio_console support (https://github.com/tokio-rs/console) # diff --git a/influxdb_iox/tests/end_to_end_cases/mod.rs b/influxdb_iox/tests/end_to_end_cases/mod.rs index fb6ab23219..8628901fcb 100644 --- a/influxdb_iox/tests/end_to_end_cases/mod.rs +++ b/influxdb_iox/tests/end_to_end_cases/mod.rs @@ -1,4 +1,7 @@ mod all_in_one; +// CLI errors when run with heappy (only works via `cargo run`): +// loading shared libraries: libjemalloc.so.2: cannot open shared object file: No such file or directory" +#[cfg(not(feature = "heappy"))] mod cli; mod debug; mod error; diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 847cc872c0..1ad027dda1 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -451,7 +451,7 @@ pub struct TestPartition { impl TestPartition { /// Update sort key. - pub async fn update_sort_key(self: &Arc, sort_key: SortKey) -> Self { + pub async fn update_sort_key(self: &Arc, sort_key: SortKey) -> Arc { let partition = self .catalog .catalog @@ -465,13 +465,13 @@ impl TestPartition { .await .unwrap(); - Self { + Arc::new(Self { catalog: Arc::clone(&self.catalog), namespace: Arc::clone(&self.namespace), table: Arc::clone(&self.table), sequencer: Arc::clone(&self.sequencer), partition, - } + }) } /// Create a parquet for the partition @@ -568,6 +568,13 @@ impl TestPartition { compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: Some(sort_key.clone()), }; + let column_set = ColumnSet::new( + record_batch + .schema() + .fields() + .iter() + .map(|f| f.name().clone()), + ); let (parquet_metadata_bin, real_file_size_bytes) = create_parquet_file( ParquetStorage::new(Arc::clone(&self.catalog.object_store)), &metadata, @@ -590,7 +597,7 @@ impl TestPartition { row_count: row_count as i64, created_at: Timestamp::new(creation_time), compaction_level: INITIAL_COMPACTION_LEVEL, - column_set: ColumnSet::new(["col1", "col2"]), + column_set, }; let parquet_file = repos .parquet_files() diff --git a/ioxd_common/Cargo.toml b/ioxd_common/Cargo.toml index ea96b4eb03..70c9d1fdf7 100644 --- a/ioxd_common/Cargo.toml +++ b/ioxd_common/Cargo.toml @@ -11,6 +11,7 @@ clap_blocks = { path = "../clap_blocks" } data_types = { path = "../data_types" } dml = { path = "../dml" } generated_types = { path = "../generated_types" } +heappy = { git = "https://github.com/mkmik/heappy", rev = "5d47dff152b8430e1dc1aea5a54c91c0c3099219", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true } metric = { path = "../metric" } metric_exporters = { path = "../metric_exporters" } mutable_batch_lp = { path = "../mutable_batch_lp" } diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index b0d2f2d038..a0838a3591 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -61,6 +61,13 @@ pub enum ReadError { /// An error reading the downloaded Parquet file. #[error("invalid parquet file: {0}")] Parquet(#[from] parquet::errors::ParquetError), + + /// Schema mismatch + #[error("Schema mismatch (expected VS actual parquet file) for file: {path}")] + SchemaMismatch { + /// Path of the affected parquet file. + path: object_store::path::Path, + }, } /// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an @@ -185,9 +192,16 @@ impl ParquetStorage { // `download_and_scan_parquet` is sent back to the reader and // not silently ignored let object_store = Arc::clone(&self.object_store); + let schema_captured = Arc::clone(&schema); let handle = tokio::task::spawn(async move { - let download_result = - download_and_scan_parquet(projection, path, object_store, tx.clone()).await; + let download_result = download_and_scan_parquet( + projection, + schema_captured, + path, + object_store, + tx.clone(), + ) + .await; // If there was an error returned from download_and_scan_parquet send it back to the receiver. if let Err(e) = download_result { @@ -245,6 +259,7 @@ fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec { /// spilling it to disk while it is processed. async fn download_and_scan_parquet( projection: Vec, + expected_schema: SchemaRef, path: object_store::path::Path, object_store: Arc, tx: tokio::sync::mpsc::Sender>, @@ -276,6 +291,14 @@ async fn download_and_scan_parquet( let file_reader = SerializedFileReader::new(Bytes::from(data))?; let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); + // Check schema but ignore the metadata + let schema = arrow_reader + .get_schema()? + .with_metadata(expected_schema.metadata().clone()); + if expected_schema.as_ref() != &schema { + return Err(ReadError::SchemaMismatch { path }); + } + let mask = ProjectionMask::roots(arrow_reader.parquet_schema(), projection); let record_batch_reader = arrow_reader.get_record_reader_by_columns(mask, batch_size)?; @@ -293,10 +316,13 @@ async fn download_and_scan_parquet( #[cfg(test)] mod tests { + use std::collections::HashMap; + use super::*; - use arrow::array::{ArrayRef, StringBuilder}; + use arrow::array::{ArrayRef, Int64Builder, StringBuilder}; use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId}; + use datafusion::common::DataFusionError; use iox_time::Time; #[tokio::test] @@ -305,21 +331,7 @@ mod tests { let store = ParquetStorage::new(object_store); - let meta = IoxMetadata { - object_store_id: Default::default(), - creation_timestamp: Time::from_timestamp_nanos(42), - namespace_id: NamespaceId::new(1), - namespace_name: "bananas".into(), - sequencer_id: SequencerId::new(2), - table_id: TableId::new(3), - table_name: "platanos".into(), - partition_id: PartitionId::new(4), - partition_key: "potato".into(), - min_sequence_number: SequenceNumber::new(10), - max_sequence_number: SequenceNumber::new(11), - compaction_level: 1, - sort_key: None, - }; + let meta = meta(); let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); let schema = batch.schema(); let stream = futures::stream::iter([Ok(batch.clone())]); @@ -356,6 +368,123 @@ mod tests { assert_eq!(got.pop().unwrap(), batch); } + #[tokio::test] + async fn test_schema_check_fail() { + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + + let store = ParquetStorage::new(object_store); + + let meta = meta(); + let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); + let other_batch = RecordBatch::try_from_iter([("a", to_int_array(&[1]))]).unwrap(); + let schema = batch.schema(); + let stream = futures::stream::iter([Ok(other_batch)]); + + // Serialize & upload the record batches. + store + .upload(stream, &meta) + .await + .expect("should serialize and store sucessfully"); + + // Fetch the record batches and compare them to the input batches. + let path: ParquetFilePath = (&meta).into(); + let rx = store + .read_filter(&Predicate::default(), Selection::All, schema, &path) + .expect("should read record batches from object store"); + + // Drain the retrieved record batch stream + let err = datafusion::physical_plan::common::collect(rx) + .await + .unwrap_err(); + + // And compare to the original input + if let DataFusionError::ArrowError(ArrowError::ExternalError(err)) = err { + assert_eq!( + err.to_string(), + "Schema mismatch (expected VS actual parquet file) for file: 1/3/2/4/00000000-0000-0000-0000-000000000000.parquet", + ); + } else { + panic!("Wrong error type: {err}"); + } + } + + #[tokio::test] + async fn test_schema_check_ignore_additional_metadata_in_mem() { + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + + let store = ParquetStorage::new(object_store); + + let meta = meta(); + let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); + let schema = batch.schema(); + let stream = futures::stream::iter([Ok(batch)]); + + // Serialize & upload the record batches. + store + .upload(stream, &meta) + .await + .expect("should serialize and store sucessfully"); + + // add metadata to reference schema + let schema = Arc::new( + schema + .as_ref() + .clone() + .with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])), + ); + + // Fetch the record batches and compare them to the input batches. + let path: ParquetFilePath = (&meta).into(); + let rx = store + .read_filter(&Predicate::default(), Selection::All, schema, &path) + .expect("should read record batches from object store"); + + // Drain the retrieved record batch stream + datafusion::physical_plan::common::collect(rx) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_schema_check_ignore_additional_metadata_in_file() { + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + + let store = ParquetStorage::new(object_store); + + let meta = meta(); + let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); + let schema = batch.schema(); + // add metadata to stored batch + let batch = RecordBatch::try_new( + Arc::new( + schema + .as_ref() + .clone() + .with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])), + ), + batch.columns().to_vec(), + ) + .unwrap(); + let stream = futures::stream::iter([Ok(batch)]); + + // Serialize & upload the record batches. + store + .upload(stream, &meta) + .await + .expect("should serialize and store sucessfully"); + + // Fetch the record batches and compare them to the input batches. + let path: ParquetFilePath = (&meta).into(); + let rx = store + .read_filter(&Predicate::default(), Selection::All, schema, &path) + .expect("should read record batches from object store"); + + // Drain the retrieved record batch stream + datafusion::physical_plan::common::collect(rx) + .await + .unwrap(); + } + fn to_string_array(strs: &[&str]) -> ArrayRef { let mut builder = StringBuilder::new(strs.len()); for s in strs { @@ -363,4 +492,30 @@ mod tests { } Arc::new(builder.finish()) } + + fn to_int_array(vals: &[i64]) -> ArrayRef { + let mut builder = Int64Builder::new(vals.len()); + for x in vals { + builder.append_value(*x).expect("appending string"); + } + Arc::new(builder.finish()) + } + + fn meta() -> IoxMetadata { + IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + compaction_level: 1, + sort_key: None, + } + } } diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs index 70fa5eaeb5..0d1cf28c7d 100644 --- a/querier/src/cache/namespace.rs +++ b/querier/src/cache/namespace.rs @@ -5,6 +5,7 @@ use cache_system::{ backend::{ lru::{LruBackend, ResourcePool}, resource_consumption::FunctionEstimator, + shared::SharedBackend, ttl::{OptionalValueTtlProvider, TtlBackend}, }, cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache}, @@ -13,7 +14,12 @@ use cache_system::{ use data_types::NamespaceSchema; use iox_catalog::interface::{get_schema_by_name, Catalog}; use iox_time::TimeProvider; -use std::{collections::HashMap, mem::size_of_val, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + mem::size_of_val, + sync::Arc, + time::Duration, +}; use super::ram::RamSize; @@ -31,6 +37,7 @@ type CacheT = Box, V = Option>, Extr #[derive(Debug)] pub struct NamespaceCache { cache: CacheT, + backend: SharedBackend, Option>>, } impl NamespaceCache { @@ -100,8 +107,9 @@ impl NamespaceCache { }, )), )); + let backend = SharedBackend::new(backend); - let cache = Box::new(CacheDriver::new(loader, backend)); + let cache = Box::new(CacheDriver::new(loader, Box::new(backend.clone()))); let cache = Box::new(CacheWithMetrics::new( cache, CACHE_ID, @@ -109,7 +117,7 @@ impl NamespaceCache { metric_registry, )); - Self { cache } + Self { cache, backend } } /// Get namespace schema by name. @@ -119,6 +127,29 @@ impl NamespaceCache { .await .map(|n| Arc::clone(&n.schema)) } + + /// Expire namespace if the cached schema does NOT cover the given set of columns. + pub fn expire_if_schema_does_not_cover( + &self, + namespace_name: Arc, + table_name: &str, + columns: &HashSet, + ) { + self.backend.remove_if(&namespace_name, |cached_namespace| { + if let Some(namespace) = cached_namespace.as_ref() { + if let Some(table) = namespace.schema.tables.get(table_name) { + let covered: HashSet<_> = table.columns.keys().map(|s| s.as_str()).collect(); + columns.iter().any(|col| !covered.contains(col.as_str())) + } else { + // table unknown => need to update + true + } + } else { + // namespace unknown => need to update + true + } + }); + } } #[derive(Debug, Clone)] @@ -294,4 +325,71 @@ mod tests { assert!(none.is_none()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); } + + #[tokio::test] + async fn test_expiration() { + let catalog = TestCatalog::new(); + + let cache = NamespaceCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + &catalog.metric_registry(), + test_ram_pool(), + ); + + // ========== namespace unknown ========== + assert!(cache.schema(Arc::from("ns1")).await.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); + + // ========== table unknown ========== + let ns1 = catalog.create_namespace("ns1").await; + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 3); + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 4); + + // ========== no columns ========== + let t1 = ns1.create_table("t1").await; + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); + + // ========== some columns ========== + t1.create_column("c1", ColumnType::Bool).await; + t1.create_column("c2", ColumnType::Bool).await; + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); + + let columns = HashSet::from(["c1".into()]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6); + + let columns = HashSet::from(["c2".into()]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6); + } }