diff --git a/Cargo.lock b/Cargo.lock index e02e0818b9..13e0fc112b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,6 +630,32 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "compactor" +version = "0.1.0" +dependencies = [ + "async-trait", + "data_types", + "futures", + "generated_types", + "hyper", + "iox_catalog", + "iox_object_store", + "metric", + "object_store", + "observability_deps", + "parking_lot 0.12.0", + "predicate", + "prost", + "test_helpers", + "thiserror", + "tokio", + "tokio-util 0.7.0", + "tonic", + "trace", + "workspace-hack", +] + [[package]] name = "core-foundation" version = "0.9.2" @@ -1859,6 +1885,7 @@ dependencies = [ "chrono", "clap 3.0.14", "comfy-table", + "compactor", "csv", "data_types", "datafusion 0.1.0", diff --git a/Cargo.toml b/Cargo.toml index 894265ec8f..25019a0e77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "arrow_util", "client_util", + "compactor", "data_types", "datafusion", "datafusion_util", diff --git a/compactor/Cargo.toml b/compactor/Cargo.toml new file mode 100644 index 0000000000..2b14cd49cf --- /dev/null +++ b/compactor/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "compactor" +version = "0.1.0" +authors = ["Luke Bond "] +edition = "2021" + +[dependencies] +async-trait = "0.1.42" +data_types = { path = "../data_types" } +futures = "0.3" +generated_types = { path = "../generated_types" } +hyper = "0.14" +iox_catalog = { path = "../iox_catalog" } +iox_object_store = { path = "../iox_object_store" } +metric = { path = "../metric" } +object_store = { path = "../object_store" } +observability_deps = { path = "../observability_deps" } +parking_lot = "0.12" +predicate = { path = "../predicate" } +prost = "0.9" +thiserror = "1.0" +tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } +tokio-util = { version = "0.7.0" } +tonic = { version = "0.6" } +trace = { path = "../trace" } +workspace-hack = { path = "../workspace-hack"} + +[dev-dependencies] +test_helpers = { path = "../test_helpers" } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs new file mode 100644 index 0000000000..584d8cc088 --- /dev/null +++ b/compactor/src/handler.rs @@ -0,0 +1,82 @@ +//! Compactor handler + +use async_trait::async_trait; +use iox_catalog::interface::Catalog; +use object_store::ObjectStore; +use observability_deps::tracing::warn; +use std::{fmt::Formatter, sync::Arc}; +use thiserror::Error; +use tokio_util::sync::CancellationToken; + +#[derive(Debug, Error)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error {} + +/// The [`CompactorHandler`] does nothing at this point +#[async_trait] +pub trait CompactorHandler: Send + Sync { + /// Wait until the handler finished to shutdown. + /// + /// Use [`shutdown`](Self::shutdown) to trigger a shutdown. + async fn join(&self); + + /// Shut down background workers. + fn shutdown(&self); +} + +/// Implementation of the `CompactorHandler` trait (that currently does nothing) +pub struct CompactorHandlerImpl { + /// The global catalog for schema, parquet files and tombstones + catalog: Arc, + /// Object store for persistence of parquet files + object_store: Arc, + /// A token that is used to trigger shutdown of the background worker + shutdown: CancellationToken, +} + +// LB: copied this over from the ingester handler; doesn't seem to be needed there either +impl std::fmt::Debug for CompactorHandlerImpl { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl CompactorHandlerImpl { + /// Initialize the Compactor + pub fn new( + catalog: Arc, + object_store: Arc, + _registry: &metric::Registry, + ) -> Self { + let shutdown = CancellationToken::new(); + + // TODO: initialise compactor threads here + + Self { + catalog, + object_store, + shutdown, + } + } +} + +#[async_trait] +impl CompactorHandler for CompactorHandlerImpl { + async fn join(&self) { + // join compactor threads here + todo!(); + } + + fn shutdown(&self) { + self.shutdown.cancel(); + } +} + +impl Drop for CompactorHandlerImpl { + fn drop(&mut self) { + if !self.shutdown.is_cancelled() { + warn!("CompactorHandlerImpl dropped without calling shutdown()"); + self.shutdown.cancel(); + } + } +} diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs new file mode 100644 index 0000000000..56b96dd1ce --- /dev/null +++ b/compactor/src/lib.rs @@ -0,0 +1,16 @@ +//! IOx compactor implementation. +//! + +#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)] +#![warn( + missing_copy_implementations, + missing_docs, + clippy::explicit_iter_loop, + clippy::future_not_send, + clippy::use_self, + clippy::clone_on_ref_ptr +)] +#![allow(dead_code)] + +pub mod handler; +pub mod server; diff --git a/compactor/src/server.rs b/compactor/src/server.rs new file mode 100644 index 0000000000..dd3aaa7319 --- /dev/null +++ b/compactor/src/server.rs @@ -0,0 +1,65 @@ +//! Compactor server entrypoint. + +use std::sync::Arc; + +use self::{grpc::GrpcDelegate, http::HttpDelegate}; +use crate::handler::CompactorHandler; +use std::fmt::Debug; + +pub mod grpc; +pub mod http; + +/// The [`CompactorServer`] manages the lifecycle and contains all state for a +/// `compactor` server instance. +#[derive(Debug, Default)] +pub struct CompactorServer { + metrics: Arc, + + http: HttpDelegate, + grpc: GrpcDelegate, + + handler: Arc, +} + +impl CompactorServer { + /// Initialise a new [`CompactorServer`] using the provided HTTP and gRPC + /// handlers. + pub fn new( + metrics: Arc, + http: HttpDelegate, + grpc: GrpcDelegate, + handler: Arc, + ) -> Self { + Self { + metrics, + http, + grpc, + handler, + } + } + + /// Return the [`metric::Registry`] used by the router. + pub fn metric_registry(&self) -> Arc { + Arc::clone(&self.metrics) + } + + /// Join shutdown worker. + pub async fn join(&self) { + self.handler.join().await; + } + + /// Shutdown background worker. + pub fn shutdown(&self) { + self.handler.shutdown(); + } + + /// Get a reference to the router http delegate. + pub fn http(&self) -> &HttpDelegate { + &self.http + } + + /// Get a reference to the router grpc delegate. + pub fn grpc(&self) -> &GrpcDelegate { + &self.grpc + } +} diff --git a/compactor/src/server/grpc.rs b/compactor/src/server/grpc.rs new file mode 100644 index 0000000000..f93312a37c --- /dev/null +++ b/compactor/src/server/grpc.rs @@ -0,0 +1,55 @@ +//! gRPC service implementations for `compactor`. + +use crate::handler::CompactorHandler; +use std::sync::Arc; +use thiserror::Error; + +/// This type is responsible for managing all gRPC services exposed by +/// `compactor`. +#[derive(Debug, Default)] +pub struct GrpcDelegate { + compactor_handler: Arc, +} + +impl GrpcDelegate { + /// Initialise a new [`GrpcDelegate`] passing valid requests to the + /// specified `compactor_handler`. + pub fn new(compactor_handler: Arc) -> Self { + Self { compactor_handler } + } + + // TODO: add grpc service here, and register it with a add_(gated_)service call in + // server_type/compactor.rs:server_grpc(). +} + +/// Errors returnable by the Compactor gRPC service +#[derive(Clone, Copy, Debug, Error)] +pub enum Error {} + +impl From for tonic::Status { + /// Logs and converts a result from the business logic into the appropriate tonic status + fn from(err: Error) -> Self { + // An explicit match on the Error enum will ensure appropriate logging is handled for any + // new error variants. + // TODO see equivalent in ingester grpc server once we have some error types + //match err { + // _ => { + // todo!(); + // } + //} + err.to_status() + } +} + +impl Error { + /// Converts a result from the business logic into the appropriate tonic status + // LB: this allow is to satisfy clippy; unsure why the ingester doesn't get this clippy error + // but since this is clearly a placeholder, this note should suffice as a reminder to remove + // this when this function is implemented for the compactor + #[allow(clippy::wrong_self_convention)] + fn to_status(&self) -> tonic::Status { + use tonic::Status; + // TODO see equivalent in ingester grpc server once we have some error types + Status::internal(self.to_string()) + } +} diff --git a/compactor/src/server/http.rs b/compactor/src/server/http.rs new file mode 100644 index 0000000000..ff2566c196 --- /dev/null +++ b/compactor/src/server/http.rs @@ -0,0 +1,50 @@ +//! HTTP service implementations for `compactor`. + +use crate::handler::CompactorHandler; +use hyper::{Body, Request, Response, StatusCode}; +use std::sync::Arc; +use thiserror::Error; + +/// Errors returned by the HTTP request handler. +#[derive(Debug, Error, Copy, Clone)] +pub enum Error { + /// The requested path has no registered handler. + #[error("not found")] + NotFound, +} + +impl Error { + /// Convert the error into an appropriate [`StatusCode`] to be returned to + /// the end user. + pub fn as_status_code(&self) -> StatusCode { + match self { + Error::NotFound => StatusCode::NOT_FOUND, + } + } +} + +/// This type is responsible for servicing requests to the `compactor` HTTP +/// endpoint. +/// +/// Requests to some paths may be handled externally by the caller - the IOx +/// server runner framework takes care of implementing the heath endpoint, +/// metrics, pprof, etc. +#[derive(Debug, Default)] +pub struct HttpDelegate { + #[allow(dead_code)] + compactor_handler: Arc, +} + +impl HttpDelegate { + /// Initialise a new [`HttpDelegate`] passing valid requests to the + /// specified `compactor_handler`. + pub fn new(compactor_handler: Arc) -> Self { + Self { compactor_handler } + } + + /// Routes `req` to the appropriate handler, if any, returning the handler + /// response. + pub fn route(&self, _req: Request) -> Result, Error> { + unimplemented!() + } +} diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index dde32fec3d..d25c8b3aa4 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -7,6 +7,7 @@ default-run = "influxdb_iox" [dependencies] # Workspace dependencies, in alphabetical order +compactor = { path = "../compactor" } data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } db = { path = "../db" } diff --git a/influxdb_iox/src/commands/run/compactor.rs b/influxdb_iox/src/commands/run/compactor.rs new file mode 100644 index 0000000000..6a5dda5ad1 --- /dev/null +++ b/influxdb_iox/src/commands/run/compactor.rs @@ -0,0 +1,87 @@ +//! Implementation of command line option for running the compactor + +use compactor::{ + handler::CompactorHandlerImpl, + server::{grpc::GrpcDelegate, http::HttpDelegate, CompactorServer}, +}; +use object_store::ObjectStore; +use observability_deps::tracing::*; +use std::sync::Arc; +use thiserror::Error; + +use crate::{ + clap_blocks::{catalog_dsn::CatalogDsnConfig, run_config::RunConfig}, + influxdb_ioxd::{ + self, + server_type::common_state::{CommonServerState, CommonServerStateError}, + server_type::compactor::CompactorServerType, + }, +}; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Run: {0}")] + Run(#[from] influxdb_ioxd::Error), + + #[error("Invalid config: {0}")] + InvalidConfig(#[from] CommonServerStateError), + + #[error("Catalog error: {0}")] + Catalog(#[from] iox_catalog::interface::Error), + + #[error("Catalog DSN error: {0}")] + CatalogDsn(#[from] crate::clap_blocks::catalog_dsn::Error), + + #[error("Cannot parse object store config: {0}")] + ObjectStoreParsing(#[from] crate::clap_blocks::object_store::ParseError), +} + +#[derive(Debug, clap::Parser)] +#[clap( + name = "run", + about = "Runs in compactor mode", + long_about = "Run the IOx compactor server.\n\nThe configuration options below can be \ + set either with the command line flags or with the specified environment \ + variable. If there is a file named '.env' in the current working directory, \ + it is sourced before loading the configuration. + +Configuration is loaded from the following sources (highest precedence first): + - command line arguments + - user set environment variables + - .env file contents + - pre-configured default values" +)] +pub struct Config { + #[clap(flatten)] + pub(crate) run_config: RunConfig, + + #[clap(flatten)] + pub(crate) catalog_dsn: CatalogDsnConfig, +} + +pub async fn command(config: Config) -> Result<(), Error> { + let common_state = CommonServerState::from_config(config.run_config.clone())?; + + let catalog = config.catalog_dsn.get_catalog("compactor").await?; + + let metric_registry: Arc = Default::default(); + + let object_store = Arc::new( + ObjectStore::try_from(&config.run_config.object_store_config) + .map_err(Error::ObjectStoreParsing)?, + ); + let compactor_handler = Arc::new(CompactorHandlerImpl::new( + catalog, + object_store, + &metric_registry, + )); + let http = HttpDelegate::new(Arc::clone(&compactor_handler)); + let grpc = GrpcDelegate::new(Arc::clone(&compactor_handler)); + + let compactor = CompactorServer::new(metric_registry, http, grpc, compactor_handler); + let server_type = Arc::new(CompactorServerType::new(compactor, &common_state)); + + info!("starting compactor"); + + Ok(influxdb_ioxd::main(common_state, server_type).await?) +} diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index 68882ce6d9..cfaf51022e 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -2,6 +2,7 @@ use snafu::{ResultExt, Snafu}; use crate::clap_blocks::run_config::RunConfig; +pub mod compactor; pub mod database; pub mod ingester; pub mod router; @@ -11,6 +12,9 @@ pub mod test; #[derive(Debug, Snafu)] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("Error in compactor subcommand: {}", source))] + CompactorError { source: compactor::Error }, + #[snafu(display("Error in database subcommand: {}", source))] DatabaseError { source: database::Error }, @@ -44,6 +48,7 @@ impl Config { pub fn run_config(&self) -> &RunConfig { match &self.command { None => &self.database_config.run_config, + Some(Command::Compactor(config)) => &config.run_config, Some(Command::Database(config)) => &config.run_config, Some(Command::Router(config)) => &config.run_config, Some(Command::Router2(config)) => &config.run_config, @@ -55,6 +60,9 @@ impl Config { #[derive(Debug, clap::Parser)] enum Command { + /// Run the server in compactor mode + Compactor(compactor::Config), + /// Run the server in database mode Database(database::Config), @@ -81,6 +89,9 @@ pub async fn command(config: Config) -> Result<()> { .await .context(DatabaseSnafu) } + Some(Command::Compactor(config)) => { + compactor::command(config).await.context(CompactorSnafu) + } Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu), Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu), Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu), diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/compactor.rs b/influxdb_iox/src/influxdb_ioxd/server_type/compactor.rs new file mode 100644 index 0000000000..7eb15ce7f5 --- /dev/null +++ b/influxdb_iox/src/influxdb_ioxd/server_type/compactor.rs @@ -0,0 +1,98 @@ +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; + +use async_trait::async_trait; +use compactor::{handler::CompactorHandler, server::CompactorServer}; +use hyper::{Body, Request, Response}; +use metric::Registry; +use trace::TraceCollector; + +use crate::influxdb_ioxd::{ + http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource}, + rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput}, + server_type::{common_state::CommonServerState, RpcError, ServerType}, +}; + +#[derive(Debug)] +pub struct CompactorServerType { + server: CompactorServer, + trace_collector: Option>, +} + +impl CompactorServerType { + pub fn new(server: CompactorServer, common_state: &CommonServerState) -> Self { + Self { + server, + trace_collector: common_state.trace_collector(), + } + } +} + +#[async_trait] +impl ServerType for CompactorServerType { + type RouteError = IoxHttpError; + + /// Return the [`metric::Registry`] used by the compactor. + fn metric_registry(&self) -> Arc { + self.server.metric_registry() + } + + /// Returns the trace collector for compactor traces. + fn trace_collector(&self) -> Option> { + self.trace_collector.as_ref().map(Arc::clone) + } + + /// Just return "not found". + async fn route_http_request( + &self, + _req: Request, + ) -> Result, Self::RouteError> { + Err(IoxHttpError::NotFound) + } + + /// Provide a placeholder gRPC service. + async fn server_grpc(self: Arc, builder_input: RpcBuilderInput) -> Result<(), RpcError> { + let builder = setup_builder!(builder_input, self); + serve_builder!(builder); + + Ok(()) + } + + async fn join(self: Arc) { + self.server.join().await; + } + + fn shutdown(&self) { + self.server.shutdown(); + } +} + +/// Simple error struct, we're not really providing an HTTP interface for the compactor. +#[derive(Debug)] +pub enum IoxHttpError { + NotFound, +} + +impl IoxHttpError { + fn status_code(&self) -> HttpApiErrorCode { + match self { + IoxHttpError::NotFound => HttpApiErrorCode::NotFound, + } + } +} + +impl Display for IoxHttpError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::error::Error for IoxHttpError {} + +impl HttpApiErrorSource for IoxHttpError { + fn to_http_api_error(&self) -> HttpApiError { + HttpApiError::new(self.status_code(), self.to_string()) + } +} diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs index 59da6bf504..b3f0cee1e7 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs @@ -9,6 +9,7 @@ use trace::TraceCollector; use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput}; pub mod common_state; +pub mod compactor; pub mod database; pub mod ingester; pub mod router;