refactor: extract ApplicationState from Server (#2167)

* refactor: extract Application from Server

* chore: review feedback
pull/24376/head
Raphael Taylor-Davies 2021-08-03 10:36:55 +01:00 committed by GitHub
parent e9bcc01d5c
commit ffb36cd50c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 397 additions and 424 deletions

50
server/src/application.rs Normal file
View File

@ -0,0 +1,50 @@
use std::sync::Arc;
use metrics::MetricRegistry;
use object_store::ObjectStore;
use observability_deps::tracing::info;
use query::exec::Executor;
use crate::JobRegistry;
/// A container for application-global resources
#[derive(Debug, Clone)]
pub struct ApplicationState {
object_store: Arc<ObjectStore>,
executor: Arc<Executor>,
job_registry: Arc<JobRegistry>,
metric_registry: Arc<MetricRegistry>,
}
impl ApplicationState {
/// Creates a new `ApplicationState`
///
/// Uses number of CPUs in the system if num_worker_threads is not set
pub fn new(object_store: Arc<ObjectStore>, num_worker_threads: Option<usize>) -> Self {
let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
Self {
object_store,
executor: Arc::new(Executor::new(num_threads)),
job_registry: Arc::new(JobRegistry::new()),
metric_registry: Arc::new(metrics::MetricRegistry::new()),
}
}
pub fn object_store(&self) -> &Arc<ObjectStore> {
&self.object_store
}
pub fn job_registry(&self) -> &Arc<JobRegistry> {
&self.job_registry
}
pub fn metric_registry(&self) -> &Arc<MetricRegistry> {
&self.metric_registry
}
pub fn executor(&self) -> &Arc<Executor> {
&self.executor
}
}

View File

@ -11,15 +11,13 @@ use metrics::MetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use parquet_file::catalog::PreservedCatalog; use parquet_file::catalog::PreservedCatalog;
use persistence_windows::checkpoint::ReplayPlan; use persistence_windows::checkpoint::ReplayPlan;
use query::exec::Executor;
use write_buffer::config::WriteBufferConfig; use write_buffer::config::WriteBufferConfig;
/// This module contains code for managing the configuration of the server. /// This module contains code for managing the configuration of the server.
use crate::{ use crate::{
db::{catalog::Catalog, DatabaseToCommit, Db}, db::{catalog::Catalog, DatabaseToCommit, Db},
DatabaseAlreadyExists, DatabaseNotFound, DatabaseReserved, Error, ApplicationState, DatabaseAlreadyExists, DatabaseNotFound, DatabaseReserved, Error,
InvalidDatabaseStateTransition, JobRegistry, Result, RulesDatabaseNameMismatch, InvalidDatabaseStateTransition, Result, RulesDatabaseNameMismatch, ServerShuttingDown,
ServerShuttingDown,
}; };
use object_store::path::Path; use object_store::path::Path;
use observability_deps::tracing::{self, error, info, warn, Instrument}; use observability_deps::tracing::{self, error, info, warn, Instrument};
@ -39,12 +37,8 @@ pub(crate) const DB_RULES_FILE_NAME: &str = "rules.pb";
/// run to completion if the tokio runtime is dropped /// run to completion if the tokio runtime is dropped
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Config { pub(crate) struct Config {
jobs: Arc<JobRegistry>, application: Arc<ApplicationState>,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
server_id: ServerId, server_id: ServerId,
metric_registry: Arc<MetricRegistry>,
shutdown: CancellationToken, shutdown: CancellationToken,
state: RwLock<ConfigState>, state: RwLock<ConfigState>,
} }
@ -62,19 +56,10 @@ impl<E> From<Error> for UpdateError<E> {
impl Config { impl Config {
/// Create new empty config. /// Create new empty config.
pub(crate) fn new( pub(crate) fn new(application: Arc<ApplicationState>, server_id: ServerId) -> Self {
jobs: Arc<JobRegistry>,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
server_id: ServerId,
metric_registry: Arc<MetricRegistry>,
) -> Self {
Self { Self {
jobs, application,
object_store,
exec,
server_id, server_id,
metric_registry,
shutdown: Default::default(), shutdown: Default::default(),
state: Default::default(), state: Default::default(),
} }
@ -272,13 +257,17 @@ impl Config {
} }
/// Metrics registry associated with this config and that should be used to create all databases. /// Metrics registry associated with this config and that should be used to create all databases.
///
/// TODO: Remove Me
pub fn metrics_registry(&self) -> Arc<MetricRegistry> { pub fn metrics_registry(&self) -> Arc<MetricRegistry> {
Arc::clone(&self.metric_registry) Arc::clone(self.application.metric_registry())
} }
/// Returns the object store of this server /// Returns the object store of this server
///
/// TODO: Remove Me
pub fn object_store(&self) -> Arc<ObjectStore> { pub fn object_store(&self) -> Arc<ObjectStore> {
Arc::clone(&self.object_store) Arc::clone(self.application.object_store())
} }
/// Returns the server id of this server /// Returns the server id of this server
@ -289,7 +278,7 @@ impl Config {
/// Base location in object store for this server. /// Base location in object store for this server.
pub fn root_path(&self) -> Path { pub fn root_path(&self) -> Path {
let id = self.server_id.get(); let id = self.server_id.get();
let mut path = self.object_store.new_path(); let mut path = self.application.object_store().new_path();
path.push_dir(format!("{}", id)); path.push_dir(format!("{}", id));
path path
} }
@ -460,7 +449,7 @@ impl<'a> DatabaseHandle<'a> {
/// Get object store. /// Get object store.
pub fn object_store(&self) -> Arc<ObjectStore> { pub fn object_store(&self) -> Arc<ObjectStore> {
Arc::clone(&self.config.object_store) self.config.object_store()
} }
/// Get server ID. /// Get server ID.
@ -542,16 +531,22 @@ impl<'a> DatabaseHandle<'a> {
) -> Result<()> { ) -> Result<()> {
match self.state().as_ref() { match self.state().as_ref() {
DatabaseState::RulesLoaded { rules } => { DatabaseState::RulesLoaded { rules } => {
let application = &self.config.application;
let database_to_commit = DatabaseToCommit { let database_to_commit = DatabaseToCommit {
server_id: self.config.server_id, server_id: self.config.server_id,
object_store: Arc::clone(&self.config.object_store), object_store: Arc::clone(application.object_store()),
exec: Arc::clone(&self.config.exec), exec: Arc::clone(application.executor()),
preserved_catalog, preserved_catalog,
catalog, catalog,
rules: Arc::clone(rules), rules: Arc::clone(rules),
write_buffer, write_buffer,
}; };
let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs)));
let db = Arc::new(Db::new(
database_to_commit,
Arc::clone(application.job_registry()),
));
self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan })); self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan }));
@ -641,14 +636,7 @@ mod test {
fn make_config() -> Config { fn make_config() -> Config {
let store = Arc::new(ObjectStore::new_in_memory()); let store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new()); Config::new(Arc::new(ApplicationState::new(store, Some(1))), server_id)
Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&store),
Arc::new(Executor::new(1)),
server_id,
Arc::clone(&metric_registry),
)
} }
#[tokio::test] #[tokio::test]

69
server/src/job.rs Normal file
View File

@ -0,0 +1,69 @@
use data_types::job::Job;
use parking_lot::Mutex;
use std::convert::Infallible;
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
const JOB_HISTORY_SIZE: usize = 1000;
/// The global job registry
#[derive(Debug)]
pub struct JobRegistry {
inner: Mutex<TaskRegistryWithHistory<Job>>,
}
impl Default for JobRegistry {
fn default() -> Self {
Self {
inner: Mutex::new(TaskRegistryWithHistory::new(JOB_HISTORY_SIZE)),
}
}
}
impl JobRegistry {
pub fn new() -> Self {
Default::default()
}
pub fn register(&self, job: Job) -> (TaskTracker<Job>, TaskRegistration) {
self.inner.lock().register(job)
}
/// Returns a list of recent Jobs, including some that are no
/// longer running
pub fn tracked(&self) -> Vec<TaskTracker<Job>> {
self.inner.lock().tracked()
}
pub fn get(&self, id: TaskId) -> Option<TaskTracker<Job>> {
self.inner.lock().get(id)
}
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> TaskTracker<Job> {
let (tracker, registration) = self.register(Job::Dummy {
nanos: nanos.clone(),
});
for duration in nanos {
tokio::spawn(
async move {
tokio::time::sleep(tokio::time::Duration::from_nanos(duration)).await;
Ok::<_, Infallible>(())
}
.track(registration.clone()),
);
}
tracker
}
/// Reclaims jobs into the historical archive
///
/// Returns the number of remaining jobs
///
/// Should be called periodically
pub(crate) fn reclaim(&self) -> usize {
let mut lock = self.inner.lock();
lock.reclaim();
lock.tracked_len()
}
}

View File

@ -69,7 +69,7 @@
)] )]
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::{Infallible, TryInto}; use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -91,25 +91,29 @@ use generated_types::database_rules::encode_database_rules;
use generated_types::influxdata::transfer::column::v1 as pb; use generated_types::influxdata::transfer::column::v1 as pb;
use influxdb_line_protocol::ParsedLine; use influxdb_line_protocol::ParsedLine;
use lifecycle::LockableChunk; use lifecycle::LockableChunk;
use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry}; use metrics::{KeyValue, MetricObserverBuilder};
use object_store::{ObjectStore, ObjectStoreApi}; use object_store::ObjectStoreApi;
use observability_deps::tracing::{error, info, warn}; use observability_deps::tracing::{error, info, warn};
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use query::{exec::Executor, DatabaseStore}; use query::{exec::Executor, DatabaseStore};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use resolver::Resolver; use resolver::Resolver;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; use tracker::{TaskTracker, TrackedFutureExt};
use write_buffer::config::WriteBufferConfig; use write_buffer::config::WriteBufferConfig;
pub use application::ApplicationState;
pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer}; pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer};
pub use db::Db; pub use db::Db;
pub use job::JobRegistry;
pub use resolver::{GrpcConnectionString, RemoteTemplate}; pub use resolver::{GrpcConnectionString, RemoteTemplate};
mod application;
mod config; mod config;
mod connection; mod connection;
pub mod db; pub mod db;
mod init; mod init;
mod job;
mod resolver; mod resolver;
/// Utility modules used by benchmarks and tests /// Utility modules used by benchmarks and tests
@ -255,86 +259,6 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
const JOB_HISTORY_SIZE: usize = 1000;
/// The global job registry
#[derive(Debug)]
pub struct JobRegistry {
inner: Mutex<TaskRegistryWithHistory<Job>>,
}
impl Default for JobRegistry {
fn default() -> Self {
Self {
inner: Mutex::new(TaskRegistryWithHistory::new(JOB_HISTORY_SIZE)),
}
}
}
impl JobRegistry {
pub fn new() -> Self {
Default::default()
}
pub fn register(&self, job: Job) -> (TaskTracker<Job>, TaskRegistration) {
self.inner.lock().register(job)
}
/// Returns a list of recent Jobs, including some that are no
/// longer running
pub fn tracked(&self) -> Vec<TaskTracker<Job>> {
self.inner.lock().tracked()
}
}
/// Used to configure a server instance
#[derive(Debug)]
pub struct ServerConfig {
// number of executor worker threads. If not specified, defaults
// to number of cores on the system.
num_worker_threads: Option<usize>,
/// The `ObjectStore` instance to use for persistence
object_store: Arc<ObjectStore>,
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
wipe_catalog_on_error: bool,
skip_replay_and_seek_instead: bool,
}
impl ServerConfig {
/// Create a new config using the specified store.
pub fn new(
object_store: Arc<ObjectStore>,
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
skip_replay_and_seek_instead: bool,
) -> Self {
Self {
num_worker_threads: None,
object_store,
metric_registry,
remote_template,
wipe_catalog_on_error: true,
skip_replay_and_seek_instead,
}
}
/// Use `num` worker threads for running queries
pub fn with_num_worker_threads(mut self, num: usize) -> Self {
self.num_worker_threads = Some(num);
self
}
/// return a reference to the object store in this configuration
pub fn store(&self) -> Arc<ObjectStore> {
Arc::clone(&self.object_store)
}
}
// A collection of metrics used to instrument the Server. // A collection of metrics used to instrument the Server.
#[derive(Debug)] #[derive(Debug)]
pub struct ServerMetrics { pub struct ServerMetrics {
@ -418,21 +342,36 @@ impl ServerMetrics {
} }
} }
/// Configuration options for `Server`
#[derive(Debug)]
pub struct ServerConfig {
pub remote_template: Option<RemoteTemplate>,
pub wipe_catalog_on_error: bool,
pub skip_replay_and_seek_instead: bool,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
remote_template: None,
wipe_catalog_on_error: false,
skip_replay_and_seek_instead: false,
}
}
}
/// `Server` is the container struct for how servers store data internally, as /// `Server` is the container struct for how servers store data internally, as
/// well as how they communicate with other servers. Each server will have one /// well as how they communicate with other servers. Each server will have one
/// of these structs, which keeps track of all replication and query rules. /// of these structs, which keeps track of all replication and query rules.
#[derive(Debug)] #[derive(Debug)]
pub struct Server<M: ConnectionManager> { pub struct Server<M: ConnectionManager> {
connection_manager: Arc<M>, connection_manager: Arc<M>,
store: Arc<ObjectStore>,
exec: Arc<Executor>,
jobs: Arc<JobRegistry>,
metrics: Arc<ServerMetrics>,
/// The metrics registry associated with the server. This is needed not for application: Arc<ApplicationState>,
/// recording telemetry, but because the server hosts the /metric endpoint
/// and populates the endpoint with this data. metrics: Arc<ServerMetrics>,
registry: Arc<metrics::MetricRegistry>,
/// Resolver for mapping ServerId to gRPC connection strings /// Resolver for mapping ServerId to gRPC connection strings
resolver: RwLock<Resolver>, resolver: RwLock<Resolver>,
@ -500,33 +439,23 @@ impl<M> Server<M>
where where
M: ConnectionManager + Send + Sync, M: ConnectionManager + Send + Sync,
{ {
pub fn new(connection_manager: M, config: ServerConfig) -> Self { pub fn new(
let jobs = Arc::new(JobRegistry::new()); connection_manager: M,
application: Arc<ApplicationState>,
let ServerConfig { config: ServerConfig,
num_worker_threads, ) -> Self {
object_store, let metrics = Arc::new(ServerMetrics::new(Arc::clone(
// to test the metrics provide a different registry to the `ServerConfig`. application.metric_registry(),
metric_registry, )));
remote_template,
wipe_catalog_on_error,
skip_replay_and_seek_instead,
} = config;
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
let exec = Arc::new(Executor::new(num_worker_threads));
Self { Self {
store: object_store, application,
metrics,
connection_manager: Arc::new(connection_manager), connection_manager: Arc::new(connection_manager),
exec, resolver: RwLock::new(Resolver::new(config.remote_template)),
jobs,
metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))),
registry: Arc::clone(&metric_registry),
resolver: RwLock::new(Resolver::new(remote_template)),
stage: Arc::new(RwLock::new(ServerStage::Startup { stage: Arc::new(RwLock::new(ServerStage::Startup {
wipe_catalog_on_error, wipe_catalog_on_error: config.wipe_catalog_on_error,
skip_replay_and_seek_instead, skip_replay_and_seek_instead: config.skip_replay_and_seek_instead,
})), })),
} }
} }
@ -545,13 +474,7 @@ where
*stage = ServerStage::InitReady { *stage = ServerStage::InitReady {
wipe_catalog_on_error: *wipe_catalog_on_error, wipe_catalog_on_error: *wipe_catalog_on_error,
skip_replay_and_seek_instead: *skip_replay_and_seek_instead, skip_replay_and_seek_instead: *skip_replay_and_seek_instead,
config: Arc::new(Config::new( config: Arc::new(Config::new(Arc::clone(&self.application), id)),
Arc::clone(&self.jobs),
Arc::clone(&self.store),
Arc::clone(&self.exec),
id,
Arc::clone(&self.registry),
)),
last_error: None, last_error: None,
}; };
Ok(()) Ok(())
@ -560,11 +483,6 @@ where
} }
} }
/// Returns the metrics registry associated with this server
pub fn metrics_registry(&self) -> &Arc<MetricRegistry> {
&self.registry
}
/// Return the metrics associated with this server /// Return the metrics associated with this server
pub fn metrics(&self) -> &Arc<ServerMetrics> { pub fn metrics(&self) -> &Arc<ServerMetrics> {
&self.metrics &self.metrics
@ -657,9 +575,9 @@ where
// load preserved catalog // load preserved catalog
let (preserved_catalog, catalog, replay_plan) = create_preserved_catalog( let (preserved_catalog, catalog, replay_plan) = create_preserved_catalog(
rules.db_name(), rules.db_name(),
Arc::clone(&self.store), Arc::clone(self.application.object_store()),
config.server_id(), config.server_id(),
config.metrics_registry(), Arc::clone(self.application.metric_registry()),
) )
.await .await
.map_err(|e| Box::new(e) as _) .map_err(|e| Box::new(e) as _)
@ -694,7 +612,8 @@ where
let len = data.len(); let len = data.len();
let stream_data = std::io::Result::Ok(data.freeze()); let stream_data = std::io::Result::Ok(data.freeze());
self.store self.application
.object_store()
.put( .put(
&location, &location,
futures::stream::once(async move { stream_data }), futures::stream::once(async move { stream_data }),
@ -1051,24 +970,6 @@ where
self.resolver.write().delete_remote(id) self.resolver.write().delete_remote(id)
} }
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> TaskTracker<Job> {
let (tracker, registration) = self.jobs.register(Job::Dummy {
nanos: nanos.clone(),
});
for duration in nanos {
tokio::spawn(
async move {
tokio::time::sleep(tokio::time::Duration::from_nanos(duration)).await;
Ok::<_, Infallible>(())
}
.track(registration.clone()),
);
}
tracker
}
/// Closes a chunk and starts moving its data to the read buffer, as a /// Closes a chunk and starts moving its data to the read buffer, as a
/// background job, dropping when complete. /// background job, dropping when complete.
pub fn close_chunk( pub fn close_chunk(
@ -1135,9 +1036,12 @@ where
} }
}; };
let (tracker, registration) = self.jobs.register(Job::WipePreservedCatalog { let (tracker, registration) =
db_name: Arc::from(db_name.as_str()), self.application
}); .job_registry()
.register(Job::WipePreservedCatalog {
db_name: Arc::from(db_name.as_str()),
});
let state = Arc::clone(&self.stage); let state = Arc::clone(&self.stage);
let db_name = db_name.clone(); let db_name = db_name.clone();
@ -1169,25 +1073,18 @@ where
Ok(tracker) Ok(tracker)
} }
/// Returns a list of all jobs tracked by this server
pub fn tracked_jobs(&self) -> Vec<TaskTracker<Job>> {
self.jobs.inner.lock().tracked()
}
/// Returns a specific job tracked by this server
pub fn get_job(&self, id: TaskId) -> Option<TaskTracker<Job>> {
self.jobs.inner.lock().get(id)
}
/// Background worker function for the server /// Background worker function for the server
pub async fn background_worker(&self, shutdown: tokio_util::sync::CancellationToken) { pub async fn background_worker(&self, shutdown: tokio_util::sync::CancellationToken) {
info!("started background worker"); info!("started background worker");
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
// TODO: Move out of Server background loop
let job_registry = self.application.job_registry();
while !shutdown.is_cancelled() { while !shutdown.is_cancelled() {
self.maybe_initialize_server().await; self.maybe_initialize_server().await;
self.jobs.inner.lock().reclaim(); job_registry.reclaim();
tokio::select! { tokio::select! {
_ = interval.tick() => {}, _ = interval.tick() => {},
@ -1205,9 +1102,7 @@ where
// Wait for any outstanding jobs to finish - frontend shutdown should be // Wait for any outstanding jobs to finish - frontend shutdown should be
// sequenced before shutting down the background workers and so there // sequenced before shutting down the background workers and so there
// shouldn't be any // shouldn't be any
while self.jobs.inner.lock().tracked_len() != 0 { while job_registry.reclaim() != 0 {
self.jobs.inner.lock().reclaim();
interval.tick().await; interval.tick().await;
} }
@ -1262,14 +1157,14 @@ where
/// Return a handle to the query executor /// Return a handle to the query executor
fn executor(&self) -> Arc<Executor> { fn executor(&self) -> Arc<Executor> {
Arc::clone(&self.exec) Arc::clone(self.application.executor())
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
convert::TryFrom, convert::{Infallible, TryFrom},
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
@ -1280,7 +1175,6 @@ mod tests {
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use bytes::Bytes; use bytes::Bytes;
use futures::TryStreamExt; use futures::TryStreamExt;
use tempfile::TempDir;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -1293,8 +1187,8 @@ mod tests {
use entry::test_helpers::lp_to_entry; use entry::test_helpers::lp_to_entry;
use generated_types::database_rules::decode_database_rules; use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
use metrics::MetricRegistry; use metrics::TestMetricRegistry;
use object_store::path::ObjectStorePath; use object_store::{path::ObjectStorePath, ObjectStore};
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog}; use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase}; use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
use write_buffer::mock::MockBufferForWritingThatAlwaysErrors; use write_buffer::mock::MockBufferForWritingThatAlwaysErrors;
@ -1306,35 +1200,24 @@ mod tests {
const ARBITRARY_DEFAULT_TIME: i64 = 456; const ARBITRARY_DEFAULT_TIME: i64 = 456;
// TODO: perhaps switch to a builder pattern. fn make_application() -> Arc<ApplicationState> {
fn config_with_metric_registry_and_store( Arc::new(ApplicationState::new(
object_store: ObjectStore, Arc::new(ObjectStore::new_in_memory()),
) -> (metrics::TestMetricRegistry, ServerConfig) { None,
let registry = Arc::new(metrics::MetricRegistry::new()); ))
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(&registry));
(
test_registry,
ServerConfig::new(Arc::new(object_store), registry, None, false)
.with_num_worker_threads(1),
)
} }
fn config_with_metric_registry() -> (metrics::TestMetricRegistry, ServerConfig) { fn make_server(application: Arc<ApplicationState>) -> Arc<Server<TestConnectionManager>> {
config_with_metric_registry_and_store(ObjectStore::new_in_memory()) Arc::new(Server::new(
} TestConnectionManager::new(),
application,
fn config() -> ServerConfig { Default::default(),
config_with_metric_registry().1 ))
}
fn config_with_store(object_store: ObjectStore) -> ServerConfig {
config_with_metric_registry_and_store(object_store).1
} }
#[tokio::test] #[tokio::test]
async fn server_api_calls_return_error_with_no_id_set() { async fn server_api_calls_return_error_with_no_id_set() {
let manager = TestConnectionManager::new(); let server = make_server(make_application());
let server = Server::new(manager, config());
let resp = server.config().unwrap_err(); let resp = server.config().unwrap_err();
assert!(matches!(resp, Error::IdNotSet)); assert!(matches!(resp, Error::IdNotSet));
@ -1349,10 +1232,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn create_database_persists_rules() { async fn create_database_persists_rules() {
let manager = TestConnectionManager::new(); let application = make_application();
let config = config(); let server = make_server(Arc::clone(&application));
let store = config.store();
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1378,12 +1259,12 @@ mod tests {
.await .await
.expect("failed to create database"); .expect("failed to create database");
let mut rules_path = server.store.new_path(); let mut rules_path = application.object_store().new_path();
rules_path.push_all_dirs(&["1", name.as_str()]); rules_path.push_all_dirs(&["1", name.as_str()]);
rules_path.set_file_name("rules.pb"); rules_path.set_file_name("rules.pb");
let read_data = server let read_data = application
.store .object_store()
.get(&rules_path) .get(&rules_path)
.await .await
.unwrap() .unwrap()
@ -1403,13 +1284,13 @@ mod tests {
.await .await
.expect("failed to create 2nd db"); .expect("failed to create 2nd db");
store.list_with_delimiter(&store.new_path()).await.unwrap(); application
.object_store()
.list_with_delimiter(&application.object_store().new_path())
.await
.unwrap();
let manager = TestConnectionManager::new(); let server2 = make_server(application);
let config2 =
ServerConfig::new(store, Arc::new(MetricRegistry::new()), Option::None, false)
.with_num_worker_threads(1);
let server2 = Server::new(manager, config2);
server2.set_id(ServerId::try_from(1).unwrap()).unwrap(); server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
server2.maybe_initialize_server().await; server2.maybe_initialize_server().await;
@ -1421,8 +1302,7 @@ mod tests {
async fn duplicate_database_name_rejected() { async fn duplicate_database_name_rejected() {
// Covers #643 // Covers #643
let manager = TestConnectionManager::new(); let server = make_server(make_application());
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1471,28 +1351,23 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn load_databases() { async fn load_databases() {
let temp_dir = TempDir::new().unwrap(); let application = make_application();
let store = application.object_store();
let store = ObjectStore::new_file(temp_dir.path()); let server = make_server(Arc::clone(&application));
let manager = TestConnectionManager::new();
let config = config_with_store(store);
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
create_simple_database(&server, "bananas") create_simple_database(&server, "bananas")
.await .await
.expect("failed to create database"); .expect("failed to create database");
let mut rules_path = server.store.new_path(); let mut rules_path = store.new_path();
rules_path.push_all_dirs(&["1", "bananas"]); rules_path.push_all_dirs(&["1", "bananas"]);
rules_path.set_file_name("rules.pb"); rules_path.set_file_name("rules.pb");
std::mem::drop(server); std::mem::drop(server);
let store = ObjectStore::new_file(temp_dir.path()); let server = make_server(Arc::clone(&application));
let manager = TestConnectionManager::new();
let config = config_with_store(store);
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1504,15 +1379,12 @@ mod tests {
std::mem::drop(server); std::mem::drop(server);
let store = ObjectStore::new_file(temp_dir.path());
store store
.delete(&rules_path) .delete(&rules_path)
.await .await
.expect("cannot delete rules file"); .expect("cannot delete rules file");
let manager = TestConnectionManager::new(); let server = make_server(application);
let config = config_with_store(store);
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1521,8 +1393,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn db_names_sorted() { async fn db_names_sorted() {
let manager = TestConnectionManager::new(); let server = make_server(make_application());
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1542,8 +1413,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn writes_local() { async fn writes_local() {
let manager = TestConnectionManager::new(); let server = make_server(make_application());
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1576,9 +1446,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn write_entry_local() { async fn write_entry_local() {
let (metric_registry, config) = config_with_metric_registry(); let application = make_application();
let manager = TestConnectionManager::new(); let metric_registry = TestMetricRegistry::new(Arc::clone(application.metric_registry()));
let server = Server::new(manager, config); let server = make_server(application);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1660,7 +1530,7 @@ mod tests {
}), }),
); );
let server = Server::new(manager, config()); let server = Server::new(manager, make_application(), Default::default());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1737,8 +1607,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn close_chunk() { async fn close_chunk() {
test_helpers::maybe_start_logging(); test_helpers::maybe_start_logging();
let manager = TestConnectionManager::new(); let server = make_server(make_application());
let server = Arc::new(Server::new(manager, config()));
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone()); let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone());
@ -1811,14 +1680,14 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn background_task_cleans_jobs() { async fn background_task_cleans_jobs() {
let manager = TestConnectionManager::new(); let application = make_application();
let server = Arc::new(Server::new(manager, config())); let server = make_server(Arc::clone(&application));
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone()); let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone());
let wait_nanos = 1000; let wait_nanos = 1000;
let job = server.spawn_dummy_job(vec![wait_nanos]); let job = application.job_registry().spawn_dummy_job(vec![wait_nanos]);
// Note: this will hang forever if the background task has not been started // Note: this will hang forever if the background task has not been started
job.join().await; job.join().await;
@ -1843,8 +1712,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn hard_buffer_limit() { async fn hard_buffer_limit() {
let manager = TestConnectionManager::new(); let server = make_server(make_application());
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1894,12 +1762,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn cannot_create_db_until_server_is_initialized() { async fn cannot_create_db_until_server_is_initialized() {
let temp_dir = TempDir::new().unwrap(); let server = make_server(make_application());
let store = ObjectStore::new_file(temp_dir.path());
let manager = TestConnectionManager::new();
let config = config_with_store(store);
let server = Server::new(manager, config);
// calling before serverID set leads to `IdNotSet` // calling before serverID set leads to `IdNotSet`
let err = create_simple_database(&server, "bananas") let err = create_simple_database(&server, "bananas")
@ -1919,8 +1782,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn background_worker_eventually_inits_server() { async fn background_worker_eventually_inits_server() {
let manager = TestConnectionManager::new(); let server = make_server(make_application());
let server = Arc::new(Server::new(manager, config()));
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone()); let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone());
@ -1944,11 +1806,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn init_error_generic() { async fn init_error_generic() {
// use an object store that will hopefully fail to read // use an object store that will hopefully fail to read
let store = ObjectStore::new_failing_store().unwrap(); let store = Arc::new(ObjectStore::new_failing_store().unwrap());
let application = Arc::new(ApplicationState::new(store, None));
let manager = TestConnectionManager::new(); let server = make_server(application);
let config = config_with_store(store);
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1957,15 +1817,11 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn init_error_database() { async fn init_error_database() {
let store = ObjectStore::new_in_memory(); let application = make_application();
let store = Arc::clone(application.object_store());
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
// Create temporary server to create single database let server = make_server(Arc::clone(&application));
let manager = TestConnectionManager::new();
let config = config_with_store(store);
let store = config.store();
let server = Server::new(manager, config);
server.set_id(server_id).unwrap(); server.set_id(server_id).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -1992,13 +1848,10 @@ mod tests {
.await .await
.unwrap(); .unwrap();
// start server
let store = Arc::try_unwrap(store).unwrap();
store.get(&path).await.unwrap(); store.get(&path).await.unwrap();
let manager = TestConnectionManager::new();
let config = config_with_store(store);
let server = Server::new(manager, config); // start server
let server = make_server(application);
server.set_id(server_id).unwrap(); server.set_id(server_id).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -2052,14 +1905,12 @@ mod tests {
let db_name_created = DatabaseName::new("db_created".to_string()).unwrap(); let db_name_created = DatabaseName::new("db_created".to_string()).unwrap();
// setup // setup
let store = ObjectStore::new_in_memory(); let application = make_application();
let store = Arc::clone(application.object_store());
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
// Create temporary server to create existing databases // Create temporary server to create existing databases
let manager = TestConnectionManager::new(); let server = make_server(Arc::clone(&application));
let config = config_with_store(store);
let store = config.store();
let server = Server::new(manager, config);
server.set_id(server_id).unwrap(); server.set_id(server_id).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -2085,6 +1936,7 @@ mod tests {
let path = object_store_path_for_database_config(&root, &db_name_rules_broken); let path = object_store_path_for_database_config(&root, &db_name_rules_broken);
let data = Bytes::from("x"); let data = Bytes::from("x");
let len = data.len(); let len = data.len();
store store
.put( .put(
&path, &path,
@ -2093,6 +1945,7 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
let (preserved_catalog, _catalog) = PreservedCatalog::load::<TestCatalogState>( let (preserved_catalog, _catalog) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&store), Arc::clone(&store),
server_id, server_id,
@ -2102,18 +1955,15 @@ mod tests {
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog) parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await; .await;
drop(preserved_catalog); drop(preserved_catalog);
// boot actual test server
let store = Arc::try_unwrap(store).unwrap();
store.get(&path).await.unwrap(); store.get(&path).await.unwrap();
let manager = TestConnectionManager::new();
// need to disable auto-wipe for this test // boot actual test server
let mut config = config_with_store(store); let server = make_server(Arc::clone(&application));
config.wipe_catalog_on_error = false;
let server = Server::new(manager, config);
// cannot wipe if server ID is not set // cannot wipe if server ID is not set
assert_eq!( assert_eq!(
@ -2135,16 +1985,18 @@ mod tests {
.to_string(), .to_string(),
"database already exists: db_existing" "database already exists: db_existing"
); );
assert!( assert!(PreservedCatalog::exists(
PreservedCatalog::exists(&server.store, server_id, db_name_existing.as_str()) application.object_store().as_ref(),
.await server_id,
.unwrap() db_name_existing.as_str()
); )
.await
.unwrap());
// 2. wiping a non-existing DB just works, but won't bring DB into existence // 2. wiping a non-existing DB just works, but won't bring DB into existence
assert!(server.error_database(&db_name_non_existing).is_none()); assert!(server.error_database(&db_name_non_existing).is_none());
PreservedCatalog::new_empty::<TestCatalogState>( PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&server.store), Arc::clone(application.object_store()),
server_id, server_id,
db_name_non_existing.to_string(), db_name_non_existing.to_string(),
(), (),
@ -2161,7 +2013,7 @@ mod tests {
assert_eq!(metadata, &expected_metadata); assert_eq!(metadata, &expected_metadata);
tracker.join().await; tracker.join().await;
assert!(!PreservedCatalog::exists( assert!(!PreservedCatalog::exists(
&server.store, application.object_store().as_ref(),
server_id, server_id,
&db_name_non_existing.to_string() &db_name_non_existing.to_string()
) )
@ -2182,7 +2034,7 @@ mod tests {
assert_eq!(metadata, &expected_metadata); assert_eq!(metadata, &expected_metadata);
tracker.join().await; tracker.join().await;
assert!(!PreservedCatalog::exists( assert!(!PreservedCatalog::exists(
&server.store, application.object_store().as_ref(),
server_id, server_id,
&db_name_rules_broken.to_string() &db_name_rules_broken.to_string()
) )
@ -2203,7 +2055,7 @@ mod tests {
assert_eq!(metadata, &expected_metadata); assert_eq!(metadata, &expected_metadata);
tracker.join().await; tracker.join().await;
assert!(PreservedCatalog::exists( assert!(PreservedCatalog::exists(
&server.store, application.object_store().as_ref(),
server_id, server_id,
&db_name_catalog_broken.to_string() &db_name_catalog_broken.to_string()
) )
@ -2230,22 +2082,24 @@ mod tests {
.to_string(), .to_string(),
"database already exists: db_created" "database already exists: db_created"
); );
assert!( assert!(PreservedCatalog::exists(
PreservedCatalog::exists(&server.store, server_id, &db_name_created.to_string()) application.object_store().as_ref(),
.await server_id,
.unwrap() &db_name_created.to_string()
); )
.await
.unwrap());
} }
#[tokio::test] #[tokio::test]
async fn cannot_create_db_when_catalog_is_present() { async fn cannot_create_db_when_catalog_is_present() {
let store = Arc::new(ObjectStore::new_in_memory()); let application = make_application();
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
let db_name = "my_db"; let db_name = "my_db";
// create catalog // create catalog
PreservedCatalog::new_empty::<TestCatalogState>( PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&store), Arc::clone(application.object_store()),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
(), (),
@ -2254,10 +2108,7 @@ mod tests {
.unwrap(); .unwrap();
// setup server // setup server
let store = Arc::try_unwrap(store).unwrap(); let server = make_server(application);
let manager = TestConnectionManager::new();
let config = config_with_store(store);
let server = Server::new(manager, config);
server.set_id(server_id).unwrap(); server.set_id(server_id).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;
@ -2268,8 +2119,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn write_buffer_errors_propagate() { async fn write_buffer_errors_propagate() {
let manager = TestConnectionManager::new(); let server = make_server(make_application());
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await; server.maybe_initialize_server().await;

View File

@ -5,8 +5,8 @@ use object_store::{self, path::ObjectStorePath, ObjectStore, ObjectStoreApi, Thr
use observability_deps::tracing::{self, error, info, warn, Instrument}; use observability_deps::tracing::{self, error, info, warn, Instrument};
use panic_logging::SendPanicsToTracing; use panic_logging::SendPanicsToTracing;
use server::{ use server::{
ConnectionManagerImpl as ConnectionManager, RemoteTemplate, Server as AppServer, ApplicationState, ConnectionManagerImpl as ConnectionManager, RemoteTemplate,
ServerConfig as AppServerConfig, Server as AppServer, ServerConfig,
}; };
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{convert::TryFrom, fs, net::SocketAddr, path::PathBuf, sync::Arc}; use std::{convert::TryFrom, fs, net::SocketAddr, path::PathBuf, sync::Arc};
@ -136,23 +136,17 @@ pub async fn main(config: Config) -> Result<()> {
let object_store = ObjectStore::try_from(&config)?; let object_store = ObjectStore::try_from(&config)?;
check_object_store(&object_store).await?; check_object_store(&object_store).await?;
let object_storage = Arc::new(object_store); let object_storage = Arc::new(object_store);
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let remote_template = config.remote_template.map(RemoteTemplate::new);
let server_config = AppServerConfig::new(
object_storage,
metric_registry,
remote_template,
config.skip_replay_and_seek_instead,
);
let server_config = if let Some(n) = config.num_worker_threads { let application = Arc::new(ApplicationState::new(
info!( object_storage,
num_worker_threads = n, config.num_worker_threads,
"Using specified number of worker threads" ));
);
server_config.with_num_worker_threads(n) let server_config = ServerConfig {
} else { remote_template: config.remote_template.map(RemoteTemplate::new),
server_config // TODO: Don't wipe on error (#1522)
wipe_catalog_on_error: true,
skip_replay_and_seek_instead: config.skip_replay_and_seek_instead,
}; };
if config.grpc_bind_address == config.http_bind_address { if config.grpc_bind_address == config.http_bind_address {
@ -165,7 +159,11 @@ pub async fn main(config: Config) -> Result<()> {
} }
let connection_manager = ConnectionManager::new(); let connection_manager = ConnectionManager::new();
let app_server = Arc::new(AppServer::new(connection_manager, server_config)); let app_server = Arc::new(AppServer::new(
connection_manager,
Arc::clone(&application),
server_config,
));
// if this ID isn't set the server won't be usable until this is set via an API // if this ID isn't set the server won't be usable until this is set via an API
// call // call
@ -189,6 +187,7 @@ pub async fn main(config: Config) -> Result<()> {
let grpc_server = rpc::serve( let grpc_server = rpc::serve(
socket, socket,
Arc::clone(&application),
Arc::clone(&app_server), Arc::clone(&app_server),
frontend_shutdown.clone(), frontend_shutdown.clone(),
config.initial_serving_state.into(), config.initial_serving_state.into(),
@ -204,6 +203,7 @@ pub async fn main(config: Config) -> Result<()> {
let http_server = http::serve( let http_server = http::serve(
addr, addr,
Arc::clone(&application),
Arc::clone(&app_server), Arc::clone(&app_server),
frontend_shutdown.clone(), frontend_shutdown.clone(),
max_http_request_size, max_http_request_size,

View File

@ -19,7 +19,7 @@ use data_types::{
use influxdb_iox_client::format::QueryOutputFormat; use influxdb_iox_client::format::QueryOutputFormat;
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
use query::{exec::ExecutorType, QueryDatabase}; use query::{exec::ExecutorType, QueryDatabase};
use server::{ConnectionManager, Server as AppServer}; use server::{ApplicationState, ConnectionManager, Server as AppServer};
// External crates // External crates
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
@ -351,6 +351,7 @@ where
} }
fn router<M>( fn router<M>(
application: Arc<ApplicationState>,
app_server: Arc<AppServer<M>>, app_server: Arc<AppServer<M>>,
max_request_size: usize, max_request_size: usize,
) -> Router<Body, ApplicationError> ) -> Router<Body, ApplicationError>
@ -365,6 +366,7 @@ where
// Create a router and specify the the handlers. // Create a router and specify the the handlers.
Router::builder() Router::builder()
.data(server) .data(server)
.data(application)
.middleware(Middleware::pre(|mut req| async move { .middleware(Middleware::pre(|mut req| async move {
// we don't need the authorization header and we don't want to accidentally log it. // we don't need the authorization header and we don't want to accidentally log it.
req.headers_mut().remove("authorization"); req.headers_mut().remove("authorization");
@ -703,9 +705,14 @@ async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>(
.http_requests .http_requests
.observation() .observation()
.ok_with_labels(&[metrics::KeyValue::new("path", path)]); .ok_with_labels(&[metrics::KeyValue::new("path", path)]);
Ok(Response::new(Body::from(
server.metrics_registry().metrics_as_text(), let body = req
))) .data::<Arc<ApplicationState>>()
.expect("application state")
.metric_registry()
.metrics_as_text();
Ok(Response::new(Body::from(body)))
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -860,6 +867,7 @@ async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
pub async fn serve<M>( pub async fn serve<M>(
addr: AddrIncoming, addr: AddrIncoming,
application: Arc<ApplicationState>,
server: Arc<AppServer<M>>, server: Arc<AppServer<M>>,
shutdown: CancellationToken, shutdown: CancellationToken,
max_request_size: usize, max_request_size: usize,
@ -867,7 +875,7 @@ pub async fn serve<M>(
where where
M: ConnectionManager + Send + Sync + Debug + 'static, M: ConnectionManager + Send + Sync + Debug + 'static,
{ {
let router = router(server, max_request_size); let router = router(application, server, max_request_size);
let service = RouterService::new(router).unwrap(); let service = RouterService::new(router).unwrap();
hyper::Server::builder(addr) hyper::Server::builder(addr)
@ -889,31 +897,32 @@ mod tests {
use reqwest::{Client, Response}; use reqwest::{Client, Response};
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName}; use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use metrics::TestMetricRegistry;
use object_store::ObjectStore; use object_store::ObjectStore;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use server::{db::Db, ConnectionManagerImpl, ServerConfig as AppServerConfig}; use server::{db::Db, ApplicationState, ConnectionManagerImpl};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
fn config() -> (metrics::TestMetricRegistry, AppServerConfig) { fn make_application() -> Arc<ApplicationState> {
let registry = Arc::new(metrics::MetricRegistry::new()); Arc::new(ApplicationState::new(
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(&registry)); Arc::new(ObjectStore::new_in_memory()),
( None,
test_registry, ))
AppServerConfig::new( }
Arc::new(ObjectStore::new_in_memory()),
registry, fn make_server(application: Arc<ApplicationState>) -> Arc<AppServer<ConnectionManagerImpl>> {
None, Arc::new(AppServer::new(
false, ConnectionManagerImpl::new(),
) application,
.with_num_worker_threads(1), Default::default(),
) ))
} }
#[tokio::test] #[tokio::test]
async fn test_health() { async fn test_health() {
let (_, config) = config(); let application = make_application();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); let app_server = make_server(Arc::clone(&application));
let server_url = test_server(Arc::clone(&app_server)); let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new(); let client = Client::new();
let response = client.get(&format!("{}/health", server_url)).send().await; let response = client.get(&format!("{}/health", server_url)).send().await;
@ -924,8 +933,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_write() { async fn test_write() {
let (_, config) = config(); let application = make_application();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); let app_server = make_server(Arc::clone(&application));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await; app_server.maybe_initialize_server().await;
app_server app_server
@ -934,7 +943,7 @@ mod tests {
)) ))
.await .await
.unwrap(); .unwrap();
let server_url = test_server(Arc::clone(&app_server)); let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new(); let client = Client::new();
@ -972,8 +981,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_write_metrics() { async fn test_write_metrics() {
let (metrics_registry, config) = config(); let application = make_application();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); let app_server = make_server(Arc::clone(&application));
let metric_registry = TestMetricRegistry::new(Arc::clone(application.metric_registry()));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await; app_server.maybe_initialize_server().await;
app_server app_server
@ -982,7 +993,8 @@ mod tests {
)) ))
.await .await
.unwrap(); .unwrap();
let server_url = test_server(Arc::clone(&app_server));
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new(); let client = Client::new();
@ -1002,7 +1014,7 @@ mod tests {
.expect("sent data"); .expect("sent data");
// The request completed successfully // The request completed successfully
metrics_registry metric_registry
.has_metric_family("http_request_duration_seconds") .has_metric_family("http_request_duration_seconds")
.with_labels(&[ .with_labels(&[
("bucket", "MetricsBucket"), ("bucket", "MetricsBucket"),
@ -1015,7 +1027,7 @@ mod tests {
.unwrap(); .unwrap();
// A single successful point landed // A single successful point landed
metrics_registry metric_registry
.has_metric_family("ingest_points_total") .has_metric_family("ingest_points_total")
.with_labels(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")]) .with_labels(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")])
.counter() .counter()
@ -1023,7 +1035,7 @@ mod tests {
.unwrap(); .unwrap();
// Which consists of two fields // Which consists of two fields
metrics_registry metric_registry
.has_metric_family("ingest_fields_total") .has_metric_family("ingest_fields_total")
.with_labels(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")]) .with_labels(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")])
.counter() .counter()
@ -1031,7 +1043,7 @@ mod tests {
.unwrap(); .unwrap();
// Bytes of data were written // Bytes of data were written
metrics_registry metric_registry
.has_metric_family("ingest_points_bytes_total") .has_metric_family("ingest_points_bytes_total")
.with_labels(&[("status", "ok"), ("db_name", "MetricsOrg_MetricsBucket")]) .with_labels(&[("status", "ok"), ("db_name", "MetricsOrg_MetricsBucket")])
.counter() .counter()
@ -1050,7 +1062,7 @@ mod tests {
.unwrap(); .unwrap();
// A single point was rejected // A single point was rejected
metrics_registry metric_registry
.has_metric_family("ingest_points_total") .has_metric_family("ingest_points_total")
.with_labels(&[("db_name", "NotMyOrg_NotMyBucket"), ("status", "error")]) .with_labels(&[("db_name", "NotMyOrg_NotMyBucket"), ("status", "error")])
.counter() .counter()
@ -1062,8 +1074,8 @@ mod tests {
/// returns a client for communicating with the server, and the server /// returns a client for communicating with the server, and the server
/// endpoint /// endpoint
async fn setup_test_data() -> (Client, String) { async fn setup_test_data() -> (Client, String) {
let (_, config) = config(); let application = make_application();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); let app_server = make_server(Arc::clone(&application));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await; app_server.maybe_initialize_server().await;
app_server app_server
@ -1072,7 +1084,7 @@ mod tests {
)) ))
.await .await
.unwrap(); .unwrap();
let server_url = test_server(Arc::clone(&app_server)); let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new(); let client = Client::new();
@ -1359,7 +1371,10 @@ mod tests {
/// creates an instance of the http service backed by a in-memory /// creates an instance of the http service backed by a in-memory
/// testable database. Returns the url of the server /// testable database. Returns the url of the server
fn test_server(server: Arc<AppServer<ConnectionManagerImpl>>) -> String { fn test_server(
application: Arc<ApplicationState>,
server: Arc<AppServer<ConnectionManagerImpl>>,
) -> String {
// NB: specify port 0 to let the OS pick the port. // NB: specify port 0 to let the OS pick the port.
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let addr = AddrIncoming::bind(&bind_addr).expect("failed to bind server"); let addr = AddrIncoming::bind(&bind_addr).expect("failed to bind server");
@ -1367,6 +1382,7 @@ mod tests {
tokio::task::spawn(serve( tokio::task::spawn(serve(
addr, addr,
application,
server, server,
CancellationToken::new(), CancellationToken::new(),
TEST_MAX_REQUEST_SIZE, TEST_MAX_REQUEST_SIZE,
@ -1391,8 +1407,8 @@ mod tests {
/// return a test server and the url to contact it for `MyOrg_MyBucket` /// return a test server and the url to contact it for `MyOrg_MyBucket`
async fn setup_server() -> (Arc<AppServer<ConnectionManagerImpl>>, String) { async fn setup_server() -> (Arc<AppServer<ConnectionManagerImpl>>, String) {
let (_, config) = config(); let application = make_application();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); let app_server = make_server(Arc::clone(&application));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await; app_server.maybe_initialize_server().await;
app_server app_server
@ -1401,7 +1417,7 @@ mod tests {
)) ))
.await .await
.unwrap(); .unwrap();
let server_url = test_server(Arc::clone(&app_server)); let server_url = test_server(application, Arc::clone(&app_server));
(app_server, server_url) (app_server, server_url)
} }

View File

@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken;
use tonic::transport::NamedService; use tonic::transport::NamedService;
use crate::influxdb_ioxd::serving_readiness::{ServingReadiness, ServingReadinessState}; use crate::influxdb_ioxd::serving_readiness::{ServingReadiness, ServingReadinessState};
use server::{ConnectionManager, Server}; use server::{ApplicationState, ConnectionManager, Server};
use tonic::{Interceptor, Status}; use tonic::{Interceptor, Status};
pub mod error; pub mod error;
@ -76,6 +76,7 @@ impl From<ServingReadinessInterceptor> for Interceptor {
/// shutdown. /// shutdown.
pub async fn serve<M>( pub async fn serve<M>(
socket: TcpListener, socket: TcpListener,
application: Arc<ApplicationState>,
server: Arc<Server<M>>, server: Arc<Server<M>>,
shutdown: CancellationToken, shutdown: CancellationToken,
serving_readiness: ServingReadiness, serving_readiness: ServingReadiness,
@ -104,14 +105,18 @@ where
testing::make_server(), testing::make_server(),
storage::make_server( storage::make_server(
Arc::clone(&server), Arc::clone(&server),
Arc::clone(server.metrics_registry()), Arc::clone(application.metric_registry()),
serving_gate.clone(), serving_gate.clone(),
), ),
flight::make_server(Arc::clone(&server), serving_gate.clone()), flight::make_server(Arc::clone(&server), serving_gate.clone()),
write::make_server(Arc::clone(&server), serving_gate.clone()), write::make_server(Arc::clone(&server), serving_gate.clone()),
write_pb::make_server(Arc::clone(&server), serving_gate.clone()), write_pb::make_server(Arc::clone(&server), serving_gate.clone()),
management::make_server(Arc::clone(&server), serving_readiness.clone()), management::make_server(
operations::make_server(Arc::clone(&server)), Arc::clone(&application),
Arc::clone(&server),
serving_readiness.clone()
),
operations::make_server(Arc::clone(application.job_registry())),
], ],
); );

View File

@ -9,10 +9,11 @@ use generated_types::google::{
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
use observability_deps::tracing::info; use observability_deps::tracing::info;
use query::{DatabaseStore, QueryDatabase}; use query::{DatabaseStore, QueryDatabase};
use server::{ConnectionManager, Error, Server}; use server::{ApplicationState, ConnectionManager, Error, Server};
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status};
struct ManagementService<M: ConnectionManager> { struct ManagementService<M: ConnectionManager> {
application: Arc<ApplicationState>,
server: Arc<Server<M>>, server: Arc<Server<M>>,
serving_readiness: ServingReadiness, serving_readiness: ServingReadiness,
} }
@ -189,7 +190,10 @@ where
request: Request<CreateDummyJobRequest>, request: Request<CreateDummyJobRequest>,
) -> Result<Response<CreateDummyJobResponse>, Status> { ) -> Result<Response<CreateDummyJobResponse>, Status> {
let request = request.into_inner(); let request = request.into_inner();
let tracker = self.server.spawn_dummy_job(request.nanos); let tracker = self
.application
.job_registry()
.spawn_dummy_job(request.nanos);
let operation = Some(super::operations::encode_tracker(tracker)?); let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(CreateDummyJobResponse { operation })) Ok(Response::new(CreateDummyJobResponse { operation }))
} }
@ -476,6 +480,7 @@ where
} }
pub fn make_server<M>( pub fn make_server<M>(
application: Arc<ApplicationState>,
server: Arc<Server<M>>, server: Arc<Server<M>>,
serving_readiness: ServingReadiness, serving_readiness: ServingReadiness,
) -> management_service_server::ManagementServiceServer< ) -> management_service_server::ManagementServiceServer<
@ -485,6 +490,7 @@ where
M: ConnectionManager + Send + Sync + Debug + 'static, M: ConnectionManager + Send + Sync + Debug + 'static,
{ {
management_service_server::ManagementServiceServer::new(ManagementService { management_service_server::ManagementServiceServer::new(ManagementService {
application,
server, server,
serving_readiness, serving_readiness,
}) })

View File

@ -1,4 +1,4 @@
use std::fmt::Debug; use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use bytes::BytesMut; use bytes::BytesMut;
@ -18,14 +18,12 @@ use generated_types::{
influxdata::iox::management::v1 as management, influxdata::iox::management::v1 as management,
protobuf_type_url, protobuf_type_url,
}; };
use server::JobRegistry;
use tracker::{TaskId, TaskResult, TaskStatus, TaskTracker}; use tracker::{TaskId, TaskResult, TaskStatus, TaskTracker};
use server::{ConnectionManager, Server};
use std::convert::TryInto;
/// Implementation of the write service /// Implementation of the write service
struct OperationsService<M: ConnectionManager> { struct OperationsService {
server: Arc<Server<M>>, jobs: Arc<JobRegistry>,
} }
pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Status> { pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Status> {
@ -112,16 +110,13 @@ pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Sta
}) })
} }
fn get_tracker<M>(server: &Server<M>, tracker: String) -> Result<TaskTracker<Job>, tonic::Status> fn get_tracker(jobs: &JobRegistry, tracker: String) -> Result<TaskTracker<Job>, tonic::Status> {
where
M: ConnectionManager + Send + Sync,
{
let tracker_id = tracker.parse::<TaskId>().map_err(|e| FieldViolation { let tracker_id = tracker.parse::<TaskId>().map_err(|e| FieldViolation {
field: "name".to_string(), field: "name".to_string(),
description: e.to_string(), description: e.to_string(),
})?; })?;
let tracker = server.get_job(tracker_id).ok_or(NotFound { let tracker = jobs.get(tracker_id).ok_or(NotFound {
resource_type: "job".to_string(), resource_type: "job".to_string(),
resource_name: tracker, resource_name: tracker,
..Default::default() ..Default::default()
@ -131,18 +126,15 @@ where
} }
#[tonic::async_trait] #[tonic::async_trait]
impl<M> operations_server::Operations for OperationsService<M> impl operations_server::Operations for OperationsService {
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
async fn list_operations( async fn list_operations(
&self, &self,
_request: tonic::Request<ListOperationsRequest>, _request: tonic::Request<ListOperationsRequest>,
) -> Result<tonic::Response<ListOperationsResponse>, tonic::Status> { ) -> Result<tonic::Response<ListOperationsResponse>, tonic::Status> {
// TODO: Support pagination // TODO: Support pagination
let operations: Result<Vec<_>, _> = self let operations: Result<Vec<_>, _> = self
.server .jobs
.tracked_jobs() .tracked()
.into_iter() .into_iter()
.map(encode_tracker) .map(encode_tracker)
.collect(); .collect();
@ -158,7 +150,7 @@ where
request: tonic::Request<GetOperationRequest>, request: tonic::Request<GetOperationRequest>,
) -> Result<tonic::Response<Operation>, tonic::Status> { ) -> Result<tonic::Response<Operation>, tonic::Status> {
let request = request.into_inner(); let request = request.into_inner();
let tracker = get_tracker(self.server.as_ref(), request.name)?; let tracker = get_tracker(self.jobs.as_ref(), request.name)?;
Ok(Response::new(encode_tracker(tracker)?)) Ok(Response::new(encode_tracker(tracker)?))
} }
@ -178,7 +170,7 @@ where
) -> Result<tonic::Response<Empty>, tonic::Status> { ) -> Result<tonic::Response<Empty>, tonic::Status> {
let request = request.into_inner(); let request = request.into_inner();
let tracker = get_tracker(self.server.as_ref(), request.name)?; let tracker = get_tracker(self.jobs.as_ref(), request.name)?;
tracker.cancel(); tracker.cancel();
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
@ -194,7 +186,7 @@ where
let request = request.into_inner(); let request = request.into_inner();
let tracker = get_tracker(self.server.as_ref(), request.name)?; let tracker = get_tracker(self.jobs.as_ref(), request.name)?;
if let Some(timeout) = request.timeout { if let Some(timeout) = request.timeout {
let timeout = timeout.try_into().field("timeout")?; let timeout = timeout.try_into().field("timeout")?;
@ -209,11 +201,8 @@ where
} }
/// Instantiate the write service /// Instantiate the write service
pub fn make_server<M>( pub fn make_server(
server: Arc<Server<M>>, jobs: Arc<JobRegistry>,
) -> operations_server::OperationsServer<impl operations_server::Operations> ) -> operations_server::OperationsServer<impl operations_server::Operations> {
where operations_server::OperationsServer::new(OperationsService { jobs })
M: ConnectionManager + Send + Sync + Debug + 'static,
{
operations_server::OperationsServer::new(OperationsService { server })
} }