diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 847cc872c0..1ad027dda1 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -451,7 +451,7 @@ pub struct TestPartition { impl TestPartition { /// Update sort key. - pub async fn update_sort_key(self: &Arc, sort_key: SortKey) -> Self { + pub async fn update_sort_key(self: &Arc, sort_key: SortKey) -> Arc { let partition = self .catalog .catalog @@ -465,13 +465,13 @@ impl TestPartition { .await .unwrap(); - Self { + Arc::new(Self { catalog: Arc::clone(&self.catalog), namespace: Arc::clone(&self.namespace), table: Arc::clone(&self.table), sequencer: Arc::clone(&self.sequencer), partition, - } + }) } /// Create a parquet for the partition @@ -568,6 +568,13 @@ impl TestPartition { compaction_level: INITIAL_COMPACTION_LEVEL, sort_key: Some(sort_key.clone()), }; + let column_set = ColumnSet::new( + record_batch + .schema() + .fields() + .iter() + .map(|f| f.name().clone()), + ); let (parquet_metadata_bin, real_file_size_bytes) = create_parquet_file( ParquetStorage::new(Arc::clone(&self.catalog.object_store)), &metadata, @@ -590,7 +597,7 @@ impl TestPartition { row_count: row_count as i64, created_at: Timestamp::new(creation_time), compaction_level: INITIAL_COMPACTION_LEVEL, - column_set: ColumnSet::new(["col1", "col2"]), + column_set, }; let parquet_file = repos .parquet_files() diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index b0d2f2d038..a0838a3591 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -61,6 +61,13 @@ pub enum ReadError { /// An error reading the downloaded Parquet file. #[error("invalid parquet file: {0}")] Parquet(#[from] parquet::errors::ParquetError), + + /// Schema mismatch + #[error("Schema mismatch (expected VS actual parquet file) for file: {path}")] + SchemaMismatch { + /// Path of the affected parquet file. + path: object_store::path::Path, + }, } /// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an @@ -185,9 +192,16 @@ impl ParquetStorage { // `download_and_scan_parquet` is sent back to the reader and // not silently ignored let object_store = Arc::clone(&self.object_store); + let schema_captured = Arc::clone(&schema); let handle = tokio::task::spawn(async move { - let download_result = - download_and_scan_parquet(projection, path, object_store, tx.clone()).await; + let download_result = download_and_scan_parquet( + projection, + schema_captured, + path, + object_store, + tx.clone(), + ) + .await; // If there was an error returned from download_and_scan_parquet send it back to the receiver. if let Err(e) = download_result { @@ -245,6 +259,7 @@ fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec { /// spilling it to disk while it is processed. async fn download_and_scan_parquet( projection: Vec, + expected_schema: SchemaRef, path: object_store::path::Path, object_store: Arc, tx: tokio::sync::mpsc::Sender>, @@ -276,6 +291,14 @@ async fn download_and_scan_parquet( let file_reader = SerializedFileReader::new(Bytes::from(data))?; let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); + // Check schema but ignore the metadata + let schema = arrow_reader + .get_schema()? + .with_metadata(expected_schema.metadata().clone()); + if expected_schema.as_ref() != &schema { + return Err(ReadError::SchemaMismatch { path }); + } + let mask = ProjectionMask::roots(arrow_reader.parquet_schema(), projection); let record_batch_reader = arrow_reader.get_record_reader_by_columns(mask, batch_size)?; @@ -293,10 +316,13 @@ async fn download_and_scan_parquet( #[cfg(test)] mod tests { + use std::collections::HashMap; + use super::*; - use arrow::array::{ArrayRef, StringBuilder}; + use arrow::array::{ArrayRef, Int64Builder, StringBuilder}; use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId}; + use datafusion::common::DataFusionError; use iox_time::Time; #[tokio::test] @@ -305,21 +331,7 @@ mod tests { let store = ParquetStorage::new(object_store); - let meta = IoxMetadata { - object_store_id: Default::default(), - creation_timestamp: Time::from_timestamp_nanos(42), - namespace_id: NamespaceId::new(1), - namespace_name: "bananas".into(), - sequencer_id: SequencerId::new(2), - table_id: TableId::new(3), - table_name: "platanos".into(), - partition_id: PartitionId::new(4), - partition_key: "potato".into(), - min_sequence_number: SequenceNumber::new(10), - max_sequence_number: SequenceNumber::new(11), - compaction_level: 1, - sort_key: None, - }; + let meta = meta(); let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); let schema = batch.schema(); let stream = futures::stream::iter([Ok(batch.clone())]); @@ -356,6 +368,123 @@ mod tests { assert_eq!(got.pop().unwrap(), batch); } + #[tokio::test] + async fn test_schema_check_fail() { + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + + let store = ParquetStorage::new(object_store); + + let meta = meta(); + let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); + let other_batch = RecordBatch::try_from_iter([("a", to_int_array(&[1]))]).unwrap(); + let schema = batch.schema(); + let stream = futures::stream::iter([Ok(other_batch)]); + + // Serialize & upload the record batches. + store + .upload(stream, &meta) + .await + .expect("should serialize and store sucessfully"); + + // Fetch the record batches and compare them to the input batches. + let path: ParquetFilePath = (&meta).into(); + let rx = store + .read_filter(&Predicate::default(), Selection::All, schema, &path) + .expect("should read record batches from object store"); + + // Drain the retrieved record batch stream + let err = datafusion::physical_plan::common::collect(rx) + .await + .unwrap_err(); + + // And compare to the original input + if let DataFusionError::ArrowError(ArrowError::ExternalError(err)) = err { + assert_eq!( + err.to_string(), + "Schema mismatch (expected VS actual parquet file) for file: 1/3/2/4/00000000-0000-0000-0000-000000000000.parquet", + ); + } else { + panic!("Wrong error type: {err}"); + } + } + + #[tokio::test] + async fn test_schema_check_ignore_additional_metadata_in_mem() { + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + + let store = ParquetStorage::new(object_store); + + let meta = meta(); + let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); + let schema = batch.schema(); + let stream = futures::stream::iter([Ok(batch)]); + + // Serialize & upload the record batches. + store + .upload(stream, &meta) + .await + .expect("should serialize and store sucessfully"); + + // add metadata to reference schema + let schema = Arc::new( + schema + .as_ref() + .clone() + .with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])), + ); + + // Fetch the record batches and compare them to the input batches. + let path: ParquetFilePath = (&meta).into(); + let rx = store + .read_filter(&Predicate::default(), Selection::All, schema, &path) + .expect("should read record batches from object store"); + + // Drain the retrieved record batch stream + datafusion::physical_plan::common::collect(rx) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_schema_check_ignore_additional_metadata_in_file() { + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + + let store = ParquetStorage::new(object_store); + + let meta = meta(); + let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); + let schema = batch.schema(); + // add metadata to stored batch + let batch = RecordBatch::try_new( + Arc::new( + schema + .as_ref() + .clone() + .with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])), + ), + batch.columns().to_vec(), + ) + .unwrap(); + let stream = futures::stream::iter([Ok(batch)]); + + // Serialize & upload the record batches. + store + .upload(stream, &meta) + .await + .expect("should serialize and store sucessfully"); + + // Fetch the record batches and compare them to the input batches. + let path: ParquetFilePath = (&meta).into(); + let rx = store + .read_filter(&Predicate::default(), Selection::All, schema, &path) + .expect("should read record batches from object store"); + + // Drain the retrieved record batch stream + datafusion::physical_plan::common::collect(rx) + .await + .unwrap(); + } + fn to_string_array(strs: &[&str]) -> ArrayRef { let mut builder = StringBuilder::new(strs.len()); for s in strs { @@ -363,4 +492,30 @@ mod tests { } Arc::new(builder.finish()) } + + fn to_int_array(vals: &[i64]) -> ArrayRef { + let mut builder = Int64Builder::new(vals.len()); + for x in vals { + builder.append_value(*x).expect("appending string"); + } + Arc::new(builder.finish()) + } + + fn meta() -> IoxMetadata { + IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + compaction_level: 1, + sort_key: None, + } + } } diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs index 70fa5eaeb5..0d1cf28c7d 100644 --- a/querier/src/cache/namespace.rs +++ b/querier/src/cache/namespace.rs @@ -5,6 +5,7 @@ use cache_system::{ backend::{ lru::{LruBackend, ResourcePool}, resource_consumption::FunctionEstimator, + shared::SharedBackend, ttl::{OptionalValueTtlProvider, TtlBackend}, }, cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache}, @@ -13,7 +14,12 @@ use cache_system::{ use data_types::NamespaceSchema; use iox_catalog::interface::{get_schema_by_name, Catalog}; use iox_time::TimeProvider; -use std::{collections::HashMap, mem::size_of_val, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + mem::size_of_val, + sync::Arc, + time::Duration, +}; use super::ram::RamSize; @@ -31,6 +37,7 @@ type CacheT = Box, V = Option>, Extr #[derive(Debug)] pub struct NamespaceCache { cache: CacheT, + backend: SharedBackend, Option>>, } impl NamespaceCache { @@ -100,8 +107,9 @@ impl NamespaceCache { }, )), )); + let backend = SharedBackend::new(backend); - let cache = Box::new(CacheDriver::new(loader, backend)); + let cache = Box::new(CacheDriver::new(loader, Box::new(backend.clone()))); let cache = Box::new(CacheWithMetrics::new( cache, CACHE_ID, @@ -109,7 +117,7 @@ impl NamespaceCache { metric_registry, )); - Self { cache } + Self { cache, backend } } /// Get namespace schema by name. @@ -119,6 +127,29 @@ impl NamespaceCache { .await .map(|n| Arc::clone(&n.schema)) } + + /// Expire namespace if the cached schema does NOT cover the given set of columns. + pub fn expire_if_schema_does_not_cover( + &self, + namespace_name: Arc, + table_name: &str, + columns: &HashSet, + ) { + self.backend.remove_if(&namespace_name, |cached_namespace| { + if let Some(namespace) = cached_namespace.as_ref() { + if let Some(table) = namespace.schema.tables.get(table_name) { + let covered: HashSet<_> = table.columns.keys().map(|s| s.as_str()).collect(); + columns.iter().any(|col| !covered.contains(col.as_str())) + } else { + // table unknown => need to update + true + } + } else { + // namespace unknown => need to update + true + } + }); + } } #[derive(Debug, Clone)] @@ -294,4 +325,71 @@ mod tests { assert!(none.is_none()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); } + + #[tokio::test] + async fn test_expiration() { + let catalog = TestCatalog::new(); + + let cache = NamespaceCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + &catalog.metric_registry(), + test_ram_pool(), + ); + + // ========== namespace unknown ========== + assert!(cache.schema(Arc::from("ns1")).await.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); + + // ========== table unknown ========== + let ns1 = catalog.create_namespace("ns1").await; + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 3); + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 4); + + // ========== no columns ========== + let t1 = ns1.create_table("t1").await; + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); + + // ========== some columns ========== + t1.create_column("c1", ColumnType::Bool).await; + t1.create_column("c2", ColumnType::Bool).await; + + let columns = HashSet::from([]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); + + let columns = HashSet::from(["c1".into()]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6); + + let columns = HashSet::from(["c2".into()]); + cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns); + assert!(cache.schema(Arc::from("ns1")).await.is_some()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6); + } }