refactor: assorted fixes and prep work for #4124 (#4912)

* refactor: `TestPartition::update_sort_key` should return an `Arc`

The whole test framework is built around `Arc`s, so let's fix this
consistency issue.

* fix: actually calculate correct column set in test framework

* feat: check expected parquet file schema

While working on the querier I made some mistakes regarding schemas and
such a check would have greatly improved the debugging experience.

* feat: namespace cache expiration

* fix: improve parquet schema check

* fix: remove clone
pull/24376/head
Marco Neumann 2022-06-21 18:08:28 +02:00 committed by GitHub
parent 70337087a8
commit 59accfe862
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 285 additions and 25 deletions

View File

@ -451,7 +451,7 @@ pub struct TestPartition {
impl TestPartition {
/// Update sort key.
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Self {
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Arc<Self> {
let partition = self
.catalog
.catalog
@ -465,13 +465,13 @@ impl TestPartition {
.await
.unwrap();
Self {
Arc::new(Self {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
table: Arc::clone(&self.table),
sequencer: Arc::clone(&self.sequencer),
partition,
}
})
}
/// Create a parquet for the partition
@ -568,6 +568,13 @@ impl TestPartition {
compaction_level: INITIAL_COMPACTION_LEVEL,
sort_key: Some(sort_key.clone()),
};
let column_set = ColumnSet::new(
record_batch
.schema()
.fields()
.iter()
.map(|f| f.name().clone()),
);
let (parquet_metadata_bin, real_file_size_bytes) = create_parquet_file(
ParquetStorage::new(Arc::clone(&self.catalog.object_store)),
&metadata,
@ -590,7 +597,7 @@ impl TestPartition {
row_count: row_count as i64,
created_at: Timestamp::new(creation_time),
compaction_level: INITIAL_COMPACTION_LEVEL,
column_set: ColumnSet::new(["col1", "col2"]),
column_set,
};
let parquet_file = repos
.parquet_files()

View File

@ -61,6 +61,13 @@ pub enum ReadError {
/// An error reading the downloaded Parquet file.
#[error("invalid parquet file: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
/// Schema mismatch
#[error("Schema mismatch (expected VS actual parquet file) for file: {path}")]
SchemaMismatch {
/// Path of the affected parquet file.
path: object_store::path::Path,
},
}
/// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an
@ -185,9 +192,16 @@ impl ParquetStorage {
// `download_and_scan_parquet` is sent back to the reader and
// not silently ignored
let object_store = Arc::clone(&self.object_store);
let schema_captured = Arc::clone(&schema);
let handle = tokio::task::spawn(async move {
let download_result =
download_and_scan_parquet(projection, path, object_store, tx.clone()).await;
let download_result = download_and_scan_parquet(
projection,
schema_captured,
path,
object_store,
tx.clone(),
)
.await;
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
@ -245,6 +259,7 @@ fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec<usize> {
/// spilling it to disk while it is processed.
async fn download_and_scan_parquet(
projection: Vec<usize>,
expected_schema: SchemaRef,
path: object_store::path::Path,
object_store: Arc<DynObjectStore>,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
@ -276,6 +291,14 @@ async fn download_and_scan_parquet(
let file_reader = SerializedFileReader::new(Bytes::from(data))?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
// Check schema but ignore the metadata
let schema = arrow_reader
.get_schema()?
.with_metadata(expected_schema.metadata().clone());
if expected_schema.as_ref() != &schema {
return Err(ReadError::SchemaMismatch { path });
}
let mask = ProjectionMask::roots(arrow_reader.parquet_schema(), projection);
let record_batch_reader = arrow_reader.get_record_reader_by_columns(mask, batch_size)?;
@ -293,10 +316,13 @@ async fn download_and_scan_parquet(
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use arrow::array::{ArrayRef, StringBuilder};
use arrow::array::{ArrayRef, Int64Builder, StringBuilder};
use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId};
use datafusion::common::DataFusionError;
use iox_time::Time;
#[tokio::test]
@ -305,21 +331,7 @@ mod tests {
let store = ParquetStorage::new(object_store);
let meta = IoxMetadata {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(42),
namespace_id: NamespaceId::new(1),
namespace_name: "bananas".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,
sort_key: None,
};
let meta = meta();
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let schema = batch.schema();
let stream = futures::stream::iter([Ok(batch.clone())]);
@ -356,6 +368,123 @@ mod tests {
assert_eq!(got.pop().unwrap(), batch);
}
#[tokio::test]
async fn test_schema_check_fail() {
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let store = ParquetStorage::new(object_store);
let meta = meta();
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let other_batch = RecordBatch::try_from_iter([("a", to_int_array(&[1]))]).unwrap();
let schema = batch.schema();
let stream = futures::stream::iter([Ok(other_batch)]);
// Serialize & upload the record batches.
store
.upload(stream, &meta)
.await
.expect("should serialize and store sucessfully");
// Fetch the record batches and compare them to the input batches.
let path: ParquetFilePath = (&meta).into();
let rx = store
.read_filter(&Predicate::default(), Selection::All, schema, &path)
.expect("should read record batches from object store");
// Drain the retrieved record batch stream
let err = datafusion::physical_plan::common::collect(rx)
.await
.unwrap_err();
// And compare to the original input
if let DataFusionError::ArrowError(ArrowError::ExternalError(err)) = err {
assert_eq!(
err.to_string(),
"Schema mismatch (expected VS actual parquet file) for file: 1/3/2/4/00000000-0000-0000-0000-000000000000.parquet",
);
} else {
panic!("Wrong error type: {err}");
}
}
#[tokio::test]
async fn test_schema_check_ignore_additional_metadata_in_mem() {
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let store = ParquetStorage::new(object_store);
let meta = meta();
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let schema = batch.schema();
let stream = futures::stream::iter([Ok(batch)]);
// Serialize & upload the record batches.
store
.upload(stream, &meta)
.await
.expect("should serialize and store sucessfully");
// add metadata to reference schema
let schema = Arc::new(
schema
.as_ref()
.clone()
.with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])),
);
// Fetch the record batches and compare them to the input batches.
let path: ParquetFilePath = (&meta).into();
let rx = store
.read_filter(&Predicate::default(), Selection::All, schema, &path)
.expect("should read record batches from object store");
// Drain the retrieved record batch stream
datafusion::physical_plan::common::collect(rx)
.await
.unwrap();
}
#[tokio::test]
async fn test_schema_check_ignore_additional_metadata_in_file() {
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let store = ParquetStorage::new(object_store);
let meta = meta();
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let schema = batch.schema();
// add metadata to stored batch
let batch = RecordBatch::try_new(
Arc::new(
schema
.as_ref()
.clone()
.with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])),
),
batch.columns().to_vec(),
)
.unwrap();
let stream = futures::stream::iter([Ok(batch)]);
// Serialize & upload the record batches.
store
.upload(stream, &meta)
.await
.expect("should serialize and store sucessfully");
// Fetch the record batches and compare them to the input batches.
let path: ParquetFilePath = (&meta).into();
let rx = store
.read_filter(&Predicate::default(), Selection::All, schema, &path)
.expect("should read record batches from object store");
// Drain the retrieved record batch stream
datafusion::physical_plan::common::collect(rx)
.await
.unwrap();
}
fn to_string_array(strs: &[&str]) -> ArrayRef {
let mut builder = StringBuilder::new(strs.len());
for s in strs {
@ -363,4 +492,30 @@ mod tests {
}
Arc::new(builder.finish())
}
fn to_int_array(vals: &[i64]) -> ArrayRef {
let mut builder = Int64Builder::new(vals.len());
for x in vals {
builder.append_value(*x).expect("appending string");
}
Arc::new(builder.finish())
}
fn meta() -> IoxMetadata {
IoxMetadata {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(42),
namespace_id: NamespaceId::new(1),
namespace_name: "bananas".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,
sort_key: None,
}
}
}

View File

@ -5,6 +5,7 @@ use cache_system::{
backend::{
lru::{LruBackend, ResourcePool},
resource_consumption::FunctionEstimator,
shared::SharedBackend,
ttl::{OptionalValueTtlProvider, TtlBackend},
},
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
@ -13,7 +14,12 @@ use cache_system::{
use data_types::NamespaceSchema;
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_time::TimeProvider;
use std::{collections::HashMap, mem::size_of_val, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
mem::size_of_val,
sync::Arc,
time::Duration,
};
use super::ram::RamSize;
@ -31,6 +37,7 @@ type CacheT = Box<dyn Cache<K = Arc<str>, V = Option<Arc<CachedNamespace>>, Extr
#[derive(Debug)]
pub struct NamespaceCache {
cache: CacheT,
backend: SharedBackend<Arc<str>, Option<Arc<CachedNamespace>>>,
}
impl NamespaceCache {
@ -100,8 +107,9 @@ impl NamespaceCache {
},
)),
));
let backend = SharedBackend::new(backend);
let cache = Box::new(CacheDriver::new(loader, backend));
let cache = Box::new(CacheDriver::new(loader, Box::new(backend.clone())));
let cache = Box::new(CacheWithMetrics::new(
cache,
CACHE_ID,
@ -109,7 +117,7 @@ impl NamespaceCache {
metric_registry,
));
Self { cache }
Self { cache, backend }
}
/// Get namespace schema by name.
@ -119,6 +127,29 @@ impl NamespaceCache {
.await
.map(|n| Arc::clone(&n.schema))
}
/// Expire namespace if the cached schema does NOT cover the given set of columns.
pub fn expire_if_schema_does_not_cover(
&self,
namespace_name: Arc<str>,
table_name: &str,
columns: &HashSet<String>,
) {
self.backend.remove_if(&namespace_name, |cached_namespace| {
if let Some(namespace) = cached_namespace.as_ref() {
if let Some(table) = namespace.schema.tables.get(table_name) {
let covered: HashSet<_> = table.columns.keys().map(|s| s.as_str()).collect();
columns.iter().any(|col| !covered.contains(col.as_str()))
} else {
// table unknown => need to update
true
}
} else {
// namespace unknown => need to update
true
}
});
}
}
#[derive(Debug, Clone)]
@ -294,4 +325,71 @@ mod tests {
assert!(none.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
}
#[tokio::test]
async fn test_expiration() {
let catalog = TestCatalog::new();
let cache = NamespaceCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
);
// ========== namespace unknown ==========
assert!(cache.schema(Arc::from("ns1")).await.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
// ========== table unknown ==========
let ns1 = catalog.create_namespace("ns1").await;
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 3);
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 4);
// ========== no columns ==========
let t1 = ns1.create_table("t1").await;
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
// ========== some columns ==========
t1.create_column("c1", ColumnType::Bool).await;
t1.create_column("c2", ColumnType::Bool).await;
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
let columns = HashSet::from(["c1".into()]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6);
let columns = HashSet::from(["c2".into()]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6);
}
}