diff --git a/server/src/application.rs b/server/src/application.rs new file mode 100644 index 0000000000..3555f6235a --- /dev/null +++ b/server/src/application.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; + +use metrics::MetricRegistry; +use object_store::ObjectStore; +use observability_deps::tracing::info; +use query::exec::Executor; + +use crate::JobRegistry; + +/// A container for application-global resources +#[derive(Debug, Clone)] +pub struct ApplicationState { + object_store: Arc, + executor: Arc, + job_registry: Arc, + metric_registry: Arc, +} + +impl ApplicationState { + /// Creates a new `ApplicationState` + /// + /// Uses number of CPUs in the system if num_worker_threads is not set + pub fn new(object_store: Arc, num_worker_threads: Option) -> Self { + let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get); + info!(%num_threads, "using specified number of threads per thread pool"); + + Self { + object_store, + executor: Arc::new(Executor::new(num_threads)), + job_registry: Arc::new(JobRegistry::new()), + metric_registry: Arc::new(metrics::MetricRegistry::new()), + } + } + + pub fn object_store(&self) -> &Arc { + &self.object_store + } + + pub fn job_registry(&self) -> &Arc { + &self.job_registry + } + + pub fn metric_registry(&self) -> &Arc { + &self.metric_registry + } + + pub fn executor(&self) -> &Arc { + &self.executor + } +} diff --git a/server/src/config.rs b/server/src/config.rs index 09749dc26c..c8e9fcbd77 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -11,15 +11,13 @@ use metrics::MetricRegistry; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use parquet_file::catalog::PreservedCatalog; use persistence_windows::checkpoint::ReplayPlan; -use query::exec::Executor; use write_buffer::config::WriteBufferConfig; /// This module contains code for managing the configuration of the server. use crate::{ db::{catalog::Catalog, DatabaseToCommit, Db}, - DatabaseAlreadyExists, DatabaseNotFound, DatabaseReserved, Error, - InvalidDatabaseStateTransition, JobRegistry, Result, RulesDatabaseNameMismatch, - ServerShuttingDown, + ApplicationState, DatabaseAlreadyExists, DatabaseNotFound, DatabaseReserved, Error, + InvalidDatabaseStateTransition, Result, RulesDatabaseNameMismatch, ServerShuttingDown, }; use object_store::path::Path; use observability_deps::tracing::{self, error, info, warn, Instrument}; @@ -39,12 +37,8 @@ pub(crate) const DB_RULES_FILE_NAME: &str = "rules.pb"; /// run to completion if the tokio runtime is dropped #[derive(Debug)] pub(crate) struct Config { - jobs: Arc, - object_store: Arc, - exec: Arc, + application: Arc, server_id: ServerId, - metric_registry: Arc, - shutdown: CancellationToken, state: RwLock, } @@ -62,19 +56,10 @@ impl From for UpdateError { impl Config { /// Create new empty config. - pub(crate) fn new( - jobs: Arc, - object_store: Arc, - exec: Arc, - server_id: ServerId, - metric_registry: Arc, - ) -> Self { + pub(crate) fn new(application: Arc, server_id: ServerId) -> Self { Self { - jobs, - object_store, - exec, + application, server_id, - metric_registry, shutdown: Default::default(), state: Default::default(), } @@ -272,13 +257,17 @@ impl Config { } /// Metrics registry associated with this config and that should be used to create all databases. + /// + /// TODO: Remove Me pub fn metrics_registry(&self) -> Arc { - Arc::clone(&self.metric_registry) + Arc::clone(self.application.metric_registry()) } /// Returns the object store of this server + /// + /// TODO: Remove Me pub fn object_store(&self) -> Arc { - Arc::clone(&self.object_store) + Arc::clone(self.application.object_store()) } /// Returns the server id of this server @@ -289,7 +278,7 @@ impl Config { /// Base location in object store for this server. pub fn root_path(&self) -> Path { let id = self.server_id.get(); - let mut path = self.object_store.new_path(); + let mut path = self.application.object_store().new_path(); path.push_dir(format!("{}", id)); path } @@ -460,7 +449,7 @@ impl<'a> DatabaseHandle<'a> { /// Get object store. pub fn object_store(&self) -> Arc { - Arc::clone(&self.config.object_store) + self.config.object_store() } /// Get server ID. @@ -542,16 +531,22 @@ impl<'a> DatabaseHandle<'a> { ) -> Result<()> { match self.state().as_ref() { DatabaseState::RulesLoaded { rules } => { + let application = &self.config.application; + let database_to_commit = DatabaseToCommit { server_id: self.config.server_id, - object_store: Arc::clone(&self.config.object_store), - exec: Arc::clone(&self.config.exec), + object_store: Arc::clone(application.object_store()), + exec: Arc::clone(application.executor()), preserved_catalog, catalog, rules: Arc::clone(rules), write_buffer, }; - let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs))); + + let db = Arc::new(Db::new( + database_to_commit, + Arc::clone(application.job_registry()), + )); self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan })); @@ -641,14 +636,7 @@ mod test { fn make_config() -> Config { let store = Arc::new(ObjectStore::new_in_memory()); let server_id = ServerId::try_from(1).unwrap(); - let metric_registry = Arc::new(metrics::MetricRegistry::new()); - Config::new( - Arc::new(JobRegistry::new()), - Arc::clone(&store), - Arc::new(Executor::new(1)), - server_id, - Arc::clone(&metric_registry), - ) + Config::new(Arc::new(ApplicationState::new(store, Some(1))), server_id) } #[tokio::test] diff --git a/server/src/job.rs b/server/src/job.rs new file mode 100644 index 0000000000..57625ff984 --- /dev/null +++ b/server/src/job.rs @@ -0,0 +1,69 @@ +use data_types::job::Job; +use parking_lot::Mutex; +use std::convert::Infallible; +use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; + +const JOB_HISTORY_SIZE: usize = 1000; + +/// The global job registry +#[derive(Debug)] +pub struct JobRegistry { + inner: Mutex>, +} + +impl Default for JobRegistry { + fn default() -> Self { + Self { + inner: Mutex::new(TaskRegistryWithHistory::new(JOB_HISTORY_SIZE)), + } + } +} + +impl JobRegistry { + pub fn new() -> Self { + Default::default() + } + + pub fn register(&self, job: Job) -> (TaskTracker, TaskRegistration) { + self.inner.lock().register(job) + } + + /// Returns a list of recent Jobs, including some that are no + /// longer running + pub fn tracked(&self) -> Vec> { + self.inner.lock().tracked() + } + + pub fn get(&self, id: TaskId) -> Option> { + self.inner.lock().get(id) + } + + pub fn spawn_dummy_job(&self, nanos: Vec) -> TaskTracker { + let (tracker, registration) = self.register(Job::Dummy { + nanos: nanos.clone(), + }); + + for duration in nanos { + tokio::spawn( + async move { + tokio::time::sleep(tokio::time::Duration::from_nanos(duration)).await; + Ok::<_, Infallible>(()) + } + .track(registration.clone()), + ); + } + + tracker + } + + /// Reclaims jobs into the historical archive + /// + /// Returns the number of remaining jobs + /// + /// Should be called periodically + pub(crate) fn reclaim(&self) -> usize { + let mut lock = self.inner.lock(); + lock.reclaim(); + lock.tracked_len() + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index bebcf48e88..1e40c06f03 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -69,7 +69,7 @@ )] use std::collections::HashMap; -use std::convert::{Infallible, TryInto}; +use std::convert::TryInto; use std::sync::Arc; use async_trait::async_trait; @@ -91,25 +91,29 @@ use generated_types::database_rules::encode_database_rules; use generated_types::influxdata::transfer::column::v1 as pb; use influxdb_line_protocol::ParsedLine; use lifecycle::LockableChunk; -use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry}; -use object_store::{ObjectStore, ObjectStoreApi}; +use metrics::{KeyValue, MetricObserverBuilder}; +use object_store::ObjectStoreApi; use observability_deps::tracing::{error, info, warn}; -use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use query::{exec::Executor, DatabaseStore}; use rand::seq::SliceRandom; use resolver::Resolver; use snafu::{OptionExt, ResultExt, Snafu}; -use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; +use tracker::{TaskTracker, TrackedFutureExt}; use write_buffer::config::WriteBufferConfig; +pub use application::ApplicationState; pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer}; pub use db::Db; +pub use job::JobRegistry; pub use resolver::{GrpcConnectionString, RemoteTemplate}; +mod application; mod config; mod connection; pub mod db; mod init; +mod job; mod resolver; /// Utility modules used by benchmarks and tests @@ -255,86 +259,6 @@ pub enum Error { pub type Result = std::result::Result; -const JOB_HISTORY_SIZE: usize = 1000; - -/// The global job registry -#[derive(Debug)] -pub struct JobRegistry { - inner: Mutex>, -} - -impl Default for JobRegistry { - fn default() -> Self { - Self { - inner: Mutex::new(TaskRegistryWithHistory::new(JOB_HISTORY_SIZE)), - } - } -} - -impl JobRegistry { - pub fn new() -> Self { - Default::default() - } - - pub fn register(&self, job: Job) -> (TaskTracker, TaskRegistration) { - self.inner.lock().register(job) - } - - /// Returns a list of recent Jobs, including some that are no - /// longer running - pub fn tracked(&self) -> Vec> { - self.inner.lock().tracked() - } -} - -/// Used to configure a server instance -#[derive(Debug)] -pub struct ServerConfig { - // number of executor worker threads. If not specified, defaults - // to number of cores on the system. - num_worker_threads: Option, - - /// The `ObjectStore` instance to use for persistence - object_store: Arc, - - metric_registry: Arc, - - remote_template: Option, - - wipe_catalog_on_error: bool, - skip_replay_and_seek_instead: bool, -} - -impl ServerConfig { - /// Create a new config using the specified store. - pub fn new( - object_store: Arc, - metric_registry: Arc, - remote_template: Option, - skip_replay_and_seek_instead: bool, - ) -> Self { - Self { - num_worker_threads: None, - object_store, - metric_registry, - remote_template, - wipe_catalog_on_error: true, - skip_replay_and_seek_instead, - } - } - - /// Use `num` worker threads for running queries - pub fn with_num_worker_threads(mut self, num: usize) -> Self { - self.num_worker_threads = Some(num); - self - } - - /// return a reference to the object store in this configuration - pub fn store(&self) -> Arc { - Arc::clone(&self.object_store) - } -} - // A collection of metrics used to instrument the Server. #[derive(Debug)] pub struct ServerMetrics { @@ -418,21 +342,36 @@ impl ServerMetrics { } } +/// Configuration options for `Server` +#[derive(Debug)] +pub struct ServerConfig { + pub remote_template: Option, + + pub wipe_catalog_on_error: bool, + + pub skip_replay_and_seek_instead: bool, +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + remote_template: None, + wipe_catalog_on_error: false, + skip_replay_and_seek_instead: false, + } + } +} + /// `Server` is the container struct for how servers store data internally, as /// well as how they communicate with other servers. Each server will have one /// of these structs, which keeps track of all replication and query rules. #[derive(Debug)] pub struct Server { connection_manager: Arc, - store: Arc, - exec: Arc, - jobs: Arc, - metrics: Arc, - /// The metrics registry associated with the server. This is needed not for - /// recording telemetry, but because the server hosts the /metric endpoint - /// and populates the endpoint with this data. - registry: Arc, + application: Arc, + + metrics: Arc, /// Resolver for mapping ServerId to gRPC connection strings resolver: RwLock, @@ -500,33 +439,23 @@ impl Server where M: ConnectionManager + Send + Sync, { - pub fn new(connection_manager: M, config: ServerConfig) -> Self { - let jobs = Arc::new(JobRegistry::new()); - - let ServerConfig { - num_worker_threads, - object_store, - // to test the metrics provide a different registry to the `ServerConfig`. - metric_registry, - remote_template, - wipe_catalog_on_error, - skip_replay_and_seek_instead, - } = config; - - let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get); - let exec = Arc::new(Executor::new(num_worker_threads)); + pub fn new( + connection_manager: M, + application: Arc, + config: ServerConfig, + ) -> Self { + let metrics = Arc::new(ServerMetrics::new(Arc::clone( + application.metric_registry(), + ))); Self { - store: object_store, + application, + metrics, connection_manager: Arc::new(connection_manager), - exec, - jobs, - metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))), - registry: Arc::clone(&metric_registry), - resolver: RwLock::new(Resolver::new(remote_template)), + resolver: RwLock::new(Resolver::new(config.remote_template)), stage: Arc::new(RwLock::new(ServerStage::Startup { - wipe_catalog_on_error, - skip_replay_and_seek_instead, + wipe_catalog_on_error: config.wipe_catalog_on_error, + skip_replay_and_seek_instead: config.skip_replay_and_seek_instead, })), } } @@ -545,13 +474,7 @@ where *stage = ServerStage::InitReady { wipe_catalog_on_error: *wipe_catalog_on_error, skip_replay_and_seek_instead: *skip_replay_and_seek_instead, - config: Arc::new(Config::new( - Arc::clone(&self.jobs), - Arc::clone(&self.store), - Arc::clone(&self.exec), - id, - Arc::clone(&self.registry), - )), + config: Arc::new(Config::new(Arc::clone(&self.application), id)), last_error: None, }; Ok(()) @@ -560,11 +483,6 @@ where } } - /// Returns the metrics registry associated with this server - pub fn metrics_registry(&self) -> &Arc { - &self.registry - } - /// Return the metrics associated with this server pub fn metrics(&self) -> &Arc { &self.metrics @@ -657,9 +575,9 @@ where // load preserved catalog let (preserved_catalog, catalog, replay_plan) = create_preserved_catalog( rules.db_name(), - Arc::clone(&self.store), + Arc::clone(self.application.object_store()), config.server_id(), - config.metrics_registry(), + Arc::clone(self.application.metric_registry()), ) .await .map_err(|e| Box::new(e) as _) @@ -694,7 +612,8 @@ where let len = data.len(); let stream_data = std::io::Result::Ok(data.freeze()); - self.store + self.application + .object_store() .put( &location, futures::stream::once(async move { stream_data }), @@ -1051,24 +970,6 @@ where self.resolver.write().delete_remote(id) } - pub fn spawn_dummy_job(&self, nanos: Vec) -> TaskTracker { - let (tracker, registration) = self.jobs.register(Job::Dummy { - nanos: nanos.clone(), - }); - - for duration in nanos { - tokio::spawn( - async move { - tokio::time::sleep(tokio::time::Duration::from_nanos(duration)).await; - Ok::<_, Infallible>(()) - } - .track(registration.clone()), - ); - } - - tracker - } - /// Closes a chunk and starts moving its data to the read buffer, as a /// background job, dropping when complete. pub fn close_chunk( @@ -1135,9 +1036,12 @@ where } }; - let (tracker, registration) = self.jobs.register(Job::WipePreservedCatalog { - db_name: Arc::from(db_name.as_str()), - }); + let (tracker, registration) = + self.application + .job_registry() + .register(Job::WipePreservedCatalog { + db_name: Arc::from(db_name.as_str()), + }); let state = Arc::clone(&self.stage); let db_name = db_name.clone(); @@ -1169,25 +1073,18 @@ where Ok(tracker) } - /// Returns a list of all jobs tracked by this server - pub fn tracked_jobs(&self) -> Vec> { - self.jobs.inner.lock().tracked() - } - - /// Returns a specific job tracked by this server - pub fn get_job(&self, id: TaskId) -> Option> { - self.jobs.inner.lock().get(id) - } - /// Background worker function for the server pub async fn background_worker(&self, shutdown: tokio_util::sync::CancellationToken) { info!("started background worker"); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + // TODO: Move out of Server background loop + let job_registry = self.application.job_registry(); + while !shutdown.is_cancelled() { self.maybe_initialize_server().await; - self.jobs.inner.lock().reclaim(); + job_registry.reclaim(); tokio::select! { _ = interval.tick() => {}, @@ -1205,9 +1102,7 @@ where // Wait for any outstanding jobs to finish - frontend shutdown should be // sequenced before shutting down the background workers and so there // shouldn't be any - while self.jobs.inner.lock().tracked_len() != 0 { - self.jobs.inner.lock().reclaim(); - + while job_registry.reclaim() != 0 { interval.tick().await; } @@ -1262,14 +1157,14 @@ where /// Return a handle to the query executor fn executor(&self) -> Arc { - Arc::clone(&self.exec) + Arc::clone(self.application.executor()) } } #[cfg(test)] mod tests { use std::{ - convert::TryFrom, + convert::{Infallible, TryFrom}, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -1280,7 +1175,6 @@ mod tests { use arrow::record_batch::RecordBatch; use bytes::Bytes; use futures::TryStreamExt; - use tempfile::TempDir; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -1293,8 +1187,8 @@ mod tests { use entry::test_helpers::lp_to_entry; use generated_types::database_rules::decode_database_rules; use influxdb_line_protocol::parse_lines; - use metrics::MetricRegistry; - use object_store::path::ObjectStorePath; + use metrics::TestMetricRegistry; + use object_store::{path::ObjectStorePath, ObjectStore}; use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog}; use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase}; use write_buffer::mock::MockBufferForWritingThatAlwaysErrors; @@ -1306,35 +1200,24 @@ mod tests { const ARBITRARY_DEFAULT_TIME: i64 = 456; - // TODO: perhaps switch to a builder pattern. - fn config_with_metric_registry_and_store( - object_store: ObjectStore, - ) -> (metrics::TestMetricRegistry, ServerConfig) { - let registry = Arc::new(metrics::MetricRegistry::new()); - let test_registry = metrics::TestMetricRegistry::new(Arc::clone(®istry)); - ( - test_registry, - ServerConfig::new(Arc::new(object_store), registry, None, false) - .with_num_worker_threads(1), - ) + fn make_application() -> Arc { + Arc::new(ApplicationState::new( + Arc::new(ObjectStore::new_in_memory()), + None, + )) } - fn config_with_metric_registry() -> (metrics::TestMetricRegistry, ServerConfig) { - config_with_metric_registry_and_store(ObjectStore::new_in_memory()) - } - - fn config() -> ServerConfig { - config_with_metric_registry().1 - } - - fn config_with_store(object_store: ObjectStore) -> ServerConfig { - config_with_metric_registry_and_store(object_store).1 + fn make_server(application: Arc) -> Arc> { + Arc::new(Server::new( + TestConnectionManager::new(), + application, + Default::default(), + )) } #[tokio::test] async fn server_api_calls_return_error_with_no_id_set() { - let manager = TestConnectionManager::new(); - let server = Server::new(manager, config()); + let server = make_server(make_application()); let resp = server.config().unwrap_err(); assert!(matches!(resp, Error::IdNotSet)); @@ -1349,10 +1232,8 @@ mod tests { #[tokio::test] async fn create_database_persists_rules() { - let manager = TestConnectionManager::new(); - let config = config(); - let store = config.store(); - let server = Server::new(manager, config); + let application = make_application(); + let server = make_server(Arc::clone(&application)); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1378,12 +1259,12 @@ mod tests { .await .expect("failed to create database"); - let mut rules_path = server.store.new_path(); + let mut rules_path = application.object_store().new_path(); rules_path.push_all_dirs(&["1", name.as_str()]); rules_path.set_file_name("rules.pb"); - let read_data = server - .store + let read_data = application + .object_store() .get(&rules_path) .await .unwrap() @@ -1403,13 +1284,13 @@ mod tests { .await .expect("failed to create 2nd db"); - store.list_with_delimiter(&store.new_path()).await.unwrap(); + application + .object_store() + .list_with_delimiter(&application.object_store().new_path()) + .await + .unwrap(); - let manager = TestConnectionManager::new(); - let config2 = - ServerConfig::new(store, Arc::new(MetricRegistry::new()), Option::None, false) - .with_num_worker_threads(1); - let server2 = Server::new(manager, config2); + let server2 = make_server(application); server2.set_id(ServerId::try_from(1).unwrap()).unwrap(); server2.maybe_initialize_server().await; @@ -1421,8 +1302,7 @@ mod tests { async fn duplicate_database_name_rejected() { // Covers #643 - let manager = TestConnectionManager::new(); - let server = Server::new(manager, config()); + let server = make_server(make_application()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1471,28 +1351,23 @@ mod tests { #[tokio::test] async fn load_databases() { - let temp_dir = TempDir::new().unwrap(); + let application = make_application(); + let store = application.object_store(); - let store = ObjectStore::new_file(temp_dir.path()); - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let server = Server::new(manager, config); + let server = make_server(Arc::clone(&application)); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; create_simple_database(&server, "bananas") .await .expect("failed to create database"); - let mut rules_path = server.store.new_path(); + let mut rules_path = store.new_path(); rules_path.push_all_dirs(&["1", "bananas"]); rules_path.set_file_name("rules.pb"); std::mem::drop(server); - let store = ObjectStore::new_file(temp_dir.path()); - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let server = Server::new(manager, config); + let server = make_server(Arc::clone(&application)); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1504,15 +1379,12 @@ mod tests { std::mem::drop(server); - let store = ObjectStore::new_file(temp_dir.path()); store .delete(&rules_path) .await .expect("cannot delete rules file"); - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let server = Server::new(manager, config); + let server = make_server(application); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1521,8 +1393,7 @@ mod tests { #[tokio::test] async fn db_names_sorted() { - let manager = TestConnectionManager::new(); - let server = Server::new(manager, config()); + let server = make_server(make_application()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1542,8 +1413,7 @@ mod tests { #[tokio::test] async fn writes_local() { - let manager = TestConnectionManager::new(); - let server = Server::new(manager, config()); + let server = make_server(make_application()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1576,9 +1446,9 @@ mod tests { #[tokio::test] async fn write_entry_local() { - let (metric_registry, config) = config_with_metric_registry(); - let manager = TestConnectionManager::new(); - let server = Server::new(manager, config); + let application = make_application(); + let metric_registry = TestMetricRegistry::new(Arc::clone(application.metric_registry())); + let server = make_server(application); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1660,7 +1530,7 @@ mod tests { }), ); - let server = Server::new(manager, config()); + let server = Server::new(manager, make_application(), Default::default()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1737,8 +1607,7 @@ mod tests { #[tokio::test] async fn close_chunk() { test_helpers::maybe_start_logging(); - let manager = TestConnectionManager::new(); - let server = Arc::new(Server::new(manager, config())); + let server = make_server(make_application()); let cancel_token = CancellationToken::new(); let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone()); @@ -1811,14 +1680,14 @@ mod tests { #[tokio::test] async fn background_task_cleans_jobs() { - let manager = TestConnectionManager::new(); - let server = Arc::new(Server::new(manager, config())); + let application = make_application(); + let server = make_server(Arc::clone(&application)); let cancel_token = CancellationToken::new(); let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone()); let wait_nanos = 1000; - let job = server.spawn_dummy_job(vec![wait_nanos]); + let job = application.job_registry().spawn_dummy_job(vec![wait_nanos]); // Note: this will hang forever if the background task has not been started job.join().await; @@ -1843,8 +1712,7 @@ mod tests { #[tokio::test] async fn hard_buffer_limit() { - let manager = TestConnectionManager::new(); - let server = Server::new(manager, config()); + let server = make_server(make_application()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1894,12 +1762,7 @@ mod tests { #[tokio::test] async fn cannot_create_db_until_server_is_initialized() { - let temp_dir = TempDir::new().unwrap(); - - let store = ObjectStore::new_file(temp_dir.path()); - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let server = Server::new(manager, config); + let server = make_server(make_application()); // calling before serverID set leads to `IdNotSet` let err = create_simple_database(&server, "bananas") @@ -1919,8 +1782,7 @@ mod tests { #[tokio::test] async fn background_worker_eventually_inits_server() { - let manager = TestConnectionManager::new(); - let server = Arc::new(Server::new(manager, config())); + let server = make_server(make_application()); let cancel_token = CancellationToken::new(); let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone()); @@ -1944,11 +1806,9 @@ mod tests { #[tokio::test] async fn init_error_generic() { // use an object store that will hopefully fail to read - let store = ObjectStore::new_failing_store().unwrap(); - - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let server = Server::new(manager, config); + let store = Arc::new(ObjectStore::new_failing_store().unwrap()); + let application = Arc::new(ApplicationState::new(store, None)); + let server = make_server(application); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; @@ -1957,15 +1817,11 @@ mod tests { #[tokio::test] async fn init_error_database() { - let store = ObjectStore::new_in_memory(); + let application = make_application(); + let store = Arc::clone(application.object_store()); let server_id = ServerId::try_from(1).unwrap(); - // Create temporary server to create single database - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let store = config.store(); - - let server = Server::new(manager, config); + let server = make_server(Arc::clone(&application)); server.set_id(server_id).unwrap(); server.maybe_initialize_server().await; @@ -1992,13 +1848,10 @@ mod tests { .await .unwrap(); - // start server - let store = Arc::try_unwrap(store).unwrap(); store.get(&path).await.unwrap(); - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let server = Server::new(manager, config); + // start server + let server = make_server(application); server.set_id(server_id).unwrap(); server.maybe_initialize_server().await; @@ -2052,14 +1905,12 @@ mod tests { let db_name_created = DatabaseName::new("db_created".to_string()).unwrap(); // setup - let store = ObjectStore::new_in_memory(); + let application = make_application(); + let store = Arc::clone(application.object_store()); let server_id = ServerId::try_from(1).unwrap(); // Create temporary server to create existing databases - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let store = config.store(); - let server = Server::new(manager, config); + let server = make_server(Arc::clone(&application)); server.set_id(server_id).unwrap(); server.maybe_initialize_server().await; @@ -2085,6 +1936,7 @@ mod tests { let path = object_store_path_for_database_config(&root, &db_name_rules_broken); let data = Bytes::from("x"); let len = data.len(); + store .put( &path, @@ -2093,6 +1945,7 @@ mod tests { ) .await .unwrap(); + let (preserved_catalog, _catalog) = PreservedCatalog::load::( Arc::clone(&store), server_id, @@ -2102,18 +1955,15 @@ mod tests { .await .unwrap() .unwrap(); + parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog) .await; drop(preserved_catalog); - // boot actual test server - let store = Arc::try_unwrap(store).unwrap(); store.get(&path).await.unwrap(); - let manager = TestConnectionManager::new(); - // need to disable auto-wipe for this test - let mut config = config_with_store(store); - config.wipe_catalog_on_error = false; - let server = Server::new(manager, config); + + // boot actual test server + let server = make_server(Arc::clone(&application)); // cannot wipe if server ID is not set assert_eq!( @@ -2135,16 +1985,18 @@ mod tests { .to_string(), "database already exists: db_existing" ); - assert!( - PreservedCatalog::exists(&server.store, server_id, db_name_existing.as_str()) - .await - .unwrap() - ); + assert!(PreservedCatalog::exists( + application.object_store().as_ref(), + server_id, + db_name_existing.as_str() + ) + .await + .unwrap()); // 2. wiping a non-existing DB just works, but won't bring DB into existence assert!(server.error_database(&db_name_non_existing).is_none()); PreservedCatalog::new_empty::( - Arc::clone(&server.store), + Arc::clone(application.object_store()), server_id, db_name_non_existing.to_string(), (), @@ -2161,7 +2013,7 @@ mod tests { assert_eq!(metadata, &expected_metadata); tracker.join().await; assert!(!PreservedCatalog::exists( - &server.store, + application.object_store().as_ref(), server_id, &db_name_non_existing.to_string() ) @@ -2182,7 +2034,7 @@ mod tests { assert_eq!(metadata, &expected_metadata); tracker.join().await; assert!(!PreservedCatalog::exists( - &server.store, + application.object_store().as_ref(), server_id, &db_name_rules_broken.to_string() ) @@ -2203,7 +2055,7 @@ mod tests { assert_eq!(metadata, &expected_metadata); tracker.join().await; assert!(PreservedCatalog::exists( - &server.store, + application.object_store().as_ref(), server_id, &db_name_catalog_broken.to_string() ) @@ -2230,22 +2082,24 @@ mod tests { .to_string(), "database already exists: db_created" ); - assert!( - PreservedCatalog::exists(&server.store, server_id, &db_name_created.to_string()) - .await - .unwrap() - ); + assert!(PreservedCatalog::exists( + application.object_store().as_ref(), + server_id, + &db_name_created.to_string() + ) + .await + .unwrap()); } #[tokio::test] async fn cannot_create_db_when_catalog_is_present() { - let store = Arc::new(ObjectStore::new_in_memory()); + let application = make_application(); let server_id = ServerId::try_from(1).unwrap(); let db_name = "my_db"; // create catalog PreservedCatalog::new_empty::( - Arc::clone(&store), + Arc::clone(application.object_store()), server_id, db_name.to_string(), (), @@ -2254,10 +2108,7 @@ mod tests { .unwrap(); // setup server - let store = Arc::try_unwrap(store).unwrap(); - let manager = TestConnectionManager::new(); - let config = config_with_store(store); - let server = Server::new(manager, config); + let server = make_server(application); server.set_id(server_id).unwrap(); server.maybe_initialize_server().await; @@ -2268,8 +2119,7 @@ mod tests { #[tokio::test] async fn write_buffer_errors_propagate() { - let manager = TestConnectionManager::new(); - let server = Server::new(manager, config()); + let server = make_server(make_application()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 3ff72f69f1..d63b75a347 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -5,8 +5,8 @@ use object_store::{self, path::ObjectStorePath, ObjectStore, ObjectStoreApi, Thr use observability_deps::tracing::{self, error, info, warn, Instrument}; use panic_logging::SendPanicsToTracing; use server::{ - ConnectionManagerImpl as ConnectionManager, RemoteTemplate, Server as AppServer, - ServerConfig as AppServerConfig, + ApplicationState, ConnectionManagerImpl as ConnectionManager, RemoteTemplate, + Server as AppServer, ServerConfig, }; use snafu::{ResultExt, Snafu}; use std::{convert::TryFrom, fs, net::SocketAddr, path::PathBuf, sync::Arc}; @@ -136,23 +136,17 @@ pub async fn main(config: Config) -> Result<()> { let object_store = ObjectStore::try_from(&config)?; check_object_store(&object_store).await?; let object_storage = Arc::new(object_store); - let metric_registry = Arc::new(metrics::MetricRegistry::new()); - let remote_template = config.remote_template.map(RemoteTemplate::new); - let server_config = AppServerConfig::new( - object_storage, - metric_registry, - remote_template, - config.skip_replay_and_seek_instead, - ); - let server_config = if let Some(n) = config.num_worker_threads { - info!( - num_worker_threads = n, - "Using specified number of worker threads" - ); - server_config.with_num_worker_threads(n) - } else { - server_config + let application = Arc::new(ApplicationState::new( + object_storage, + config.num_worker_threads, + )); + + let server_config = ServerConfig { + remote_template: config.remote_template.map(RemoteTemplate::new), + // TODO: Don't wipe on error (#1522) + wipe_catalog_on_error: true, + skip_replay_and_seek_instead: config.skip_replay_and_seek_instead, }; if config.grpc_bind_address == config.http_bind_address { @@ -165,7 +159,11 @@ pub async fn main(config: Config) -> Result<()> { } let connection_manager = ConnectionManager::new(); - let app_server = Arc::new(AppServer::new(connection_manager, server_config)); + let app_server = Arc::new(AppServer::new( + connection_manager, + Arc::clone(&application), + server_config, + )); // if this ID isn't set the server won't be usable until this is set via an API // call @@ -189,6 +187,7 @@ pub async fn main(config: Config) -> Result<()> { let grpc_server = rpc::serve( socket, + Arc::clone(&application), Arc::clone(&app_server), frontend_shutdown.clone(), config.initial_serving_state.into(), @@ -204,6 +203,7 @@ pub async fn main(config: Config) -> Result<()> { let http_server = http::serve( addr, + Arc::clone(&application), Arc::clone(&app_server), frontend_shutdown.clone(), max_http_request_size, diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 86c53adbfa..9f622490bf 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -19,7 +19,7 @@ use data_types::{ use influxdb_iox_client::format::QueryOutputFormat; use influxdb_line_protocol::parse_lines; use query::{exec::ExecutorType, QueryDatabase}; -use server::{ConnectionManager, Server as AppServer}; +use server::{ApplicationState, ConnectionManager, Server as AppServer}; // External crates use bytes::{Bytes, BytesMut}; @@ -351,6 +351,7 @@ where } fn router( + application: Arc, app_server: Arc>, max_request_size: usize, ) -> Router @@ -365,6 +366,7 @@ where // Create a router and specify the the handlers. Router::builder() .data(server) + .data(application) .middleware(Middleware::pre(|mut req| async move { // we don't need the authorization header and we don't want to accidentally log it. req.headers_mut().remove("authorization"); @@ -703,9 +705,14 @@ async fn handle_metrics( .http_requests .observation() .ok_with_labels(&[metrics::KeyValue::new("path", path)]); - Ok(Response::new(Body::from( - server.metrics_registry().metrics_as_text(), - ))) + + let body = req + .data::>() + .expect("application state") + .metric_registry() + .metrics_as_text(); + + Ok(Response::new(Body::from(body))) } #[derive(Deserialize, Debug)] @@ -860,6 +867,7 @@ async fn pprof_profile( pub async fn serve( addr: AddrIncoming, + application: Arc, server: Arc>, shutdown: CancellationToken, max_request_size: usize, @@ -867,7 +875,7 @@ pub async fn serve( where M: ConnectionManager + Send + Sync + Debug + 'static, { - let router = router(server, max_request_size); + let router = router(application, server, max_request_size); let service = RouterService::new(router).unwrap(); hyper::Server::builder(addr) @@ -889,31 +897,32 @@ mod tests { use reqwest::{Client, Response}; use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName}; + use metrics::TestMetricRegistry; use object_store::ObjectStore; use serde::de::DeserializeOwned; - use server::{db::Db, ConnectionManagerImpl, ServerConfig as AppServerConfig}; + use server::{db::Db, ApplicationState, ConnectionManagerImpl}; use tokio_stream::wrappers::ReceiverStream; - fn config() -> (metrics::TestMetricRegistry, AppServerConfig) { - let registry = Arc::new(metrics::MetricRegistry::new()); - let test_registry = metrics::TestMetricRegistry::new(Arc::clone(®istry)); - ( - test_registry, - AppServerConfig::new( - Arc::new(ObjectStore::new_in_memory()), - registry, - None, - false, - ) - .with_num_worker_threads(1), - ) + fn make_application() -> Arc { + Arc::new(ApplicationState::new( + Arc::new(ObjectStore::new_in_memory()), + None, + )) + } + + fn make_server(application: Arc) -> Arc> { + Arc::new(AppServer::new( + ConnectionManagerImpl::new(), + application, + Default::default(), + )) } #[tokio::test] async fn test_health() { - let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); - let server_url = test_server(Arc::clone(&app_server)); + let application = make_application(); + let app_server = make_server(Arc::clone(&application)); + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); let response = client.get(&format!("{}/health", server_url)).send().await; @@ -924,8 +933,8 @@ mod tests { #[tokio::test] async fn test_write() { - let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); + let application = make_application(); + let app_server = make_server(Arc::clone(&application)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server @@ -934,7 +943,7 @@ mod tests { )) .await .unwrap(); - let server_url = test_server(Arc::clone(&app_server)); + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); @@ -972,8 +981,10 @@ mod tests { #[tokio::test] async fn test_write_metrics() { - let (metrics_registry, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); + let application = make_application(); + let app_server = make_server(Arc::clone(&application)); + let metric_registry = TestMetricRegistry::new(Arc::clone(application.metric_registry())); + app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server @@ -982,7 +993,8 @@ mod tests { )) .await .unwrap(); - let server_url = test_server(Arc::clone(&app_server)); + + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); @@ -1002,7 +1014,7 @@ mod tests { .expect("sent data"); // The request completed successfully - metrics_registry + metric_registry .has_metric_family("http_request_duration_seconds") .with_labels(&[ ("bucket", "MetricsBucket"), @@ -1015,7 +1027,7 @@ mod tests { .unwrap(); // A single successful point landed - metrics_registry + metric_registry .has_metric_family("ingest_points_total") .with_labels(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")]) .counter() @@ -1023,7 +1035,7 @@ mod tests { .unwrap(); // Which consists of two fields - metrics_registry + metric_registry .has_metric_family("ingest_fields_total") .with_labels(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")]) .counter() @@ -1031,7 +1043,7 @@ mod tests { .unwrap(); // Bytes of data were written - metrics_registry + metric_registry .has_metric_family("ingest_points_bytes_total") .with_labels(&[("status", "ok"), ("db_name", "MetricsOrg_MetricsBucket")]) .counter() @@ -1050,7 +1062,7 @@ mod tests { .unwrap(); // A single point was rejected - metrics_registry + metric_registry .has_metric_family("ingest_points_total") .with_labels(&[("db_name", "NotMyOrg_NotMyBucket"), ("status", "error")]) .counter() @@ -1062,8 +1074,8 @@ mod tests { /// returns a client for communicating with the server, and the server /// endpoint async fn setup_test_data() -> (Client, String) { - let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); + let application = make_application(); + let app_server = make_server(Arc::clone(&application)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server @@ -1072,7 +1084,7 @@ mod tests { )) .await .unwrap(); - let server_url = test_server(Arc::clone(&app_server)); + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); @@ -1359,7 +1371,10 @@ mod tests { /// creates an instance of the http service backed by a in-memory /// testable database. Returns the url of the server - fn test_server(server: Arc>) -> String { + fn test_server( + application: Arc, + server: Arc>, + ) -> String { // NB: specify port 0 to let the OS pick the port. let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let addr = AddrIncoming::bind(&bind_addr).expect("failed to bind server"); @@ -1367,6 +1382,7 @@ mod tests { tokio::task::spawn(serve( addr, + application, server, CancellationToken::new(), TEST_MAX_REQUEST_SIZE, @@ -1391,8 +1407,8 @@ mod tests { /// return a test server and the url to contact it for `MyOrg_MyBucket` async fn setup_server() -> (Arc>, String) { - let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); + let application = make_application(); + let app_server = make_server(Arc::clone(&application)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server @@ -1401,7 +1417,7 @@ mod tests { )) .await .unwrap(); - let server_url = test_server(Arc::clone(&app_server)); + let server_url = test_server(application, Arc::clone(&app_server)); (app_server, server_url) } diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index 8ea844e3d1..7ad26560bc 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken; use tonic::transport::NamedService; use crate::influxdb_ioxd::serving_readiness::{ServingReadiness, ServingReadinessState}; -use server::{ConnectionManager, Server}; +use server::{ApplicationState, ConnectionManager, Server}; use tonic::{Interceptor, Status}; pub mod error; @@ -76,6 +76,7 @@ impl From for Interceptor { /// shutdown. pub async fn serve( socket: TcpListener, + application: Arc, server: Arc>, shutdown: CancellationToken, serving_readiness: ServingReadiness, @@ -104,14 +105,18 @@ where testing::make_server(), storage::make_server( Arc::clone(&server), - Arc::clone(server.metrics_registry()), + Arc::clone(application.metric_registry()), serving_gate.clone(), ), flight::make_server(Arc::clone(&server), serving_gate.clone()), write::make_server(Arc::clone(&server), serving_gate.clone()), write_pb::make_server(Arc::clone(&server), serving_gate.clone()), - management::make_server(Arc::clone(&server), serving_readiness.clone()), - operations::make_server(Arc::clone(&server)), + management::make_server( + Arc::clone(&application), + Arc::clone(&server), + serving_readiness.clone() + ), + operations::make_server(Arc::clone(application.job_registry())), ], ); diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 31bedc284e..3be351e2d5 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -9,10 +9,11 @@ use generated_types::google::{ use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; use observability_deps::tracing::info; use query::{DatabaseStore, QueryDatabase}; -use server::{ConnectionManager, Error, Server}; +use server::{ApplicationState, ConnectionManager, Error, Server}; use tonic::{Request, Response, Status}; struct ManagementService { + application: Arc, server: Arc>, serving_readiness: ServingReadiness, } @@ -189,7 +190,10 @@ where request: Request, ) -> Result, Status> { let request = request.into_inner(); - let tracker = self.server.spawn_dummy_job(request.nanos); + let tracker = self + .application + .job_registry() + .spawn_dummy_job(request.nanos); let operation = Some(super::operations::encode_tracker(tracker)?); Ok(Response::new(CreateDummyJobResponse { operation })) } @@ -476,6 +480,7 @@ where } pub fn make_server( + application: Arc, server: Arc>, serving_readiness: ServingReadiness, ) -> management_service_server::ManagementServiceServer< @@ -485,6 +490,7 @@ where M: ConnectionManager + Send + Sync + Debug + 'static, { management_service_server::ManagementServiceServer::new(ManagementService { + application, server, serving_readiness, }) diff --git a/src/influxdb_ioxd/rpc/operations.rs b/src/influxdb_ioxd/rpc/operations.rs index 79a8467499..86d7e6ca6a 100644 --- a/src/influxdb_ioxd/rpc/operations.rs +++ b/src/influxdb_ioxd/rpc/operations.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::convert::TryInto; use std::sync::Arc; use bytes::BytesMut; @@ -18,14 +18,12 @@ use generated_types::{ influxdata::iox::management::v1 as management, protobuf_type_url, }; +use server::JobRegistry; use tracker::{TaskId, TaskResult, TaskStatus, TaskTracker}; -use server::{ConnectionManager, Server}; -use std::convert::TryInto; - /// Implementation of the write service -struct OperationsService { - server: Arc>, +struct OperationsService { + jobs: Arc, } pub fn encode_tracker(tracker: TaskTracker) -> Result { @@ -112,16 +110,13 @@ pub fn encode_tracker(tracker: TaskTracker) -> Result(server: &Server, tracker: String) -> Result, tonic::Status> -where - M: ConnectionManager + Send + Sync, -{ +fn get_tracker(jobs: &JobRegistry, tracker: String) -> Result, tonic::Status> { let tracker_id = tracker.parse::().map_err(|e| FieldViolation { field: "name".to_string(), description: e.to_string(), })?; - let tracker = server.get_job(tracker_id).ok_or(NotFound { + let tracker = jobs.get(tracker_id).ok_or(NotFound { resource_type: "job".to_string(), resource_name: tracker, ..Default::default() @@ -131,18 +126,15 @@ where } #[tonic::async_trait] -impl operations_server::Operations for OperationsService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl operations_server::Operations for OperationsService { async fn list_operations( &self, _request: tonic::Request, ) -> Result, tonic::Status> { // TODO: Support pagination let operations: Result, _> = self - .server - .tracked_jobs() + .jobs + .tracked() .into_iter() .map(encode_tracker) .collect(); @@ -158,7 +150,7 @@ where request: tonic::Request, ) -> Result, tonic::Status> { let request = request.into_inner(); - let tracker = get_tracker(self.server.as_ref(), request.name)?; + let tracker = get_tracker(self.jobs.as_ref(), request.name)?; Ok(Response::new(encode_tracker(tracker)?)) } @@ -178,7 +170,7 @@ where ) -> Result, tonic::Status> { let request = request.into_inner(); - let tracker = get_tracker(self.server.as_ref(), request.name)?; + let tracker = get_tracker(self.jobs.as_ref(), request.name)?; tracker.cancel(); Ok(Response::new(Empty {})) @@ -194,7 +186,7 @@ where let request = request.into_inner(); - let tracker = get_tracker(self.server.as_ref(), request.name)?; + let tracker = get_tracker(self.jobs.as_ref(), request.name)?; if let Some(timeout) = request.timeout { let timeout = timeout.try_into().field("timeout")?; @@ -209,11 +201,8 @@ where } /// Instantiate the write service -pub fn make_server( - server: Arc>, -) -> operations_server::OperationsServer -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - operations_server::OperationsServer::new(OperationsService { server }) +pub fn make_server( + jobs: Arc, +) -> operations_server::OperationsServer { + operations_server::OperationsServer::new(OperationsService { jobs }) }