diff --git a/.circleci/config.yml b/.circleci/config.yml index 4ca737bbc3..d37e87a990 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -273,10 +273,12 @@ jobs: - cache_restore - run: name: Print rustc target CPU options - command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" --bin print_cpu + # disable jemalloc/heappy and use system alloctor + command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger" --bin print_cpu - run: name: Cargo release build with target arch set for CRoaring - command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" + # disable jemalloc/heappy and use system alloctor + command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger" - run: | echo sha256sum after build is sha256sum target/release/influxdb_iox diff --git a/query/src/exec.rs b/query/src/exec.rs index 240b740d52..4b0199ec0a 100644 --- a/query/src/exec.rs +++ b/query/src/exec.rs @@ -81,13 +81,17 @@ impl Executor { } } - /// Return a new execution config, suitable for executing a new query or system task + /// Return a new execution config, suitable for executing a new query or system task. + /// + /// Note that this context (and all its clones) will be shut down once `Executor` is dropped. pub fn new_execution_config(&self, executor_type: ExecutorType) -> IOxExecutionConfig { let exec = self.executor(executor_type).clone(); IOxExecutionConfig::new(exec).with_concurrency(self.config.concurrency) } /// Create a new execution context, suitable for executing a new query or system task + /// + /// Note that this context (and all its clones) will be shut down once `Executor` is dropped. pub fn new_context(&self, executor_type: ExecutorType) -> IOxExecutionContext { self.new_execution_config(executor_type).build() } @@ -99,6 +103,23 @@ impl Executor { ExecutorType::Reorg => &self.reorg_exec, } } + + /// Stops all subsequent task executions, and waits for the worker + /// thread to complete. Note this will shutdown all created contexts. + /// + /// Only the first all to `join` will actually wait for the + /// executing thread to complete. All other calls to join will + /// complete immediately. + pub fn join(&self) { + self.query_exec.join(); + self.reorg_exec.join(); + } +} + +impl Drop for Executor { + fn drop(&mut self) { + self.join(); + } } /// Create a SchemaPivot node which an arbitrary input like @@ -176,7 +197,8 @@ mod tests { let expected_strings = to_set(&["Foo", "Bar"]); let plan = StringSetPlan::Known(Arc::clone(&expected_strings)); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let result_strings = ctx.to_string_set(plan).await.unwrap(); assert_eq!(result_strings, expected_strings); } @@ -188,7 +210,8 @@ mod tests { let scan = make_plan(schema, vec![]); let plan: StringSetPlan = vec![scan].into(); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let results = ctx.to_string_set(plan).await.unwrap(); assert_eq!(results, StringSetRef::new(StringSet::new())); @@ -203,7 +226,8 @@ mod tests { let scan = make_plan(batch.schema(), vec![batch]); let plan: StringSetPlan = vec![scan].into(); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let results = ctx.to_string_set(plan).await.unwrap(); assert_eq!(results, to_set(&["foo", "bar", "baz"])); @@ -222,7 +246,8 @@ mod tests { let scan = make_plan(schema, vec![batch1, batch2]); let plan: StringSetPlan = vec![scan].into(); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let results = ctx.to_string_set(plan).await.unwrap(); assert_eq!(results, to_set(&["foo", "bar", "baz"])); @@ -245,7 +270,8 @@ mod tests { let plan: StringSetPlan = vec![scan1, scan2].into(); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let results = ctx.to_string_set(plan).await.unwrap(); assert_eq!(results, to_set(&["foo", "bar", "baz"])); @@ -265,7 +291,8 @@ mod tests { let scan = make_plan(schema, vec![batch]); let plan: StringSetPlan = vec![scan].into(); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let results = ctx.to_string_set(plan).await; let actual_error = match results { @@ -290,7 +317,8 @@ mod tests { let scan = make_plan(batch.schema(), vec![batch]); let plan: StringSetPlan = vec![scan].into(); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let results = ctx.to_string_set(plan).await; let actual_error = match results { @@ -321,7 +349,8 @@ mod tests { let pivot = make_schema_pivot(scan); let plan = vec![pivot].into(); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let results = ctx.to_string_set(plan).await.expect("Executed plan"); assert_eq!(results, to_set(&["f1", "f2"])); diff --git a/query/src/exec/task.rs b/query/src/exec/task.rs index 8dbf7a6b1f..069f2693ba 100644 --- a/query/src/exec/task.rs +++ b/query/src/exec/task.rs @@ -141,7 +141,6 @@ impl DedicatedExecutor { } /// signals shutdown of this executor and any Clones - #[allow(dead_code)] // https://github.com/influxdata/influxdb_iox/issues/2372 pub fn shutdown(&self) { // hang up the channel which will cause the dedicated thread // to quit @@ -156,7 +155,6 @@ impl DedicatedExecutor { /// Only the first all to `join` will actually wait for the /// executing thread to complete. All other calls to join will /// complete immediately. - #[allow(dead_code)] // https://github.com/influxdata/influxdb_iox/issues/2372 pub fn join(&self) { self.shutdown(); diff --git a/query/src/plan/stringset.rs b/query/src/plan/stringset.rs index 18c63c18e2..4e7b96a2c4 100644 --- a/query/src/plan/stringset.rs +++ b/query/src/plan/stringset.rs @@ -218,7 +218,8 @@ mod tests { let expected_ss = to_string_set(&["foo", "bar", "baz", "from_a_plan"]).into(); assert!(matches!(plan, StringSetPlan::Plan(_))); - let ctx = Executor::new(1).new_context(ExecutorType::Query); + let exec = Executor::new(1); + let ctx = exec.new_context(ExecutorType::Query); let ss = ctx.to_string_set(plan).await.unwrap(); assert_eq!(ss, expected_ss); } diff --git a/server/src/application.rs b/server/src/application.rs index 5ca312b7d8..605c51c8e7 100644 --- a/server/src/application.rs +++ b/server/src/application.rs @@ -76,4 +76,8 @@ impl ApplicationState { pub fn executor(&self) -> &Arc { &self.executor } + + pub fn join(&self) { + self.executor.join() + } } diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 7cb9d21c9c..ddb6ddd657 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -341,6 +341,9 @@ async fn serve( info!("server completed shutting down"); + application.join(); + info!("shared application state completed shutting down"); + res }