Merge branch 'main' into dom/ingester-streaming-compact
commit
6d8b93e129
|
@ -1989,9 +1989,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http"
|
name = "http"
|
||||||
version = "0.2.7"
|
version = "0.2.8"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb"
|
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"fnv",
|
"fnv",
|
||||||
|
@ -5534,9 +5534,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.19.1"
|
version = "1.19.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "95eec79ea28c00a365f539f1961e9278fbcaf81c0ff6aaf0e93c181352446948"
|
checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"libc",
|
"libc",
|
||||||
|
@ -5741,9 +5741,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-http"
|
name = "tower-http"
|
||||||
version = "0.3.3"
|
version = "0.3.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7d342c6d58709c0a6d48d48dabbb62d4ef955cf5f0f3bbfd845838e7ae88dbae"
|
checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
|
|
@ -6,7 +6,7 @@ description = "Shared code for IOx clients"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
http = "0.2.7"
|
http = "0.2.8"
|
||||||
thiserror = "1.0.31"
|
thiserror = "1.0.31"
|
||||||
tonic = { version = "0.7" }
|
tonic = { version = "0.7" }
|
||||||
tower = "0.4"
|
tower = "0.4"
|
||||||
|
|
|
@ -43,7 +43,7 @@ console-subscriber = { version = "0.1.6", optional = true, features = ["parking_
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
hashbrown = "0.12"
|
hashbrown = "0.12"
|
||||||
http = "0.2.7"
|
http = "0.2.8"
|
||||||
humantime = "2.1.0"
|
humantime = "2.1.0"
|
||||||
itertools = "0.10.1"
|
itertools = "0.10.1"
|
||||||
libc = { version = "0.2" }
|
libc = { version = "0.2" }
|
||||||
|
|
|
@ -33,7 +33,7 @@ chrono = { version = "0.4", default-features = false }
|
||||||
flate2 = "1.0"
|
flate2 = "1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
hashbrown = "0.12"
|
hashbrown = "0.12"
|
||||||
http = "0.2.7"
|
http = "0.2.8"
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
parking_lot = "0.12"
|
parking_lot = "0.12"
|
||||||
|
|
|
@ -115,7 +115,7 @@ impl CatalogCache {
|
||||||
let read_buffer_cache = ReadBufferCache::new(
|
let read_buffer_cache = ReadBufferCache::new(
|
||||||
backoff_config,
|
backoff_config,
|
||||||
Arc::clone(&time_provider),
|
Arc::clone(&time_provider),
|
||||||
&metric_registry,
|
metric_registry,
|
||||||
Arc::clone(&ram_pool),
|
Arc::clone(&ram_pool),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use iox_time::TimeProvider;
|
use iox_time::TimeProvider;
|
||||||
use parquet_file::{chunk::DecodedParquetFile, storage::ParquetStorage};
|
use parquet_file::{chunk::DecodedParquetFile, storage::ParquetStorage};
|
||||||
use read_buffer::RBChunk;
|
use read_buffer::{ChunkMetrics, RBChunk};
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
use std::{collections::HashMap, mem, sync::Arc};
|
use std::{collections::HashMap, mem, sync::Arc};
|
||||||
|
|
||||||
|
@ -43,12 +43,14 @@ impl ReadBufferCache {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
backoff_config: BackoffConfig,
|
backoff_config: BackoffConfig,
|
||||||
time_provider: Arc<dyn TimeProvider>,
|
time_provider: Arc<dyn TimeProvider>,
|
||||||
metric_registry: &metric::Registry,
|
metric_registry: Arc<metric::Registry>,
|
||||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let metric_registry_captured = Arc::clone(&metric_registry);
|
||||||
let loader = Box::new(FunctionLoader::new(
|
let loader = Box::new(FunctionLoader::new(
|
||||||
move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| {
|
move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| {
|
||||||
let backoff_config = backoff_config.clone();
|
let backoff_config = backoff_config.clone();
|
||||||
|
let metric_registry = Arc::clone(&metric_registry_captured);
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let rb_chunk = Backoff::new(&backoff_config)
|
let rb_chunk = Backoff::new(&backoff_config)
|
||||||
|
@ -63,6 +65,7 @@ impl ReadBufferCache {
|
||||||
decoded_parquet_file_for_load,
|
decoded_parquet_file_for_load,
|
||||||
table_name_for_load,
|
table_name_for_load,
|
||||||
store_for_load,
|
store_for_load,
|
||||||
|
&metric_registry,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -79,7 +82,7 @@ impl ReadBufferCache {
|
||||||
loader,
|
loader,
|
||||||
CACHE_ID,
|
CACHE_ID,
|
||||||
Arc::clone(&time_provider),
|
Arc::clone(&time_provider),
|
||||||
metric_registry,
|
&metric_registry,
|
||||||
));
|
));
|
||||||
|
|
||||||
// add to memory pool
|
// add to memory pool
|
||||||
|
@ -137,10 +140,11 @@ async fn load_from_parquet_file(
|
||||||
decoded_parquet_file: Arc<DecodedParquetFile>,
|
decoded_parquet_file: Arc<DecodedParquetFile>,
|
||||||
table_name: Arc<str>,
|
table_name: Arc<str>,
|
||||||
store: ParquetStorage,
|
store: ParquetStorage,
|
||||||
|
metric_registry: &metric::Registry,
|
||||||
) -> Result<RBChunk, LoadError> {
|
) -> Result<RBChunk, LoadError> {
|
||||||
let record_batch_stream =
|
let record_batch_stream =
|
||||||
record_batches_stream(decoded_parquet_file, store).context(ReadingFromStorageSnafu)?;
|
record_batches_stream(decoded_parquet_file, store).context(ReadingFromStorageSnafu)?;
|
||||||
read_buffer_chunk_from_stream(table_name, record_batch_stream)
|
read_buffer_chunk_from_stream(table_name, record_batch_stream, metric_registry)
|
||||||
.await
|
.await
|
||||||
.context(BuildingChunkSnafu)
|
.context(BuildingChunkSnafu)
|
||||||
}
|
}
|
||||||
|
@ -170,10 +174,15 @@ enum RBChunkError {
|
||||||
async fn read_buffer_chunk_from_stream(
|
async fn read_buffer_chunk_from_stream(
|
||||||
table_name: Arc<str>,
|
table_name: Arc<str>,
|
||||||
mut stream: SendableRecordBatchStream,
|
mut stream: SendableRecordBatchStream,
|
||||||
|
metric_registry: &metric::Registry,
|
||||||
) -> Result<RBChunk, RBChunkError> {
|
) -> Result<RBChunk, RBChunkError> {
|
||||||
let schema = stream.schema();
|
let schema = stream.schema();
|
||||||
|
|
||||||
let mut builder = read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema);
|
// create "global" metric object, so that we don't blow up prometheus w/ too many metrics
|
||||||
|
let metrics = ChunkMetrics::new(metric_registry, "iox_shared");
|
||||||
|
|
||||||
|
let mut builder =
|
||||||
|
read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema).with_metrics(metrics);
|
||||||
|
|
||||||
while let Some(record_batch) = stream.next().await {
|
while let Some(record_batch) = stream.next().await {
|
||||||
builder
|
builder
|
||||||
|
@ -192,7 +201,7 @@ mod tests {
|
||||||
use arrow_util::assert_batches_eq;
|
use arrow_util::assert_batches_eq;
|
||||||
use datafusion_util::stream_from_batches;
|
use datafusion_util::stream_from_batches;
|
||||||
use iox_tests::util::{TestCatalog, TestPartition};
|
use iox_tests::util::{TestCatalog, TestPartition};
|
||||||
use metric::{Attributes, Metric, U64Counter};
|
use metric::{Attributes, CumulativeGauge, Metric, U64Counter};
|
||||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||||
use read_buffer::Predicate;
|
use read_buffer::Predicate;
|
||||||
use schema::selection::Selection;
|
use schema::selection::Selection;
|
||||||
|
@ -202,7 +211,7 @@ mod tests {
|
||||||
ReadBufferCache::new(
|
ReadBufferCache::new(
|
||||||
BackoffConfig::default(),
|
BackoffConfig::default(),
|
||||||
catalog.time_provider(),
|
catalog.time_provider(),
|
||||||
&catalog.metric_registry(),
|
catalog.metric_registry(),
|
||||||
test_ram_pool(),
|
test_ram_pool(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -304,7 +313,7 @@ mod tests {
|
||||||
let cache = ReadBufferCache::new(
|
let cache = ReadBufferCache::new(
|
||||||
BackoffConfig::default(),
|
BackoffConfig::default(),
|
||||||
catalog.time_provider(),
|
catalog.time_provider(),
|
||||||
&catalog.metric_registry(),
|
catalog.metric_registry(),
|
||||||
ram_pool,
|
ram_pool,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -407,7 +416,9 @@ mod tests {
|
||||||
|
|
||||||
let stream = stream_from_batches(batches);
|
let stream = stream_from_batches(batches);
|
||||||
|
|
||||||
let rb = read_buffer_chunk_from_stream("cpu".into(), stream)
|
let metric_registry = metric::Registry::new();
|
||||||
|
|
||||||
|
let rb = read_buffer_chunk_from_stream("cpu".into(), stream, &metric_registry)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -427,4 +438,32 @@ mod tests {
|
||||||
|
|
||||||
assert_batches_eq!(expected, &rb_batches);
|
assert_batches_eq!(expected, &rb_batches);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_rb_metrics() {
|
||||||
|
let (catalog, partition) = make_catalog().await;
|
||||||
|
|
||||||
|
let test_parquet_file = partition.create_parquet_file("table1 foo=1 11").await;
|
||||||
|
let parquet_file = test_parquet_file.parquet_file;
|
||||||
|
let decoded = Arc::new(DecodedParquetFile::new(parquet_file));
|
||||||
|
let storage = ParquetStorage::new(Arc::clone(&catalog.object_store));
|
||||||
|
|
||||||
|
let cache = make_cache(&catalog);
|
||||||
|
|
||||||
|
let _rb = cache
|
||||||
|
.get(Arc::clone(&decoded), "table1".into(), storage.clone())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let g: Metric<CumulativeGauge> = catalog
|
||||||
|
.metric_registry
|
||||||
|
.get_instrument("read_buffer_row_group_total")
|
||||||
|
.unwrap();
|
||||||
|
let v = g
|
||||||
|
.get_observer(&Attributes::from(&[("db_name", "iox_shared")]))
|
||||||
|
.unwrap()
|
||||||
|
.fetch();
|
||||||
|
|
||||||
|
// Load is only called once
|
||||||
|
assert_eq!(v, 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
observability_deps = { path = "../observability_deps" }
|
observability_deps = { path = "../observability_deps" }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
async-trait = { version = "0.1.56", optional = true }
|
async-trait = { version = "0.1.56", optional = true }
|
||||||
tokio = { version = "1.19.1", optional = true, default_features = false, features = ["time"] }
|
tokio = { version = "1.19.2", optional = true, default_features = false, features = ["time"] }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
|
|
|
@ -10,7 +10,7 @@ assert_cmd = "2.0.2"
|
||||||
bytes = "1.0"
|
bytes = "1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
generated_types = { path = "../generated_types" }
|
generated_types = { path = "../generated_types" }
|
||||||
http = "0.2.7"
|
http = "0.2.8"
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
|
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
|
||||||
nix = "0.24"
|
nix = "0.24"
|
||||||
|
|
Loading…
Reference in New Issue