Merge branch 'main' into crepererum/rub_shrink_rle

pull/24376/head
kodiakhq[bot] 2021-08-25 08:58:22 +00:00 committed by GitHub
commit c98723e3b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 51 additions and 14 deletions

View File

@ -273,10 +273,12 @@ jobs:
- cache_restore
- run:
name: Print rustc target CPU options
command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" --bin print_cpu
# disable jemalloc/heappy and use system alloctor
command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger" --bin print_cpu
- run:
name: Cargo release build with target arch set for CRoaring
command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger,heappy"
# disable jemalloc/heappy and use system alloctor
command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger"
- run: |
echo sha256sum after build is
sha256sum target/release/influxdb_iox

View File

@ -81,13 +81,17 @@ impl Executor {
}
}
/// Return a new execution config, suitable for executing a new query or system task
/// Return a new execution config, suitable for executing a new query or system task.
///
/// Note that this context (and all its clones) will be shut down once `Executor` is dropped.
pub fn new_execution_config(&self, executor_type: ExecutorType) -> IOxExecutionConfig {
let exec = self.executor(executor_type).clone();
IOxExecutionConfig::new(exec).with_concurrency(self.config.concurrency)
}
/// Create a new execution context, suitable for executing a new query or system task
///
/// Note that this context (and all its clones) will be shut down once `Executor` is dropped.
pub fn new_context(&self, executor_type: ExecutorType) -> IOxExecutionContext {
self.new_execution_config(executor_type).build()
}
@ -99,6 +103,23 @@ impl Executor {
ExecutorType::Reorg => &self.reorg_exec,
}
}
/// Stops all subsequent task executions, and waits for the worker
/// thread to complete. Note this will shutdown all created contexts.
///
/// Only the first all to `join` will actually wait for the
/// executing thread to complete. All other calls to join will
/// complete immediately.
pub fn join(&self) {
self.query_exec.join();
self.reorg_exec.join();
}
}
impl Drop for Executor {
fn drop(&mut self) {
self.join();
}
}
/// Create a SchemaPivot node which an arbitrary input like
@ -176,7 +197,8 @@ mod tests {
let expected_strings = to_set(&["Foo", "Bar"]);
let plan = StringSetPlan::Known(Arc::clone(&expected_strings));
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let result_strings = ctx.to_string_set(plan).await.unwrap();
assert_eq!(result_strings, expected_strings);
}
@ -188,7 +210,8 @@ mod tests {
let scan = make_plan(schema, vec![]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, StringSetRef::new(StringSet::new()));
@ -203,7 +226,8 @@ mod tests {
let scan = make_plan(batch.schema(), vec![batch]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
@ -222,7 +246,8 @@ mod tests {
let scan = make_plan(schema, vec![batch1, batch2]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
@ -245,7 +270,8 @@ mod tests {
let plan: StringSetPlan = vec![scan1, scan2].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
@ -265,7 +291,8 @@ mod tests {
let scan = make_plan(schema, vec![batch]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await;
let actual_error = match results {
@ -290,7 +317,8 @@ mod tests {
let scan = make_plan(batch.schema(), vec![batch]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await;
let actual_error = match results {
@ -321,7 +349,8 @@ mod tests {
let pivot = make_schema_pivot(scan);
let plan = vec![pivot].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.expect("Executed plan");
assert_eq!(results, to_set(&["f1", "f2"]));

View File

@ -141,7 +141,6 @@ impl DedicatedExecutor {
}
/// signals shutdown of this executor and any Clones
#[allow(dead_code)] // https://github.com/influxdata/influxdb_iox/issues/2372
pub fn shutdown(&self) {
// hang up the channel which will cause the dedicated thread
// to quit
@ -156,7 +155,6 @@ impl DedicatedExecutor {
/// Only the first all to `join` will actually wait for the
/// executing thread to complete. All other calls to join will
/// complete immediately.
#[allow(dead_code)] // https://github.com/influxdata/influxdb_iox/issues/2372
pub fn join(&self) {
self.shutdown();

View File

@ -218,7 +218,8 @@ mod tests {
let expected_ss = to_string_set(&["foo", "bar", "baz", "from_a_plan"]).into();
assert!(matches!(plan, StringSetPlan::Plan(_)));
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let ss = ctx.to_string_set(plan).await.unwrap();
assert_eq!(ss, expected_ss);
}

View File

@ -76,4 +76,8 @@ impl ApplicationState {
pub fn executor(&self) -> &Arc<Executor> {
&self.executor
}
pub fn join(&self) {
self.executor.join()
}
}

View File

@ -341,6 +341,9 @@ async fn serve(
info!("server completed shutting down");
application.join();
info!("shared application state completed shutting down");
res
}