Merge branch 'main' into dom/buffer-tree-iter

pull/24376/head
kodiakhq[bot] 2022-12-02 16:10:38 +00:00 committed by GitHub
commit 1137f2fc7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1785 additions and 1 deletions

2
Cargo.lock generated
View File

@ -2495,6 +2495,7 @@ dependencies = [
"observability_deps",
"once_cell",
"parking_lot 0.12.1",
"parquet_file",
"paste",
"pin-project",
"predicate",
@ -2502,6 +2503,7 @@ dependencies = [
"rand",
"schema",
"service_grpc_catalog",
"sharder",
"tempfile",
"test_helpers",
"thiserror",

View File

@ -31,12 +31,14 @@ object_store = "0.5.1"
observability_deps = { version = "0.1.0", path = "../observability_deps" }
once_cell = "1.16.0"
parking_lot = "0.12.1"
parquet_file = { version = "0.1.0", path = "../parquet_file" }
pin-project = "1.0.12"
predicate = { version = "0.1.0", path = "../predicate" }
prost = { version = "0.11.2", default-features = false, features = ["std"] }
rand = "0.8.5"
schema = { version = "0.1.0", path = "../schema" }
service_grpc_catalog = { version = "0.1.0", path = "../service_grpc_catalog" }
sharder = { version = "0.1.0", path = "../sharder" }
thiserror = "1.0.37"
tokio = { version = "1.22", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tonic = "0.8.3"

View File

@ -15,7 +15,7 @@ use super::{namespace::NamespaceName, table::TableName};
use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor};
mod buffer;
mod persisting;
pub(crate) mod persisting;
pub(crate) mod resolver;
/// The load state of the [`SortKey`] for a given partition.

View File

@ -75,6 +75,7 @@ mod arcmap;
mod buffer_tree;
mod deferred_load;
mod dml_sink;
mod persist;
mod query;
mod query_adaptor;
pub(crate) mod server;

View File

@ -0,0 +1,104 @@
use std::sync::Arc;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use parquet_file::storage::ParquetStorage;
use sharder::JumpHash;
use tokio::{sync::mpsc, task::JoinHandle};
use super::context::{Context, PersistRequest};
/// An actor implementation that fans out incoming persistence jobs to a set of
/// workers.
///
/// See [`PersistHandle`].
///
/// [`PersistHandle`]: super::handle::PersistHandle
#[must_use = "PersistActor must be ran by calling run()"]
pub(crate) struct PersistActor {
rx: mpsc::Receiver<PersistRequest>,
/// THe state/dependencies shared across all worker tasks.
inner: Arc<Inner>,
/// A consistent hash implementation used to consistently map buffers from
/// one partition to the same worker queue.
///
/// This ensures persistence is serialised per-partition, but in parallel
/// across partitions (up to the number of worker tasks).
persist_queues: JumpHash<mpsc::Sender<PersistRequest>>,
/// Task handles for the worker tasks, aborted on drop of this
/// [`PersistActor`].
tasks: Vec<JoinHandle<()>>,
}
impl Drop for PersistActor {
fn drop(&mut self) {
// Stop all background tasks when the actor goes out of scope.
self.tasks.iter().for_each(|v| v.abort())
}
}
impl PersistActor {
pub(super) fn new(
rx: mpsc::Receiver<PersistRequest>,
exec: Executor,
store: ParquetStorage,
catalog: Arc<dyn Catalog>,
workers: usize,
worker_queue_depth: usize,
) -> Self {
let inner = Arc::new(Inner {
exec,
store,
catalog,
});
let (tx_handles, tasks): (Vec<_>, Vec<_>) = (0..workers)
.map(|_| {
let inner = Arc::clone(&inner);
let (tx, rx) = mpsc::channel(worker_queue_depth);
(tx, tokio::spawn(run_task(inner, rx)))
})
.unzip();
// TODO(test): N workers running
// TODO(test): queue depths
Self {
rx,
inner,
persist_queues: JumpHash::new(tx_handles),
tasks,
}
}
/// Execute this actor task and block until all [`PersistHandle`] are
/// dropped.
///
/// [`PersistHandle`]: super::handle::PersistHandle
pub(crate) async fn run(mut self) {
while let Some(req) = self.rx.recv().await {
let tx = self.persist_queues.hash(req.partition_id());
tx.send(req).await.expect("persist worker has stopped;")
}
}
}
pub(super) struct Inner {
pub(super) exec: Executor,
pub(super) store: ParquetStorage,
pub(super) catalog: Arc<dyn Catalog>,
}
async fn run_task(inner: Arc<Inner>, mut rx: mpsc::Receiver<PersistRequest>) {
while let Some(req) = rx.recv().await {
let ctx = Context::new(req, Arc::clone(&inner));
let compacted = ctx.compact().await;
let (sort_key_update, parquet_table_data) = ctx.upload(compacted).await;
ctx.update_database(sort_key_update, parquet_table_data)
.await;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,384 @@
use std::sync::Arc;
use backoff::Backoff;
use data_types::{
CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber,
TableId,
};
use iox_catalog::interface::get_table_schema_by_id;
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::*;
use parking_lot::Mutex;
use parquet_file::metadata::IoxMetadata;
use schema::sort::SortKey;
use tokio::sync::Notify;
use uuid::Uuid;
use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{persisting::PersistingData, PartitionData, SortKeyState},
table::TableName,
},
deferred_load::DeferredLoad,
persist::compact::{compact_persisting_batch, CompactedStream},
TRANSITION_SHARD_ID,
};
use super::actor::Inner;
/// An internal type that contains all necessary information to run a persist task.
///
/// Used to communicate between actor handles & actor task.
#[derive(Debug)]
pub(super) struct PersistRequest {
complete: Arc<Notify>,
partition: Arc<Mutex<PartitionData>>,
data: PersistingData,
}
impl PersistRequest {
pub(super) fn new(partition: Arc<Mutex<PartitionData>>, data: PersistingData) -> Self {
Self {
complete: Arc::new(Notify::default()),
partition,
data,
}
}
/// Return the partition ID of the persisting data.
pub(super) fn partition_id(&self) -> PartitionId {
self.data.partition_id()
}
/// Obtain the completion notification handle for this request.
///
/// This notification is fired once persistence is complete.
pub(super) fn complete_notification(&self) -> Arc<Notify> {
Arc::clone(&self.complete)
}
}
pub(super) struct Context {
partition: Arc<Mutex<PartitionData>>,
data: PersistingData,
inner: Arc<Inner>,
/// IDs loaded from the partition at construction time.
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
// The partition key for this partition
partition_key: PartitionKey,
/// Deferred strings needed for persistence.
///
/// These [`DeferredLoad`] are given a pre-fetch hint when this [`Context`]
/// is constructed to load them in the background (if not already resolved)
/// in order to avoid incurring the query latency when the values are
/// needed.
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_name: Arc<DeferredLoad<TableName>>,
/// The [`SortKey`] for the [`PartitionData`] at the time of [`Context`]
/// construction.
///
/// The [`SortKey`] MUST NOT change during persistence, as updates to the
/// sort key are not commutative and thus require serialising. This
/// precludes parallel persists of partitions, unless they can be proven not
/// to need to update the sort key.
sort_key: SortKeyState,
/// A notification signal to indicate to the caller that this partition has
/// persisted.
complete: Arc<Notify>,
}
impl Context {
/// Construct a persistence job [`Context`] from `req`.
///
/// Locks the [`PartitionData`] in `req` to read various properties which
/// are then cached in the [`Context`].
pub(super) fn new(req: PersistRequest, inner: Arc<Inner>) -> Self {
let partition_id = req.data.partition_id();
// Obtain the partition lock and load the immutable values that will be
// used during this persistence.
let s = {
let complete = req.complete_notification();
let p = Arc::clone(&req.partition);
let guard = p.lock();
assert_eq!(partition_id, guard.partition_id());
Self {
partition: req.partition,
data: req.data,
inner,
namespace_id: guard.namespace_id(),
table_id: guard.table_id(),
partition_id,
partition_key: guard.partition_key().clone(),
namespace_name: Arc::clone(guard.namespace_name()),
table_name: Arc::clone(guard.table_name()),
// Technically the sort key isn't immutable, but MUST NOT change
// during the execution of this persist.
sort_key: guard.sort_key().clone(),
complete,
}
};
// Pre-fetch the deferred values in a background thread (if not already
// resolved)
s.namespace_name.prefetch_now();
s.table_name.prefetch_now();
if let SortKeyState::Deferred(ref d) = s.sort_key {
d.prefetch_now();
}
s
}
pub(super) async fn compact(&self) -> CompactedStream {
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
"compacting partition"
);
assert!(!self.data.record_batches().is_empty());
// Run a compaction sort the data and resolve any duplicate values.
//
// This demands the deferred load values and may have to wait for them
// to be loaded before compaction starts.
compact_persisting_batch(
&self.inner.exec,
self.sort_key.get().await,
self.table_name.get().await,
self.data.query_adaptor(),
)
.await
.expect("unable to compact persisting batch")
}
pub(super) async fn upload(
&self,
compacted: CompactedStream,
) -> (Option<SortKey>, ParquetFileParams) {
let CompactedStream {
stream: record_stream,
catalog_sort_key_update,
data_sort_key,
} = compacted;
// Generate a UUID to uniquely identify this parquet file in
// object storage.
let object_store_id = Uuid::new_v4();
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
sort_key = %data_sort_key,
"uploading partition parquet"
);
// Construct the metadata for this parquet file.
let iox_metadata = IoxMetadata {
object_store_id,
creation_timestamp: SystemProvider::new().now(),
shard_id: TRANSITION_SHARD_ID,
namespace_id: self.namespace_id,
namespace_name: Arc::clone(&*self.namespace_name.get().await),
table_id: self.table_id,
table_name: Arc::clone(&*self.table_name.get().await),
partition_id: self.partition_id,
partition_key: self.partition_key.clone(),
max_sequence_number: SequenceNumber::new(0), // TODO: not ordered!
compaction_level: CompactionLevel::Initial,
sort_key: Some(data_sort_key),
};
// Save the compacted data to a parquet file in object storage.
//
// This call retries until it completes.
let (md, file_size) = self
.inner
.store
.upload(record_stream, &iox_metadata)
.await
.expect("unexpected fatal persist error");
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
file_size,
"partition parquet uploaded"
);
// Read the table schema from the catalog to act as a map of column name
// -> column IDs.
let table_schema = Backoff::new(&Default::default())
.retry_all_errors("get table schema", || async {
let mut repos = self.inner.catalog.repositories().await;
get_table_schema_by_id(self.table_id, repos.as_mut()).await
})
.await
.expect("retry forever");
// Build the data that must be inserted into the parquet_files catalog
// table in order to make the file visible to queriers.
let parquet_table_data =
iox_metadata.to_parquet_file(self.partition_id, file_size, &md, |name| {
table_schema.columns.get(name).expect("unknown column").id
});
(catalog_sort_key_update, parquet_table_data)
}
pub(crate) async fn update_database(
self,
sort_key_update: Option<SortKey>,
parquet_table_data: ParquetFileParams,
) {
// Extract the object store ID to the local scope so that it can easily
// be referenced in debug logging to aid correlation of persist events
// for a specific file.
let object_store_id = parquet_table_data.object_store_id;
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
?parquet_table_data,
?sort_key_update,
"updating catalog"
);
// If necessary, update the partition sort key in the catalog and update
// the local cached copy in the PartitionData.
//
// This update MUST be made visibile before the parquet file, otherwise
// the consumer of the parquet file will observe an inconsistent sort
// key.
if let Some(new_sort_key) = sort_key_update {
let sort_key = new_sort_key.to_columns().collect::<Vec<_>>();
Backoff::new(&Default::default())
.retry_all_errors("update_sort_key", || async {
let mut repos = self.inner.catalog.repositories().await;
let _partition = repos
.partitions()
.update_sort_key(self.partition_id, &sort_key)
.await?;
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
// Update the sort key in the partition cache.
let old_key;
{
let mut guard = self.partition.lock();
old_key = guard.sort_key().clone();
guard.update_sort_key(Some(new_sort_key.clone()));
};
// Assert the serialisation of sort key updates.
//
// Both of these get() should not block due to both of the
// values having been previously resolved / used.
assert_eq!(old_key.get().await, self.sort_key.get().await);
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
old_sort_key = ?sort_key,
%new_sort_key,
"adjusted partition sort key"
);
}
// Add the parquet file to the catalog.
//
// This has the effect of allowing the queriers to "discover" the
// parquet file by polling / querying the catalog.
Backoff::new(&Default::default())
.retry_all_errors("add parquet file to catalog", || async {
let mut repos = self.inner.catalog.repositories().await;
let parquet_file = repos
.parquet_files()
.create(parquet_table_data.clone())
.await?;
debug!(
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%object_store_id,
?parquet_table_data,
parquet_file_id=?parquet_file.id,
"parquet file added to catalog"
);
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
})
.await
.expect("retry forever");
// Mark the partition as having completed persistence, causing it to
// release the reference to the in-flight persistence data it is
// holding.
//
// This SHOULD cause the data to be dropped, but there MAY be ongoing
// queries that currently hold a reference to the data. In either case,
// the persisted data will be dropped "shortly".
self.partition.lock().mark_persisted(self.data);
info!(
%object_store_id,
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
"persisted partition"
);
// Notify all observers of this persistence task
self.complete.notify_waiters();
}
}
// TODO(test): persist
// TODO(test): persist completion notification

View File

@ -0,0 +1,173 @@
use std::sync::Arc;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use parking_lot::Mutex;
use parquet_file::storage::ParquetStorage;
use thiserror::Error;
use tokio::sync::{
mpsc::{self},
Notify,
};
use crate::buffer_tree::partition::{persisting::PersistingData, PartitionData};
use super::{actor::PersistActor, context::PersistRequest};
#[derive(Debug, Error)]
pub(crate) enum PersistError {
#[error("persist queue is full")]
QueueFull,
}
/// A persistence task submission handle.
///
/// This type is cheap to clone to share across threads.
///
/// # Usage
///
/// The caller should construct an [`PersistHandle`] and [`PersistActor`] by
/// calling [`PersistHandle::new()`], and run the provided [`PersistActor`]
/// instance in another thread / task (by calling [`PersistActor::run()`]).
///
/// From this point on, the caller interacts only with the [`PersistHandle`]
/// which can be cheaply cloned and passed over thread/task boundaries to
/// enqueue persist tasks.
///
/// Dropping all [`PersistHandle`] instances stops the [`PersistActor`], which
/// immediately stops all workers.
///
/// # Topology
///
/// The persist actor is uses an internal work group to parallelise persistence
/// operations up to `n_workers` number of parallel tasks.
///
/// Submitting a persistence request places the job into a bounded queue,
/// providing a buffer for persistence requests to wait for an available worker.
///
/// Two types of queue exists:
///
/// * Submission queue (bounded by `submission_queue_depth`)
/// * `n_worker` number of worker queues (bounded by `worker_queue_depth`)
///
/// ```text
/// ┌─────────────┐
/// │PersistHandle├┐
/// └┬────────────┘├┐
/// └┬────────────┘│
/// └─────┬───────┘
/// │
/// submission_queue_depth
/// │
/// ▼
/// ╔═══════════════╗
/// ║ PersistActor ║
/// ╚═══════════════╝
/// │
/// ┌────────────────┼────────────────┐
/// │ │ │
/// │ │ │
/// worker_queue_depth worker_queue_depth worker_queue_depth
/// │ │ │
/// ▼ ▼ ▼
/// ┌────────────┐ ┌────────────┐ ┌────────────┐
/// │ Worker 1 │ │ Worker 2 │ │ Worker N │
/// └────────────┘ └────────────┘ └────────────┘
/// ▲ ▲ ▲
/// │
/// │ ▼ │
/// ┌ ─ ─ ─ ─ ─ ─
/// └ ─ ─ ─ ─▶ Executor │◀ ─ ─ ─ ─ ┘
/// └ ─ ─ ─ ─ ─ ─
/// ```
///
/// Compaction is performed in the provided [`Executor`] re-org thread-pool and
/// is shared across all workers.
///
/// At any one time, the number of outstanding persist tasks is bounded by:
///
/// ```text
///
/// submission_queue_depth + (workers * worker_queue_depth)
///
/// ```
///
/// At any one time, there may be at most `submission_queue_depth +
/// worker_queue_depth` number of outstanding persist jobs for a single
/// partition.
///
/// These two queues are used to decouple submissions from individual workers -
/// this prevents a "hot" / backlogged worker with a full worker queue from
/// blocking tasks from being passed through to workers with spare capacity.
///
/// # Parallelism & Partition Serialisation
///
/// Persistence jobs are parallelised across partitions, with up to at most
/// `n_worker` parallel persist executions at once.
///
/// Because updates of a partition's [`SortKey`] are not commutative, they must
/// be serialised. For this reason, persist operations for given partition are
/// always placed in the same worker queue, ensuring they execute sequentially.
///
/// [`SortKey`]: schema::sort::SortKey
#[derive(Debug, Clone)]
pub(crate) struct PersistHandle {
tx: mpsc::Sender<PersistRequest>,
}
impl PersistHandle {
/// Initialise a new persist actor & obtain the first handle.
///
/// The caller should call [`PersistActor::run()`] in a separate
/// thread / task to start the persistence executor.
pub(crate) fn new(
submission_queue_depth: usize,
n_workers: usize,
worker_queue_depth: usize,
exec: Executor,
store: ParquetStorage,
catalog: Arc<dyn Catalog>,
) -> (Self, PersistActor) {
let (tx, rx) = mpsc::channel(submission_queue_depth);
let actor = PersistActor::new(rx, exec, store, catalog, n_workers, worker_queue_depth);
(Self { tx }, actor)
}
/// Place `data` from `partition` into the persistence queue.
///
/// This call (asynchronously) waits for space to become available in the
/// submission queue.
///
/// Once persistence is complete, the partition will be locked and the sort
/// key will be updated, and [`PartitionData::mark_persisted()`] is called
/// with `data`.
///
/// Once all persistence related tasks are complete, the returned [`Notify`]
/// broadcasts a notification.
///
/// # Panics
///
/// Panics (asynchronously) if the [`PartitionData`]'s sort key is updated
/// between persistence starting and ending.
///
/// This will panic (asynchronously) if `data` was not from `partition` or
/// all worker threads have stopped.
pub(crate) async fn queue_persist(
&self,
partition: Arc<Mutex<PartitionData>>,
data: PersistingData,
) -> Arc<Notify> {
// Build the persist task request
let r = PersistRequest::new(partition, data);
let notify = r.complete_notification();
self.tx
.send(r)
.await
.expect("no persist worker tasks running");
notify
}
}

View File

@ -0,0 +1,4 @@
mod actor;
pub(super) mod compact;
mod context;
mod handle;