Merge branch 'main' into cn/rename-no-ng

pull/24376/head
kodiakhq[bot] 2022-05-13 13:47:48 +00:00 committed by GitHub
commit 542ec97b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1429 additions and 74 deletions

View File

@ -450,7 +450,7 @@ jobs:
RUST_VERSION="$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml)"
docker buildx build \
--build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console" \
--build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console,pprof" \
--build-arg RUST_VERSION="$RUST_VERSION" \
--build-arg RUSTFLAGS="$RUSTFLAGS" \
--progress plain \

23
Cargo.lock generated
View File

@ -587,6 +587,22 @@ dependencies = [
"tokio",
]
[[package]]
name = "cache_system"
version = "0.1.0"
dependencies = [
"async-trait",
"criterion",
"futures",
"iox_time",
"observability_deps",
"parking_lot 0.12.0",
"proptest",
"rand",
"tokio",
"workspace-hack",
]
[[package]]
name = "cast"
version = "0.2.7"
@ -4097,8 +4113,8 @@ dependencies = [
"assert_matches",
"async-trait",
"backoff 0.1.0",
"cache_system",
"client_util",
"criterion",
"data_types",
"datafusion 0.1.0",
"datafusion_util",
@ -4116,7 +4132,6 @@ dependencies = [
"parquet_file",
"pin-project",
"predicate",
"proptest",
"query",
"rand",
"schema",
@ -6729,9 +6744,9 @@ dependencies = [
[[package]]
name = "zstd-safe"
version = "5.0.1+zstd.1.5.2"
version = "5.0.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae"
checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db"
dependencies = [
"libc",
"zstd-sys",

View File

@ -3,6 +3,7 @@
members = [
"arrow_util",
"backoff",
"cache_system",
"clap_blocks",
"client_util",
"compactor",

22
cache_system/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "cache_system"
version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1.53"
futures = "0.3"
iox_time = { path = "../iox_time" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
tokio = { version = "1.18", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
criterion = "0.3"
proptest = { version = "1", default_features = false, features = ["std"] }
rand = "0.8.3"
[[bench]]
name = "addressable_heap"
harness = false

View File

@ -1,10 +1,10 @@
use std::mem::size_of;
use cache_system::backend::addressable_heap::AddressableHeap;
use criterion::{
criterion_group, criterion_main, measurement::WallTime, AxisScale, BatchSize, BenchmarkGroup,
BenchmarkId, Criterion, PlotConfiguration, SamplingMode,
};
use querier::AddressableHeap;
use rand::{prelude::SliceRandom, thread_rng, Rng};
/// Payload (`V`) for testing.

View File

@ -1,3 +1,4 @@
//! Implementation of an [`AddressableHeap`].
use std::{
collections::{HashMap, VecDeque},
hash::Hash,
@ -44,6 +45,14 @@ where
}
}
/// Check if the heap is empty.
pub fn is_empty(&self) -> bool {
let res1 = self.key_to_order_and_value.is_empty();
let res2 = self.queue.is_empty();
assert_eq!(res1, res2, "data structures out of sync");
res1
}
/// Insert element.
///
/// If the element (compared by `K`) already exists, it will be returned.
@ -396,6 +405,10 @@ mod tests {
Self { inner: Vec::new() }
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn insert(&mut self, k: u8, v: String, o: i8) -> Option<(String, i8)> {
let res = self.remove(&k);
self.inner.push((k, v, o));
@ -441,6 +454,7 @@ mod tests {
#[derive(Debug, Clone)]
enum Action {
IsEmpty,
Insert { k: u8, v: String, o: i8 },
Peek,
Pop,
@ -451,6 +465,7 @@ mod tests {
// Use a hand-rolled strategy instead of `proptest-derive`, because the latter one is quite a heavy dependency.
fn action() -> impl Strategy<Value = Action> {
prop_oneof![
Just(Action::IsEmpty),
(any::<u8>(), ".*", any::<i8>()).prop_map(|(k, v, o)| Action::Insert { k, v, o }),
Just(Action::Peek),
Just(Action::Pop),
@ -467,6 +482,11 @@ mod tests {
for action in actions {
match action {
Action::IsEmpty => {
let res1 = heap.is_empty();
let res2 = sim.is_empty();
assert_eq!(res1, res2);
}
Action::Insert{k, v, o} => {
let res1 = heap.insert(k, v.clone(), o);
let res2 = sim.insert(k, v, o);

View File

@ -1,3 +1,4 @@
//! Cross-populated two caches.
use std::{any::Any, fmt::Debug, hash::Hash, sync::Arc};
use parking_lot::Mutex;
@ -310,7 +311,7 @@ mod tests {
#[test]
fn test_generic1() {
use crate::cache_system::backend::test_util::test_generic;
use crate::backend::test_util::test_generic;
test_generic(|| {
let backend1 = Box::new(HashMap::<u8, String>::new());
@ -328,7 +329,7 @@ mod tests {
#[test]
fn test_generic2() {
use crate::cache_system::backend::test_util::test_generic;
use crate::backend::test_util::test_generic;
test_generic(|| {
let backend1 = Box::new(HashMap::<i8, String>::new());

View File

@ -1,3 +1,4 @@
//! Implements [`CacheBackend`] for [`HashMap`].
use std::{
any::Any,
collections::HashMap,
@ -43,7 +44,7 @@ mod tests {
#[test]
fn test_generic() {
use crate::cache_system::backend::test_util::test_generic;
use crate::backend::test_util::test_generic;
test_generic(HashMap::new);
}

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,11 @@
//! Storage backends to keep and manage cached entries.
use std::{any::Any, fmt::Debug, hash::Hash};
pub mod addressable_heap;
pub mod dual;
pub mod hash_map;
pub mod lru;
pub mod resource_consumption;
pub mod ttl;
#[cfg(test)]

View File

@ -0,0 +1,30 @@
//! Reasoning about resource consumption of cached data.
use std::{
fmt::Debug,
ops::{Add, Sub},
};
/// Strongly-typed resource consumption.
///
/// Can be used to represent in-RAM memory as well as on-disc memory.
pub trait Resource:
Add<Output = Self> + Copy + Debug + PartialOrd + Send + Sub<Output = Self> + 'static
{
/// Create resource consumption of zero.
fn zero() -> Self;
}
/// An estimator of [`Resource`] consumption for a given key-value pair.
pub trait ResourceEstimator: Debug + Send + Sync + 'static {
/// Cache key.
type K;
/// Cached value.
type V;
/// Size that can be estimated.
type S: Resource;
/// Estimate size of given key-value pair.
fn consumption(&self, k: &Self::K, v: &Self::V) -> Self::S;
}

View File

@ -1,3 +1,4 @@
//! Time-to-live handling.
use std::{any::Any, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration};
use iox_time::{Time, TimeProvider};
@ -552,7 +553,7 @@ mod tests {
#[test]
fn test_generic() {
use crate::cache_system::backend::test_util::test_generic;
use crate::backend::test_util::test_generic;
test_generic(|| {
let ttl_provider = Arc::new(NeverTtlProvider::default());

View File

@ -1,3 +1,4 @@
//! Main data structure, see [`Cache`].
use std::{collections::HashMap, hash::Hash, sync::Arc};
use futures::{

14
cache_system/src/lib.rs Normal file
View File

@ -0,0 +1,14 @@
//! Flexible and modular cache system.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub mod backend;
pub mod driver;
pub mod loader;

View File

@ -1,8 +1,9 @@
//! How to load new cache entries.
use async_trait::async_trait;
use futures::{future::BoxFuture, FutureExt};
use std::future::Future;
/// Loader for missing [`Cache`](crate::cache_system::driver::Cache) entries.
/// Loader for missing [`Cache`](crate::driver::Cache) entries.
#[async_trait]
pub trait Loader: std::fmt::Debug + Send + Sync + 'static {
/// Cache key.

View File

@ -15,7 +15,10 @@ metric = { path = "../metric" }
metric_exporters = { path = "../metric_exporters" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
observability_deps = { path = "../observability_deps" }
pprof = { version = "0.9", default-features = false, features = ["flamegraph", "prost-codec"], optional = true }
# NOTE: we may not notice that we need the "backtrace-rs" feature if we also build with the heappy feature, which depends on backtrace-rs.
# (honestly I thought that cargo dependencies were isolated on a per crate basis so I'm a bit surprised that pprof accidentally builds
# successfully just because another crate happens to depend on backtrace-rs)
pprof = { version = "0.9", default-features = false, features = ["flamegraph", "prost-codec", "backtrace-rs"], optional = true }
predicate = { path = "../predicate" }
service_grpc_testing = { path = "../service_grpc_testing" }
trace = { path = "../trace" }

View File

@ -7,6 +7,7 @@ edition = "2021"
arrow = "13"
async-trait = "0.1.53"
backoff = { path = "../backoff" }
cache_system = { path = "../cache_system" }
client_util = { path = "../client_util" }
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
@ -40,12 +41,6 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
assert_matches = "1.5"
criterion = "0.3"
iox_tests = { path = "../iox_tests" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
proptest = { version = "1", default_features = false, features = ["std"] }
test_helpers = { path = "../test_helpers" }
[[bench]]
name = "addressable_heap"
harness = false

View File

@ -1,6 +1,7 @@
//! Namespace cache.
use crate::cache_system::{
use backoff::{Backoff, BackoffConfig};
use cache_system::{
backend::{
dual::dual_backends,
ttl::{OptionalValueTtlProvider, TtlBackend},
@ -8,7 +9,6 @@ use crate::cache_system::{
driver::Cache,
loader::FunctionLoader,
};
use backoff::{Backoff, BackoffConfig};
use data_types::{NamespaceId, NamespaceSchema};
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_time::TimeProvider;

View File

@ -1,7 +1,7 @@
//! Partition cache.
use crate::cache_system::{driver::Cache, loader::FunctionLoader};
use backoff::{Backoff, BackoffConfig};
use cache_system::{driver::Cache, loader::FunctionLoader};
use data_types::{PartitionId, SequencerId};
use iox_catalog::interface::Catalog;
use schema::sort::SortKey;

View File

@ -1,11 +1,11 @@
//! Processed tombstone cache.
use crate::cache_system::{
use backoff::{Backoff, BackoffConfig};
use cache_system::{
backend::ttl::{TtlBackend, TtlProvider},
driver::Cache,
loader::FunctionLoader,
};
use backoff::{Backoff, BackoffConfig};
use data_types::{ParquetFileId, TombstoneId};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;

View File

@ -1,6 +1,7 @@
//! Table cache.
use crate::cache_system::{
use backoff::{Backoff, BackoffConfig};
use cache_system::{
backend::{
dual::dual_backends,
ttl::{OptionalValueTtlProvider, TtlBackend},
@ -8,7 +9,6 @@ use crate::cache_system::{
driver::Cache,
loader::FunctionLoader,
};
use backoff::{Backoff, BackoffConfig};
use data_types::{NamespaceId, Table, TableId};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;

View File

@ -1,3 +0,0 @@
pub mod backend;
pub mod driver;
pub mod loader;

View File

@ -1,11 +1,12 @@
use arrow::{datatypes::Schema, record_batch::RecordBatch};
use async_trait::async_trait;
use client_util::connection;
use client_util::connection::{self, Connection};
use generated_types::ingester::IngesterQueryRequest;
use influxdb_iox_client::flight::{self, generated_types::IngesterQueryResponseMetadata};
use observability_deps::tracing::debug;
use snafu::{ResultExt, Snafu};
use std::{
collections::HashMap,
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
@ -45,14 +46,23 @@ pub trait FlightClient: Debug + Send + Sync + 'static {
/// Send query to given ingester.
async fn query(
&self,
ingester_address: &str,
ingester_address: Arc<str>,
request: IngesterQueryRequest,
) -> Result<Box<dyn QueryData>, Error>;
}
/// Default [`FlightClient`] implemenetation that uses a real client.
/// Default [`FlightClient`] implementation that uses a real connection
#[derive(Debug, Default)]
pub struct FlightClientImpl {}
pub struct FlightClientImpl {
/// Cached connections
/// key: ingester_address (e.g. "http://ingester-1:8082")
/// value: CachedConnection
///
/// Note: Use sync (parking_log) mutex because it is always held
/// for a very short period of time, and any actual connection (and
/// waiting) is done in CachedConnection
connections: parking_lot::Mutex<HashMap<String, CachedConnection>>,
}
impl FlightClientImpl {
/// Create new client.
@ -61,28 +71,19 @@ impl FlightClientImpl {
}
/// Establish connection to given addr and perform handshake.
async fn connect(
&self,
ingester_address: &str,
) -> Result<flight::Client<flight::generated_types::IngesterQueryRequest>, Error> {
debug!(
%ingester_address,
"Connecting to ingester",
);
let connection = connection::Builder::new()
.build(ingester_address)
.await
.context(ConnectingSnafu { ingester_address })?;
let mut client =
flight::Client::<flight::generated_types::IngesterQueryRequest>::new(connection);
// make contact with the ingester
client
.handshake()
.await
.context(HandshakeSnafu { ingester_address })?;
Ok(client)
async fn connect(&self, ingester_address: Arc<str>) -> Result<Connection, Error> {
let cached_connection = {
let mut connections = self.connections.lock();
if let Some(cached_connection) = connections.get(ingester_address.as_ref()) {
cached_connection.clone()
} else {
// need to make a new one;
let cached_connection = CachedConnection::new(&ingester_address);
connections.insert(ingester_address.to_string(), cached_connection.clone());
cached_connection
}
};
cached_connection.connect().await
}
}
@ -90,11 +91,13 @@ impl FlightClientImpl {
impl FlightClient for FlightClientImpl {
async fn query(
&self,
ingester_addr: &str,
ingester_addr: Arc<str>,
request: IngesterQueryRequest,
) -> Result<Box<dyn QueryData>, Error> {
// TODO maybe cache this connection
let mut client = self.connect(ingester_addr).await?;
let connection = self.connect(ingester_addr).await?;
let mut client =
flight::Client::<flight::generated_types::IngesterQueryRequest>::new(connection);
debug!(?request, "Sending request to ingester");
let request: flight::generated_types::IngesterQueryRequest =
@ -153,3 +156,53 @@ impl QueryData for PerformQuery<IngesterQueryResponseMetadata> {
self.schema()
}
}
#[derive(Debug, Clone)]
struct CachedConnection {
ingester_address: Arc<str>,
/// Real async mutex to
maybe_connection: Arc<tokio::sync::Mutex<Option<Connection>>>,
}
impl CachedConnection {
fn new(ingester_address: &Arc<str>) -> Self {
Self {
ingester_address: Arc::clone(ingester_address),
maybe_connection: Arc::new(tokio::sync::Mutex::new(None)),
}
}
/// Return the underlying connection, creating it if needed
async fn connect(&self) -> Result<Connection, Error> {
let mut maybe_connection = self.maybe_connection.lock().await;
let ingester_address = self.ingester_address.as_ref();
if let Some(connection) = maybe_connection.as_ref() {
debug!(%ingester_address, "Reusing connection to ingester");
Ok(connection.clone())
} else {
debug!(%ingester_address, "Connecting to ingester");
let connection = connection::Builder::new()
.build(ingester_address)
.await
.context(ConnectingSnafu { ingester_address })?;
// sanity check w/ a handshake
let mut client = flight::Client::<flight::generated_types::IngesterQueryRequest>::new(
connection.clone(),
);
// make contact with the ingester
client
.handshake()
.await
.context(HandshakeSnafu { ingester_address })?;
*maybe_connection = Some(connection.clone());
Ok(connection)
}
}
}

View File

@ -222,8 +222,6 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<Ingeste
expected_schema,
} = request;
let ingester_address = ingester_address.as_ref();
let ingester_query_request = IngesterQueryRequest {
namespace: namespace_name.to_string(),
table: table_name.to_string(),
@ -232,8 +230,10 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<Ingeste
};
let query_res = flight_client
.query(ingester_address, ingester_query_request)
.query(Arc::clone(&ingester_address), ingester_query_request)
.await;
let ingester_address = ingester_address.as_ref();
if let Err(FlightClientError::Flight {
source: FlightError::GrpcError(status),
}) = &query_res
@ -1202,13 +1202,13 @@ mod tests {
impl FlightClient for MockFlightClient {
async fn query(
&self,
ingester_address: &str,
ingester_address: Arc<str>,
_request: IngesterQueryRequest,
) -> Result<Box<dyn QueryData>, FlightClientError> {
self.responses
.lock()
.await
.remove(ingester_address)
.remove(ingester_address.as_ref())
.expect("Response not mocked")
.map(|query_data| Box::new(query_data) as _)
}

View File

@ -10,7 +10,6 @@
)]
pub mod cache;
mod cache_system;
pub mod chunk;
mod database;
mod handler;
@ -36,6 +35,3 @@ pub use ingester::{
};
pub use namespace::QuerierNamespace;
pub use server::QuerierServer;
// for benchmarks
pub use cache_system::backend::addressable_heap::AddressableHeap;

View File

@ -72,7 +72,7 @@ impl RecordBatchDeduplicator {
};
let mut dupe_ranges = self.compute_ranges(&batch)?;
debug!("Finish computing range");
trace!("Finish computing range");
// The last partition may span batches so we can't emit it
// until we have seen the next batch (or we are at end of
@ -80,7 +80,7 @@ impl RecordBatchDeduplicator {
let last_range = dupe_ranges.ranges.pop();
let output_record_batch = self.output_from_ranges(&batch, &dupe_ranges)?;
debug!(
trace!(
num_rows = output_record_batch.num_rows(),
"Rows of ouput_record_batch"
);
@ -91,7 +91,7 @@ impl RecordBatchDeduplicator {
let last_batch = Self::slice_record_batch(&batch, last_range.start, len)?;
self.last_batch = Some(last_batch);
}
debug!("done pushing record batch into the indexer");
trace!("done pushing record batch into the indexer");
Ok(output_record_batch)
}

View File

@ -1,5 +1,5 @@
-- Test Setup: ManyFieldsSeveralChunks
-- SQL: SELECT * from h2o order by temp, other_temp, time;
-- SQL: SELECT * from h2o;
-- Results After Sorting
+---------+------------+-------+------+--------------------------------+
| city | other_temp | state | temp | time |
@ -29,7 +29,8 @@
| | IOxReadFilterNode: table_name=h2o, chunks=2 predicate=Predicate |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------+
-- SQL: select temp, other_temp, time from h2o order by 1, 2;
-- SQL: select temp, other_temp, time from h2o;
-- Results After Sorting
+------+------------+--------------------------------+
| temp | other_temp | time |
+------+------------+--------------------------------+

View File

@ -2,7 +2,7 @@
-- validate we have access to information schema for listing system tables
-- IOX_COMPARE: sorted
SELECT * from h2o order by temp, other_temp, time;
SELECT * from h2o;
-- Plan will look like:
-- . Two chunks (one parquet and one from ingester) neither overlap nor contain duplicate
-- --> scan in one scan node
@ -10,5 +10,6 @@ SELECT * from h2o order by temp, other_temp, time;
EXPLAIN SELECT * from h2o;
-- Only selct fields and time
select temp, other_temp, time from h2o order by 1, 2;
-- IOX_COMPARE: sorted
select temp, other_temp, time from h2o;
EXPLAIN select temp, other_temp, time from h2o;

View File

@ -877,7 +877,7 @@ impl Partitioner for ConstantPartitioner {
impl IngesterFlightClient for MockIngester {
async fn query(
&self,
_ingester_address: &str,
_ingester_address: Arc<str>,
request: IngesterQueryRequest,
) -> Result<Box<dyn IngesterFlightClientQueryData>, IngesterFlightClientError> {
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior (e.g. passing predicates