fix: `--compaction-process-once` should exit (#6886)

- do not wait for a non-empty partition result (this doesn't make sense
  if we are not running endlessly)
- modify entry point to allow the compactor to exit on its own (this is
  normally not allowed for other server types)
pull/24376/head
Marco Neumann 2023-02-07 14:52:17 +01:00 committed by GitHub
parent e179887ede
commit 997cca67a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 14 deletions

View File

@ -58,10 +58,8 @@ impl Compactor2 {
_ = async {
compact(config.partition_concurrency, config.partition_timeout, Arc::clone(&job_semaphore), &components).await;
// the main entry point does not allow servers to shut down themselves, so we just wait forever
info!("comapctor done");
futures::future::pending::<()>().await;
} => unreachable!(),
} => {}
}
});
let worker = shared_handle(worker);

View File

@ -13,7 +13,7 @@ mod tests {
},
config::AlgoVersion,
driver::compact,
test_util::{list_object_store, AssertFutureExt, TestSetup},
test_util::{list_object_store, TestSetup},
};
#[tokio::test]
@ -27,10 +27,7 @@ mod tests {
assert!(files.is_empty());
// compact
// This wil wait for files forever.
let fut = run_compact(&setup);
tokio::pin!(fut);
fut.assert_pending().await;
run_compact(&setup).await;
// verify catalog is still empty
let files = setup.list_by_table_not_to_delete().await;

View File

@ -162,14 +162,21 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
// Note: Place "not empty" wrapper at the very last so that the logging and metric wrapper work even when there
// is not data.
let partitions_source = NotEmptyPartitionsSourceWrapper::new(
let partitions_source =
LoggingPartitionsSourceWrapper::new(MetricsPartitionsSourceWrapper::new(
RandomizeOrderPartitionsSourcesWrapper::new(partitions_source, 1234),
&config.metric_registry,
)),
Duration::from_secs(5),
Arc::clone(&config.time_provider),
);
));
let partitions_source: Arc<dyn PartitionsSource> = if config.process_once {
// do not wrap into the "not empty" filter because we do NOT wanna throttle in this case but just exit early
Arc::new(partitions_source)
} else {
Arc::new(NotEmptyPartitionsSourceWrapper::new(
partitions_source,
Duration::from_secs(5),
Arc::clone(&config.time_provider),
))
};
let partition_stream: Arc<dyn PartitionStream> = if config.process_once {
Arc::new(OncePartititionStream::new(partitions_source))

View File

@ -112,6 +112,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
}));
let time_provider = Arc::new(SystemProvider::new());
let process_once = config.compactor_config.process_once;
let server_type = create_compactor2_server_type(
&common_state,
Arc::clone(&metric_registry),
@ -127,5 +128,14 @@ pub async fn command(config: Config) -> Result<(), Error> {
info!("starting compactor");
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(common_state, services, metric_registry).await?)
let res = main::main(common_state, services, metric_registry).await;
match res {
Ok(()) => Ok(()),
// compactor2 is allowed to shut itself down
Err(main::Error::Wrapper {
source: _source @ ioxd_common::Error::LostServer,
}) if process_once => Ok(()),
Err(e) => Err(e.into()),
}
}