From 1fa4ef9baccae41f130851b15fb59641a724d904 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 7 Jun 2022 11:03:00 +0000 Subject: [PATCH 1/4] chore(deps): Bump tower-http from 0.3.3 to 0.3.4 (#4793) Bumps [tower-http](https://github.com/tower-rs/tower-http) from 0.3.3 to 0.3.4. - [Release notes](https://github.com/tower-rs/tower-http/releases) - [Commits](https://github.com/tower-rs/tower-http/compare/tower-http-0.3.3...tower-http-0.3.4) --- updated-dependencies: - dependency-name: tower-http dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f884fedb29..dfddd58262 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5741,9 +5741,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d342c6d58709c0a6d48d48dabbb62d4ef955cf5f0f3bbfd845838e7ae88dbae" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" dependencies = [ "bitflags", "bytes", From 5d98988c9f0f1cf048c63165325ffb0cea9d943e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 7 Jun 2022 12:37:55 +0000 Subject: [PATCH 2/4] chore(deps): Bump tokio from 1.19.1 to 1.19.2 (#4795) Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.19.1 to 1.19.2. - [Release notes](https://github.com/tokio-rs/tokio/releases) - [Commits](https://github.com/tokio-rs/tokio/commits) --- updated-dependencies: - dependency-name: tokio dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- test_helpers/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dfddd58262..77a6256aea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5534,9 +5534,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.19.1" +version = "1.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95eec79ea28c00a365f539f1961e9278fbcaf81c0ff6aaf0e93c181352446948" +checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" dependencies = [ "bytes", "libc", diff --git a/test_helpers/Cargo.toml b/test_helpers/Cargo.toml index ba5c7615ae..98dac25d5a 100644 --- a/test_helpers/Cargo.toml +++ b/test_helpers/Cargo.toml @@ -13,7 +13,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } observability_deps = { path = "../observability_deps" } workspace-hack = { path = "../workspace-hack"} async-trait = { version = "0.1.56", optional = true } -tokio = { version = "1.19.1", optional = true, default_features = false, features = ["time"] } +tokio = { version = "1.19.2", optional = true, default_features = false, features = ["time"] } [features] default = [] From 4509e3db57096587abb27108495d5595c61afb3c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 7 Jun 2022 15:31:49 +0200 Subject: [PATCH 3/4] feat: wire up RB metrics for querier chunks --- querier/src/cache/mod.rs | 2 +- querier/src/cache/read_buffer.rs | 57 +++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/querier/src/cache/mod.rs b/querier/src/cache/mod.rs index fa70536f9f..7d0129cc2e 100644 --- a/querier/src/cache/mod.rs +++ b/querier/src/cache/mod.rs @@ -115,7 +115,7 @@ impl CatalogCache { let read_buffer_cache = ReadBufferCache::new( backoff_config, Arc::clone(&time_provider), - &metric_registry, + metric_registry, Arc::clone(&ram_pool), ); diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs index e95d795cd5..72c5203c04 100644 --- a/querier/src/cache/read_buffer.rs +++ b/querier/src/cache/read_buffer.rs @@ -16,7 +16,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use iox_time::TimeProvider; use parquet_file::{chunk::DecodedParquetFile, storage::ParquetStorage}; -use read_buffer::RBChunk; +use read_buffer::{ChunkMetrics, RBChunk}; use snafu::{ResultExt, Snafu}; use std::{collections::HashMap, mem, sync::Arc}; @@ -43,12 +43,14 @@ impl ReadBufferCache { pub fn new( backoff_config: BackoffConfig, time_provider: Arc, - metric_registry: &metric::Registry, + metric_registry: Arc, ram_pool: Arc>, ) -> Self { + let metric_registry_captured = Arc::clone(&metric_registry); let loader = Box::new(FunctionLoader::new( move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| { let backoff_config = backoff_config.clone(); + let metric_registry = Arc::clone(&metric_registry_captured); async move { let rb_chunk = Backoff::new(&backoff_config) @@ -63,6 +65,7 @@ impl ReadBufferCache { decoded_parquet_file_for_load, table_name_for_load, store_for_load, + &metric_registry, ) .await } @@ -79,7 +82,7 @@ impl ReadBufferCache { loader, CACHE_ID, Arc::clone(&time_provider), - metric_registry, + &metric_registry, )); // add to memory pool @@ -137,10 +140,11 @@ async fn load_from_parquet_file( decoded_parquet_file: Arc, table_name: Arc, store: ParquetStorage, + metric_registry: &metric::Registry, ) -> Result { let record_batch_stream = record_batches_stream(decoded_parquet_file, store).context(ReadingFromStorageSnafu)?; - read_buffer_chunk_from_stream(table_name, record_batch_stream) + read_buffer_chunk_from_stream(table_name, record_batch_stream, metric_registry) .await .context(BuildingChunkSnafu) } @@ -170,10 +174,15 @@ enum RBChunkError { async fn read_buffer_chunk_from_stream( table_name: Arc, mut stream: SendableRecordBatchStream, + metric_registry: &metric::Registry, ) -> Result { let schema = stream.schema(); - let mut builder = read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema); + // create "global" metric object, so that we don't blow up prometheus w/ too many metrics + let metrics = ChunkMetrics::new(metric_registry, "iox_shared"); + + let mut builder = + read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema).with_metrics(metrics); while let Some(record_batch) = stream.next().await { builder @@ -192,7 +201,7 @@ mod tests { use arrow_util::assert_batches_eq; use datafusion_util::stream_from_batches; use iox_tests::util::{TestCatalog, TestPartition}; - use metric::{Attributes, Metric, U64Counter}; + use metric::{Attributes, CumulativeGauge, Metric, U64Counter}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use read_buffer::Predicate; use schema::selection::Selection; @@ -202,7 +211,7 @@ mod tests { ReadBufferCache::new( BackoffConfig::default(), catalog.time_provider(), - &catalog.metric_registry(), + catalog.metric_registry(), test_ram_pool(), ) } @@ -304,7 +313,7 @@ mod tests { let cache = ReadBufferCache::new( BackoffConfig::default(), catalog.time_provider(), - &catalog.metric_registry(), + catalog.metric_registry(), ram_pool, ); @@ -407,7 +416,9 @@ mod tests { let stream = stream_from_batches(batches); - let rb = read_buffer_chunk_from_stream("cpu".into(), stream) + let metric_registry = metric::Registry::new(); + + let rb = read_buffer_chunk_from_stream("cpu".into(), stream, &metric_registry) .await .unwrap(); @@ -427,4 +438,32 @@ mod tests { assert_batches_eq!(expected, &rb_batches); } + + #[tokio::test] + async fn test_rb_metrics() { + let (catalog, partition) = make_catalog().await; + + let test_parquet_file = partition.create_parquet_file("table1 foo=1 11").await; + let parquet_file = test_parquet_file.parquet_file; + let decoded = Arc::new(DecodedParquetFile::new(parquet_file)); + let storage = ParquetStorage::new(Arc::clone(&catalog.object_store)); + + let cache = make_cache(&catalog); + + let _rb = cache + .get(Arc::clone(&decoded), "table1".into(), storage.clone()) + .await; + + let g: Metric = catalog + .metric_registry + .get_instrument("read_buffer_row_group_total") + .unwrap(); + let v = g + .get_observer(&Attributes::from(&[("db_name", "iox_shared")])) + .unwrap() + .fetch(); + + // Load is only called once + assert_eq!(v, 1); + } } From 3ecb1ee056884265b677d7f75717dc9b98695b53 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 7 Jun 2022 13:35:01 +0000 Subject: [PATCH 4/4] chore(deps): Bump http from 0.2.7 to 0.2.8 (#4796) Bumps [http](https://github.com/hyperium/http) from 0.2.7 to 0.2.8. - [Release notes](https://github.com/hyperium/http/releases) - [Changelog](https://github.com/hyperium/http/blob/master/CHANGELOG.md) - [Commits](https://github.com/hyperium/http/compare/v0.2.7...v0.2.8) --- updated-dependencies: - dependency-name: http dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- client_util/Cargo.toml | 2 +- influxdb_iox/Cargo.toml | 2 +- ioxd_common/Cargo.toml | 2 +- test_helpers_end_to_end/Cargo.toml | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 77a6256aea..6699e91525 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1989,9 +1989,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ "bytes", "fnv", diff --git a/client_util/Cargo.toml b/client_util/Cargo.toml index 24bc56fba8..97473c0ab0 100644 --- a/client_util/Cargo.toml +++ b/client_util/Cargo.toml @@ -6,7 +6,7 @@ description = "Shared code for IOx clients" edition = "2021" [dependencies] -http = "0.2.7" +http = "0.2.8" thiserror = "1.0.31" tonic = { version = "0.7" } tower = "0.4" diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 1b33b8603d..052dde496b 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -43,7 +43,7 @@ console-subscriber = { version = "0.1.6", optional = true, features = ["parking_ dotenv = "0.15.0" futures = "0.3" hashbrown = "0.12" -http = "0.2.7" +http = "0.2.8" humantime = "2.1.0" itertools = "0.10.1" libc = { version = "0.2" } diff --git a/ioxd_common/Cargo.toml b/ioxd_common/Cargo.toml index 2ba676ad8a..ea96b4eb03 100644 --- a/ioxd_common/Cargo.toml +++ b/ioxd_common/Cargo.toml @@ -33,7 +33,7 @@ chrono = { version = "0.4", default-features = false } flate2 = "1.0" futures = "0.3" hashbrown = "0.12" -http = "0.2.7" +http = "0.2.8" hyper = "0.14" log = "0.4" parking_lot = "0.12" diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index 745a8003b6..17f6e951c1 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -10,7 +10,7 @@ assert_cmd = "2.0.2" bytes = "1.0" futures = "0.3" generated_types = { path = "../generated_types" } -http = "0.2.7" +http = "0.2.8" hyper = "0.14" influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] } nix = "0.24"