From cd6d69c9b1a2cb86249779904b44b0b84155c703 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 6 Feb 2023 09:37:57 +0100 Subject: [PATCH] docs: high-level documentation for `compactor2` (#6818) * docs: high-level documentation for `comapctor2` * docs: typos & clarification Co-authored-by: Nga Tran Co-authored-by: Andrew Lamb * docs: improve wording * fix: typo --------- Co-authored-by: Nga Tran Co-authored-by: Andrew Lamb --- compactor2/img/driver.svg | 45 +++++++++++ compactor2/src/lib.rs | 159 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 compactor2/img/driver.svg diff --git a/compactor2/img/driver.svg b/compactor2/img/driver.svg new file mode 100644 index 0000000000..0f04ea608d --- /dev/null +++ b/compactor2/img/driver.svg @@ -0,0 +1,45 @@ +Partitions +SourcePartitions +SourcePartitions +SourcePartitions +SourcePartitions StreamPartitions StreamPartitions StreamPartitions StreamPer Partition (concurrently)Per Partition (concurrently)Time-out & Error CatchingTime-out & Error CatchingPartition +Files SourcePartition +Files SourceFiles +FilterFiles +FilterSplitSplitPartition +FilterPartition +FilterDivideDividePer Branch (serial)Per Branch (serial)Per Branch (serial)Per Branch (serial)Scratchpad +LoadScratchpad +LoadScratchpad +LoadScratchpad +LoadJob SemaphoreJob SemaphoreJob SemaphoreJob SemaphoreDataFusion +PlannerDataFusion +PlannerDataFusion +PlannerDataFusion +PlannerDataFusion +Plan ExecDataFusion +Plan ExecDataFusion +Plan ExecDataFusion +Plan ExecParquet +SinkParquet +SinkParquet +SinkParquet +SinkScratchpad +PublishScratchpad +PublishScratchpad +PublishScratchpad +PublishScratchpad +CleanScratchpad +CleanScratchpad +CleanScratchpad +CleanCommitCommitCommitCommitPartition +Done SinkPartition +Done SinkPartition +InfoPartition +InfoThese are technically +multiple sources and +the results are cached +after the first round.These are technically +multiple sources and +the results are cached +after the first round. \ No newline at end of file diff --git a/compactor2/src/lib.rs b/compactor2/src/lib.rs index 5a129d7d6d..22e96f18cf 100644 --- a/compactor2/src/lib.rs +++ b/compactor2/src/lib.rs @@ -1,4 +1,162 @@ //! The compactor. +//! +//! +//! # Mission statement +//! +//! This processes parquet files produced by the ingesters so that queriers can read them more efficiently. More precisely it +//! tries to meet the following -- partially conflicting -- objectives: +//! +//! - **O1 -- duplicates:** Duplicate across files should be removed[^duplicates_within_files]. Removing duplicates +//! during query is costly. +//! - **O2 -- overlaps:** The key-space (tags + timestamp) within parquet files should be strictly distinct. Without this +//! attribute the querier has now way to detect the inter-file key uniqueness and hence must run costly +//! de-duplications during query time. +//! - **O3 -- minimize number of files:** There should be as few files as possible. Every file comes with +//! overhead both within the catalog and on the querier side. +//! - **O4 -- max file size:** Parquet files must have a maximum (configurable) size (with a certain margin on top due +//! for technical reasons[^max_parquet_size]). Files that are too large will cause memory issues with querier +//! - **O5 -- minimal changes:** The changes to the set of parquet files should be as minimal as possible to increase cache +//! efficiency on the querier side, reduce write amplification (rewriting the same row over and over again) and +//! decrease the catalog load (each file creation / deletion requires catalog updates). +//! - **O6 -- catch-up:** The compactor should be able to not "fall behind" the ingest tier. +//! - **O7 -- costs:** The compactor shall be cost-efficient. +//! +//! From these objectives, we can derive the following additional requirements -- some more and some less obvious ones: +//! +//! - **R1 -- avoid re-compaction:** Avoid re-processing the same data multiple times. This can be derived from *O5*, +//! *O7*, and potentially *O6*. +//! - **R2 -- parallelization:** Be able to run multiple compaction jobs at the same time. Derived from *O6* and *O7*. +//! - **R3 -- scale-out:** Be able to scale to multiple nodes. Derived from *O6* but may very much depend on *R2*. +//! - **R4 -- concurrent compute & IO:** Be able to decouple IO (and the involved latencies) from the actual +//! computation. Derived from *O6* and *O7*. +//! +//! +//! # Crate Layout +//! +//! This crate tries to decouple "when to do what" from "how to do what". The "when" is described by the [driver] which +//! provides an abstract framework. The "how" part is described by [components] which act as puzzle pieces that are +//! plugged into the [driver]. +//! +//! The concrete selection of components and their setup is currently [hardcoded](crate::components::hardcoded). This +//! setup process take a [config] and creates a [component setup](crate::components::Components). +//! +//! The final flow graph looks like this: +//! +//! ```text +//! (CLI/Env) --> (Config) --[hardcoded]--> (Components) --> (Driver) +//! ``` +//! +//! ## Driver +//! The driver is the heart of the compactor and describes and executes the framework. +//! +//! This is the data and control flow of driver: +#![doc = include_str!("../img/driver.svg")] +//! +//! In general modification of the driver should be rare. Most changes should be implement by using components. +//! +//! ## Components +//! Components can be thought as puzzle pieces used by the driver. Technically there are two parts to it: the driver +//! interfaces (traits) and the implementations. +//! +//! **Note: The code samples used in the "components" section are simplified and may not reflect actual components. They are +//! mostly illustrations.** +//! +//! ### Interfaces +//! Component interfaces should describe one single aspect of the framework. For example: +//! +//! ``` +//! # use std::fmt::{Debug, Display}; +//! # use data_types::ParquetFile; +//! /// Calculate importance of a file. +//! pub trait FileImportants: Debug + Display + Send + Sync { +//! /// Rates file by importance. +//! /// +//! /// Higher ratings mean "more important". +//! fn rate(&self, file: &ParquetFile) -> u64; +//! } +//! ``` +//! +//! Looking this interface you will note the following aspects: +//! +//! - **`Debug`:** Allows to use the component in our structs that implement [`Debug`]. +//! - **`Display`:** Allows to produce easy human (and machine) readable dumps of the current component setup. Also +//! allows other components to implement [`Display`] more easily. +//! - **immutable:** All methods take `&self`. Interior mutability (e.g. for caching) is allowed. This allows easy parallelization. +//! - **not-consuming:** No method takes `self`. This ensures that the trait is object-safe and can be put into an [`Arc`]. +//! - **`Send + Sync`:** Together with the the two previous points this allows the usage within an [`Arc`]. +//! - **single method:** Most interfaces will only have a single method. This is because they have one dedicated job. +//! +//! The above interface will not allow any IO because it is not async. It is important to keep the interfaces that +//! perform IO and the ones that don't clearly apart, so it is visible to the user and allows a proper driver design. An +//! interface that performs IO looks like this: +//! +//! ``` +//! # use std::fmt::{Debug, Display}; +//! # use async_trait::async_trait; +//! # use data_types::{Partition, PartitionId}; +//! /// Fetches partition info. +//! #[async_trait] +//! pub trait PartitionSource: Debug + Display + Send + Sync { +//! /// Fetches partition info. +//! /// +//! /// This method retries internally. +//! async fn fetch(&self, id: PartitionId) -> Partition; +//! } +//! ``` +//! +//! For IO interfaces, all the points of the normal interface apply. In addition, the following points are important: +//! +//! - **error & retries:** Most IO interfaces are NOT allowed to return an error. Instead they shall implement internal +//! retries (e.g. using [`backoff`]). +//! +//! ### Implementations +//! The implementations of the aforementioned interfaces roughly fall into the following categories: +//! +//! - **true sources/sinks:** These are the actual (mostly object-store and catalog backed) implementations used in +//! production. There may be multiple variants that are used depending on the config. NO metrics or logging is +//! implemented directly within these (see "wrappers" below). +//! - **assemblies:** Assembles components of other types into this type. E.g. a filter for a set of files +//! (`[file] -> [file]`) can be assembled by a filter for a single file (`file -> bool`). +//! - **mocks:** Mocks can be controlled during initialization or later and may record the interfaction with them. They +//! are mostly used for testing or development purposes. +//! - **wrappers:** Wrap a component of the same type but add functionality on top. This can be logging, metrics, +//! assertions, data dumping, filtering, randomness, etc. +//! +//! This modular approach allows easy testing of components as well consistency. E.g. swapping out a data source for +//! another one will preserve the same logs and metrics (given that the same wrappers are used). +//! +//! ## Configuration +//! The [config] allows the compactor to set up a wide range of component setups. The config options fall roughly into +//! the following categories: +//! +//! - **parameters:** Information required to run the compactor, e.g. the object store connection or sharding information. +//! - **tuning knobs:** Options like "maximum desired file size", or "number of concurrent jobs" allow deployments to +//! fine-tune the compactor behavior. They do NOT fundamentally change the behavior though. +//! - **behavior switches:** Switches like "ignore that partitions where flagged", "run once and exit", or "use this +//! pre-defined set of partitions" change the behavior of the system. This may be used for emergency situations or to +//! debug / develop the compactor. +//! - **algorithm versions:** This sets [`AlgoVersion`] and fundamentally changes the behavior, e.g. from a simple demo +//! approach to some experimental new system. Note that this only changes the setup of the components, it does NOT +//! change the driver. +//! +//! +//! [^duplicates_within_files]: The compactor design would not fundamentally change if the ingester would produce output +//! that contains duplicates within in single file. The current implementation however assumes that this is NOT the case. +//! +//! [^max_parquet_size]: It is technically hard to determine the size of a parquet size before writing it. It may be +//! estimated by assuming that "sum size of input files >= sum size of output files". However there are certain +//! cases where this does NOT hold due to the way the data within the output parquet file is split into data pages, +//! is then potentially dictionary- and RLE-encoded and [ZSTD] compressed. +//! +//! +//! [`AlgoVersion`]: crate::config::AlgoVersion +//! [`Arc`]: std::sync::Arc +//! [components]: crate::components +//! [config]: crate::config +//! [`Debug`]: std::fmt::Debug +//! [`Display`]: std::fmt::Display +//! [driver]: crate::driver +//! [ZSTD]: https://github.com/facebook/zstd #![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)] #![warn( missing_copy_implementations, @@ -10,6 +168,7 @@ clippy::todo, clippy::dbg_macro )] +#![allow(rustdoc::private_intra_doc_links)] pub mod compactor; mod components;