Merge branch 'main' into jg/snafu-driveby

pull/24376/head
kodiakhq[bot] 2021-07-19 20:20:30 +00:00 committed by GitHub
commit 5bf68c4a57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 1244 additions and 793 deletions

24
Cargo.lock generated
View File

@ -126,14 +126,14 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "4.4.0"
source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/perf_integration#d416e9158275148e2be5e64a1c8a6689c7a83fac"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06d2bd50fddbdcecd832742b388228eec02eb3aabd33144ef46f6bc5420bf662"
dependencies = [
"bitflags",
"chrono",
"csv",
"flatbuffers",
"getrandom 0.2.3",
"hex",
"indexmap",
"lazy_static",
@ -150,8 +150,9 @@ dependencies = [
[[package]]
name = "arrow-flight"
version = "4.4.0"
source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/perf_integration#d416e9158275148e2be5e64a1c8a6689c7a83fac"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "628b4735031e1d5e41c7c1f210ad233ab942adcf155edcba05c3e6d762062349"
dependencies = [
"arrow",
"base64 0.13.0",
@ -823,6 +824,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"influxdb_line_protocol",
"num_cpus",
"observability_deps",
"percent-encoding",
"regex",
@ -841,7 +843,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/perf_integration_df_2#d201ebf323a532ac858fe33083639df4a8d321ee"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=3fb600df48ab1e53903b1a9bb12ebde33ad0856b#3fb600df48ab1e53903b1a9bb12ebde33ad0856b"
dependencies = [
"ahash 0.7.4",
"arrow",
@ -1277,6 +1279,7 @@ dependencies = [
"data_types",
"futures",
"google_types",
"num_cpus",
"observability_deps",
"proc-macro2",
"prost",
@ -2593,8 +2596,9 @@ dependencies = [
[[package]]
name = "parquet"
version = "4.4.0"
source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/perf_integration#d416e9158275148e2be5e64a1c8a6689c7a83fac"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9297cb17ef7287f6105685d230abbb2b37247657edf2b4a99271088e7d5b0ddd"
dependencies = [
"arrow",
"base64 0.13.0",
@ -4980,9 +4984,9 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "zeroize"
version = "1.3.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd"
checksum = "eeafe61337cb2c879d328b74aa6cd9d794592c82da6be559fdf11493f02a2d18"
[[package]]
name = "zstd"

View File

@ -70,8 +70,8 @@ tracker = { path = "tracker" }
trogging = { path = "trogging", features = ["structopt"] }
# Crates.io dependencies, in alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
arrow-flight = "4.0"
arrow = { version = "5.0", features = ["prettyprint"] }
arrow-flight = "5.0"
byteorder = "1.3.4"
bytes = "1.0"
chrono = "0.4"
@ -88,7 +88,7 @@ opentelemetry-jaeger = { version = "0.12", features = ["tokio"] }
opentelemetry-otlp = "0.6"
parking_lot = "0.11.1"
itertools = "0.10.1"
parquet = "4.0"
parquet = "5.0"
# used by arrow/datafusion anyway
prettytable-rs = "0.8"
pprof = { version = "^0.4", default-features = false, features = ["flamegraph", "protobuf"] }
@ -138,9 +138,3 @@ tempfile = "3.1.0"
azure = ["object_store/azure"]
gcp = ["object_store/gcp"]
aws = ["object_store/aws"]
[patch.crates-io]
arrow = { git="https://github.com/alamb/arrow-rs.git", branch = "alamb/perf_integration" }
parquet = { git="https://github.com/alamb/arrow-rs.git", branch = "alamb/perf_integration" }
arrow-flight= { git="https://github.com/alamb/arrow-rs.git", branch = "alamb/perf_integration" }

View File

@ -7,7 +7,7 @@ description = "Apache Arrow utilities"
[dependencies]
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
ahash = "0.7.2"
num-traits = "0.2"
snafu = "0.6"

View File

@ -9,6 +9,7 @@ readme = "README.md"
[dependencies] # In alphabetical order
chrono = { version = "0.4", features = ["serde"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
num_cpus = "1.13.0"
percent-encoding = "2.1.0"
regex = "1.4"
serde = { version = "1.0", features = ["rc", "derive"] }

View File

@ -144,6 +144,11 @@ pub struct LifecycleRules {
/// will sleep for this many milliseconds before looking again
pub worker_backoff_millis: NonZeroU64,
/// The maximum number of permitted concurrently executing compactions.
/// It is not currently possible to set a limit that disables compactions
/// entirely, nor is it possible to set an "unlimited" value.
pub max_active_compactions: NonZeroU32,
/// After how many transactions should IOx write a new checkpoint?
pub catalog_transactions_until_checkpoint: NonZeroU64,
@ -179,6 +184,7 @@ impl Default for LifecycleRules {
persist: false,
immutable: false,
worker_backoff_millis: NonZeroU64::new(DEFAULT_WORKER_BACKOFF_MILLIS).unwrap(),
max_active_compactions: NonZeroU32::new(num_cpus::get() as u32).unwrap(), // defaults to number of CPU threads
catalog_transactions_until_checkpoint: NonZeroU64::new(
DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
)

View File

@ -115,16 +115,11 @@ impl Job {
pub enum OperationStatus {
/// A task associated with the operation is running
Running,
/// All tasks associated with the operation have finished
///
/// Note: This does not indicate success or failure only that
/// no tasks associated with the operation are running
Complete,
/// All tasks associated with the operation have finished successfully
Success,
/// The operation was cancelled and no associated tasks are running
Cancelled,
/// An operation error was returned
///
/// Note: The tracker system currently will never return this
Errored,
}
@ -135,10 +130,18 @@ pub enum OperationStatus {
pub struct Operation {
/// ID of the running operation
pub id: usize,
/// Number of subtasks for this operation
pub task_count: u64,
/// Number of pending tasks for this operation
// The total number of created tasks
pub total_count: u64,
// The number of pending tasks
pub pending_count: u64,
// The number of tasks that completed successfully
pub success_count: u64,
// The number of tasks that returned an error
pub error_count: u64,
// The number of tasks that were cancelled
pub cancelled_count: u64,
// The number of tasks that did not run to completion (e.g. panic)
pub dropped_count: u64,
/// Wall time spent executing this operation
pub wall_time: std::time::Duration,
/// CPU time spent executing this operation

View File

@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug
# Turn off optional datafusion features (function packages)
upstream = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/perf_integration_df_2", default-features = false, package = "datafusion" }
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="3fb600df48ab1e53903b1a9bb12ebde33ad0856b", default-features = false, package = "datafusion" }

View File

@ -13,6 +13,7 @@ data_types = { path = "../data_types" }
futures = "0.3"
google_types = { path = "../google_types" }
observability_deps = { path = "../observability_deps" }
num_cpus = "1.13.0"
prost = "0.7"
prost-types = "0.7"
regex = "1.4"

View File

@ -76,6 +76,12 @@ message LifecycleRules {
// Maximum number of rows to buffer in a MUB chunk before compacting it
uint64 mub_row_threshold = 15;
// The maximum number of concurrent active compactions that can run.
//
// If 0, compactions are limited to the default number.
// See data_types::database_rules::DEFAULT_MAX_ACTIVE_COMPACTIONS
uint32 max_active_compactions = 16;
}
message DatabaseRules {

View File

@ -8,12 +8,24 @@ message OperationMetadata {
// How many nanoseconds has it been since the job was submitted
uint64 wall_nanos = 2;
// How many total tasks does this job have currently
uint64 task_count = 3;
// The total number of created tasks
uint64 total_count = 3;
// How many tasks for this job are still pending
// The number of pending tasks
uint64 pending_count = 4;
// The number of tasks that completed successfully
uint64 success_count = 13;
// The number of tasks that returned an error
uint64 error_count = 14;
// The number of tasks that were cancelled
uint64 cancelled_count = 15;
// The number of tasks that did not run to completion (e.g. panic)
uint64 dropped_count = 16;
// What kind of job is it?
oneof job {
Dummy dummy = 5;

View File

@ -27,6 +27,7 @@ impl From<LifecycleRules> for management::LifecycleRules {
persist: config.persist,
immutable: config.immutable,
worker_backoff_millis: config.worker_backoff_millis.get(),
max_active_compactions: config.max_active_compactions.get(),
catalog_transactions_until_checkpoint: config
.catalog_transactions_until_checkpoint
.get(),
@ -50,6 +51,8 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
immutable: proto.immutable,
worker_backoff_millis: NonZeroU64::new(proto.worker_backoff_millis)
.unwrap_or_else(|| NonZeroU64::new(DEFAULT_WORKER_BACKOFF_MILLIS).unwrap()),
max_active_compactions: NonZeroU32::new(proto.max_active_compactions)
.unwrap_or_else(|| NonZeroU32::new(num_cpus::get() as u32).unwrap()), // default to num CPU threads
catalog_transactions_until_checkpoint: NonZeroU64::new(
proto.catalog_transactions_until_checkpoint,
)
@ -84,6 +87,7 @@ mod tests {
persist: true,
immutable: true,
worker_backoff_millis: 1000,
max_active_compactions: 8,
catalog_transactions_until_checkpoint: 10,
late_arrive_window_seconds: 23,
persist_row_threshold: 57,
@ -110,6 +114,7 @@ mod tests {
assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted);
assert_eq!(back.immutable, protobuf.immutable);
assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis);
assert_eq!(back.max_active_compactions, protobuf.max_active_compactions);
assert_eq!(
back.late_arrive_window_seconds,
protobuf.late_arrive_window_seconds

View File

@ -157,7 +157,7 @@ impl TryFrom<longrunning::Operation> for data_types::job::Operation {
let status = match &operation.result {
None => OperationStatus::Running,
Some(longrunning::operation::Result::Response(_)) => OperationStatus::Complete,
Some(longrunning::operation::Result::Response(_)) => OperationStatus::Success,
Some(longrunning::operation::Result::Error(status)) => {
if status.code == tonic::Code::Cancelled as i32 {
OperationStatus::Cancelled
@ -169,8 +169,12 @@ impl TryFrom<longrunning::Operation> for data_types::job::Operation {
Ok(Self {
id: operation.name.parse().field("name")?,
task_count: meta.task_count,
total_count: meta.total_count,
pending_count: meta.pending_count,
success_count: meta.success_count,
error_count: meta.error_count,
cancelled_count: meta.cancelled_count,
dropped_count: meta.dropped_count,
wall_time: std::time::Duration::from_nanos(meta.wall_nanos),
cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos),
job: meta.job.map(Into::into),

View File

@ -10,8 +10,8 @@ format = ["arrow"]
[dependencies]
# Workspace dependencies, in alphabetical order
arrow = { version = "4.0", optional = true }
arrow-flight = { version = "4.0", optional = true}
arrow = { version = "5.0", optional = true }
arrow-flight = { version = "5.0", optional = true}
generated_types = { path = "../generated_types" }
# Crates.io dependencies, in alphabetical order

View File

@ -7,7 +7,7 @@ description = "InfluxDB IOx internal types, shared between IOx instances"
readme = "README.md"
[dependencies]
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
hashbrown = "0.11"
indexmap = "1.6"
itertools = "0.10.1"

View File

@ -30,6 +30,9 @@ where
/// The `LifecycleDb` this policy is automating
db: M,
/// The current number of active compactions.
active_compactions: usize,
/// Background tasks spawned by this `LifecyclePolicy`
trackers: Vec<TaskTracker<ChunkLifecycleAction>>,
}
@ -42,6 +45,7 @@ where
Self {
db,
trackers: vec![],
active_compactions: 0,
}
}
@ -250,6 +254,9 @@ where
}
if to_compact.len() >= 2 || has_mub_snapshot {
// caller's responsibility to determine if we can maybe compact.
assert!(self.active_compactions < rules.max_active_compactions.get() as usize);
// Upgrade partition first
let partition = partition.upgrade();
let chunks = to_compact
@ -261,6 +268,7 @@ where
.expect("failed to compact chunks")
.with_metadata(ChunkLifecycleAction::Compacting);
self.active_compactions += 1;
self.trackers.push(tracker);
}
}
@ -475,17 +483,39 @@ where
// if the criteria for persistence have been satisfied,
// but persistence cannot proceed because of in-progress
// compactions
let stall_compaction = if rules.persist {
self.maybe_persist_chunks(&db_name, partition, &rules, now_instant)
let stall_compaction_persisting = if rules.persist {
let persisting =
self.maybe_persist_chunks(&db_name, partition, &rules, now_instant);
if persisting {
debug!(%db_name, %partition, reason="persisting", "stalling compaction");
}
persisting
} else {
false
};
if !stall_compaction {
self.maybe_compact_chunks(partition, &rules, now);
} else {
debug!(%db_name, %partition, "stalling compaction to allow persist");
// Until we have a more sophisticated compaction policy that can
// allocate resources appropriately, we limit the number of
// compactions that may run concurrently. Compactions are
// completely disabled if max_compactions is Some(0), whilst if
// it is None then the compaction limiter is disabled (unlimited
// concurrent compactions).
let stall_compaction_no_slots = {
let max_compactions = self.db.rules().max_active_compactions.get();
let slots_full = self.active_compactions >= max_compactions as usize;
if slots_full {
debug!(%db_name, %partition, ?max_compactions, reason="slots_full", "stalling compaction");
}
slots_full
};
// conditions where no compactions will be scheduled.
if stall_compaction_persisting || stall_compaction_no_slots {
continue;
}
// possibly do a compaction
self.maybe_compact_chunks(partition, &rules, now);
}
if let Some(soft_limit) = rules.buffer_size_soft {
@ -498,7 +528,25 @@ where
}
// Clear out completed tasks
self.trackers.retain(|x| !x.is_complete());
let mut completed_compactions = 0;
self.trackers.retain(|x| {
let completed = x.is_complete();
if completed && matches!(x.metadata(), ChunkLifecycleAction::Compacting) {
// free up slot for another compaction
completed_compactions += 1;
}
!completed
});
// update active compactions
if completed_compactions > 0 {
debug!(?completed_compactions, active_compactions=?self.active_compactions,
max_compactions=?self.db.rules().max_active_compactions, "releasing compaction slots")
}
assert!(completed_compactions <= self.active_compactions);
self.active_compactions -= completed_compactions;
let tracker_fut = match self.trackers.is_empty() {
false => futures::future::Either::Left(futures::future::select_all(
@ -1437,6 +1485,52 @@ mod tests {
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![17, 18])]);
}
#[test]
fn test_compaction_limiter() {
let rules = LifecycleRules {
max_active_compactions: 2.try_into().unwrap(),
..Default::default()
};
let now = from_secs(50);
let partitions = vec![
TestPartition::new(vec![
// closed => can compact
TestChunk::new(0, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer),
// closed => can compact
TestChunk::new(10, Some(0), Some(30), ChunkStorage::ClosedMutableBuffer),
// closed => can compact
TestChunk::new(12, Some(0), Some(40), ChunkStorage::ClosedMutableBuffer),
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(1, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer),
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(200, Some(0), Some(10), ChunkStorage::ClosedMutableBuffer),
]),
];
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(now, Instant::now());
assert_eq!(
*db.events.read(),
vec![
MoverEvents::Compact(vec![0, 10, 12]),
MoverEvents::Compact(vec![1]),
],
);
db.events.write().clear();
// Compaction slots freed up, other partition can now compact.
lifecycle.check_for_work(now, Instant::now());
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![200]),],);
}
#[test]
fn test_persist() {
let rules = LifecycleRules {

View File

@ -5,7 +5,7 @@ authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
chrono = "0.4"
croaring = "0.5"
crossbeam = "0.8"
@ -14,7 +14,7 @@ human_format = "1.0.3"
packers = { path = "../packers" }
snafu = "0.6.8"
observability_deps = { path = "../observability_deps" }
parquet = "4.0"
parquet = "5.0"
[dev-dependencies] # In alphabetical order
criterion = "0.3"

View File

@ -14,7 +14,7 @@ edition = "2018"
# 2. Keep change/compile/link time down during development when working on just this crate
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
async-trait = "0.1"
chrono = "0.4"

View File

@ -5,13 +5,13 @@ authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
human_format = "1.0.3"
influxdb_tsm = { path = "../influxdb_tsm" }
internal_types = { path = "../internal_types" }
snafu = "0.6.2"
observability_deps = { path = "../observability_deps" }
parquet = "4.0"
parquet = "5.0"
[dev-dependencies] # In alphabetical order
rand = "0.8.3"

View File

@ -5,7 +5,7 @@ authors = ["Nga Tran <nga-tran@live.com>"]
edition = "2018"
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
base64 = "0.13"
bytes = "1.0"
chrono = "0.4"
@ -20,7 +20,7 @@ object_store = {path = "../object_store"}
observability_deps = { path = "../observability_deps" }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway
parquet = "4.0"
parquet = "5.0"
parquet-format = "2.6"
parking_lot = "0.11.1"
persistence_windows = { path = "../persistence_windows" }

View File

@ -14,7 +14,7 @@ description = "IOx Query Interface and Executor"
# 2. Allow for query logic testing without bringing in all the storage systems.
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
async-trait = "0.1"
chrono = "0.4"

View File

@ -39,6 +39,7 @@ use crate::plan::{
};
use self::{
context::IOxExecutionConfig,
split::StreamSplitNode,
task::{DedicatedExecutor, Error as ExecutorError},
};
@ -111,6 +112,9 @@ pub struct Executor {
/// Executor for running system/reorganization tasks such as
/// compact
reorg_exec: DedicatedExecutor,
/// The default configuration options with which to create contexts
config: IOxExecutionConfig,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -128,12 +132,25 @@ impl Executor {
let query_exec = DedicatedExecutor::new("IOx Query Executor Thread", num_threads);
let reorg_exec = DedicatedExecutor::new("IOx Reorg Executor Thread", num_threads);
let config = IOxExecutionConfig::new();
Self {
query_exec,
reorg_exec,
config,
}
}
/// returns the config of this executor
pub fn config(&self) -> &IOxExecutionConfig {
&self.config
}
/// returns a mutable reference to this executor's config
pub fn config_mut(&mut self) -> &mut IOxExecutionConfig {
&mut self.config
}
/// Executes this plan on the query pool, and returns the
/// resulting set of strings
pub async fn to_string_set(&self, plan: StringSetPlan) -> Result<StringSetRef> {
@ -289,7 +306,7 @@ impl Executor {
pub fn new_context(&self, executor_type: ExecutorType) -> IOxExecutionContext {
let executor = self.executor(executor_type).clone();
IOxExecutionContext::new(executor)
IOxExecutionContext::new(executor, self.config.clone())
}
/// Return the execution pool of the specified type

View File

@ -5,6 +5,7 @@ use std::{fmt, sync::Arc};
use arrow::record_batch::RecordBatch;
use datafusion::{
catalog::catalog::CatalogProvider,
execution::context::{ExecutionContextState, QueryPlanner},
logical_plan::{LogicalPlan, UserDefinedLogicalNode},
physical_plan::{
@ -105,6 +106,46 @@ impl ExtensionPlanner for IOxExtensionPlanner {
}
}
// Configuration for an IOx execution context
#[derive(Clone)]
pub struct IOxExecutionConfig {
/// Configuration options to pass to DataFusion
inner: ExecutionConfig,
}
impl Default for IOxExecutionConfig {
fn default() -> Self {
const BATCH_SIZE: usize = 1000;
// Setup default configuration
let inner = ExecutionConfig::new()
.with_batch_size(BATCH_SIZE)
.create_default_catalog_and_schema(true)
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA)
.with_query_planner(Arc::new(IOxQueryPlanner {}));
Self { inner }
}
}
impl fmt::Debug for IOxExecutionConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "IOxExecutionConfig ...")
}
}
impl IOxExecutionConfig {
pub fn new() -> Self {
Default::default()
}
/// Set execution concurrency
pub fn set_concurrency(&mut self, concurrency: usize) {
self.inner.concurrency = concurrency;
}
}
/// This is an execution context for planning in IOx. It wraps a
/// DataFusion execution context with the information needed for planning.
///
@ -136,21 +177,8 @@ impl fmt::Debug for IOxExecutionContext {
impl IOxExecutionContext {
/// Create an ExecutionContext suitable for executing DataFusion plans
///
/// The config is created with a default catalog and schema, but this
/// can be overridden at a later date
pub fn new(exec: DedicatedExecutor) -> Self {
const BATCH_SIZE: usize = 1000;
// TBD: Should we be reusing an execution context across all executions?
let config = ExecutionConfig::new()
.with_batch_size(BATCH_SIZE)
.create_default_catalog_and_schema(true)
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA)
.with_query_planner(Arc::new(IOxQueryPlanner {}));
let inner = ExecutionContext::with_config(config);
pub fn new(exec: DedicatedExecutor, config: IOxExecutionConfig) -> Self {
let inner = ExecutionContext::with_config(config.inner);
Self { inner, exec }
}
@ -160,11 +188,13 @@ impl IOxExecutionContext {
&self.inner
}
/// returns a mutable reference to the inner datafusion execution context
pub fn inner_mut(&mut self) -> &mut ExecutionContext {
&mut self.inner
/// registers a catalog with the inner context
pub fn register_catalog(&mut self, name: impl Into<String>, catalog: Arc<dyn CatalogProvider>) {
self.inner.register_catalog(name, catalog);
}
///
/// Prepare a SQL statement for execution. This assumes that any
/// tables referenced in the SQL have been registered with this context
pub fn prepare_sql(&mut self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {

View File

@ -9,6 +9,7 @@ use tracker::{TaskRegistration, TrackedFutureExt};
use futures::Future;
use observability_deps::tracing::warn;
use std::convert::Infallible;
/// The type of thing that the dedicated executor runs
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
@ -83,7 +84,13 @@ impl DedicatedExecutor {
let registration = TaskRegistration::new();
while let Ok(task) = rx.recv() {
tokio::task::spawn(task.track(registration.clone()));
tokio::task::spawn(
async move {
task.await;
Ok::<_, Infallible>(())
}
.track(registration.clone()),
);
}
// Wait for all tasks to finish

View File

@ -87,7 +87,7 @@ impl SqlQueryPlanner {
executor: &Executor,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut ctx = executor.new_context(ExecutorType::Query);
ctx.inner_mut().register_catalog(DEFAULT_CATALOG, database);
ctx.register_catalog(DEFAULT_CATALOG, database);
ctx.prepare_sql(query).context(Preparing)
}
}

View File

@ -15,7 +15,7 @@ query = { path = "../query" }
server = { path = "../server" }
[dev-dependencies]
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" }

View File

@ -1,86 +1,87 @@
-- Test Setup: OneMeasurementThreeChunksWithDuplicates
-- SQL: explain verbose select time, state, city, min_temp, max_temp, area from h2o order by time, state, city;
+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: #h2o.time ASC NULLS FIRST, #h2o.state ASC NULLS FIRST, #h2o.city ASC NULLS FIRST |
| | Projection: #h2o.time, #h2o.state, #h2o.city, #h2o.min_temp, #h2o.max_temp, #h2o.area |
| | TableScan: h2o projection=None |
| logical_plan after projection_push_down | Sort: #h2o.time ASC NULLS FIRST, #h2o.state ASC NULLS FIRST, #h2o.city ASC NULLS FIRST |
| | Projection: #h2o.time, #h2o.state, #h2o.city, #h2o.min_temp, #h2o.max_temp, #h2o.area |
| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |
| logical_plan after simplify_expressions | Sort: #h2o.time ASC NULLS FIRST, #h2o.state ASC NULLS FIRST, #h2o.city ASC NULLS FIRST |
| | Projection: #h2o.time, #h2o.state, #h2o.city, #h2o.min_temp, #h2o.max_temp, #h2o.area |
| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |
| physical_plan | SortExec: [time@0 ASC,state@1 ASC,city@2 ASC] |
| | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | ExecutionPlan(PlaceHolder) |
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | ExecutionPlan(PlaceHolder) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: explain verbose select time, state, city, min_temp, max_temp, area from h2o;
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #h2o.time, #h2o.state, #h2o.city, #h2o.min_temp, #h2o.max_temp, #h2o.area |
| | TableScan: h2o projection=None |
| logical_plan after projection_push_down | Projection: #h2o.time, #h2o.state, #h2o.city, #h2o.min_temp, #h2o.max_temp, #h2o.area |
| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |
| logical_plan after simplify_expressions | Projection: #h2o.time, #h2o.state, #h2o.city, #h2o.min_temp, #h2o.max_temp, #h2o.area |
| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |
| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | ExecutionPlan(PlaceHolder) |
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | ExecutionPlan(PlaceHolder) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE select state as name from h2o UNION ALL select city as name from h2o;
+-----------------------------------------+-------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+-------------------------------------------------------------------------------+
| logical_plan | Union |
| | Projection: #h2o.state AS name |
| | TableScan: h2o projection=None |
| | Projection: #h2o.city AS name |
| | TableScan: h2o projection=None |
| logical_plan after projection_push_down | Union |
| | Projection: #h2o.state AS name |
| | TableScan: h2o projection=Some([4]) |
| | Projection: #h2o.city AS name |
| | TableScan: h2o projection=Some([1]) |
| logical_plan after simplify_expressions | Union |
| | Projection: #h2o.state AS name |
| | TableScan: h2o projection=Some([4]) |
| | Projection: #h2o.city AS name |
| | TableScan: h2o projection=Some([1]) |
| physical_plan | ExecutionPlan(PlaceHolder) |
| | ProjectionExec: expr=[state@0 as name] |
| | ExecutionPlan(PlaceHolder) |
| | ProjectionExec: expr=[state@1 as state] |
| | DeduplicateExec: [state@1 ASC,city@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [state@1 ASC,city@0 ASC,time@2 ASC] |
| | ExecutionPlan(PlaceHolder) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | ProjectionExec: expr=[city@0 as name] |
| | ExecutionPlan(PlaceHolder) |
| | ProjectionExec: expr=[city@0 as city] |
| | DeduplicateExec: [state@1 ASC,city@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [state@1 ASC,city@0 ASC,time@2 ASC] |
| | ExecutionPlan(PlaceHolder) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
+-----------------------------------------+-------------------------------------------------------------------------------+
-- SQL: explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: #h2o.time ASC NULLS FIRST, #h2o.state ASC NULLS FIRST, #h2o.city ASC NULLS FIRST |
| | Projection: #h2o.time, #h2o.state, #h2o.city, #h2o.min_temp, #h2o.max_temp, #h2o.area |
| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |
| physical_plan | SortExec: [time@0 ASC,state@1 ASC,city@2 ASC] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | ExecutionPlan(PlaceHolder) |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | ExecutionPlan(PlaceHolder) |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN select time, state, city, min_temp, max_temp, area from h2o;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #h2o.time, #h2o.state, #h2o.city, #h2o.min_temp, #h2o.max_temp, #h2o.area |
| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |
| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | ExecutionPlan(PlaceHolder) |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | ExecutionPlan(PlaceHolder) |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN select state as name from h2o UNION ALL select city as name from h2o;
+---------------+-----------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan | Union |
| | Projection: #h2o.state AS name |
| | TableScan: h2o projection=Some([4]) |
| | Projection: #h2o.city AS name |
| | TableScan: h2o projection=Some([1]) |
| physical_plan | ExecutionPlan(PlaceHolder) |
| | ProjectionExec: expr=[state@0 as name] |
| | ExecutionPlan(PlaceHolder) |
| | ProjectionExec: expr=[state@1 as state] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [state@1 ASC,city@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [state@1 ASC,city@0 ASC,time@2 ASC] |
| | ExecutionPlan(PlaceHolder) |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | ProjectionExec: expr=[city@0 as name] |
| | ExecutionPlan(PlaceHolder) |
| | ProjectionExec: expr=[city@0 as city] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [state@1 ASC,city@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [state@1 ASC,city@0 ASC,time@2 ASC] |
| | ExecutionPlan(PlaceHolder) |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate |
+---------------+-----------------------------------------------------------------------------------+

View File

@ -2,11 +2,11 @@
-- IOX_SETUP: OneMeasurementThreeChunksWithDuplicates
-- Plan with order by
explain verbose select time, state, city, min_temp, max_temp, area from h2o order by time, state, city;
explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city;
-- plan without order by
explain verbose select time, state, city, min_temp, max_temp, area from h2o;
EXPLAIN select time, state, city, min_temp, max_temp, area from h2o;
-- Union plan
EXPLAIN VERBOSE select state as name from h2o UNION ALL select city as name from h2o;
EXPLAIN select state as name from h2o UNION ALL select city as name from h2o;

View File

@ -1,218 +1,167 @@
-- Test Setup: TwoMeasurementsPredicatePushDown
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant;
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where count > 200;
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: CAST(count@0 AS Int64) > 200 |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where count > 200.0;
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Float64(200) |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Float64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Float64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: CAST(count@0 AS Float64) > 200 |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0;
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: system@1 > 4 |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury';
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence');
+-----------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 = CAST(5 AS Float64) OR CAST(town@3 AS Utf8) = lawrence |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000;
+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 = CAST(5 AS Float64) OR CAST(town@3 AS Utf8) = lawrence AND CAST(count@0 AS Int64) < 40000 |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and count < 40000;
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(count@0 AS Int64) < 40000 |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0 and system < 7.0;
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: system@1 > 4 AND system@1 < 7 |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and system < 7.0;
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: system@1 > 5 AND system@1 < 7 |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system;
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.town NotEq Utf8("tewsbury") And Float64(7) Gt #restaurant.system |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.town NotEq Utf8("tewsbury") And Float64(7) Gt #restaurant.system |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.town NotEq Utf8("tewsbury") And Float64(7) Gt #restaurant.system |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: system@1 > 5 AND CAST(town@3 AS Utf8) != tewsbury AND 7 > system@1 |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading');
+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And Utf8("tewsbury") NotEq #restaurant.town And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And Utf8("tewsbury") NotEq #restaurant.town And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And Utf8("tewsbury") NotEq #restaurant.town And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: system@1 > 5 AND tewsbury != CAST(town@3 AS Utf8) AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN VERBOSE SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: Float64(5) Lt #restaurant.system And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") And #restaurant.time Gt totimestamp(Utf8("1970-01-01T00:00:00.000000130+00:00")) |
| | TableScan: restaurant projection=None |
| logical_plan after projection_push_down | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: Float64(5) Lt #restaurant.system And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") And #restaurant.time Gt totimestamp(Utf8("1970-01-01T00:00:00.000000130+00:00")) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| logical_plan after simplify_expressions | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: Float64(5) Lt #restaurant.system And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") And #restaurant.time Gt totimestamp(Utf8("1970-01-01T00:00:00.000000130+00:00")) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | FilterExec: 5 < system@1 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading AND time@2 > totimestamp(1970-01-01T00:00:00.000000130+00:00) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant;
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200;
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200.0;
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Float64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Float64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 4.0;
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 4 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury';
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence');
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 = CAST(5 AS Float64) OR CAST(town@3 AS Utf8) = lawrence |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 = CAST(5 AS Float64) OR CAST(town@3 AS Utf8) = lawrence AND CAST(count@0 AS Int64) < 40000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and count < 40000;
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(count@0 AS Int64) < 40000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 4.0 and system < 7.0;
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 4 AND system@1 < 7 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 5.0 and system < 7.0;
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND system@1 < 7 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system;
+---------------+-----------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.town NotEq Utf8("tewsbury") And Float64(7) Gt #restaurant.system |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND CAST(town@3 AS Utf8) != tewsbury AND 7 > system@1 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+-----------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading');
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And Utf8("tewsbury") NotEq #restaurant.town And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND tewsbury != CAST(town@3 AS Utf8) AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: Float64(5) Lt #restaurant.system And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") And #restaurant.time Gt totimestamp(Utf8("1970-01-01T00:00:00.000000130+00:00")) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: 5 < system@1 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading AND time@2 > totimestamp(1970-01-01T00:00:00.000000130+00:00) |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -2,44 +2,44 @@
-- IOX_SETUP: TwoMeasurementsPredicatePushDown
-- Test 1: Select everything
EXPLAIN VERBOSE SELECT * from restaurant;
EXPLAIN SELECT * from restaurant;
-- Test 2: One push-down expression: count > 200
-- TODO: Make push-down predicates shown in explain verbose. Ticket #1538
EXPLAIN VERBOSE SELECT * from restaurant where count > 200;
EXPLAIN SELECT * from restaurant where count > 200;
-- Test 2.2: One push-down expression: count > 200.0
EXPLAIN VERBOSE SELECT * from restaurant where count > 200.0;
EXPLAIN SELECT * from restaurant where count > 200.0;
-- Test 2.3: One push-down expression: system > 4.0
EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0;
EXPLAIN SELECT * from restaurant where system > 4.0;
-- Test 3: Two push-down expression: count > 200 and town != 'tewsbury'
EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury';
EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury';
-- Test 4: Still two push-down expression: count > 200 and town != 'tewsbury'
-- even though the results are different
EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence');
EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence');
-- Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000
EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000;
EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000;
-- Test 6: two push-down expression: count > 200 and count < 40000
EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and count < 40000;
EXPLAIN SELECT * from restaurant where count > 200 and count < 40000;
-- Test 7: two push-down expression on float: system > 4.0 and system < 7.0
EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0 and system < 7.0;
EXPLAIN SELECT * from restaurant where system > 4.0 and system < 7.0;
-- Test 8: two push-down expression on float: system > 5.0 and system < 7.0
EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and system < 7.0;
EXPLAIN SELECT * from restaurant where system > 5.0 and system < 7.0;
-- Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system;
EXPLAIN SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system;
-- Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading');
EXPLAIN SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading');
-- Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and
-- time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') rewritten to time GT INT(130)
EXPLAIN VERBOSE SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');
EXPLAIN SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');

View File

@ -1,13 +1,7 @@
//! Tests for the Influx gRPC queries
use std::{convert::TryFrom, num::NonZeroU32};
use crate::scenarios::*;
use data_types::database_rules::LifecycleRules;
use server::{
db::test_helpers::write_lp,
utils::{make_db, TestDb},
};
use server::{db::test_helpers::write_lp, utils::make_db};
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
@ -194,14 +188,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
db,
};
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data).await;
// roll over and load chunks into both RUB and OS

View File

@ -4,12 +4,16 @@ mod parse;
mod setup;
use arrow::record_batch::RecordBatch;
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner};
use query::{
exec::{Executor, ExecutorType},
frontend::sql::SqlQueryPlanner,
};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
io::LineWriter,
io::Write,
path::{Path, PathBuf},
sync::Arc,
};
use self::{parse::TestQueries, setup::TestSetup};
@ -261,7 +265,13 @@ impl<W: Write> Runner<W> {
writeln!(self.log, "Running scenario '{}'", scenario_name)?;
writeln!(self.log, "SQL: '{:#?}'", sql)?;
let planner = SqlQueryPlanner::default();
let executor = db.executor();
let num_threads = 1;
let mut executor = Executor::new(num_threads);
// hardcode concurrency in tests as by default is is the
// number of cores, which varies across machines
executor.config_mut().set_concurrency(4);
let executor = Arc::new(executor);
let physical_plan = planner
.query(db, &sql, executor.as_ref())

View File

@ -1,12 +1,9 @@
//! This module contains testing scenarios for Db
use std::collections::HashMap;
use std::convert::TryFrom;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::{Duration, Instant};
use data_types::database_rules::LifecycleRules;
use once_cell::sync::OnceCell;
#[allow(unused_imports, dead_code, unused_macros)]
@ -16,7 +13,6 @@ use async_trait::async_trait;
use server::utils::{
count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db,
TestDb,
};
use server::{db::test_helpers::write_lp, Db};
@ -138,14 +134,7 @@ impl DbSetup for NoData {
// Scenario 4: the database has had data loaded into RB & Object Store and then deleted
//
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
let data = "cpu,region=west user=23.2 100";
write_lp(&db, data).await;
// move data out of open chunk
@ -564,14 +553,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
write_lp(
&db,
@ -771,14 +753,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
};
// Scenario 4: One closed chunk in both RUb and OS
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
@ -802,14 +777,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
};
// Scenario 5: One closed chunk in OS only
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
@ -914,14 +882,7 @@ pub async fn make_two_chunk_scenarios(
};
// in 2 read buffer chunks that also loaded into object store
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
@ -960,14 +921,7 @@ pub async fn make_two_chunk_scenarios(
};
// Scenario 6: Two closed chunk in OS only
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
@ -1077,14 +1031,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
};
// Scenario 2: One closed chunk in Parquet only
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)

View File

@ -11,7 +11,7 @@ edition = "2018"
# 2. Keep change/compile/link time down during development when working on just this crate
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
croaring = "0.5"
data_types = { path = "../data_types" }

View File

@ -5,7 +5,7 @@ authors = ["pauldix <paul@pauldix.net>"]
edition = "2018"
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
arrow = { version = "5.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
async-trait = "0.1"
bytes = { version = "1.0" }

View File

@ -210,12 +210,12 @@ impl Config {
names
}
/// Update datbase rules of a fully initialized database.
/// Update database rules of a fully initialized database.
pub(crate) fn update_db_rules<F, E>(
&self,
db_name: &DatabaseName<'static>,
update: F,
) -> std::result::Result<DatabaseRules, UpdateError<E>>
) -> std::result::Result<Arc<DatabaseRules>, UpdateError<E>>
where
F: FnOnce(DatabaseRules) -> std::result::Result<DatabaseRules, E>,
{
@ -224,9 +224,7 @@ impl Config {
.db_initialized(db_name)
.context(DatabaseNotFound { db_name })?;
let mut rules = db.rules.write();
*rules = update(rules.clone()).map_err(UpdateError::Closure)?;
Ok(rules.clone())
db.update_db_rules(update).map_err(UpdateError::Closure)
}
/// Get all registered remote servers.
@ -400,7 +398,7 @@ enum DatabaseState {
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
server_id: ServerId,
rules: DatabaseRules,
rules: Arc<DatabaseRules>,
},
/// Catalog is loaded but data from sequencers / write buffers is not yet replayed.
@ -454,8 +452,8 @@ impl DatabaseState {
match self {
DatabaseState::Known { db_name, .. } => db_name.clone(),
DatabaseState::RulesLoaded { rules, .. } => rules.name.clone(),
DatabaseState::Replay { db, .. } => db.rules.read().name.clone(),
DatabaseState::Initialized { db, .. } => db.rules.read().name.clone(),
DatabaseState::Replay { db, .. } => db.rules().name.clone(),
DatabaseState::Initialized { db, .. } => db.rules().name.clone(),
}
}
@ -477,12 +475,12 @@ impl DatabaseState {
}
}
fn rules(&self) -> Option<DatabaseRules> {
fn rules(&self) -> Option<Arc<DatabaseRules>> {
match self {
DatabaseState::Known { .. } => None,
DatabaseState::RulesLoaded { rules, .. } => Some(rules.clone()),
DatabaseState::Replay { db, .. } => Some(db.rules.read().clone()),
DatabaseState::Initialized { db, .. } => Some(db.rules.read().clone()),
DatabaseState::RulesLoaded { rules, .. } => Some(Arc::clone(&rules)),
DatabaseState::Replay { db, .. } => Some(db.rules()),
DatabaseState::Initialized { db, .. } => Some(db.rules()),
}
}
}
@ -556,7 +554,7 @@ impl<'a> DatabaseHandle<'a> {
}
/// Get rules, if already known in the current state.
pub fn rules(&self) -> Option<DatabaseRules> {
pub fn rules(&self) -> Option<Arc<DatabaseRules>> {
self.state().rules()
}
@ -604,7 +602,7 @@ impl<'a> DatabaseHandle<'a> {
object_store: Arc::clone(&object_store),
exec: Arc::clone(&exec),
server_id: *server_id,
rules,
rules: Arc::new(rules),
}));
Ok(())
@ -637,7 +635,7 @@ impl<'a> DatabaseHandle<'a> {
exec: Arc::clone(&exec),
preserved_catalog,
catalog,
rules: rules.clone(),
rules: Arc::clone(&rules),
write_buffer,
};
let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs)));
@ -666,12 +664,12 @@ impl<'a> DatabaseHandle<'a> {
let shutdown = self.config.shutdown.child_token();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let name_captured = db.rules.read().name.clone();
let rules = db.rules();
let handle = Some(tokio::spawn(async move {
db_captured
.background_worker(shutdown_captured)
.instrument(tracing::info_span!("db_worker", database=%name_captured))
.instrument(tracing::info_span!("db_worker", database=%rules.name))
.await
}));

View File

@ -201,7 +201,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// outside of the Db
#[derive(Debug)]
pub struct Db {
pub rules: RwLock<DatabaseRules>,
rules: RwLock<Arc<DatabaseRules>>,
pub server_id: ServerId, // this is also the Query Server ID
@ -270,7 +270,7 @@ pub(crate) struct DatabaseToCommit {
pub(crate) exec: Arc<Executor>,
pub(crate) preserved_catalog: PreservedCatalog,
pub(crate) catalog: Catalog,
pub(crate) rules: DatabaseRules,
pub(crate) rules: Arc<DatabaseRules>,
pub(crate) write_buffer: Option<WriteBufferConfig>,
}
@ -327,6 +327,22 @@ impl Db {
Arc::clone(&self.exec)
}
/// Return the current database rules
pub fn rules(&self) -> Arc<DatabaseRules> {
Arc::clone(&*self.rules.read())
}
/// Updates the database rules
pub fn update_db_rules<F, E>(&self, update: F) -> Result<Arc<DatabaseRules>, E>
where
F: FnOnce(DatabaseRules) -> Result<DatabaseRules, E>,
{
let mut rules = self.rules.write();
let new_rules = Arc::new(update(rules.as_ref().clone())?);
*rules = Arc::clone(&new_rules);
Ok(new_rules)
}
/// Rolls over the active chunk in the database's specified
/// partition. Returns the previously open (now closed) Chunk if
/// there was any.
@ -640,14 +656,15 @@ impl Db {
// streaming from the write buffer loop
async {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer {
let wb = Arc::clone(write_buffer);
while !shutdown.is_cancelled() {
tokio::select! {
_ = {
self.stream_in_sequenced_entries(wb.stream())
} => {},
_ = shutdown.cancelled() => break,
}
let mut futures = vec![];
for (_sequencer_id, stream) in write_buffer.streams() {
let fut = self.stream_in_sequenced_entries(stream);
futures.push(fut);
}
tokio::select! {
_ = futures::future::join_all(futures) => {},
_ = shutdown.cancelled() => {},
}
}
},
@ -1093,11 +1110,22 @@ mod tests {
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = TestError> = std::result::Result<T, E>;
async fn immutable_db() -> Arc<Db> {
TestDb::builder()
.lifecycle_rules(LifecycleRules {
immutable: true,
..Default::default()
})
.build()
.await
.db
}
#[tokio::test]
async fn write_no_mutable_buffer() {
// Validate that writes are rejected if there is no mutable buffer
let db = make_db().await.db;
db.rules.write().lifecycle_rules.immutable = true;
let db = immutable_db().await;
let entry = lp_to_entry("cpu bar=1 10");
let res = db.store_entry(entry).await;
assert_contains!(
@ -1114,12 +1142,14 @@ mod tests {
let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state.clone()));
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _))
.lifecycle_rules(LifecycleRules {
immutable: true,
..Default::default()
})
.build()
.await
.db;
test_db.rules.write().lifecycle_rules.immutable = true;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).await.unwrap();
@ -1286,8 +1316,7 @@ mod tests {
#[tokio::test]
async fn cant_write_when_reading_from_write_buffer() {
// Validate that writes are rejected if this database is reading from the write buffer
let db = make_db().await.db;
db.rules.write().lifecycle_rules.immutable = true;
let db = immutable_db().await;
let entry = lp_to_entry("cpu bar=1 10");
let res = db.store_entry(entry).await;
assert_contains!(
@ -1392,13 +1421,7 @@ mod tests {
#[tokio::test]
async fn metrics_during_rollover() {
let test_db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await;
let test_db = make_db().await;
let db = test_db.db;
write_lp(db.as_ref(), "cpu bar=1 10").await;
@ -2454,14 +2477,7 @@ mod tests {
#[tokio::test]
async fn chunk_summaries() {
// Test that chunk id listing is hooked up
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
// get three chunks: one open, one closed in mb and one close in rb
write_lp(&db, "cpu bar=1 1").await;
@ -2502,7 +2518,7 @@ mod tests {
ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
3236,
1528,
1479,
2,
),
ChunkSummary::new_without_timestamps(
@ -2541,14 +2557,7 @@ mod tests {
#[tokio::test]
async fn partition_summaries() {
// Test that chunk id listing is hooked up
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
write_lp(&db, "cpu bar=1 1").await;
let chunk_id = db
@ -2741,14 +2750,7 @@ mod tests {
#[tokio::test]
async fn write_chunk_to_object_store_in_background() {
// Test that data can be written to object store using a background task
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let db = make_db().await.db;
// create MB partition
write_lp(db.as_ref(), "cpu bar=1 10").await;
@ -2785,8 +2787,14 @@ mod tests {
#[tokio::test]
async fn write_hard_limit() {
let db = Arc::new(make_db().await.db);
db.rules.write().lifecycle_rules.buffer_size_hard = Some(NonZeroUsize::new(10).unwrap());
let db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10).unwrap()),
..Default::default()
})
.build()
.await
.db;
// inserting first line does not trigger hard buffer limit
write_lp(db.as_ref(), "cpu bar=1 10").await;

View File

@ -104,6 +104,7 @@ pub fn persist_chunks(
let remainder_rows = remainder.rows();
let persist_fut = {
let partition = LockableCatalogPartition::new(Arc::clone(&db), partition);
let mut partition_write = partition.write();
for id in chunk_ids {
partition_write.force_drop_chunk(id)
@ -117,18 +118,12 @@ pub fn persist_chunks(
assert!(to_persist.rows() > 0);
let to_persist = LockableCatalogChunk {
db: Arc::clone(&db),
db,
chunk: partition_write.create_rub_chunk(to_persist, schema),
};
let to_persist = to_persist.write();
// Drop partition lock guard after locking chunk
std::mem::drop(partition_write);
let partition = LockableCatalogPartition::new(db, partition);
let partition = partition.write();
write_chunk_to_object_store(partition, to_persist, flush_handle)?.1
write_chunk_to_object_store(partition_write, to_persist, flush_handle)?.1
};
// Wait for write operation to complete

View File

@ -530,11 +530,11 @@ impl InitStatus {
let rules = handle
.rules()
.expect("in this state rules should be loaded");
let write_buffer = WriteBufferConfig::new(handle.server_id(), &rules).context(
CreateWriteBuffer {
let write_buffer = WriteBufferConfig::new(handle.server_id(), &rules)
.await
.context(CreateWriteBuffer {
config: rules.write_buffer_connection.clone(),
},
)?;
})?;
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
handle

View File

@ -68,7 +68,7 @@
clippy::future_not_send
)]
use std::convert::TryInto;
use std::convert::{Infallible, TryInto};
use std::sync::Arc;
use async_trait::async_trait;
@ -555,8 +555,9 @@ where
.map_err(|e| Box::new(e) as _)
.context(CannotCreatePreservedCatalog)?;
let write_buffer =
WriteBufferConfig::new(server_id, &rules).map_err(|e| Error::CreatingWriteBuffer {
let write_buffer = WriteBufferConfig::new(server_id, &rules)
.await
.map_err(|e| Error::CreatingWriteBuffer {
config: rules.write_buffer_connection.clone(),
source: e,
})?;
@ -649,7 +650,7 @@ where
// need to split this in two blocks because we cannot hold a lock across an async call.
let routing_config_target = {
let rules = db.rules.read();
let rules = db.rules();
if let Some(RoutingRules::RoutingConfig(routing_config)) = &rules.routing_rules {
let sharded_entries = lines_to_sharded_entries(
lines,
@ -679,7 +680,7 @@ where
// config is updated, hence it's safe to use after we release the shard config
// lock.
let (sharded_entries, shards) = {
let rules = db.rules.read();
let rules = db.rules();
let shard_config = rules.routing_rules.as_ref().map(|cfg| match cfg {
RoutingRules::RoutingConfig(_) => todo!("routing config"),
@ -827,10 +828,8 @@ where
self.config.db_initialized(name)
}
pub fn db_rules(&self, name: &DatabaseName<'_>) -> Option<DatabaseRules> {
self.config
.db_initialized(name)
.map(|d| d.rules.read().clone())
pub fn db_rules(&self, name: &DatabaseName<'_>) -> Option<Arc<DatabaseRules>> {
self.config.db_initialized(name).map(|d| d.rules())
}
// Update database rules and save on success.
@ -838,7 +837,7 @@ where
&self,
db_name: &DatabaseName<'static>,
update: F,
) -> std::result::Result<DatabaseRules, UpdateError<E>>
) -> std::result::Result<Arc<DatabaseRules>, UpdateError<E>>
where
F: FnOnce(DatabaseRules) -> Result<DatabaseRules, E> + Send,
{
@ -849,7 +848,9 @@ where
crate::config::UpdateError::Closure(e) => UpdateError::Closure(e),
crate::config::UpdateError::Update(e) => UpdateError::Update(e),
})?;
self.persist_database_rules(rules.clone()).await?;
// TODO: Move into DB (#2053)
self.persist_database_rules(rules.as_ref().clone()).await?;
Ok(rules)
}
@ -872,8 +873,11 @@ where
for duration in nanos {
tokio::spawn(
tokio::time::sleep(tokio::time::Duration::from_nanos(duration))
.track(registration.clone()),
async move {
tokio::time::sleep(tokio::time::Duration::from_nanos(duration)).await;
Ok::<_, Infallible>(())
}
.track(registration.clone()),
);
}
@ -1473,6 +1477,7 @@ mod tests {
let db_name = DatabaseName::new("foo").unwrap();
let db = server.db(&db_name).unwrap();
let rules = db.rules();
let line = "cpu bar=1 10";
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
@ -1480,7 +1485,7 @@ mod tests {
&lines,
ARBITRARY_DEFAULT_TIME,
NO_SHARD_CONFIG,
&*db.rules.read(),
rules.as_ref(),
)
.expect("sharded entries");
@ -1554,8 +1559,7 @@ mod tests {
let remote_ids = vec![bad_remote_id, good_remote_id_1, good_remote_id_2];
let db = server.db(&db_name).unwrap();
{
let mut rules = db.rules.write();
db.update_db_rules(|mut rules| {
let shard_config = ShardConfig {
hash_ring: Some(HashRing {
shards: vec![TEST_SHARD_ID].into(),
@ -1569,7 +1573,9 @@ mod tests {
..Default::default()
};
rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config));
}
Ok::<_, Infallible>(rules)
})
.unwrap();
let line = "cpu bar=1 10";
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
@ -1789,19 +1795,20 @@ mod tests {
let db_name = DatabaseName::new("foo").unwrap();
let db = server.db(&db_name).unwrap();
db.rules.write().lifecycle_rules.buffer_size_hard =
Some(std::num::NonZeroUsize::new(10).unwrap());
let rules = db
.update_db_rules(|mut rules| {
rules.lifecycle_rules.buffer_size_hard =
Some(std::num::NonZeroUsize::new(10).unwrap());
Ok::<_, Infallible>(rules)
})
.unwrap();
// inserting first line does not trigger hard buffer limit
let line_1 = "cpu bar=1 10";
let lines_1: Vec<_> = parse_lines(line_1).map(|l| l.unwrap()).collect();
let sharded_entries_1 = lines_to_sharded_entries(
&lines_1,
ARBITRARY_DEFAULT_TIME,
NO_SHARD_CONFIG,
&*db.rules.read(),
)
.expect("first sharded entries");
let sharded_entries_1 =
lines_to_sharded_entries(&lines_1, ARBITRARY_DEFAULT_TIME, NO_SHARD_CONFIG, &*rules)
.expect("first sharded entries");
let entry_1 = &sharded_entries_1[0].entry;
server
@ -1816,7 +1823,7 @@ mod tests {
&lines_2,
ARBITRARY_DEFAULT_TIME,
NO_SHARD_CONFIG,
&*db.rules.read(),
rules.as_ref(),
)
.expect("second sharded entries");
let entry_2 = &sharded_entries_2[0].entry;

View File

@ -1,4 +1,4 @@
use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration};
use std::{borrow::Cow, convert::TryFrom, num::NonZeroU32, sync::Arc, time::Duration};
use data_types::{
chunk_metadata::{ChunkStorage, ChunkSummary},
@ -77,7 +77,11 @@ impl TestDbBuilder {
.worker_cleanup_avg_sleep
.unwrap_or_else(|| Duration::from_secs(1));
rules.lifecycle_rules = self.lifecycle_rules.unwrap_or_default();
// default to quick lifecycle rules for faster tests
rules.lifecycle_rules = self.lifecycle_rules.unwrap_or_else(|| LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
});
// set partion template
if let Some(partition_template) = self.partition_template {
@ -90,7 +94,7 @@ impl TestDbBuilder {
}
let database_to_commit = DatabaseToCommit {
rules,
rules: Arc::new(rules),
server_id,
object_store,
preserved_catalog,

View File

@ -185,6 +185,7 @@ pub async fn command(url: String, config: Config) -> Result<()> {
persist: command.persist,
immutable: command.immutable,
worker_backoff_millis: Default::default(),
max_active_compactions: Default::default(),
catalog_transactions_until_checkpoint: command
.catalog_transactions_until_checkpoint
.get(),

View File

@ -98,7 +98,7 @@ where
match self.server.db_rules(&name) {
Some(rules) => Ok(Response::new(GetDatabaseResponse {
rules: Some(rules.into()),
rules: Some(rules.as_ref().clone().into()),
})),
None => {
return Err(NotFound {
@ -150,7 +150,7 @@ where
.map_err(UpdateError::from)?;
Ok(Response::new(UpdateDatabaseResponse {
rules: Some(updated_rules.into()),
rules: Some(updated_rules.as_ref().clone().into()),
}))
}

View File

@ -18,7 +18,7 @@ use generated_types::{
influxdata::iox::management::v1 as management,
protobuf_type_url,
};
use tracker::{TaskId, TaskStatus, TaskTracker};
use tracker::{TaskId, TaskResult, TaskStatus, TaskTracker};
use server::{ConnectionManager, Server};
use std::convert::TryInto;
@ -30,48 +30,44 @@ struct OperationsService<M: ConnectionManager> {
pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Status> {
let id = tracker.id();
let is_cancelled = tracker.is_cancelled();
let status = tracker.get_status();
let result = status.result();
let (operation_metadata, is_complete) = match status {
TaskStatus::Creating => {
let metadata = management::OperationMetadata {
job: Some(tracker.metadata().clone().into()),
..Default::default()
};
(metadata, false)
}
let operation_metadata = match status {
TaskStatus::Creating => management::OperationMetadata {
job: Some(tracker.metadata().clone().into()),
..Default::default()
},
TaskStatus::Running {
total_count,
pending_count,
cpu_nanos,
} => {
let metadata = management::OperationMetadata {
cpu_nanos: cpu_nanos as _,
task_count: total_count as _,
pending_count: pending_count as _,
job: Some(tracker.metadata().clone().into()),
..Default::default()
};
(metadata, false)
}
} => management::OperationMetadata {
cpu_nanos: cpu_nanos as _,
total_count: total_count as _,
pending_count: pending_count as _,
job: Some(tracker.metadata().clone().into()),
..Default::default()
},
TaskStatus::Complete {
total_count,
success_count,
error_count,
cancelled_count,
dropped_count,
cpu_nanos,
wall_nanos,
} => {
let metadata = management::OperationMetadata {
cpu_nanos: cpu_nanos as _,
task_count: total_count as _,
wall_nanos: wall_nanos as _,
job: Some(tracker.metadata().clone().into()),
..Default::default()
};
(metadata, true)
}
} => management::OperationMetadata {
cpu_nanos: cpu_nanos as _,
total_count: total_count as _,
success_count: success_count as _,
error_count: error_count as _,
cancelled_count: cancelled_count as _,
dropped_count: dropped_count as _,
wall_nanos: wall_nanos as _,
job: Some(tracker.metadata().clone().into()),
..Default::default()
},
};
let mut buffer = BytesMut::new();
@ -85,25 +81,33 @@ pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Sta
value: buffer.freeze(),
};
let result = match (is_complete, is_cancelled) {
(true, true) => Some(operation::Result::Error(Status {
let result = match result {
Some(TaskResult::Success) => Some(operation::Result::Response(Any {
type_url: "type.googleapis.com/google.protobuf.Empty".to_string(),
value: Default::default(),
})),
Some(TaskResult::Cancelled) => Some(operation::Result::Error(Status {
code: tonic::Code::Cancelled as _,
message: "Job cancelled".to_string(),
details: vec![],
})),
(true, false) => Some(operation::Result::Response(Any {
type_url: "type.googleapis.com/google.protobuf.Empty".to_string(),
value: Default::default(), // TODO: Verify this is correct
Some(TaskResult::Dropped) => Some(operation::Result::Error(Status {
code: tonic::Code::Internal as _,
message: "Job did not run to completion, possible panic".to_string(),
details: vec![],
})),
_ => None,
Some(TaskResult::Error) => Some(operation::Result::Error(Status {
code: tonic::Code::Internal as _,
message: "Job returned an error".to_string(),
details: vec![],
})),
None => None,
};
Ok(Operation {
name: id.to_string(),
metadata: Some(metadata),
done: is_complete,
done: result.is_some(),
result,
})
}

View File

@ -140,7 +140,7 @@ fn main() -> Result<(), std::io::Error> {
// load all environment variables from .env before doing anything
load_dotenv();
let config = Config::from_args();
let config: Config = StructOpt::from_args();
let tokio_runtime = get_runtime(config.num_threads)?;
tokio_runtime.block_on(async move {

View File

@ -218,6 +218,7 @@ async fn test_create_get_update_database() {
catalog_transactions_until_checkpoint: 13,
late_arrive_window_seconds: 423,
worker_backoff_millis: 15,
max_active_compactions: 8,
persist_row_threshold: 342,
persist_age_threshold_seconds: 700,
mub_row_threshold: 1343,

View File

@ -40,7 +40,7 @@ async fn test_operations() {
let job = meta.job.expect("expected a job");
assert_eq!(meta.task_count, 2);
assert_eq!(meta.total_count, 2);
assert_eq!(meta.pending_count, 1);
assert_eq!(job, operation_metadata::Job::Dummy(Dummy { nanos }));
@ -76,7 +76,8 @@ async fn test_operations() {
assert!(meta.wall_nanos > 0);
assert!(meta.cpu_nanos > 0);
assert_eq!(meta.pending_count, 0);
assert_eq!(meta.task_count, 2);
assert_eq!(meta.total_count, 2);
assert_eq!(meta.cancelled_count, 1);
match waited.result {
Some(operations::generated_types::operation::Result::Error(status)) => {

View File

@ -24,7 +24,7 @@ async fn test_start_stop() {
)
.expect("expected JSON output");
assert_eq!(stdout.task_count, 1);
assert_eq!(stdout.total_count, 1);
match stdout.job {
Some(Job::Dummy { nanos }) => assert_eq!(nanos, vec![duration]),
_ => panic!("expected dummy job got {:?}", stdout.job),
@ -82,7 +82,8 @@ async fn test_start_stop() {
.expect("expected JSON output");
assert_eq!(completed.pending_count, 0);
assert_eq!(completed.task_count, 1);
assert_eq!(completed.total_count, 1);
assert_eq!(completed.cancelled_count, 1);
assert_eq!(completed.status, OperationStatus::Cancelled);
assert_eq!(&completed.job, &operations[0].job)
}

View File

@ -99,23 +99,23 @@ async fn reads_come_from_kafka() {
let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection::Reading(kafka_connection.to_string());
DatabaseBuilder::new(db_name.clone())
.write_buffer(write_buffer_connection)
.build(server.grpc_channel())
.await;
// Common Kafka config
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", kafka_connection);
cfg.set("message.timeout.ms", "5000");
// Create a partition with 2 topics in Kafka
// Create a partition with 2 topics in Kafka BEFORE creating the DB
let num_partitions = 2;
let admin: AdminClient<DefaultClientContext> = cfg.clone().create().unwrap();
let topic = NewTopic::new(&db_name, num_partitions, TopicReplication::Fixed(1));
let opts = AdminOptions::default();
admin.create_topics(&[topic], &opts).await.unwrap();
DatabaseBuilder::new(db_name.clone())
.write_buffer(write_buffer_connection)
.build(server.grpc_channel())
.await;
// put some points in Kafka
let producer: FutureProducer = cfg.create().unwrap();
@ -150,10 +150,18 @@ async fn reads_come_from_kafka() {
if let Ok(mut results) = query_results {
let mut batches = Vec::new();
let mut num_rows = 0;
while let Some(data) = results.next().await.unwrap() {
num_rows += data.num_rows();
batches.push(data);
}
// Since data is streamed using two partitions, only a subset of the data might be present. If that's
// the case, ignore that record batch and try again.
if num_rows < 4 {
continue;
}
let expected = vec![
"+--------+-------------------------------+------+",
"| region | time | user |",

View File

@ -111,11 +111,28 @@ struct TrackerState {
pending_futures: AtomicUsize,
pending_registrations: AtomicUsize,
ok_futures: AtomicUsize,
err_futures: AtomicUsize,
cancelled_futures: AtomicUsize,
notify: Notify,
}
/// Returns a summary of the task execution
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TaskResult {
/// All futures completed successfully
Success,
/// Some futures were cancelled, and none were dropped or errored
Cancelled,
/// Some futures were dropped, and none errored
Dropped,
/// Some futures returned an error
Error,
}
/// The status of the tracked task
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum TaskStatus {
/// More futures can be registered
Creating,
@ -141,6 +158,14 @@ pub enum TaskStatus {
Complete {
/// The number of created futures
total_count: usize,
/// The number of futures that completed successfully
success_count: usize,
/// The number of futures that returned an error
error_count: usize,
/// The number of futures that were aborted
cancelled_count: usize,
/// The number of futures that were dropped without running to completion (e.g. panic)
dropped_count: usize,
/// The total amount of CPU time spent executing the futures
cpu_nanos: usize,
/// The number of nanoseconds between tracker registration and
@ -169,7 +194,7 @@ impl TaskStatus {
}
}
/// If the job has competed, returns the total amount of wall clock time
/// If the job has completed, returns the total amount of wall clock time
/// spent executing futures
pub fn wall_nanos(&self) -> Option<usize> {
match self {
@ -178,6 +203,34 @@ impl TaskStatus {
Self::Complete { wall_nanos, .. } => Some(*wall_nanos),
}
}
/// Returns the result of the job if it has completed, otherwise None
pub fn result(&self) -> Option<TaskResult> {
match self {
TaskStatus::Creating => None,
TaskStatus::Running { .. } => None,
TaskStatus::Complete {
total_count,
success_count,
error_count,
dropped_count,
cancelled_count,
..
} => {
if *error_count != 0 {
Some(TaskResult::Error)
} else if *dropped_count != 0 {
Some(TaskResult::Dropped)
} else if *cancelled_count != 0 {
Some(TaskResult::Cancelled)
} else {
// Sanity check
assert_eq!(total_count, success_count);
Some(TaskResult::Success)
}
}
}
}
}
/// A Tracker can be used to monitor/cancel/wait for a set of associated futures
@ -279,11 +332,29 @@ where
pending_count: self.state.pending_futures.load(Ordering::Relaxed),
cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed),
},
(true, true) => TaskStatus::Complete {
total_count: self.state.created_futures.load(Ordering::Relaxed),
cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed),
wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed),
},
(true, true) => {
let total_count = self.state.created_futures.load(Ordering::Relaxed);
let success_count = self.state.ok_futures.load(Ordering::Relaxed);
let error_count = self.state.err_futures.load(Ordering::Relaxed);
let cancelled_count = self.state.cancelled_futures.load(Ordering::Relaxed);
// Failure of this would imply a future reported its completion status multiple
// times or a future was created without incrementing created_futures.
// Both of these should be impossible
let dropped_count = total_count
.checked_sub(success_count + error_count + cancelled_count)
.expect("invalid tracker state");
TaskStatus::Complete {
total_count,
success_count,
error_count,
cancelled_count,
dropped_count,
cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed),
wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed),
}
}
}
}
@ -352,6 +423,9 @@ impl Default for TaskRegistration {
created_futures: AtomicUsize::new(0),
pending_futures: AtomicUsize::new(0),
pending_registrations: AtomicUsize::new(1),
ok_futures: AtomicUsize::new(0),
err_futures: AtomicUsize::new(0),
cancelled_futures: AtomicUsize::new(0),
notify: Notify::new(),
});
@ -401,8 +475,18 @@ mod tests {
use std::time::Duration;
use super::*;
use futures::FutureExt;
use std::convert::Infallible;
use tokio::sync::oneshot;
fn pending() -> futures::future::Pending<Result<(), Infallible>> {
futures::future::pending()
}
fn ready_ok() -> futures::future::Ready<Result<(), Infallible>> {
futures::future::ready(Ok(()))
}
#[tokio::test]
async fn test_lifecycle() {
let (sender, receive) = oneshot::channel();
@ -451,7 +535,7 @@ mod tests {
let (_, registration) = registry.register(());
{
let f = futures::future::pending::<()>().track(registration);
let f = pending().track(registration);
assert_eq!(registry.running().len(), 1);
@ -467,9 +551,9 @@ mod tests {
let (_, registration) = registry.register(());
{
let f = futures::future::pending::<()>().track(registration.clone());
let f = pending().track(registration.clone());
{
let f = futures::future::pending::<()>().track(registration);
let f = pending().track(registration);
assert_eq!(registry.running().len(), 1);
std::mem::drop(f);
}
@ -485,7 +569,7 @@ mod tests {
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
let task = tokio::spawn(futures::future::pending::<()>().track(registration));
let task = tokio::spawn(pending().track(registration));
let tracked = registry.running();
assert_eq!(tracked.len(), 1);
@ -503,7 +587,7 @@ mod tests {
let (tracker, registration) = registry.register(());
tracker.cancel();
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration));
let task1 = tokio::spawn(pending().track(registration));
let result1 = task1.await.unwrap();
assert!(result1.is_err());
@ -515,8 +599,8 @@ mod tests {
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration.clone()));
let task2 = tokio::spawn(futures::future::pending::<()>().track(registration));
let task1 = tokio::spawn(pending().track(registration.clone()));
let task2 = tokio::spawn(pending().track(registration));
let tracked = registry.running();
assert_eq!(tracked.len(), 1);
@ -539,11 +623,11 @@ mod tests {
let (_, registration2) = registry.register(2);
let (_, registration3) = registry.register(3);
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration1.clone()));
let task2 = tokio::spawn(futures::future::pending::<()>().track(registration1));
let task3 = tokio::spawn(futures::future::ready(()).track(registration2.clone()));
let task4 = tokio::spawn(futures::future::pending::<()>().track(registration2));
let task5 = tokio::spawn(futures::future::pending::<()>().track(registration3));
let task1 = tokio::spawn(pending().track(registration1.clone()));
let task2 = tokio::spawn(pending().track(registration1));
let task3 = tokio::spawn(ready_ok().track(registration2.clone()));
let task4 = tokio::spawn(pending().track(registration2));
let task5 = tokio::spawn(pending().track(registration3));
let running = sorted(registry.running());
let tracked = sorted(registry.tracked());
@ -637,25 +721,25 @@ mod tests {
let (tracker2, registration2) = registry.register(2);
let (tracker3, registration3) = registry.register(3);
let task1 =
tokio::spawn(tokio::time::sleep(Duration::from_millis(100)).track(registration1));
let task2 = tokio::spawn(
async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration2),
);
let async_task = || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok::<_, Infallible>(())
};
let task3 = tokio::spawn(
async move { std::thread::sleep(Duration::from_millis(100)) }
.track(registration3.clone()),
);
let blocking_task = || async move {
std::thread::sleep(Duration::from_millis(100));
Ok::<_, Infallible>(())
};
let task4 = tokio::spawn(
async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration3),
);
let task1 = tokio::spawn(async_task().track(registration1));
let task2 = tokio::spawn(blocking_task().track(registration2));
let task3 = tokio::spawn(blocking_task().track(registration3.clone()));
let task4 = tokio::spawn(blocking_task().track(registration3));
task1.await.unwrap().unwrap();
task2.await.unwrap().unwrap();
task3.await.unwrap().unwrap();
task4.await.unwrap().unwrap();
task1.await.unwrap().unwrap().unwrap();
task2.await.unwrap().unwrap().unwrap();
task3.await.unwrap().unwrap().unwrap();
task4.await.unwrap().unwrap().unwrap();
let assert_fuzzy = |actual: usize, expected: std::time::Duration| {
// Number of milliseconds of toleration
@ -710,8 +794,8 @@ mod tests {
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
let task1 = tokio::spawn(futures::future::ready(()).track(registration.clone()));
task1.await.unwrap().unwrap();
let task1 = tokio::spawn(ready_ok().track(registration.clone()));
task1.await.unwrap().unwrap().unwrap();
let tracked = registry.tracked();
assert_eq!(tracked.len(), 1);
@ -721,13 +805,138 @@ mod tests {
let reclaimed: Vec<_> = registry.reclaim().collect();
assert_eq!(reclaimed.len(), 0);
let task2 = tokio::spawn(futures::future::ready(()).track(registration));
task2.await.unwrap().unwrap();
let task2 = tokio::spawn(ready_ok().track(registration));
task2.await.unwrap().unwrap().unwrap();
let reclaimed: Vec<_> = registry.reclaim().collect();
assert_eq!(reclaimed.len(), 1);
}
#[tokio::test]
async fn test_failure() {
let mut registry = TaskRegistry::new();
let zero_clocks = |mut status: TaskStatus| {
match &mut status {
TaskStatus::Creating => {}
TaskStatus::Running { cpu_nanos, .. } => {
*cpu_nanos = 0;
}
TaskStatus::Complete {
wall_nanos,
cpu_nanos,
..
} => {
*wall_nanos = 0;
*cpu_nanos = 0;
}
}
status
};
let (task, registration) = registry.register(());
let (sender, receiver) = oneshot::channel();
let handle = tokio::spawn(receiver.track(registration));
sender.send(()).unwrap();
handle.await.unwrap().unwrap().unwrap();
assert_eq!(task.get_status().result(), Some(TaskResult::Success));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 1,
error_count: 0,
cancelled_count: 0,
dropped_count: 0,
cpu_nanos: 0,
wall_nanos: 0
}
);
let (task, registration) = registry.register(());
let (sender, receiver) = oneshot::channel::<()>();
let handle = tokio::spawn(receiver.track(registration));
std::mem::drop(sender);
handle.await.unwrap().unwrap().expect_err("expected error");
assert_eq!(task.get_status().result(), Some(TaskResult::Error));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 0,
error_count: 1,
cancelled_count: 0,
dropped_count: 0,
cpu_nanos: 0,
wall_nanos: 0
}
);
let (task, registration) = registry.register(());
let handle = tokio::spawn(pending().track(registration));
task.cancel();
handle.await.unwrap().expect_err("expected aborted");
assert_eq!(task.get_status().result(), Some(TaskResult::Cancelled));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 0,
error_count: 0,
cancelled_count: 1,
dropped_count: 0,
cpu_nanos: 0,
wall_nanos: 0
}
);
let (task, registration) = registry.register(());
std::mem::drop(pending().track(registration));
assert_eq!(task.get_status().result(), Some(TaskResult::Dropped));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 0,
error_count: 0,
cancelled_count: 0,
dropped_count: 1,
cpu_nanos: 0,
wall_nanos: 0
}
);
let (task, registration) = registry.register(());
let handle = tokio::spawn(
async move {
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
panic!("test");
}
.inspect(|_output: &Result<(), Infallible>| {})
.track(registration),
);
handle.await.unwrap_err();
assert_eq!(task.get_status().result(), Some(TaskResult::Dropped));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 0,
error_count: 0,
cancelled_count: 0,
dropped_count: 1,
cpu_nanos: 0,
wall_nanos: 0
}
);
}
#[tokio::test]
async fn test_join() {
use std::future::Future;
@ -737,20 +946,10 @@ mod tests {
let (tracker, registration) = registry.register(());
let (s1, r1) = oneshot::channel();
let task1 = tokio::spawn(
async move {
r1.await.unwrap();
}
.track(registration.clone()),
);
let task1 = tokio::spawn(r1.track(registration.clone()));
let (s2, r2) = oneshot::channel();
let task2 = tokio::spawn(
async move {
r2.await.unwrap();
}
.track(registration.clone()),
);
let task2 = tokio::spawn(r2.track(registration.clone()));
// This executor goop is necessary to get a future into
// a state where it is waiting on the Notify resource
@ -767,7 +966,7 @@ mod tests {
assert!(matches!(tracker.get_status(), TaskStatus::Creating));
s1.send(()).unwrap();
task1.await.unwrap().unwrap();
task1.await.unwrap().unwrap().unwrap();
assert!(matches!(tracker.get_status(), TaskStatus::Creating));
@ -775,7 +974,7 @@ mod tests {
assert_eq!(poll, Poll::Pending);
s2.send(()).unwrap();
task2.await.unwrap().unwrap();
task2.await.unwrap().unwrap().unwrap();
assert!(matches!(tracker.get_status(), TaskStatus::Creating));

View File

@ -11,7 +11,7 @@ use std::sync::Arc;
/// An extension trait that provides `self.track(registration)` allowing
/// associating this future with a `TrackerRegistration`
pub trait TrackedFutureExt: Future {
pub trait TrackedFutureExt: TryFuture {
fn track(self, registration: TaskRegistration) -> TrackedFuture<Self>
where
Self: Sized,
@ -28,17 +28,18 @@ pub trait TrackedFutureExt: Future {
// The future returned by CancellationToken::cancelled borrows the token
// In order to ensure we get a future with a static lifetime
// we box them up together and let async work its magic
let abort = Box::pin(async move { token.cancelled().await });
let cancel = Box::pin(async move { token.cancelled().await });
TrackedFuture {
inner: self,
abort,
cancel,
tracker,
complete: false,
}
}
}
impl<T: ?Sized> TrackedFutureExt for T where T: Future {}
impl<T: ?Sized> TrackedFutureExt for T where T: TryFuture {}
/// The `Future` returned by `TrackedFutureExt::track()`
/// Unregisters the future from the registered `TrackerRegistry` on drop
@ -46,36 +47,53 @@ impl<T: ?Sized> TrackedFutureExt for T where T: Future {}
/// `TrackerRegistry::terminate`
#[pin_project(PinnedDrop)]
#[allow(missing_debug_implementations)]
pub struct TrackedFuture<F: Future> {
pub struct TrackedFuture<F: TryFuture> {
#[pin]
inner: F,
#[pin]
abort: BoxFuture<'static, ()>,
cancel: BoxFuture<'static, ()>,
tracker: Arc<TrackerState>,
complete: bool,
}
impl<F: Future> Future for TrackedFuture<F> {
type Output = Result<F::Output, future::Aborted>;
impl<F: TryFuture> Future for TrackedFuture<F> {
type Output = Result<Result<F::Ok, F::Error>, future::Aborted>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.as_mut().project().abort.poll(cx).is_ready() {
assert!(!self.complete, "It is illegal to poll a completed future");
if self.as_mut().project().cancel.poll(cx).is_ready() {
*self.as_mut().project().complete = true;
self.tracker
.cancelled_futures
.fetch_add(1, Ordering::Relaxed);
return Poll::Ready(Err(future::Aborted {}));
}
let start = Instant::now();
let poll = self.as_mut().project().inner.poll(cx);
let poll = self.as_mut().project().inner.try_poll(cx);
let delta = start.elapsed().as_nanos() as usize;
self.tracker.cpu_nanos.fetch_add(delta, Ordering::Relaxed);
poll.map(Ok)
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(v) => {
match v.is_ok() {
true => self.tracker.ok_futures.fetch_add(1, Ordering::Relaxed),
false => self.tracker.err_futures.fetch_add(1, Ordering::Relaxed),
};
*self.as_mut().project().complete = true;
Poll::Ready(Ok(v))
}
}
}
}
#[pinned_drop]
impl<F: Future> PinnedDrop for TrackedFuture<F> {
impl<F: TryFuture> PinnedDrop for TrackedFuture<F> {
fn drop(self: Pin<&mut Self>) {
let state = &self.project().tracker;
let state: &TrackerState = &self.project().tracker;
let wall_nanos = state.start_instant.elapsed().as_nanos() as usize;

View File

@ -8,11 +8,11 @@ async-trait = "0.1"
data_types = { path = "../data_types" }
entry = { path = "../entry" }
futures = "0.3"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.1"
rdkafka = "0.26.0"
observability_deps = { path = "../observability_deps" }
tokio = { version = "1.0", features = ["macros", "fs"] }
[dev-dependencies]
dotenv = "0.15.0"
tokio = { version = "1.0", features = ["macros", "fs"] }
uuid = { version = "0.8", features = ["serde", "v4"] }

View File

@ -17,7 +17,7 @@ pub enum WriteBufferConfig {
}
impl WriteBufferConfig {
pub fn new(
pub async fn new(
server_id: ServerId,
rules: &DatabaseRules,
) -> Result<Option<Self>, WriteBufferError> {
@ -34,7 +34,7 @@ impl WriteBufferConfig {
Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _)))
}
Some(WriteBufferConnection::Reading(conn)) => {
let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name)?;
let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name).await?;
Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _)))
}

View File

@ -7,7 +7,7 @@ use futures::stream::BoxStream;
/// The dynamic boxing makes it easier to deal with error from different implementations.
pub type WriteBufferError = Box<dyn std::error::Error + Sync + Send>;
/// Writing to a Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading
/// Writing to a Write Buffer takes an [`Entry`] and returns [`Sequence`] data that facilitates reading
/// entries from the Write Buffer at a later time.
#[async_trait]
pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
@ -21,12 +21,13 @@ pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
) -> Result<Sequence, WriteBufferError>;
}
/// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using
/// `Db::stream_in_sequenced_entries`.
/// Output stream of [`WriteBufferReading`].
pub type EntryStream<'a> = BoxStream<'a, Result<SequencedEntry, WriteBufferError>>;
/// Produce streams (one per sequencer) of [`SequencedEntry`]s.
pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static {
fn stream<'life0, 'async_trait>(
&'life0 self,
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
/// Returns a stream per sequencer.
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
where
'life0: 'async_trait,
Self: 'async_trait;
@ -46,13 +47,14 @@ pub mod test_utils {
async fn new_context(&self, n_sequencers: u32) -> Self::Context;
}
#[async_trait]
pub trait TestContext: Send + Sync {
type Writing: WriteBufferWriting;
type Reading: WriteBufferReading;
fn writing(&self) -> Self::Writing;
fn reading(&self) -> Self::Reading;
async fn reading(&self) -> Self::Reading;
}
pub async fn perform_generic_tests<T>(adapter: T)
@ -61,6 +63,7 @@ pub mod test_utils {
{
test_single_stream_io(&adapter).await;
test_multi_stream_io(&adapter).await;
test_multi_sequencer_io(&adapter).await;
test_multi_writer_multi_reader(&adapter).await;
}
@ -75,9 +78,12 @@ pub mod test_utils {
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let reader = context.reading();
let reader = context.reading().await;
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (sequencer_id, mut stream) = streams.pop().unwrap();
let mut stream = reader.stream();
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
@ -85,15 +91,15 @@ pub mod test_utils {
assert!(stream.poll_next_unpin(&mut cx).is_pending());
// adding content allows us to get results
writer.store_entry(&entry_1, 0).await.unwrap();
writer.store_entry(&entry_1, sequencer_id).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_1);
// stream is pending again
assert!(stream.poll_next_unpin(&mut cx).is_pending());
// adding more data unblocks the stream
writer.store_entry(&entry_2, 0).await.unwrap();
writer.store_entry(&entry_3, 0).await.unwrap();
writer.store_entry(&entry_2, sequencer_id).await.unwrap();
writer.store_entry(&entry_3, sequencer_id).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_2);
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_3);
@ -101,6 +107,50 @@ pub mod test_utils {
assert!(stream.poll_next_unpin(&mut cx).is_pending());
}
async fn test_multi_sequencer_io<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(2).await;
let entry_1 = lp_to_entry("upc user=1 100");
let entry_2 = lp_to_entry("upc user=2 200");
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let reader = context.reading().await;
let mut streams = reader.streams();
assert_eq!(streams.len(), 2);
let (sequencer_id_1, mut stream_1) = streams.pop().unwrap();
let (sequencer_id_2, mut stream_2) = streams.pop().unwrap();
assert_ne!(sequencer_id_1, sequencer_id_2);
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
// empty streams are pending
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
// entries arrive at the right target stream
writer.store_entry(&entry_1, sequencer_id_1).await.unwrap();
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_1);
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
writer.store_entry(&entry_2, sequencer_id_2).await.unwrap();
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert_eq!(stream_2.next().await.unwrap().unwrap().entry(), &entry_2);
writer.store_entry(&entry_3, sequencer_id_1).await.unwrap();
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_3);
// streams are pending again
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
}
async fn test_multi_stream_io<T>(adapter: &T)
where
T: TestAdapter,
@ -112,10 +162,16 @@ pub mod test_utils {
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let reader = context.reading();
let reader = context.reading().await;
let mut streams_1 = reader.streams();
let mut streams_2 = reader.streams();
assert_eq!(streams_1.len(), 1);
assert_eq!(streams_2.len(), 1);
let (sequencer_id_1, mut stream_1) = streams_1.pop().unwrap();
let (sequencer_id_2, mut stream_2) = streams_2.pop().unwrap();
assert_eq!(sequencer_id_1, sequencer_id_2);
let mut stream_1 = reader.stream();
let mut stream_2 = reader.stream();
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
@ -124,9 +180,9 @@ pub mod test_utils {
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
// streams poll from same source
writer.store_entry(&entry_1, 0).await.unwrap();
writer.store_entry(&entry_2, 0).await.unwrap();
writer.store_entry(&entry_3, 0).await.unwrap();
writer.store_entry(&entry_1, sequencer_id_1).await.unwrap();
writer.store_entry(&entry_2, sequencer_id_1).await.unwrap();
writer.store_entry(&entry_3, sequencer_id_1).await.unwrap();
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_1);
assert_eq!(stream_2.next().await.unwrap().unwrap().entry(), &entry_2);
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_3);
@ -148,34 +204,51 @@ pub mod test_utils {
let writer_1 = context.writing();
let writer_2 = context.writing();
let reader_1 = context.reading();
let reader_2 = context.reading();
let reader_1 = context.reading().await;
let reader_2 = context.reading().await;
// TODO: do not hard-code sequencer IDs here but provide a proper interface
writer_1.store_entry(&entry_east_1, 0).await.unwrap();
writer_1.store_entry(&entry_west_1, 1).await.unwrap();
writer_2.store_entry(&entry_east_2, 0).await.unwrap();
assert_reader_content(reader_1, &[&entry_east_1, &entry_east_2, &entry_west_1]).await;
assert_reader_content(reader_2, &[&entry_east_1, &entry_east_2, &entry_west_1]).await;
assert_reader_content(
reader_1,
&[(0, &[&entry_east_1, &entry_east_2]), (1, &[&entry_west_1])],
)
.await;
assert_reader_content(
reader_2,
&[(0, &[&entry_east_1, &entry_east_2]), (1, &[&entry_west_1])],
)
.await;
}
async fn assert_reader_content<R>(reader: R, expected: &[&Entry])
async fn assert_reader_content<R>(reader: R, expected: &[(u32, &[&Entry])])
where
R: WriteBufferReading,
{
// we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever
let mut results: Vec<_> = reader
.stream()
.take(expected.len())
.try_collect()
.await
.unwrap();
results.sort_by_key(|entry| {
let sequence = entry.sequence().unwrap();
(sequence.id, sequence.number)
});
let actual: Vec<_> = results.iter().map(|entry| entry.entry()).collect();
assert_eq!(&actual[..], expected);
let mut streams = reader.streams();
assert_eq!(streams.len(), expected.len());
streams.sort_by_key(|(sequencer_id, _stream)| *sequencer_id);
for ((actual_sequencer_id, actual_stream), (expected_sequencer_id, expected_entries)) in
streams.into_iter().zip(expected.iter())
{
assert_eq!(actual_sequencer_id, *expected_sequencer_id);
// we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever
let mut results: Vec<_> = actual_stream
.take(expected_entries.len())
.try_collect()
.await
.unwrap();
results.sort_by_key(|entry| {
let sequence = entry.sequence().unwrap();
(sequence.id, sequence.number)
});
let actual_entries: Vec<_> = results.iter().map(|entry| entry.entry()).collect();
assert_eq!(&&actual_entries[..], expected_entries);
}
}
}

View File

@ -1,19 +1,22 @@
use std::convert::{TryFrom, TryInto};
use std::{
convert::{TryFrom, TryInto},
time::Duration,
};
use async_trait::async_trait;
use data_types::server_id::ServerId;
use entry::{Entry, Sequence, SequencedEntry};
use futures::{stream::BoxStream, StreamExt};
use observability_deps::tracing::debug;
use futures::StreamExt;
use observability_deps::tracing::{debug, info};
use rdkafka::{
consumer::{Consumer, StreamConsumer},
consumer::{BaseConsumer, Consumer, StreamConsumer},
error::KafkaError,
producer::{FutureProducer, FutureRecord},
util::Timeout,
ClientConfig, Message,
ClientConfig, Message, TopicPartitionList,
};
use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
use crate::core::{EntryStream, WriteBufferError, WriteBufferReading, WriteBufferWriting};
pub struct KafkaBufferProducer {
conn: String,
@ -91,7 +94,7 @@ impl KafkaBufferProducer {
pub struct KafkaBufferConsumer {
conn: String,
database_name: String,
consumer: StreamConsumer,
consumers: Vec<(u32, StreamConsumer)>,
}
// Needed because rdkafka's StreamConsumer doesn't impl Debug
@ -105,34 +108,38 @@ impl std::fmt::Debug for KafkaBufferConsumer {
}
impl WriteBufferReading for KafkaBufferConsumer {
fn stream<'life0, 'async_trait>(
&'life0 self,
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
where
'life0: 'async_trait,
Self: 'async_trait,
{
self.consumer
.stream()
.map(|message| {
let message = message?;
let entry = Entry::try_from(message.payload().unwrap().to_vec())?;
let sequence = Sequence {
id: message.partition().try_into()?,
number: message.offset().try_into()?,
};
self.consumers
.iter()
.map(|(sequencer_id, consumer)| {
let stream = consumer
.stream()
.map(|message| {
let message = message?;
let entry = Entry::try_from(message.payload().unwrap().to_vec())?;
let sequence = Sequence {
id: message.partition().try_into()?,
number: message.offset().try_into()?,
};
Ok(SequencedEntry::new_from_sequence(sequence, entry)?)
Ok(SequencedEntry::new_from_sequence(sequence, entry)?)
})
.boxed();
(*sequencer_id, stream)
})
.boxed()
.collect()
}
}
impl KafkaBufferConsumer {
pub fn new(
conn: impl Into<String>,
pub async fn new(
conn: impl Into<String> + Send + Sync,
server_id: ServerId,
database_name: impl Into<String>,
database_name: impl Into<String> + Send + Sync,
) -> Result<Self, KafkaError> {
let conn = conn.into();
let database_name = database_name.into();
@ -149,17 +156,55 @@ impl KafkaBufferConsumer {
// When subscribing without a partition offset, start from the smallest offset available.
cfg.set("auto.offset.reset", "smallest");
let consumer: StreamConsumer = cfg.create()?;
// figure out which partitions exists
let partitions = Self::get_partitions(&database_name, &cfg).await?;
info!(%database_name, ?partitions, "found Kafka partitions");
// Subscribe to all partitions of this database's topic.
consumer.subscribe(&[&database_name]).unwrap();
// setup a single consumer per partition, at least until https://github.com/fede1024/rust-rdkafka/pull/351 is
// merged
let consumers = partitions
.into_iter()
.map(|partition| {
let consumer: StreamConsumer = cfg.create()?;
let mut assignment = TopicPartitionList::new();
assignment.add_partition(&database_name, partition as i32);
consumer.assign(&assignment)?;
Ok((partition, consumer))
})
.collect::<Result<Vec<(u32, StreamConsumer)>, KafkaError>>()?;
Ok(Self {
conn,
database_name,
consumer,
consumers,
})
}
async fn get_partitions(
database_name: &str,
cfg: &ClientConfig,
) -> Result<Vec<u32>, KafkaError> {
let database_name = database_name.to_string();
let probe_consumer: BaseConsumer = cfg.create()?;
let metadata = tokio::task::spawn_blocking(move || {
probe_consumer.fetch_metadata(Some(&database_name), Duration::from_secs(60))
})
.await
.expect("subtask failed")?;
let topic_metadata = metadata.topics().get(0).expect("requested a single topic");
let mut partitions: Vec<_> = topic_metadata
.partitions()
.iter()
.map(|partition_metdata| partition_metdata.id().try_into().unwrap())
.collect();
partitions.sort_unstable();
Ok(partitions)
}
}
pub mod test_utils {
@ -273,6 +318,7 @@ mod tests {
server_id_counter: AtomicU32,
}
#[async_trait]
impl TestContext for KafkaTestContext {
type Writing = KafkaBufferProducer;
@ -282,10 +328,12 @@ mod tests {
KafkaBufferProducer::new(&self.conn, &self.database_name).unwrap()
}
fn reading(&self) -> Self::Reading {
async fn reading(&self) -> Self::Reading {
let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst);
let server_id = ServerId::try_from(server_id).unwrap();
KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name).unwrap()
KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name)
.await
.unwrap()
}
}

View File

@ -2,13 +2,10 @@ use std::{collections::BTreeMap, sync::Arc, task::Poll};
use async_trait::async_trait;
use entry::{Entry, Sequence, SequencedEntry};
use futures::{
stream::{self, BoxStream},
StreamExt,
};
use futures::{stream, StreamExt};
use parking_lot::Mutex;
use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
use crate::core::{EntryStream, WriteBufferError, WriteBufferReading, WriteBufferWriting};
type EntryResVec = Vec<Result<SequencedEntry, WriteBufferError>>;
@ -182,22 +179,28 @@ impl std::fmt::Debug for MockBufferForReading {
}
impl WriteBufferReading for MockBufferForReading {
fn stream<'life0, 'async_trait>(
&'life0 self,
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
where
'life0: 'async_trait,
Self: 'async_trait,
{
let state = self.state.clone();
let positions = Arc::clone(&self.positions);
let sequencer_ids: Vec<_> = {
let positions = self.positions.lock();
positions.keys().copied().collect()
};
stream::poll_fn(move |_ctx| {
let entries = state.entries.lock();
let mut positions = positions.lock();
let mut streams = vec![];
for sequencer_id in sequencer_ids {
let state = self.state.clone();
let positions = Arc::clone(&self.positions);
let stream = stream::poll_fn(move |_ctx| {
let entries = state.entries.lock();
let mut positions = positions.lock();
let entry_vec = entries.get(&sequencer_id).unwrap();
let position = positions.get_mut(&sequencer_id).unwrap();
for (sequencer_id, position) in positions.iter_mut() {
let entry_vec = entries.get(sequencer_id).unwrap();
if entry_vec.len() > *position {
let entry = match &entry_vec[*position] {
Ok(entry) => Ok(entry.clone()),
@ -206,11 +209,14 @@ impl WriteBufferReading for MockBufferForReading {
*position += 1;
return Poll::Ready(Some(entry));
}
}
Poll::Pending
})
.boxed()
Poll::Pending
})
.boxed();
streams.push((sequencer_id, stream));
}
streams
}
}
@ -239,6 +245,7 @@ mod tests {
state: MockBufferSharedState,
}
#[async_trait]
impl TestContext for MockTestContext {
type Writing = MockBufferForWriting;
@ -248,7 +255,7 @@ mod tests {
MockBufferForWriting::new(self.state.clone())
}
fn reading(&self) -> Self::Reading {
async fn reading(&self) -> Self::Reading {
MockBufferForReading::new(self.state.clone())
}
}