Merge pull request #3803 from influxdata/feat/new-compactor-command-and-crate

feat: adding compactor CLI command and crate
pull/24376/head
kodiakhq[bot] 2022-02-22 11:27:41 +00:00 committed by GitHub
commit 3d8f0246c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 523 additions and 0 deletions

27
Cargo.lock generated
View File

@ -630,6 +630,32 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "compactor"
version = "0.1.0"
dependencies = [
"async-trait",
"data_types",
"futures",
"generated_types",
"hyper",
"iox_catalog",
"iox_object_store",
"metric",
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"predicate",
"prost",
"test_helpers",
"thiserror",
"tokio",
"tokio-util 0.7.0",
"tonic",
"trace",
"workspace-hack",
]
[[package]]
name = "core-foundation"
version = "0.9.2"
@ -1859,6 +1885,7 @@ dependencies = [
"chrono",
"clap 3.0.14",
"comfy-table",
"compactor",
"csv",
"data_types",
"datafusion 0.1.0",

View File

@ -3,6 +3,7 @@
members = [
"arrow_util",
"client_util",
"compactor",
"data_types",
"datafusion",
"datafusion_util",

29
compactor/Cargo.toml Normal file
View File

@ -0,0 +1,29 @@
[package]
name = "compactor"
version = "0.1.0"
authors = ["Luke Bond <luke.n.bond@gmail.com>"]
edition = "2021"
[dependencies]
async-trait = "0.1.42"
data_types = { path = "../data_types" }
futures = "0.3"
generated_types = { path = "../generated_types" }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
predicate = { path = "../predicate" }
prost = "0.9"
thiserror = "1.0"
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = { version = "0.7.0" }
tonic = { version = "0.6" }
trace = { path = "../trace" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
test_helpers = { path = "../test_helpers" }

82
compactor/src/handler.rs Normal file
View File

@ -0,0 +1,82 @@
//! Compactor handler
use async_trait::async_trait;
use iox_catalog::interface::Catalog;
use object_store::ObjectStore;
use observability_deps::tracing::warn;
use std::{fmt::Formatter, sync::Arc};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Error)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {}
/// The [`CompactorHandler`] does nothing at this point
#[async_trait]
pub trait CompactorHandler: Send + Sync {
/// Wait until the handler finished to shutdown.
///
/// Use [`shutdown`](Self::shutdown) to trigger a shutdown.
async fn join(&self);
/// Shut down background workers.
fn shutdown(&self);
}
/// Implementation of the `CompactorHandler` trait (that currently does nothing)
pub struct CompactorHandlerImpl {
/// The global catalog for schema, parquet files and tombstones
catalog: Arc<dyn Catalog>,
/// Object store for persistence of parquet files
object_store: Arc<ObjectStore>,
/// A token that is used to trigger shutdown of the background worker
shutdown: CancellationToken,
}
// LB: copied this over from the ingester handler; doesn't seem to be needed there either
impl std::fmt::Debug for CompactorHandlerImpl {
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
impl CompactorHandlerImpl {
/// Initialize the Compactor
pub fn new(
catalog: Arc<dyn Catalog>,
object_store: Arc<ObjectStore>,
_registry: &metric::Registry,
) -> Self {
let shutdown = CancellationToken::new();
// TODO: initialise compactor threads here
Self {
catalog,
object_store,
shutdown,
}
}
}
#[async_trait]
impl CompactorHandler for CompactorHandlerImpl {
async fn join(&self) {
// join compactor threads here
todo!();
}
fn shutdown(&self) {
self.shutdown.cancel();
}
}
impl Drop for CompactorHandlerImpl {
fn drop(&mut self) {
if !self.shutdown.is_cancelled() {
warn!("CompactorHandlerImpl dropped without calling shutdown()");
self.shutdown.cancel();
}
}
}

16
compactor/src/lib.rs Normal file
View File

@ -0,0 +1,16 @@
//! IOx compactor implementation.
//!
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
#![allow(dead_code)]
pub mod handler;
pub mod server;

65
compactor/src/server.rs Normal file
View File

@ -0,0 +1,65 @@
//! Compactor server entrypoint.
use std::sync::Arc;
use self::{grpc::GrpcDelegate, http::HttpDelegate};
use crate::handler::CompactorHandler;
use std::fmt::Debug;
pub mod grpc;
pub mod http;
/// The [`CompactorServer`] manages the lifecycle and contains all state for a
/// `compactor` server instance.
#[derive(Debug, Default)]
pub struct CompactorServer<C: CompactorHandler> {
metrics: Arc<metric::Registry>,
http: HttpDelegate<C>,
grpc: GrpcDelegate<C>,
handler: Arc<C>,
}
impl<C: CompactorHandler> CompactorServer<C> {
/// Initialise a new [`CompactorServer`] using the provided HTTP and gRPC
/// handlers.
pub fn new(
metrics: Arc<metric::Registry>,
http: HttpDelegate<C>,
grpc: GrpcDelegate<C>,
handler: Arc<C>,
) -> Self {
Self {
metrics,
http,
grpc,
handler,
}
}
/// Return the [`metric::Registry`] used by the router.
pub fn metric_registry(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metrics)
}
/// Join shutdown worker.
pub async fn join(&self) {
self.handler.join().await;
}
/// Shutdown background worker.
pub fn shutdown(&self) {
self.handler.shutdown();
}
/// Get a reference to the router http delegate.
pub fn http(&self) -> &HttpDelegate<C> {
&self.http
}
/// Get a reference to the router grpc delegate.
pub fn grpc(&self) -> &GrpcDelegate<C> {
&self.grpc
}
}

View File

@ -0,0 +1,55 @@
//! gRPC service implementations for `compactor`.
use crate::handler::CompactorHandler;
use std::sync::Arc;
use thiserror::Error;
/// This type is responsible for managing all gRPC services exposed by
/// `compactor`.
#[derive(Debug, Default)]
pub struct GrpcDelegate<C: CompactorHandler> {
compactor_handler: Arc<C>,
}
impl<C: CompactorHandler + Send + Sync + 'static> GrpcDelegate<C> {
/// Initialise a new [`GrpcDelegate`] passing valid requests to the
/// specified `compactor_handler`.
pub fn new(compactor_handler: Arc<C>) -> Self {
Self { compactor_handler }
}
// TODO: add grpc service here, and register it with a add_(gated_)service call in
// server_type/compactor.rs:server_grpc().
}
/// Errors returnable by the Compactor gRPC service
#[derive(Clone, Copy, Debug, Error)]
pub enum Error {}
impl From<Error> for tonic::Status {
/// Logs and converts a result from the business logic into the appropriate tonic status
fn from(err: Error) -> Self {
// An explicit match on the Error enum will ensure appropriate logging is handled for any
// new error variants.
// TODO see equivalent in ingester grpc server once we have some error types
//match err {
// _ => {
// todo!();
// }
//}
err.to_status()
}
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic status
// LB: this allow is to satisfy clippy; unsure why the ingester doesn't get this clippy error
// but since this is clearly a placeholder, this note should suffice as a reminder to remove
// this when this function is implemented for the compactor
#[allow(clippy::wrong_self_convention)]
fn to_status(&self) -> tonic::Status {
use tonic::Status;
// TODO see equivalent in ingester grpc server once we have some error types
Status::internal(self.to_string())
}
}

View File

@ -0,0 +1,50 @@
//! HTTP service implementations for `compactor`.
use crate::handler::CompactorHandler;
use hyper::{Body, Request, Response, StatusCode};
use std::sync::Arc;
use thiserror::Error;
/// Errors returned by the HTTP request handler.
#[derive(Debug, Error, Copy, Clone)]
pub enum Error {
/// The requested path has no registered handler.
#[error("not found")]
NotFound,
}
impl Error {
/// Convert the error into an appropriate [`StatusCode`] to be returned to
/// the end user.
pub fn as_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
}
}
}
/// This type is responsible for servicing requests to the `compactor` HTTP
/// endpoint.
///
/// Requests to some paths may be handled externally by the caller - the IOx
/// server runner framework takes care of implementing the heath endpoint,
/// metrics, pprof, etc.
#[derive(Debug, Default)]
pub struct HttpDelegate<I: CompactorHandler> {
#[allow(dead_code)]
compactor_handler: Arc<I>,
}
impl<I: CompactorHandler> HttpDelegate<I> {
/// Initialise a new [`HttpDelegate`] passing valid requests to the
/// specified `compactor_handler`.
pub fn new(compactor_handler: Arc<I>) -> Self {
Self { compactor_handler }
}
/// Routes `req` to the appropriate handler, if any, returning the handler
/// response.
pub fn route(&self, _req: Request<Body>) -> Result<Response<Body>, Error> {
unimplemented!()
}
}

View File

@ -7,6 +7,7 @@ default-run = "influxdb_iox"
[dependencies]
# Workspace dependencies, in alphabetical order
compactor = { path = "../compactor" }
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
db = { path = "../db" }

View File

@ -0,0 +1,87 @@
//! Implementation of command line option for running the compactor
use compactor::{
handler::CompactorHandlerImpl,
server::{grpc::GrpcDelegate, http::HttpDelegate, CompactorServer},
};
use object_store::ObjectStore;
use observability_deps::tracing::*;
use std::sync::Arc;
use thiserror::Error;
use crate::{
clap_blocks::{catalog_dsn::CatalogDsnConfig, run_config::RunConfig},
influxdb_ioxd::{
self,
server_type::common_state::{CommonServerState, CommonServerStateError},
server_type::compactor::CompactorServerType,
},
};
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] influxdb_ioxd::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] crate::clap_blocks::catalog_dsn::Error),
#[error("Cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] crate::clap_blocks::object_store::ParseError),
}
#[derive(Debug, clap::Parser)]
#[clap(
name = "run",
about = "Runs in compactor mode",
long_about = "Run the IOx compactor server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[clap(flatten)]
pub(crate) run_config: RunConfig,
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
}
pub async fn command(config: Config) -> Result<(), Error> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let catalog = config.catalog_dsn.get_catalog("compactor").await?;
let metric_registry: Arc<metric::Registry> = Default::default();
let object_store = Arc::new(
ObjectStore::try_from(&config.run_config.object_store_config)
.map_err(Error::ObjectStoreParsing)?,
);
let compactor_handler = Arc::new(CompactorHandlerImpl::new(
catalog,
object_store,
&metric_registry,
));
let http = HttpDelegate::new(Arc::clone(&compactor_handler));
let grpc = GrpcDelegate::new(Arc::clone(&compactor_handler));
let compactor = CompactorServer::new(metric_registry, http, grpc, compactor_handler);
let server_type = Arc::new(CompactorServerType::new(compactor, &common_state));
info!("starting compactor");
Ok(influxdb_ioxd::main(common_state, server_type).await?)
}

View File

@ -2,6 +2,7 @@ use snafu::{ResultExt, Snafu};
use crate::clap_blocks::run_config::RunConfig;
pub mod compactor;
pub mod database;
pub mod ingester;
pub mod router;
@ -11,6 +12,9 @@ pub mod test;
#[derive(Debug, Snafu)]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[snafu(display("Error in compactor subcommand: {}", source))]
CompactorError { source: compactor::Error },
#[snafu(display("Error in database subcommand: {}", source))]
DatabaseError { source: database::Error },
@ -44,6 +48,7 @@ impl Config {
pub fn run_config(&self) -> &RunConfig {
match &self.command {
None => &self.database_config.run_config,
Some(Command::Compactor(config)) => &config.run_config,
Some(Command::Database(config)) => &config.run_config,
Some(Command::Router(config)) => &config.run_config,
Some(Command::Router2(config)) => &config.run_config,
@ -55,6 +60,9 @@ impl Config {
#[derive(Debug, clap::Parser)]
enum Command {
/// Run the server in compactor mode
Compactor(compactor::Config),
/// Run the server in database mode
Database(database::Config),
@ -81,6 +89,9 @@ pub async fn command(config: Config) -> Result<()> {
.await
.context(DatabaseSnafu)
}
Some(Command::Compactor(config)) => {
compactor::command(config).await.context(CompactorSnafu)
}
Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu),
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),

View File

@ -0,0 +1,98 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use compactor::{handler::CompactorHandler, server::CompactorServer};
use hyper::{Body, Request, Response};
use metric::Registry;
use trace::TraceCollector;
use crate::influxdb_ioxd::{
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::{common_state::CommonServerState, RpcError, ServerType},
};
#[derive(Debug)]
pub struct CompactorServerType<C: CompactorHandler> {
server: CompactorServer<C>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<C: CompactorHandler> CompactorServerType<C> {
pub fn new(server: CompactorServer<C>, common_state: &CommonServerState) -> Self {
Self {
server,
trace_collector: common_state.trace_collector(),
}
}
}
#[async_trait]
impl<C: CompactorHandler + std::fmt::Debug + 'static> ServerType for CompactorServerType<C> {
type RouteError = IoxHttpError;
/// Return the [`metric::Registry`] used by the compactor.
fn metric_registry(&self) -> Arc<Registry> {
self.server.metric_registry()
}
/// Returns the trace collector for compactor traces.
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
self.trace_collector.as_ref().map(Arc::clone)
}
/// Just return "not found".
async fn route_http_request(
&self,
_req: Request<Body>,
) -> Result<Response<Body>, Self::RouteError> {
Err(IoxHttpError::NotFound)
}
/// Provide a placeholder gRPC service.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
serve_builder!(builder);
Ok(())
}
async fn join(self: Arc<Self>) {
self.server.join().await;
}
fn shutdown(&self) {
self.server.shutdown();
}
}
/// Simple error struct, we're not really providing an HTTP interface for the compactor.
#[derive(Debug)]
pub enum IoxHttpError {
NotFound,
}
impl IoxHttpError {
fn status_code(&self) -> HttpApiErrorCode {
match self {
IoxHttpError::NotFound => HttpApiErrorCode::NotFound,
}
}
}
impl Display for IoxHttpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for IoxHttpError {}
impl HttpApiErrorSource for IoxHttpError {
fn to_http_api_error(&self) -> HttpApiError {
HttpApiError::new(self.status_code(), self.to_string())
}
}

View File

@ -9,6 +9,7 @@ use trace::TraceCollector;
use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput};
pub mod common_state;
pub mod compactor;
pub mod database;
pub mod ingester;
pub mod router;