feat(ingester2): run persistence task

Configures the initialisation of an ingester2 instance to spawn a
persistence task (currently unused) and plumbs in various configuration
parameters.
pull/24376/head
Dom Dwyer 2022-12-01 15:32:58 +01:00
parent e234187a94
commit 66aab55534
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
9 changed files with 112 additions and 8 deletions

2
Cargo.lock generated
View File

@ -2794,8 +2794,10 @@ dependencies = [
"hyper",
"ingester2",
"iox_catalog",
"iox_query",
"ioxd_common",
"metric",
"parquet_file",
"thiserror",
"tokio",
"tokio-util",

View File

@ -29,4 +29,40 @@ pub struct Ingester2Config {
action
)]
pub concurrent_query_limit: usize,
/// The maximum number of persist tasks that can run simultaneously.
#[clap(
long = "persist-max-parallelism",
env = "INFLUXDB_IOX_PERSIST_MAX_PARALLELISM",
default_value = "5",
action
)]
pub persist_max_parallelism: usize,
/// The maximum number of persist tasks that can be queued for each worker.
///
/// Note that each partition is consistently hashed to the same worker -
/// this can cause uneven distribution of persist tasks across workers in
/// workloads with skewed / hot partitions.
#[clap(
long = "persist-worker-queue-depth",
env = "INFLUXDB_IOX_PERSIST_WORKER_QUEUE_DEPTH",
default_value = "10",
action
)]
pub persist_worker_queue_depth: usize,
/// The maximum number of persist tasks queued in the shared submission
/// queue. This is an advanced option, users should prefer
/// "--persist-worker-queue-depth".
///
/// This queue provides a buffer for persist tasks before they are hashed to
/// a worker and enqueued for the worker to process.
#[clap(
long = "persist-submission-queue-depth",
env = "INFLUXDB_IOX_PERSIST_SUBMISSION_QUEUE_DEPTH",
default_value = "5",
action
)]
pub persist_submission_queue_depth: usize,
}

View File

@ -3,29 +3,35 @@
use super::main;
use crate::process_info::setup_metric_registry;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, ingester2::Ingester2Config, run_config::RunConfig,
catalog_dsn::CatalogDsnConfig, ingester2::Ingester2Config, object_store::make_object_store,
run_config::RunConfig,
};
use iox_query::exec::Executor;
use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
Service,
};
use ioxd_ingester2::create_ingester_server_type;
use observability_deps::tracing::*;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
#[error("run: {0}")]
Run(#[from] main::Error),
#[error("Invalid config: {0}")]
#[error("invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
#[error("cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("error initializing ingester2: {0}")]
Ingester(#[from] ioxd_ingester2::Error),
#[error("Catalog DSN error: {0}")]
#[error("catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
}
@ -54,6 +60,16 @@ pub struct Config {
#[clap(flatten)]
pub(crate) ingester_config: Ingester2Config,
/// Specify the size of the thread-pool for query execution, and the
/// separate compaction thread-pool.
#[clap(
long = "exec-thread-count",
env = "INFLUXDB_IOX_EXEC_THREAD_COUNT",
default_value = "4",
action
)]
pub exec_thread_count: usize,
}
pub async fn command(config: Config) -> Result<()> {
@ -65,11 +81,17 @@ pub async fn command(config: Config) -> Result<()> {
.get_catalog("ingester", Arc::clone(&metric_registry))
.await?;
let exec = Arc::new(Executor::new(config.exec_thread_count));
let object_store = make_object_store(config.run_config.object_store_config())
.map_err(Error::ObjectStoreParsing)?;
let server_type = create_ingester_server_type(
&common_state,
catalog,
Arc::clone(&metric_registry),
&config.ingester_config,
exec,
ParquetStorage::new(object_store, StorageId::from("iox")),
)
.await?;

View File

@ -9,6 +9,8 @@ use generated_types::influxdata::iox::{
ingester::v1::write_service_server::{WriteService, WriteServiceServer},
};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use parquet_file::storage::ParquetStorage;
use thiserror::Error;
use wal::Wal;
@ -19,6 +21,7 @@ use crate::{
table::name_resolver::{TableNameProvider, TableNameResolver},
BufferTree,
},
persist::handle::PersistHandle,
server::grpc::GrpcDelegate,
timestamp_oracle::TimestampOracle,
wal::{rotate_task::periodic_rotation, wal_sink::WalSink},
@ -69,6 +72,7 @@ pub struct IngesterGuard<T> {
///
/// Aborted on drop.
rotation_task: tokio::task::JoinHandle<()>,
persist_task: tokio::task::JoinHandle<()>,
}
impl<T> IngesterGuard<T> {
@ -140,12 +144,18 @@ pub enum InitError {
/// value should be tuned to be slightly less than the interval between persist
/// operations, but not so long that it causes catalog load spikes at persist
/// time (which can be observed by the catalog instrumentation metrics).
#[allow(clippy::too_many_arguments)]
pub async fn new(
catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>,
persist_background_fetch_time: Duration,
wal_directory: PathBuf,
wal_rotation_period: Duration,
persist_executor: Arc<Executor>,
persist_submission_queue_depth: usize,
persist_workers: usize,
persist_worker_queue_depth: usize,
object_store: ParquetStorage,
) -> Result<IngesterGuard<impl IngesterRpcInterface>, InitError> {
// Initialise the deferred namespace name resolver.
let namespace_name_provider: Arc<dyn NamespaceNameProvider> =
@ -210,6 +220,18 @@ pub async fn new(
.await
.map_err(|e| InitError::WalReplay(e.into()))?;
// Spawn the persist workers to compact partition data, convert it into
// Parquet files, and upload them to object storage.
let (_handle, persist_actor) = PersistHandle::new(
persist_submission_queue_depth,
persist_workers,
persist_worker_queue_depth,
persist_executor,
object_store,
Arc::clone(&catalog),
);
let persist_task = tokio::spawn(persist_actor.run());
// TODO: persist replayed ops, if any
// Build the chain of DmlSink that forms the write path.
@ -233,5 +255,6 @@ pub async fn new(
Ok(IngesterGuard {
rpc: GrpcDelegate::new(Arc::new(write_path), buffer, timestamp, catalog, metrics),
rotation_task: handle,
persist_task,
})
}

View File

@ -43,7 +43,7 @@ impl Drop for PersistActor {
impl PersistActor {
pub(super) fn new(
rx: mpsc::Receiver<PersistRequest>,
exec: Executor,
exec: Arc<Executor>,
store: ParquetStorage,
catalog: Arc<dyn Catalog>,
workers: usize,
@ -87,7 +87,7 @@ impl PersistActor {
}
pub(super) struct Inner {
pub(super) exec: Executor,
pub(super) exec: Arc<Executor>,
pub(super) store: ParquetStorage,
pub(super) catalog: Arc<dyn Catalog>,
}

View File

@ -2,6 +2,7 @@ use std::sync::Arc;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use observability_deps::tracing::info;
use parking_lot::Mutex;
use parquet_file::storage::ParquetStorage;
use thiserror::Error;
@ -124,12 +125,21 @@ impl PersistHandle {
submission_queue_depth: usize,
n_workers: usize,
worker_queue_depth: usize,
exec: Executor,
exec: Arc<Executor>,
store: ParquetStorage,
catalog: Arc<dyn Catalog>,
) -> (Self, PersistActor) {
let (tx, rx) = mpsc::channel(submission_queue_depth);
// Log the important configuration parameters of the persist subsystem.
info!(
submission_queue_depth,
n_workers,
worker_queue_depth,
max_queued_tasks = submission_queue_depth + (n_workers * worker_queue_depth),
"initialised persist task"
);
let actor = PersistActor::new(rx, exec, store, catalog, n_workers, worker_queue_depth);
(Self { tx }, actor)

View File

@ -1,4 +1,4 @@
mod actor;
pub(super) mod compact;
mod context;
mod handle;
pub(crate) mod handle;

View File

@ -11,8 +11,10 @@ clap_blocks = { path = "../clap_blocks" }
hyper = "0.14"
ingester2 = { path = "../ingester2" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { version = "0.1.0", path = "../iox_query" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
parquet_file = { version = "0.1.0", path = "../parquet_file" }
thiserror = "1.0.37"
tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-util = { version = "0.7.4" }

View File

@ -3,6 +3,7 @@ use clap_blocks::ingester2::Ingester2Config;
use hyper::{Body, Request, Response};
use ingester2::{IngesterGuard, IngesterRpcInterface};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
@ -12,6 +13,7 @@ use ioxd_common::{
setup_builder,
};
use metric::Registry;
use parquet_file::storage::ParquetStorage;
use std::{
fmt::{Debug, Display},
sync::Arc,
@ -143,6 +145,8 @@ pub async fn create_ingester_server_type(
catalog: Arc<dyn Catalog>,
metrics: Arc<Registry>,
ingester_config: &Ingester2Config,
exec: Arc<Executor>,
object_store: ParquetStorage,
) -> Result<Arc<dyn ServerType>> {
let grpc = ingester2::new(
catalog,
@ -150,6 +154,11 @@ pub async fn create_ingester_server_type(
PERSIST_BACKGROUND_FETCH_TIME,
ingester_config.wal_directory.clone(),
Duration::from_secs(ingester_config.wal_rotation_period_seconds),
exec,
ingester_config.persist_submission_queue_depth,
ingester_config.persist_max_parallelism,
ingester_config.persist_worker_queue_depth,
object_store,
)
.await?;