From 4f5321d19bc8740502494d946936d1359ac4ee26 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Wed, 23 Mar 2022 10:11:47 -0400 Subject: [PATCH] feat: add compactor configuration for kafka topic and sequencers (#4107) --- clap_blocks/src/compactor.rs | 26 +++++++++++ clap_blocks/src/lib.rs | 1 + influxdb_iox/src/commands/run/all_in_one.rs | 15 +++++-- influxdb_iox/src/commands/run/compactor.rs | 22 +++++---- influxdb_ioxd/src/server_type/compactor.rs | 49 +++++++++++++++++++-- 5 files changed, 97 insertions(+), 16 deletions(-) create mode 100644 clap_blocks/src/compactor.rs diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs new file mode 100644 index 0000000000..cd9e5179f1 --- /dev/null +++ b/clap_blocks/src/compactor.rs @@ -0,0 +1,26 @@ +/// CLI config for compactor +#[derive(Debug, Clone, clap::Parser)] +pub struct CompactorConfig { + /// Write buffer topic/database that the compactor will be compacting files for. It won't + /// connect to Kafka, but uses this to get the sequencers out of the catalog. + #[clap( + long = "--write-buffer-topic", + env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC", + default_value = "iox-shared" + )] + pub topic: String, + + /// Write buffer partition number to start (inclusive) range with + #[clap( + long = "--write-buffer-partition-range-start", + env = "INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_START" + )] + pub write_buffer_partition_range_start: i32, + + /// Write buffer partition number to end (inclusive) range with + #[clap( + long = "--write-buffer-partition-range-end", + env = "INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END" + )] + pub write_buffer_partition_range_end: i32, +} diff --git a/clap_blocks/src/lib.rs b/clap_blocks/src/lib.rs index 9ef11a1b2c..d1b8633e1d 100644 --- a/clap_blocks/src/lib.rs +++ b/clap_blocks/src/lib.rs @@ -2,6 +2,7 @@ //! //! They can easily be re-used using `#[clap(flatten)]`. pub mod catalog_dsn; +pub mod compactor; pub mod ingester; pub mod object_store; pub mod run_config; diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 8d1f154b3c..4d811a274a 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -1,6 +1,7 @@ //! Implementation of command line option for running all in one mode use std::{num::NonZeroU32, sync::Arc}; +use clap_blocks::compactor::CompactorConfig; use clap_blocks::{ catalog_dsn::CatalogDsnConfig, ingester::IngesterConfig, @@ -60,6 +61,9 @@ pub enum Error { #[error("Ingester error: {0}")] Ingester(#[from] influxdb_ioxd::server_type::ingester::Error), + #[error("error initializing compactor: {0}")] + Compactor(#[from] influxdb_ioxd::server_type::compactor::Error), + #[error("Invalid config: {0}")] InvalidConfig(#[from] CommonServerStateError), } @@ -148,6 +152,9 @@ pub struct Config { #[clap(flatten)] pub(crate) ingester_config: IngesterConfig, + #[clap(flatten)] + pub(crate) compactor_config: CompactorConfig, + /// The address on which IOx will serve Router HTTP API requests #[clap( long = "--router-http-bind", @@ -230,6 +237,7 @@ impl Config { catalog_dsn: self.catalog_dsn, write_buffer_config: self.write_buffer_config, ingester_config: self.ingester_config, + compactor_config: self.compactor_config, } } } @@ -245,6 +253,7 @@ struct SpecializedConfig { catalog_dsn: CatalogDsnConfig, write_buffer_config: WriteBufferConfig, ingester_config: IngesterConfig, + compactor_config: CompactorConfig, } pub async fn command(config: Config) -> Result<()> { @@ -256,6 +265,7 @@ pub async fn command(config: Config) -> Result<()> { catalog_dsn, mut write_buffer_config, ingester_config, + compactor_config, } = config.specialize(); // Ensure at least one topic is automatically created in all in one mode @@ -332,7 +342,6 @@ pub async fn command(config: Config) -> Result<()> { .await?; info!("starting compactor"); - let sequencers = vec![]; // TODO sequencers let compactor = create_compactor_server_type( &common_state, Arc::clone(&metrics), @@ -340,9 +349,9 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&object_store), Arc::clone(&exec), Arc::clone(&time_provider), - sequencers, + compactor_config, ) - .await; + .await?; info!("starting querier"); let querier = create_querier_server_type( diff --git a/influxdb_iox/src/commands/run/compactor.rs b/influxdb_iox/src/commands/run/compactor.rs index 5e3153571e..04fed35c6d 100644 --- a/influxdb_iox/src/commands/run/compactor.rs +++ b/influxdb_iox/src/commands/run/compactor.rs @@ -1,6 +1,5 @@ //! Implementation of command line option for running the compactor -use data_types2::SequencerId; use object_store::{instrumentation::ObjectStoreMetrics, DynObjectStore, ObjectStoreImpl}; use observability_deps::tracing::*; use query::exec::Executor; @@ -8,7 +7,9 @@ use std::sync::Arc; use thiserror::Error; use time::SystemProvider; -use clap_blocks::{catalog_dsn::CatalogDsnConfig, run_config::RunConfig}; +use clap_blocks::{ + catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, run_config::RunConfig, +}; use influxdb_ioxd::{ self, server_type::{ @@ -34,6 +35,9 @@ pub enum Error { #[error("Cannot parse object store config: {0}")] ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError), + + #[error("error initializing compactor: {0}")] + Compactor(#[from] influxdb_ioxd::server_type::compactor::Error), } #[derive(Debug, clap::Parser)] @@ -58,13 +62,16 @@ pub struct Config { #[clap(flatten)] pub(crate) catalog_dsn: CatalogDsnConfig, + #[clap(flatten)] + pub(crate) compactor_config: CompactorConfig, + /// Number of threads to use for the compactor query execution, compaction and persistence. #[clap( long = "--query-exec-thread-count", env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT", default_value = "4" )] - pub query_exect_thread_count: usize, + pub query_exec_thread_count: usize, } pub async fn command(config: Config) -> Result<(), Error> { @@ -82,12 +89,9 @@ pub async fn command(config: Config) -> Result<(), Error> { let object_store: Arc = Arc::new(ObjectStoreMetrics::new(object_store, &*metric_registry)); - let exec = Arc::new(Executor::new(config.query_exect_thread_count)); + let exec = Arc::new(Executor::new(config.query_exec_thread_count)); let time_provider = Arc::new(SystemProvider::new()); - // TODO: modify config to let us get assigned sequence numbers - let sequencers: Vec = vec![]; - let server_type = create_compactor_server_type( &common_state, metric_registry, @@ -95,9 +99,9 @@ pub async fn command(config: Config) -> Result<(), Error> { object_store, exec, time_provider, - sequencers, + config.compactor_config, ) - .await; + .await?; info!("starting compactor"); diff --git a/influxdb_ioxd/src/server_type/compactor.rs b/influxdb_ioxd/src/server_type/compactor.rs index 063721cd61..1c5346aca6 100644 --- a/influxdb_ioxd/src/server_type/compactor.rs +++ b/influxdb_ioxd/src/server_type/compactor.rs @@ -8,7 +8,7 @@ use compactor::{ handler::{CompactorHandler, CompactorHandlerImpl}, server::CompactorServer, }; -use data_types2::SequencerId; +use data_types2::KafkaPartition; use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; use metric::Registry; @@ -23,6 +23,22 @@ use crate::{ rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput}, server_type::{common_state::CommonServerState, RpcError, ServerType}, }; +use clap_blocks::compactor::CompactorConfig; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Catalog error: {0}")] + Catalog(#[from] iox_catalog::interface::Error), + + #[error("Kafka topic {0} not found in the catalog")] + KafkaTopicNotFound(String), + + #[error("kafka_partition_range_start must be <= kafka_partition_range_end")] + KafkaRange, +} + +pub type Result = std::result::Result; #[derive(Debug)] pub struct CompactorServerType { @@ -115,8 +131,33 @@ pub async fn create_compactor_server_type( object_store: Arc, exec: Arc, time_provider: Arc, - sequencers: Vec, -) -> Arc { + compactor_config: CompactorConfig, +) -> Result> { + if compactor_config.write_buffer_partition_range_start + > compactor_config.write_buffer_partition_range_end + { + return Err(Error::KafkaRange); + } + + let mut txn = catalog.start_transaction().await?; + let kafka_topic = txn + .kafka_topics() + .get_by_name(&compactor_config.topic) + .await? + .ok_or(Error::KafkaTopicNotFound(compactor_config.topic))?; + + let kafka_partitions: Vec<_> = (compactor_config.write_buffer_partition_range_start + ..=compactor_config.write_buffer_partition_range_end) + .map(KafkaPartition::new) + .collect(); + + let mut sequencers = Vec::with_capacity(kafka_partitions.len()); + for k in kafka_partitions { + let s = txn.sequencers().create_or_get(&kafka_topic, k).await?; + sequencers.push(s.id); + } + txn.commit().await?; + let compactor_handler = Arc::new(CompactorHandlerImpl::new( sequencers, catalog, @@ -127,5 +168,5 @@ pub async fn create_compactor_server_type( )); let compactor = CompactorServer::new(metric_registry, compactor_handler); - Arc::new(CompactorServerType::new(compactor, common_state)) + Ok(Arc::new(CompactorServerType::new(compactor, common_state))) }