Merge pull request #6323 from influxdata/dom/wal-rotate-persist
feat(ingester2): persist data when rotating WAL segmentspull/24376/head
commit
464fcba98f
|
@ -2794,8 +2794,10 @@ dependencies = [
|
|||
"hyper",
|
||||
"ingester2",
|
||||
"iox_catalog",
|
||||
"iox_query",
|
||||
"ioxd_common",
|
||||
"metric",
|
||||
"parquet_file",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
|
|
|
@ -29,4 +29,40 @@ pub struct Ingester2Config {
|
|||
action
|
||||
)]
|
||||
pub concurrent_query_limit: usize,
|
||||
|
||||
/// The maximum number of persist tasks that can run simultaneously.
|
||||
#[clap(
|
||||
long = "persist-max-parallelism",
|
||||
env = "INFLUXDB_IOX_PERSIST_MAX_PARALLELISM",
|
||||
default_value = "5",
|
||||
action
|
||||
)]
|
||||
pub persist_max_parallelism: usize,
|
||||
|
||||
/// The maximum number of persist tasks that can be queued for each worker.
|
||||
///
|
||||
/// Note that each partition is consistently hashed to the same worker -
|
||||
/// this can cause uneven distribution of persist tasks across workers in
|
||||
/// workloads with skewed / hot partitions.
|
||||
#[clap(
|
||||
long = "persist-worker-queue-depth",
|
||||
env = "INFLUXDB_IOX_PERSIST_WORKER_QUEUE_DEPTH",
|
||||
default_value = "10",
|
||||
action
|
||||
)]
|
||||
pub persist_worker_queue_depth: usize,
|
||||
|
||||
/// The maximum number of persist tasks queued in the shared submission
|
||||
/// queue. This is an advanced option, users should prefer
|
||||
/// "--persist-worker-queue-depth".
|
||||
///
|
||||
/// This queue provides a buffer for persist tasks before they are hashed to
|
||||
/// a worker and enqueued for the worker to process.
|
||||
#[clap(
|
||||
long = "persist-submission-queue-depth",
|
||||
env = "INFLUXDB_IOX_PERSIST_SUBMISSION_QUEUE_DEPTH",
|
||||
default_value = "5",
|
||||
action
|
||||
)]
|
||||
pub persist_submission_queue_depth: usize,
|
||||
}
|
||||
|
|
|
@ -3,29 +3,35 @@
|
|||
use super::main;
|
||||
use crate::process_info::setup_metric_registry;
|
||||
use clap_blocks::{
|
||||
catalog_dsn::CatalogDsnConfig, ingester2::Ingester2Config, run_config::RunConfig,
|
||||
catalog_dsn::CatalogDsnConfig, ingester2::Ingester2Config, object_store::make_object_store,
|
||||
run_config::RunConfig,
|
||||
};
|
||||
use iox_query::exec::Executor;
|
||||
use ioxd_common::{
|
||||
server_type::{CommonServerState, CommonServerStateError},
|
||||
Service,
|
||||
};
|
||||
use ioxd_ingester2::create_ingester_server_type;
|
||||
use observability_deps::tracing::*;
|
||||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
#[error("run: {0}")]
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Invalid config: {0}")]
|
||||
#[error("invalid config: {0}")]
|
||||
InvalidConfig(#[from] CommonServerStateError),
|
||||
|
||||
#[error("cannot parse object store config: {0}")]
|
||||
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
|
||||
|
||||
#[error("error initializing ingester2: {0}")]
|
||||
Ingester(#[from] ioxd_ingester2::Error),
|
||||
|
||||
#[error("Catalog DSN error: {0}")]
|
||||
#[error("catalog DSN error: {0}")]
|
||||
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
|
||||
}
|
||||
|
||||
|
@ -54,6 +60,16 @@ pub struct Config {
|
|||
|
||||
#[clap(flatten)]
|
||||
pub(crate) ingester_config: Ingester2Config,
|
||||
|
||||
/// Specify the size of the thread-pool for query execution, and the
|
||||
/// separate compaction thread-pool.
|
||||
#[clap(
|
||||
long = "exec-thread-count",
|
||||
env = "INFLUXDB_IOX_EXEC_THREAD_COUNT",
|
||||
default_value = "4",
|
||||
action
|
||||
)]
|
||||
pub exec_thread_count: usize,
|
||||
}
|
||||
|
||||
pub async fn command(config: Config) -> Result<()> {
|
||||
|
@ -65,11 +81,17 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
.get_catalog("ingester", Arc::clone(&metric_registry))
|
||||
.await?;
|
||||
|
||||
let exec = Arc::new(Executor::new(config.exec_thread_count));
|
||||
let object_store = make_object_store(config.run_config.object_store_config())
|
||||
.map_err(Error::ObjectStoreParsing)?;
|
||||
|
||||
let server_type = create_ingester_server_type(
|
||||
&common_state,
|
||||
catalog,
|
||||
Arc::clone(&metric_registry),
|
||||
&config.ingester_config,
|
||||
exec,
|
||||
ParquetStorage::new(object_store, StorageId::from("iox")),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -9,6 +9,8 @@ use generated_types::influxdata::iox::{
|
|||
ingester::v1::write_service_server::{WriteService, WriteServiceServer},
|
||||
};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use thiserror::Error;
|
||||
use wal::Wal;
|
||||
|
||||
|
@ -19,6 +21,7 @@ use crate::{
|
|||
table::name_resolver::{TableNameProvider, TableNameResolver},
|
||||
BufferTree,
|
||||
},
|
||||
persist::handle::PersistHandle,
|
||||
server::grpc::GrpcDelegate,
|
||||
timestamp_oracle::TimestampOracle,
|
||||
wal::{rotate_task::periodic_rotation, wal_sink::WalSink},
|
||||
|
@ -69,6 +72,7 @@ pub struct IngesterGuard<T> {
|
|||
///
|
||||
/// Aborted on drop.
|
||||
rotation_task: tokio::task::JoinHandle<()>,
|
||||
persist_task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl<T> IngesterGuard<T> {
|
||||
|
@ -140,12 +144,18 @@ pub enum InitError {
|
|||
/// value should be tuned to be slightly less than the interval between persist
|
||||
/// operations, but not so long that it causes catalog load spikes at persist
|
||||
/// time (which can be observed by the catalog instrumentation metrics).
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
catalog: Arc<dyn Catalog>,
|
||||
metrics: Arc<metric::Registry>,
|
||||
persist_background_fetch_time: Duration,
|
||||
wal_directory: PathBuf,
|
||||
wal_rotation_period: Duration,
|
||||
persist_executor: Arc<Executor>,
|
||||
persist_submission_queue_depth: usize,
|
||||
persist_workers: usize,
|
||||
persist_worker_queue_depth: usize,
|
||||
object_store: ParquetStorage,
|
||||
) -> Result<IngesterGuard<impl IngesterRpcInterface>, InitError> {
|
||||
// Initialise the deferred namespace name resolver.
|
||||
let namespace_name_provider: Arc<dyn NamespaceNameProvider> =
|
||||
|
@ -210,13 +220,30 @@ pub async fn new(
|
|||
.await
|
||||
.map_err(|e| InitError::WalReplay(e.into()))?;
|
||||
|
||||
// Spawn the persist workers to compact partition data, convert it into
|
||||
// Parquet files, and upload them to object storage.
|
||||
let (persist_handle, persist_actor) = PersistHandle::new(
|
||||
persist_submission_queue_depth,
|
||||
persist_workers,
|
||||
persist_worker_queue_depth,
|
||||
persist_executor,
|
||||
object_store,
|
||||
Arc::clone(&catalog),
|
||||
);
|
||||
let persist_task = tokio::spawn(persist_actor.run());
|
||||
|
||||
// TODO: persist replayed ops, if any
|
||||
|
||||
// Build the chain of DmlSink that forms the write path.
|
||||
let write_path = WalSink::new(Arc::clone(&buffer), wal.write_handle().await);
|
||||
|
||||
// Spawn a background thread to periodically rotate the WAL segment file.
|
||||
let handle = tokio::spawn(periodic_rotation(wal, wal_rotation_period));
|
||||
let handle = tokio::spawn(periodic_rotation(
|
||||
wal,
|
||||
wal_rotation_period,
|
||||
Arc::clone(&buffer),
|
||||
persist_handle,
|
||||
));
|
||||
|
||||
// Restore the highest sequence number from the WAL files, and default to 0
|
||||
// if there were no files to replay.
|
||||
|
@ -233,5 +260,6 @@ pub async fn new(
|
|||
Ok(IngesterGuard {
|
||||
rpc: GrpcDelegate::new(Arc::new(write_path), buffer, timestamp, catalog, metrics),
|
||||
rotation_task: handle,
|
||||
persist_task,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ impl Drop for PersistActor {
|
|||
impl PersistActor {
|
||||
pub(super) fn new(
|
||||
rx: mpsc::Receiver<PersistRequest>,
|
||||
exec: Executor,
|
||||
exec: Arc<Executor>,
|
||||
store: ParquetStorage,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
workers: usize,
|
||||
|
@ -87,7 +87,7 @@ impl PersistActor {
|
|||
}
|
||||
|
||||
pub(super) struct Inner {
|
||||
pub(super) exec: Executor,
|
||||
pub(super) exec: Arc<Executor>,
|
||||
pub(super) store: ParquetStorage,
|
||||
pub(super) catalog: Arc<dyn Catalog>,
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
use observability_deps::tracing::info;
|
||||
use parking_lot::Mutex;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use thiserror::Error;
|
||||
|
@ -124,12 +125,21 @@ impl PersistHandle {
|
|||
submission_queue_depth: usize,
|
||||
n_workers: usize,
|
||||
worker_queue_depth: usize,
|
||||
exec: Executor,
|
||||
exec: Arc<Executor>,
|
||||
store: ParquetStorage,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
) -> (Self, PersistActor) {
|
||||
let (tx, rx) = mpsc::channel(submission_queue_depth);
|
||||
|
||||
// Log the important configuration parameters of the persist subsystem.
|
||||
info!(
|
||||
submission_queue_depth,
|
||||
n_workers,
|
||||
worker_queue_depth,
|
||||
max_queued_tasks = submission_queue_depth + (n_workers * worker_queue_depth),
|
||||
"initialised persist task"
|
||||
);
|
||||
|
||||
let actor = PersistActor::new(rx, exec, store, catalog, n_workers, worker_queue_depth);
|
||||
|
||||
(Self { tx }, actor)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
mod actor;
|
||||
pub(super) mod compact;
|
||||
mod context;
|
||||
mod handle;
|
||||
pub(crate) mod handle;
|
||||
|
|
|
@ -1,20 +1,92 @@
|
|||
use futures::{stream, StreamExt};
|
||||
use observability_deps::tracing::*;
|
||||
use std::time::Duration;
|
||||
use std::{future, sync::Arc, time::Duration};
|
||||
|
||||
use crate::{buffer_tree::BufferTree, persist::handle::PersistHandle};
|
||||
|
||||
/// [`PERSIST_ENQUEUE_CONCURRENCY`] defines the parallelism used when acquiring
|
||||
/// partition locks and marking the partition as persisting.
|
||||
const PERSIST_ENQUEUE_CONCURRENCY: usize = 10;
|
||||
|
||||
/// Rotate the `wal` segment file every `period` duration of time.
|
||||
pub(crate) async fn periodic_rotation(wal: wal::Wal, period: Duration) {
|
||||
pub(crate) async fn periodic_rotation(
|
||||
wal: wal::Wal,
|
||||
period: Duration,
|
||||
buffer: Arc<BufferTree>,
|
||||
persist: PersistHandle,
|
||||
) {
|
||||
let handle = wal.rotation_handle();
|
||||
let mut interval = tokio::time::interval(period);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
debug!("rotating wal file");
|
||||
info!("rotating wal file");
|
||||
|
||||
let stats = handle.rotate().await.expect("failed to rotate WAL");
|
||||
info!(
|
||||
debug!(
|
||||
closed_id = %stats.id(),
|
||||
segment_bytes = stats.size(),
|
||||
"rotated wal"
|
||||
);
|
||||
|
||||
// Drain the BufferTree of partition data and persist each one.
|
||||
//
|
||||
// Writes that landed into the partition buffer after the rotation but
|
||||
// before the partition data is read will be included in the parquet
|
||||
// file, but this is not a problem in the happy case (they will not
|
||||
// appear in the next persist too).
|
||||
//
|
||||
// In the case of an ingester crash after these partitions (with their
|
||||
// extra writes) have been persisted, the ingester will replay them and
|
||||
// re-persist them, causing a small number of duplicate writes to be
|
||||
// present in object storage that must be asynchronously compacted later
|
||||
// - a small price to pay for not having to block ingest while the WAL
|
||||
// is rotated, all outstanding writes + queries complete, and all then
|
||||
// partitions are marked as persisting.
|
||||
|
||||
let notifications = stream::iter(buffer.partitions())
|
||||
.filter_map(|p| {
|
||||
async move {
|
||||
// Skip this partition if there is no data to persist
|
||||
let data = p.lock().mark_persisting()?;
|
||||
|
||||
// Enqueue the partition for persistence.
|
||||
//
|
||||
// The persist task will call mark_persisted() on the partition
|
||||
// once complete.
|
||||
// Some(future::ready(persist.queue_persist(p, data).await))
|
||||
Some(future::ready((p, data)))
|
||||
}
|
||||
})
|
||||
// Concurrently attempt to obtain partition locks and mark them as
|
||||
// persisting. This will hide the latency of individual lock
|
||||
// acquisitions.
|
||||
.buffer_unordered(PERSIST_ENQUEUE_CONCURRENCY)
|
||||
// Serialise adding partitions to the persist queue (a fast
|
||||
// operation that doesn't benefit from contention at all).
|
||||
.then(|(p, data)| {
|
||||
let persist = persist.clone();
|
||||
async move { persist.queue_persist(p, data).await }
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
debug!(
|
||||
n_partitions = notifications.len(),
|
||||
closed_id = %stats.id(),
|
||||
"queued partitions for persist"
|
||||
);
|
||||
|
||||
// Wait for all the persist completion notifications.
|
||||
for n in notifications {
|
||||
n.notified().await;
|
||||
}
|
||||
|
||||
debug!(
|
||||
closed_id = %stats.id(),
|
||||
"partitions persisted"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(test): rotate task
|
||||
|
|
|
@ -11,8 +11,10 @@ clap_blocks = { path = "../clap_blocks" }
|
|||
hyper = "0.14"
|
||||
ingester2 = { path = "../ingester2" }
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
iox_query = { version = "0.1.0", path = "../iox_query" }
|
||||
ioxd_common = { path = "../ioxd_common" }
|
||||
metric = { path = "../metric" }
|
||||
parquet_file = { version = "0.1.0", path = "../parquet_file" }
|
||||
thiserror = "1.0.37"
|
||||
tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
|
||||
tokio-util = { version = "0.7.4" }
|
||||
|
|
|
@ -3,6 +3,7 @@ use clap_blocks::ingester2::Ingester2Config;
|
|||
use hyper::{Body, Request, Response};
|
||||
use ingester2::{IngesterGuard, IngesterRpcInterface};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
use ioxd_common::{
|
||||
add_service,
|
||||
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
|
||||
|
@ -12,6 +13,7 @@ use ioxd_common::{
|
|||
setup_builder,
|
||||
};
|
||||
use metric::Registry;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
sync::Arc,
|
||||
|
@ -143,6 +145,8 @@ pub async fn create_ingester_server_type(
|
|||
catalog: Arc<dyn Catalog>,
|
||||
metrics: Arc<Registry>,
|
||||
ingester_config: &Ingester2Config,
|
||||
exec: Arc<Executor>,
|
||||
object_store: ParquetStorage,
|
||||
) -> Result<Arc<dyn ServerType>> {
|
||||
let grpc = ingester2::new(
|
||||
catalog,
|
||||
|
@ -150,6 +154,11 @@ pub async fn create_ingester_server_type(
|
|||
PERSIST_BACKGROUND_FETCH_TIME,
|
||||
ingester_config.wal_directory.clone(),
|
||||
Duration::from_secs(ingester_config.wal_rotation_period_seconds),
|
||||
exec,
|
||||
ingester_config.persist_submission_queue_depth,
|
||||
ingester_config.persist_max_parallelism,
|
||||
ingester_config.persist_worker_queue_depth,
|
||||
object_store,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
Loading…
Reference in New Issue