Merge branch 'main' into dom/ingester-uses-partition-key

pull/24376/head
kodiakhq[bot] 2022-06-22 09:47:49 +00:00 committed by GitHub
commit aff5e6d69a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 309 additions and 27 deletions

2
Cargo.lock generated
View File

@ -2066,7 +2066,6 @@ dependencies = [
"futures",
"generated_types",
"hashbrown",
"heappy",
"http",
"humantime",
"influxdb_iox_client",
@ -2392,6 +2391,7 @@ dependencies = [
"futures",
"generated_types",
"hashbrown",
"heappy",
"http",
"hyper",
"log",

Binary file not shown.

After

Width:  |  Height:  |  Size: 684 KiB

View File

@ -54,3 +54,21 @@ For example, if you aim your browser at an IOx server with a URL such as http://
You will see a beautiful flame graph such as
![Flame Graph](images/flame_graph.png)
# IOx — Heap Profiling
IOx includes a memory heap profile tool as well as a CPU profiler. The memory usage tool is based on [heappy](https://github.com/mkmik/heappy)
Support is is not compiled in by defauly, but must be enabled via the `heappy` feature:
```shell
# Compile and run IOx with heap profiling enabled
cargo run --no-default-features --features=heappy -- run all-in-one
```
Now, you aim your browser at an IOx server with a URL such as http://localhost:8080/debug/pprof/allocs?seconds=5
You will see a green flamegraph such as
![Heappy Graph](images/heappy_graph.png)

View File

@ -62,7 +62,6 @@ tonic = "0.7"
uuid = { version = "1", features = ["v4"] }
# jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive
tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
heappy = { git = "https://github.com/mkmik/heappy", rev = "5d47dff152b8430e1dc1aea5a54c91c0c3099219", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
@ -82,6 +81,7 @@ azure = ["clap_blocks/azure"] # Optional Azure Object store support
gcp = ["clap_blocks/gcp"] # Optional GCP object store support
aws = ["clap_blocks/aws"] # Optional AWS / S3 object store support
pprof = ["ioxd_common/pprof"] # Optional http://localhost:8080/debug/pprof/profile support
heappy = ["ioxd_common/heappy"] # Optional http://localhost:8080/debug/pproc/alloc support
# Enable tokio_console support (https://github.com/tokio-rs/console)
#

View File

@ -1,4 +1,7 @@
mod all_in_one;
// CLI errors when run with heappy (only works via `cargo run`):
// loading shared libraries: libjemalloc.so.2: cannot open shared object file: No such file or directory"
#[cfg(not(feature = "heappy"))]
mod cli;
mod debug;
mod error;

View File

@ -451,7 +451,7 @@ pub struct TestPartition {
impl TestPartition {
/// Update sort key.
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Self {
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Arc<Self> {
let partition = self
.catalog
.catalog
@ -465,13 +465,13 @@ impl TestPartition {
.await
.unwrap();
Self {
Arc::new(Self {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
table: Arc::clone(&self.table),
sequencer: Arc::clone(&self.sequencer),
partition,
}
})
}
/// Create a parquet for the partition
@ -568,6 +568,13 @@ impl TestPartition {
compaction_level: INITIAL_COMPACTION_LEVEL,
sort_key: Some(sort_key.clone()),
};
let column_set = ColumnSet::new(
record_batch
.schema()
.fields()
.iter()
.map(|f| f.name().clone()),
);
let (parquet_metadata_bin, real_file_size_bytes) = create_parquet_file(
ParquetStorage::new(Arc::clone(&self.catalog.object_store)),
&metadata,
@ -590,7 +597,7 @@ impl TestPartition {
row_count: row_count as i64,
created_at: Timestamp::new(creation_time),
compaction_level: INITIAL_COMPACTION_LEVEL,
column_set: ColumnSet::new(["col1", "col2"]),
column_set,
};
let parquet_file = repos
.parquet_files()

View File

@ -11,6 +11,7 @@ clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
heappy = { git = "https://github.com/mkmik/heappy", rev = "5d47dff152b8430e1dc1aea5a54c91c0c3099219", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
metric = { path = "../metric" }
metric_exporters = { path = "../metric_exporters" }
mutable_batch_lp = { path = "../mutable_batch_lp" }

View File

@ -61,6 +61,13 @@ pub enum ReadError {
/// An error reading the downloaded Parquet file.
#[error("invalid parquet file: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
/// Schema mismatch
#[error("Schema mismatch (expected VS actual parquet file) for file: {path}")]
SchemaMismatch {
/// Path of the affected parquet file.
path: object_store::path::Path,
},
}
/// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an
@ -185,9 +192,16 @@ impl ParquetStorage {
// `download_and_scan_parquet` is sent back to the reader and
// not silently ignored
let object_store = Arc::clone(&self.object_store);
let schema_captured = Arc::clone(&schema);
let handle = tokio::task::spawn(async move {
let download_result =
download_and_scan_parquet(projection, path, object_store, tx.clone()).await;
let download_result = download_and_scan_parquet(
projection,
schema_captured,
path,
object_store,
tx.clone(),
)
.await;
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
@ -245,6 +259,7 @@ fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec<usize> {
/// spilling it to disk while it is processed.
async fn download_and_scan_parquet(
projection: Vec<usize>,
expected_schema: SchemaRef,
path: object_store::path::Path,
object_store: Arc<DynObjectStore>,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
@ -276,6 +291,14 @@ async fn download_and_scan_parquet(
let file_reader = SerializedFileReader::new(Bytes::from(data))?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
// Check schema but ignore the metadata
let schema = arrow_reader
.get_schema()?
.with_metadata(expected_schema.metadata().clone());
if expected_schema.as_ref() != &schema {
return Err(ReadError::SchemaMismatch { path });
}
let mask = ProjectionMask::roots(arrow_reader.parquet_schema(), projection);
let record_batch_reader = arrow_reader.get_record_reader_by_columns(mask, batch_size)?;
@ -293,10 +316,13 @@ async fn download_and_scan_parquet(
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use arrow::array::{ArrayRef, StringBuilder};
use arrow::array::{ArrayRef, Int64Builder, StringBuilder};
use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId};
use datafusion::common::DataFusionError;
use iox_time::Time;
#[tokio::test]
@ -305,21 +331,7 @@ mod tests {
let store = ParquetStorage::new(object_store);
let meta = IoxMetadata {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(42),
namespace_id: NamespaceId::new(1),
namespace_name: "bananas".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,
sort_key: None,
};
let meta = meta();
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let schema = batch.schema();
let stream = futures::stream::iter([Ok(batch.clone())]);
@ -356,6 +368,123 @@ mod tests {
assert_eq!(got.pop().unwrap(), batch);
}
#[tokio::test]
async fn test_schema_check_fail() {
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let store = ParquetStorage::new(object_store);
let meta = meta();
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let other_batch = RecordBatch::try_from_iter([("a", to_int_array(&[1]))]).unwrap();
let schema = batch.schema();
let stream = futures::stream::iter([Ok(other_batch)]);
// Serialize & upload the record batches.
store
.upload(stream, &meta)
.await
.expect("should serialize and store sucessfully");
// Fetch the record batches and compare them to the input batches.
let path: ParquetFilePath = (&meta).into();
let rx = store
.read_filter(&Predicate::default(), Selection::All, schema, &path)
.expect("should read record batches from object store");
// Drain the retrieved record batch stream
let err = datafusion::physical_plan::common::collect(rx)
.await
.unwrap_err();
// And compare to the original input
if let DataFusionError::ArrowError(ArrowError::ExternalError(err)) = err {
assert_eq!(
err.to_string(),
"Schema mismatch (expected VS actual parquet file) for file: 1/3/2/4/00000000-0000-0000-0000-000000000000.parquet",
);
} else {
panic!("Wrong error type: {err}");
}
}
#[tokio::test]
async fn test_schema_check_ignore_additional_metadata_in_mem() {
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let store = ParquetStorage::new(object_store);
let meta = meta();
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let schema = batch.schema();
let stream = futures::stream::iter([Ok(batch)]);
// Serialize & upload the record batches.
store
.upload(stream, &meta)
.await
.expect("should serialize and store sucessfully");
// add metadata to reference schema
let schema = Arc::new(
schema
.as_ref()
.clone()
.with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])),
);
// Fetch the record batches and compare them to the input batches.
let path: ParquetFilePath = (&meta).into();
let rx = store
.read_filter(&Predicate::default(), Selection::All, schema, &path)
.expect("should read record batches from object store");
// Drain the retrieved record batch stream
datafusion::physical_plan::common::collect(rx)
.await
.unwrap();
}
#[tokio::test]
async fn test_schema_check_ignore_additional_metadata_in_file() {
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let store = ParquetStorage::new(object_store);
let meta = meta();
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let schema = batch.schema();
// add metadata to stored batch
let batch = RecordBatch::try_new(
Arc::new(
schema
.as_ref()
.clone()
.with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])),
),
batch.columns().to_vec(),
)
.unwrap();
let stream = futures::stream::iter([Ok(batch)]);
// Serialize & upload the record batches.
store
.upload(stream, &meta)
.await
.expect("should serialize and store sucessfully");
// Fetch the record batches and compare them to the input batches.
let path: ParquetFilePath = (&meta).into();
let rx = store
.read_filter(&Predicate::default(), Selection::All, schema, &path)
.expect("should read record batches from object store");
// Drain the retrieved record batch stream
datafusion::physical_plan::common::collect(rx)
.await
.unwrap();
}
fn to_string_array(strs: &[&str]) -> ArrayRef {
let mut builder = StringBuilder::new(strs.len());
for s in strs {
@ -363,4 +492,30 @@ mod tests {
}
Arc::new(builder.finish())
}
fn to_int_array(vals: &[i64]) -> ArrayRef {
let mut builder = Int64Builder::new(vals.len());
for x in vals {
builder.append_value(*x).expect("appending string");
}
Arc::new(builder.finish())
}
fn meta() -> IoxMetadata {
IoxMetadata {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(42),
namespace_id: NamespaceId::new(1),
namespace_name: "bananas".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,
sort_key: None,
}
}
}

View File

@ -5,6 +5,7 @@ use cache_system::{
backend::{
lru::{LruBackend, ResourcePool},
resource_consumption::FunctionEstimator,
shared::SharedBackend,
ttl::{OptionalValueTtlProvider, TtlBackend},
},
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
@ -13,7 +14,12 @@ use cache_system::{
use data_types::NamespaceSchema;
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_time::TimeProvider;
use std::{collections::HashMap, mem::size_of_val, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
mem::size_of_val,
sync::Arc,
time::Duration,
};
use super::ram::RamSize;
@ -31,6 +37,7 @@ type CacheT = Box<dyn Cache<K = Arc<str>, V = Option<Arc<CachedNamespace>>, Extr
#[derive(Debug)]
pub struct NamespaceCache {
cache: CacheT,
backend: SharedBackend<Arc<str>, Option<Arc<CachedNamespace>>>,
}
impl NamespaceCache {
@ -100,8 +107,9 @@ impl NamespaceCache {
},
)),
));
let backend = SharedBackend::new(backend);
let cache = Box::new(CacheDriver::new(loader, backend));
let cache = Box::new(CacheDriver::new(loader, Box::new(backend.clone())));
let cache = Box::new(CacheWithMetrics::new(
cache,
CACHE_ID,
@ -109,7 +117,7 @@ impl NamespaceCache {
metric_registry,
));
Self { cache }
Self { cache, backend }
}
/// Get namespace schema by name.
@ -119,6 +127,29 @@ impl NamespaceCache {
.await
.map(|n| Arc::clone(&n.schema))
}
/// Expire namespace if the cached schema does NOT cover the given set of columns.
pub fn expire_if_schema_does_not_cover(
&self,
namespace_name: Arc<str>,
table_name: &str,
columns: &HashSet<String>,
) {
self.backend.remove_if(&namespace_name, |cached_namespace| {
if let Some(namespace) = cached_namespace.as_ref() {
if let Some(table) = namespace.schema.tables.get(table_name) {
let covered: HashSet<_> = table.columns.keys().map(|s| s.as_str()).collect();
columns.iter().any(|col| !covered.contains(col.as_str()))
} else {
// table unknown => need to update
true
}
} else {
// namespace unknown => need to update
true
}
});
}
}
#[derive(Debug, Clone)]
@ -294,4 +325,71 @@ mod tests {
assert!(none.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
}
#[tokio::test]
async fn test_expiration() {
let catalog = TestCatalog::new();
let cache = NamespaceCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
);
// ========== namespace unknown ==========
assert!(cache.schema(Arc::from("ns1")).await.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
// ========== table unknown ==========
let ns1 = catalog.create_namespace("ns1").await;
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 3);
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 4);
// ========== no columns ==========
let t1 = ns1.create_table("t1").await;
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
// ========== some columns ==========
t1.create_column("c1", ColumnType::Bool).await;
t1.create_column("c2", ColumnType::Bool).await;
let columns = HashSet::from([]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
let columns = HashSet::from(["c1".into()]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6);
let columns = HashSet::from(["c2".into()]);
cache.expire_if_schema_does_not_cover(Arc::from("ns1"), "t1", &columns);
assert!(cache.schema(Arc::from("ns1")).await.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6);
}
}