feat(ingester2): parallel partition persistence

Implements actor-based, parallel persistence in ingester2 with
controllable fan-out parallelism and queue depths.

This implementation encapsulates the complexity of persistence, queuing
and parallelism - the caller simply uses the handle to persist a
partition, while the actor handles fan-out to a set of persistence
workers, compaction in a separate thread-pool, and optional completion
notifications.

By consistently hashing persist jobs onto workers, parallelism is
achieved across partitions, but serialisation of partition persists is
enforced so that the sort key update is correctly serialised.
pull/24376/head
Dom Dwyer 2022-12-01 10:37:25 +01:00
parent 85abd09be0
commit f524687602
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
9 changed files with 1785 additions and 1 deletions

2
Cargo.lock generated
View File

@ -2495,6 +2495,7 @@ dependencies = [
"observability_deps",
"once_cell",
"parking_lot 0.12.1",
"parquet_file",
"paste",
"pin-project",
"predicate",
@ -2502,6 +2503,7 @@ dependencies = [
"rand",
"schema",
"service_grpc_catalog",
"sharder",
"tempfile",
"test_helpers",
"thiserror",

View File

@ -31,12 +31,14 @@ object_store = "0.5.1"
observability_deps = { version = "0.1.0", path = "../observability_deps" }
once_cell = "1.16.0"
parking_lot = "0.12.1"
parquet_file = { version = "0.1.0", path = "../parquet_file" }
pin-project = "1.0.12"
predicate = { version = "0.1.0", path = "../predicate" }
prost = { version = "0.11.2", default-features = false, features = ["std"] }
rand = "0.8.5"
schema = { version = "0.1.0", path = "../schema" }
service_grpc_catalog = { version = "0.1.0", path = "../service_grpc_catalog" }
sharder = { version = "0.1.0", path = "../sharder" }
thiserror = "1.0.37"
tokio = { version = "1.22", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tonic = "0.8.3"

View File

@ -15,7 +15,7 @@ use super::{namespace::NamespaceName, table::TableName};
use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor};
mod buffer;
mod persisting;
pub(crate) mod persisting;
pub(crate) mod resolver;
/// The load state of the [`SortKey`] for a given partition.

View File

@ -75,6 +75,7 @@ mod arcmap;
mod buffer_tree;
mod deferred_load;
mod dml_sink;
mod persist;
mod query;
mod query_adaptor;
pub(crate) mod server;

View File

@ -0,0 +1,104 @@
use std::sync::Arc;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use parquet_file::storage::ParquetStorage;
use sharder::JumpHash;
use tokio::{sync::mpsc, task::JoinHandle};
use super::context::{Context, PersistRequest};
/// An actor implementation that fans out incoming persistence jobs to a set of
/// workers.
///
/// See [`PersistHandle`].
///
/// [`PersistHandle`]: super::handle::PersistHandle
#[must_use = "PersistActor must be ran by calling run()"]
pub(crate) struct PersistActor {
rx: mpsc::Receiver<PersistRequest>,
/// THe state/dependencies shared across all worker tasks.
inner: Arc<Inner>,
/// A consistent hash implementation used to consistently map buffers from
/// one partition to the same worker queue.
///
/// This ensures persistence is serialised per-partition, but in parallel
/// across partitions (up to the number of worker tasks).
persist_queues: JumpHash<mpsc::Sender<PersistRequest>>,
/// Task handles for the worker tasks, aborted on drop of this
/// [`PersistActor`].
tasks: Vec<JoinHandle<()>>,
}
impl Drop for PersistActor {
fn drop(&mut self) {
// Stop all background tasks when the actor goes out of scope.
self.tasks.iter().for_each(|v| v.abort())
}
}
impl PersistActor {
pub(super) fn new(
rx: mpsc::Receiver<PersistRequest>,
exec: Executor,
store: ParquetStorage,
catalog: Arc<dyn Catalog>,
workers: usize,
worker_queue_depth: usize,
) -> Self {
let inner = Arc::new(Inner {
exec,
store,
catalog,
});
let (tx_handles, tasks): (Vec<_>, Vec<_>) = (0..workers)
.map(|_| {
let inner = Arc::clone(&inner);
let (tx, rx) = mpsc::channel(worker_queue_depth);
(tx, tokio::spawn(run_task(inner, rx)))
})
.unzip();
// TODO(test): N workers running
// TODO(test): queue depths
Self {
rx,
inner,
persist_queues: JumpHash::new(tx_handles),
tasks,
}
}
/// Execute this actor task and block until all [`PersistHandle`] are
/// dropped.
///
/// [`PersistHandle`]: super::handle::PersistHandle
pub(crate) async fn run(mut self) {
while let Some(req) = self.rx.recv().await {
let tx = self.persist_queues.hash(req.partition_id());
tx.send(req).await.expect("persist worker has stopped;")
}
}
}
pub(super) struct Inner {
pub(super) exec: Executor,
pub(super) store: ParquetStorage,
pub(super) catalog: Arc<dyn Catalog>,
}
async fn run_task(inner: Arc<Inner>, mut rx: mpsc::Receiver<PersistRequest>) {
while let Some(req) = rx.recv().await {
let ctx = Context::new(req, Arc::clone(&inner));
let compacted = ctx.compact().await;
let (sort_key_update, parquet_table_data) = ctx.upload(compacted).await;
ctx.update_database(sort_key_update, parquet_table_data)
.await;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,384 @@
use std::sync::Arc;
use backoff::Backoff;
use data_types::{
CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber,
TableId,
};
use iox_catalog::interface::get_table_schema_by_id;
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::*;
use parking_lot::Mutex;
use parquet_file::metadata::IoxMetadata;
use schema::sort::SortKey;
use tokio::sync::Notify;
use uuid::Uuid;
use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{persisting::PersistingData, PartitionData, SortKeyState},
table::TableName,
},
deferred_load::DeferredLoad,
persist::compact::{compact_persisting_batch, CompactedStream},
TRANSITION_SHARD_ID,
};
use super::actor::Inner;
/// An internal type that contains all necessary information to run a persist task.
///
/// Used to communicate between actor handles & actor task.
#[derive(Debug)]
pub(super) struct PersistRequest {
complete: Arc<Notify>,
partition: Arc<Mutex<PartitionData>>,
data: PersistingData,
}
impl PersistRequest {
pub(super) fn new(partition: Arc<Mutex<PartitionData>>, data: PersistingData) -> Self {
Self {
complete: Arc::new(Notify::default()),
partition,
data,
}
}
/// Return the partition ID of the persisting data.
pub(super) fn partition_id(&self) -> PartitionId {
self.data.partition_id()
}
/// Obtain the completion notification handle for this request.
///
/// This notification is fired once persistence is complete.
pub(super) fn complete_notification(&self) -> Arc<Notify> {
Arc::clone(&self.complete)
}
}
pub(super) struct Context {
partition: Arc<Mutex<PartitionData>>,
data: PersistingData,
inner: Arc<Inner>,
/// IDs loaded from the partition at construction time.
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
// The partition key for this partition
partition_key: PartitionKey,
/// Deferred strings needed for persistence.
///
/// These [`DeferredLoad`] are given a pre-fetch hint when this [`Context`]
/// is constructed to load them in the background (if not already resolved)
/// in order to avoid incurring the query latency when the values are
/// needed.
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_name: Arc<DeferredLoad<TableName>>,
/// The [`SortKey`] for the [`PartitionData`] at the time of [`Context`]
/// construction.
///
/// The [`SortKey`] MUST NOT change during persistence, as updates to the
/// sort key are not commutative and thus require serialising. This
/// precludes parallel persists of partitions, unless they can be proven not
/// to need to update the sort key.
sort_key: SortKeyState,
/// A notification signal to indicate to the caller that this partition has
/// persisted.
complete: Arc<Notify>,
}
impl Context {
/// Construct a persistence job [`Context`] from `req`.
///
/// Locks the [`PartitionData`] in `req` to read various properties which
/// are then cached in the [`Context`].
pub(super) fn new(req: PersistRequest, inner: Arc<Inner>) -> Self {
let partition_id = req.data.partition_id();
// Obtain the partition lock and load the immutable values that will be
// used during this persistence.
let s = {
let complete = req.complete_notification();
let p = Arc::clone(&req.partition);
let guard = p.lock();
assert_eq!(partition_id, guard.partition_id());
Self {
partition: req.partition,
data: req.data,
inner,
namespace_id: guard.namespace_id(),
table_id: guard.table_id(),
partition_id,
partition_key: guard.partition_key().clone(),
namespace_name: Arc::clone(guard.namespace_name()),
table_name: Arc::clone(guard.table_name()),
// Technically the sort key isn't immutable, but MUST NOT change
// during the execution of this persist.
sort_key: guard.sort_key().clone(),
complete,
}
};
// Pre-fetch the deferred values in a background thread (if not already
// resolved)
s.namespace_name.prefetch_now();
s.table_name.prefetch_now();
if let SortKeyState::Deferred(ref d) = s.sort_key {
d.prefetch_now();
}
s
}
pub(super) async fn compact(&self) -> CompactedStream {
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
"compacting partition"
);
assert!(!self.data.record_batches().is_empty());
// Run a compaction sort the data and resolve any duplicate values.
//
// This demands the deferred load values and may have to wait for them
// to be loaded before compaction starts.
compact_persisting_batch(
&self.inner.exec,
self.sort_key.get().await,
self.table_name.get().await,
self.data.query_adaptor(),
)
.await
.expect("unable to compact persisting batch")
}
pub(super) async fn upload(
&self,
compacted: CompactedStream,
) -> (Option<SortKey>, ParquetFileParams) {
let CompactedStream {
stream: record_stream,
catalog_sort_key_update,
data_sort_key,
} = compacted;
// Generate a UUID to uniquely identify this parquet file in
// object storage.
let object_store_id = Uuid::new_v4();
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
sort_key = %data_sort_key,
"uploading partition parquet"
);
// Construct the metadata for this parquet file.
let iox_metadata = IoxMetadata {
object_store_id,
creation_timestamp: SystemProvider::new().now(),
shard_id: TRANSITION_SHARD_ID,
namespace_id: self.namespace_id,
namespace_name: Arc::clone(&*self.namespace_name.get().await),
table_id: self.table_id,
table_name: Arc::clone(&*self.table_name.get().await),
partition_id: self.partition_id,
partition_key: self.partition_key.clone(),
max_sequence_number: SequenceNumber::new(0), // TODO: not ordered!
compaction_level: CompactionLevel::Initial,
sort_key: Some(data_sort_key),
};
// Save the compacted data to a parquet file in object storage.
//
// This call retries until it completes.
let (md, file_size) = self
.inner
.store
.upload(record_stream, &iox_metadata)
.await
.expect("unexpected fatal persist error");
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
file_size,
"partition parquet uploaded"
);
// Read the table schema from the catalog to act as a map of column name
// -> column IDs.
let table_schema = Backoff::new(&Default::default())
.retry_all_errors("get table schema", || async {
let mut repos = self.inner.catalog.repositories().await;
get_table_schema_by_id(self.table_id, repos.as_mut()).await
})
.await
.expect("retry forever");
// Build the data that must be inserted into the parquet_files catalog
// table in order to make the file visible to queriers.
let parquet_table_data =
iox_metadata.to_parquet_file(self.partition_id, file_size, &md, |name| {
table_schema.columns.get(name).expect("unknown column").id
});
(catalog_sort_key_update, parquet_table_data)
}
pub(crate) async fn update_database(
self,
sort_key_update: Option<SortKey>,
parquet_table_data: ParquetFileParams,
) {
// Extract the object store ID to the local scope so that it can easily
// be referenced in debug logging to aid correlation of persist events
// for a specific file.
let object_store_id = parquet_table_data.object_store_id;
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
?parquet_table_data,
?sort_key_update,
"updating catalog"
);
// If necessary, update the partition sort key in the catalog and update
// the local cached copy in the PartitionData.
//
// This update MUST be made visibile before the parquet file, otherwise
// the consumer of the parquet file will observe an inconsistent sort
// key.
if let Some(new_sort_key) = sort_key_update {
let sort_key = new_sort_key.to_columns().collect::<Vec<_>>();
Backoff::new(&Default::default())
.retry_all_errors("update_sort_key", || async {
let mut repos = self.inner.catalog.repositories().await;
let _partition = repos
.partitions()
.update_sort_key(self.partition_id, &sort_key)
.await?;
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
// Update the sort key in the partition cache.
let old_key;
{
let mut guard = self.partition.lock();
old_key = guard.sort_key().clone();
guard.update_sort_key(Some(new_sort_key.clone()));
};
// Assert the serialisation of sort key updates.
//
// Both of these get() should not block due to both of the
// values having been previously resolved / used.
assert_eq!(old_key.get().await, self.sort_key.get().await);
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
old_sort_key = ?sort_key,
%new_sort_key,
"adjusted partition sort key"
);
}
// Add the parquet file to the catalog.
//
// This has the effect of allowing the queriers to "discover" the
// parquet file by polling / querying the catalog.
Backoff::new(&Default::default())
.retry_all_errors("add parquet file to catalog", || async {
let mut repos = self.inner.catalog.repositories().await;
let parquet_file = repos
.parquet_files()
.create(parquet_table_data.clone())
.await?;
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
?parquet_table_data,
parquet_file_id=?parquet_file.id,
"parquet file added to catalog"
);
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
// Mark the partition as having completed persistence, causing it to
// release the reference to the in-flight persistence data it is
// holding.
//
// This SHOULD cause the data to be dropped, but there MAY be ongoing
// queries that currently hold a reference to the data. In either case,
// the persisted data will be dropped "shortly".
self.partition.lock().mark_persisted(self.data);
info!(
%object_store_id,
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
"persisted partition"
);
// Notify all observers of this persistence task
self.complete.notify_waiters();
}
}
// TODO(test): persist
// TODO(test): persist completion notification

View File

@ -0,0 +1,173 @@
use std::sync::Arc;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use parking_lot::Mutex;
use parquet_file::storage::ParquetStorage;
use thiserror::Error;
use tokio::sync::{
mpsc::{self},
Notify,
};
use crate::buffer_tree::partition::{persisting::PersistingData, PartitionData};
use super::{actor::PersistActor, context::PersistRequest};
#[derive(Debug, Error)]
pub(crate) enum PersistError {
#[error("persist queue is full")]
QueueFull,
}
/// A persistence task submission handle.
///
/// This type is cheap to clone to share across threads.
///
/// # Usage
///
/// The caller should construct an [`PersistHandle`] and [`PersistActor`] by
/// calling [`PersistHandle::new()`], and run the provided [`PersistActor`]
/// instance in another thread / task (by calling [`PersistActor::run()`]).
///
/// From this point on, the caller interacts only with the [`PersistHandle`]
/// which can be cheaply cloned and passed over thread/task boundaries to
/// enqueue persist tasks.
///
/// Dropping all [`PersistHandle`] instances stops the [`PersistActor`], which
/// immediately stops all workers.
///
/// # Topology
///
/// The persist actor is uses an internal work group to parallelise persistence
/// operations up to `n_workers` number of parallel tasks.
///
/// Submitting a persistence request places the job into a bounded queue,
/// providing a buffer for persistence requests to wait for an available worker.
///
/// Two types of queue exists:
///
/// * Submission queue (bounded by `submission_queue_depth`)
/// * `n_worker` number of worker queues (bounded by `worker_queue_depth`)
///
/// ```text
/// ┌─────────────┐
/// │PersistHandle├┐
/// └┬────────────┘├┐
/// └┬────────────┘│
/// └─────┬───────┘
/// │
/// submission_queue_depth
/// │
/// ▼
/// ╔═══════════════╗
/// ║ PersistActor ║
/// ╚═══════════════╝
/// │
/// ┌────────────────┼────────────────┐
/// │ │ │
/// │ │ │
/// worker_queue_depth worker_queue_depth worker_queue_depth
/// │ │ │
/// ▼ ▼ ▼
/// ┌────────────┐ ┌────────────┐ ┌────────────┐
/// │ Worker 1 │ │ Worker 2 │ │ Worker N │
/// └────────────┘ └────────────┘ └────────────┘
/// ▲ ▲ ▲
/// │
/// │ ▼ │
/// ┌ ─ ─ ─ ─ ─ ─
/// └ ─ ─ ─ ─▶ Executor │◀ ─ ─ ─ ─ ┘
/// └ ─ ─ ─ ─ ─ ─
/// ```
///
/// Compaction is performed in the provided [`Executor`] re-org thread-pool and
/// is shared across all workers.
///
/// At any one time, the number of outstanding persist tasks is bounded by:
///
/// ```text
///
/// submission_queue_depth + (workers * worker_queue_depth)
///
/// ```
///
/// At any one time, there may be at most `submission_queue_depth +
/// worker_queue_depth` number of outstanding persist jobs for a single
/// partition.
///
/// These two queues are used to decouple submissions from individual workers -
/// this prevents a "hot" / backlogged worker with a full worker queue from
/// blocking tasks from being passed through to workers with spare capacity.
///
/// # Parallelism & Partition Serialisation
///
/// Persistence jobs are parallelised across partitions, with up to at most
/// `n_worker` parallel persist executions at once.
///
/// Because updates of a partition's [`SortKey`] are not commutative, they must
/// be serialised. For this reason, persist operations for given partition are
/// always placed in the same worker queue, ensuring they execute sequentially.
///
/// [`SortKey`]: schema::sort::SortKey
#[derive(Debug, Clone)]
pub(crate) struct PersistHandle {
tx: mpsc::Sender<PersistRequest>,
}
impl PersistHandle {
/// Initialise a new persist actor & obtain the first handle.
///
/// The caller should call [`PersistActor::run()`] in a separate
/// thread / task to start the persistence executor.
pub(crate) fn new(
submission_queue_depth: usize,
n_workers: usize,
worker_queue_depth: usize,
exec: Executor,
store: ParquetStorage,
catalog: Arc<dyn Catalog>,
) -> (Self, PersistActor) {
let (tx, rx) = mpsc::channel(submission_queue_depth);
let actor = PersistActor::new(rx, exec, store, catalog, n_workers, worker_queue_depth);
(Self { tx }, actor)
}
/// Place `data` from `partition` into the persistence queue.
///
/// This call (asynchronously) waits for space to become available in the
/// submission queue.
///
/// Once persistence is complete, the partition will be locked and the sort
/// key will be updated, and [`PartitionData::mark_persisted()`] is called
/// with `data`.
///
/// Once all persistence related tasks are complete, the returned [`Notify`]
/// broadcasts a notification.
///
/// # Panics
///
/// Panics (asynchronously) if the [`PartitionData`]'s sort key is updated
/// between persistence starting and ending.
///
/// This will panic (asynchronously) if `data` was not from `partition` or
/// all worker threads have stopped.
pub(crate) async fn queue_persist(
&self,
partition: Arc<Mutex<PartitionData>>,
data: PersistingData,
) -> Arc<Notify> {
// Build the persist task request
let r = PersistRequest::new(partition, data);
let notify = r.complete_notification();
self.tx
.send(r)
.await
.expect("no persist worker tasks running");
notify
}
}

View File

@ -0,0 +1,4 @@
mod actor;
pub(super) mod compact;
mod context;
mod handle;