Merge branch 'main' into cn/get-tombstones

pull/24376/head
kodiakhq[bot] 2022-03-23 14:18:41 +00:00 committed by GitHub
commit 58bfab5a8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 16 deletions

View File

@ -0,0 +1,26 @@
/// CLI config for compactor
#[derive(Debug, Clone, clap::Parser)]
pub struct CompactorConfig {
/// Write buffer topic/database that the compactor will be compacting files for. It won't
/// connect to Kafka, but uses this to get the sequencers out of the catalog.
#[clap(
long = "--write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared"
)]
pub topic: String,
/// Write buffer partition number to start (inclusive) range with
#[clap(
long = "--write-buffer-partition-range-start",
env = "INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_START"
)]
pub write_buffer_partition_range_start: i32,
/// Write buffer partition number to end (inclusive) range with
#[clap(
long = "--write-buffer-partition-range-end",
env = "INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END"
)]
pub write_buffer_partition_range_end: i32,
}

View File

@ -2,6 +2,7 @@
//!
//! They can easily be re-used using `#[clap(flatten)]`.
pub mod catalog_dsn;
pub mod compactor;
pub mod ingester;
pub mod object_store;
pub mod run_config;

View File

@ -1,6 +1,7 @@
//! Implementation of command line option for running all in one mode
use std::{num::NonZeroU32, sync::Arc};
use clap_blocks::compactor::CompactorConfig;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig,
ingester::IngesterConfig,
@ -60,6 +61,9 @@ pub enum Error {
#[error("Ingester error: {0}")]
Ingester(#[from] influxdb_ioxd::server_type::ingester::Error),
#[error("error initializing compactor: {0}")]
Compactor(#[from] influxdb_ioxd::server_type::compactor::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
}
@ -148,6 +152,9 @@ pub struct Config {
#[clap(flatten)]
pub(crate) ingester_config: IngesterConfig,
#[clap(flatten)]
pub(crate) compactor_config: CompactorConfig,
/// The address on which IOx will serve Router HTTP API requests
#[clap(
long = "--router-http-bind",
@ -230,6 +237,7 @@ impl Config {
catalog_dsn: self.catalog_dsn,
write_buffer_config: self.write_buffer_config,
ingester_config: self.ingester_config,
compactor_config: self.compactor_config,
}
}
}
@ -245,6 +253,7 @@ struct SpecializedConfig {
catalog_dsn: CatalogDsnConfig,
write_buffer_config: WriteBufferConfig,
ingester_config: IngesterConfig,
compactor_config: CompactorConfig,
}
pub async fn command(config: Config) -> Result<()> {
@ -256,6 +265,7 @@ pub async fn command(config: Config) -> Result<()> {
catalog_dsn,
mut write_buffer_config,
ingester_config,
compactor_config,
} = config.specialize();
// Ensure at least one topic is automatically created in all in one mode
@ -332,7 +342,6 @@ pub async fn command(config: Config) -> Result<()> {
.await?;
info!("starting compactor");
let sequencers = vec![]; // TODO sequencers
let compactor = create_compactor_server_type(
&common_state,
Arc::clone(&metrics),
@ -340,9 +349,9 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&object_store),
Arc::clone(&exec),
Arc::clone(&time_provider),
sequencers,
compactor_config,
)
.await;
.await?;
info!("starting querier");
let querier = create_querier_server_type(

View File

@ -1,6 +1,5 @@
//! Implementation of command line option for running the compactor
use data_types2::SequencerId;
use object_store::{instrumentation::ObjectStoreMetrics, DynObjectStore, ObjectStoreImpl};
use observability_deps::tracing::*;
use query::exec::Executor;
@ -8,7 +7,9 @@ use std::sync::Arc;
use thiserror::Error;
use time::SystemProvider;
use clap_blocks::{catalog_dsn::CatalogDsnConfig, run_config::RunConfig};
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, run_config::RunConfig,
};
use influxdb_ioxd::{
self,
server_type::{
@ -34,6 +35,9 @@ pub enum Error {
#[error("Cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("error initializing compactor: {0}")]
Compactor(#[from] influxdb_ioxd::server_type::compactor::Error),
}
#[derive(Debug, clap::Parser)]
@ -58,13 +62,16 @@ pub struct Config {
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
#[clap(flatten)]
pub(crate) compactor_config: CompactorConfig,
/// Number of threads to use for the compactor query execution, compaction and persistence.
#[clap(
long = "--query-exec-thread-count",
env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT",
default_value = "4"
)]
pub query_exect_thread_count: usize,
pub query_exec_thread_count: usize,
}
pub async fn command(config: Config) -> Result<(), Error> {
@ -82,12 +89,9 @@ pub async fn command(config: Config) -> Result<(), Error> {
let object_store: Arc<DynObjectStore> =
Arc::new(ObjectStoreMetrics::new(object_store, &*metric_registry));
let exec = Arc::new(Executor::new(config.query_exect_thread_count));
let exec = Arc::new(Executor::new(config.query_exec_thread_count));
let time_provider = Arc::new(SystemProvider::new());
// TODO: modify config to let us get assigned sequence numbers
let sequencers: Vec<SequencerId> = vec![];
let server_type = create_compactor_server_type(
&common_state,
metric_registry,
@ -95,9 +99,9 @@ pub async fn command(config: Config) -> Result<(), Error> {
object_store,
exec,
time_provider,
sequencers,
config.compactor_config,
)
.await;
.await?;
info!("starting compactor");

View File

@ -8,7 +8,7 @@ use compactor::{
handler::{CompactorHandler, CompactorHandlerImpl},
server::CompactorServer,
};
use data_types2::SequencerId;
use data_types2::KafkaPartition;
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use metric::Registry;
@ -23,6 +23,22 @@ use crate::{
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::{common_state::CommonServerState, RpcError, ServerType},
};
use clap_blocks::compactor::CompactorConfig;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
#[error("Kafka topic {0} not found in the catalog")]
KafkaTopicNotFound(String),
#[error("kafka_partition_range_start must be <= kafka_partition_range_end")]
KafkaRange,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct CompactorServerType<C: CompactorHandler> {
@ -115,8 +131,33 @@ pub async fn create_compactor_server_type(
object_store: Arc<DynObjectStore>,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
sequencers: Vec<SequencerId>,
) -> Arc<dyn ServerType> {
compactor_config: CompactorConfig,
) -> Result<Arc<dyn ServerType>> {
if compactor_config.write_buffer_partition_range_start
> compactor_config.write_buffer_partition_range_end
{
return Err(Error::KafkaRange);
}
let mut txn = catalog.start_transaction().await?;
let kafka_topic = txn
.kafka_topics()
.get_by_name(&compactor_config.topic)
.await?
.ok_or(Error::KafkaTopicNotFound(compactor_config.topic))?;
let kafka_partitions: Vec<_> = (compactor_config.write_buffer_partition_range_start
..=compactor_config.write_buffer_partition_range_end)
.map(KafkaPartition::new)
.collect();
let mut sequencers = Vec::with_capacity(kafka_partitions.len());
for k in kafka_partitions {
let s = txn.sequencers().create_or_get(&kafka_topic, k).await?;
sequencers.push(s.id);
}
txn.commit().await?;
let compactor_handler = Arc::new(CompactorHandlerImpl::new(
sequencers,
catalog,
@ -127,5 +168,5 @@ pub async fn create_compactor_server_type(
));
let compactor = CompactorServer::new(metric_registry, compactor_handler);
Arc::new(CompactorServerType::new(compactor, common_state))
Ok(Arc::new(CompactorServerType::new(compactor, common_state)))
}