Merge branch 'main' into dom/doc-persist-endpoint

pull/24376/head
Dom 2023-02-09 16:27:37 +00:00 committed by GitHub
commit f4e22528f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 217 additions and 148 deletions

View File

@ -16,7 +16,7 @@ use crate::{partition_info::PartitionInfo, plan_ir::PlanIR};
pub trait DataFusionPlanner: Debug + Display + Send + Sync {
async fn plan(
&self,
ir: PlanIR,
ir: &PlanIR,
partition: Arc<PartitionInfo>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>;
}

View File

@ -37,7 +37,7 @@ impl Display for PanicDataFusionPlanner {
impl DataFusionPlanner for PanicDataFusionPlanner {
async fn plan(
&self,
_ir: PlanIR,
_ir: &PlanIR,
_partition: Arc<PartitionInfo>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
Ok(Arc::new(PanicPlan))
@ -111,7 +111,7 @@ mod tests {
let planner = PanicDataFusionPlanner::new();
let partition = partition_info();
let plan = planner
.plan(PlanIR::Compact { files: vec![] }, partition)
.plan(&PlanIR::Compact { files: vec![] }, partition)
.await
.unwrap();

View File

@ -43,7 +43,7 @@ impl Display for V1DataFusionPlanner {
impl DataFusionPlanner for V1DataFusionPlanner {
async fn plan(
&self,
ir: PlanIR,
ir: &PlanIR,
partition: Arc<PartitionInfo>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let ctx = self.exec.new_context(ExecutorType::Reorg);
@ -87,7 +87,7 @@ impl DataFusionPlanner for V1DataFusionPlanner {
&merged_schema,
query_chunks,
sort_key,
split_times,
split_times.clone(),
)
.map_err(|e| {
DataFusionError::Context(

View File

@ -163,12 +163,12 @@ impl QueryChunk for QueryableParquetChunk {
}
pub fn to_query_chunks(
files: Vec<FileIR>,
files: &[FileIR],
partition_info: &PartitionInfo,
store: ParquetStorage,
) -> Vec<Arc<dyn QueryChunk>> {
files
.into_iter()
.iter()
.map(|file| {
Arc::new(to_queryable_parquet_chunk(
file,
@ -181,7 +181,7 @@ pub fn to_query_chunks(
/// Convert to a QueryableParquetChunk
fn to_queryable_parquet_chunk(
file: FileIR,
file: &FileIR,
partition_info: &PartitionInfo,
store: ParquetStorage,
) -> QueryableParquetChunk {
@ -220,7 +220,7 @@ fn to_queryable_parquet_chunk(
"built parquet chunk from metadata"
);
let parquet_chunk = ParquetChunk::new(Arc::new(file.file), schema, store);
let parquet_chunk = ParquetChunk::new(Arc::new(file.file.clone()), schema, store);
QueryableParquetChunk::new(
partition_id,
Arc::new(parquet_chunk),

View File

@ -47,6 +47,7 @@ use super::{
dedicated::DedicatedExecParquetFileSinkWrapper, logging::LoggingParquetFileSinkWrapper,
mock::MockParquetFileSink, object_store::ObjectStoreParquetFileSink, ParquetFileSink,
},
parquet_files_sink::dispatch::DispatchParquetFilesSink,
partition_done_sink::{
catalog::CatalogPartitionDoneSink, error_kind::ErrorKindPartitionDoneSinkWrapper,
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
@ -302,7 +303,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(&config.exec),
)),
df_plan_exec,
parquet_file_sink,
parquet_files_sink: Arc::new(DispatchParquetFilesSink::new(parquet_file_sink)),
round_split: Arc::new(AllNowRoundSplit::new()),
divide_initial: Arc::new(SingleBranchDivideInitial::new()),
scratchpad_gen,

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use self::{
commit::Commit, df_plan_exec::DataFusionPlanExec, df_planner::DataFusionPlanner,
divide_initial::DivideInitial, files_filter::FilesFilter, ir_planner::IRPlanner,
namespaces_source::NamespacesSource, parquet_file_sink::ParquetFileSink,
namespaces_source::NamespacesSource, parquet_files_sink::ParquetFilesSink,
partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource,
partition_filter::PartitionFilter, partition_source::PartitionSource,
partition_stream::PartitionStream, round_split::RoundSplit, scratchpad::ScratchpadGen,
@ -24,6 +24,7 @@ pub mod ir_planner;
pub mod level_exist;
pub mod namespaces_source;
pub mod parquet_file_sink;
pub mod parquet_files_sink;
pub mod partition_done_sink;
pub mod partition_files_source;
pub mod partition_filter;
@ -52,7 +53,7 @@ pub struct Components {
pub ir_planner: Arc<dyn IRPlanner>,
pub df_planner: Arc<dyn DataFusionPlanner>,
pub df_plan_exec: Arc<dyn DataFusionPlanExec>,
pub parquet_file_sink: Arc<dyn ParquetFileSink>,
pub parquet_files_sink: Arc<dyn ParquetFilesSink>,
pub round_split: Arc<dyn RoundSplit>,
pub divide_initial: Arc<dyn DivideInitial>,
pub scratchpad_gen: Arc<dyn ScratchpadGen>,

View File

@ -25,3 +25,21 @@ pub trait ParquetFileSink: Debug + Display + Send + Sync {
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError>;
}
#[async_trait]
impl<T> ParquetFileSink for Arc<T>
where
T: ParquetFileSink + ?Sized,
{
async fn store(
&self,
stream: SendableRecordBatchStream,
partition: Arc<PartitionInfo>,
level: CompactionLevel,
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError> {
self.as_ref()
.store(stream, partition, level, max_l0_created_at)
.await
}
}

View File

@ -0,0 +1,87 @@
use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{stream::FuturesOrdered, TryFutureExt, TryStreamExt};
use iox_time::Time;
use crate::{
components::parquet_file_sink::ParquetFileSink, error::DynError, partition_info::PartitionInfo,
plan_ir::PlanIR,
};
use super::ParquetFilesSink;
#[derive(Debug)]
pub struct DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
{
inner: Arc<T>,
}
impl<T> DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
{
pub fn new(inner: T) -> Self {
Self {
inner: Arc::new(inner),
}
}
}
impl<T> Display for DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "dispatch({})", self.inner)
}
}
#[async_trait]
impl<T> ParquetFilesSink for DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
{
async fn stream_into_file_sink(
&self,
streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
plan_ir: &PlanIR,
) -> Result<Vec<ParquetFileParams>, DynError> {
// compute max_l0_created_at
let max_l0_created_at: Time = plan_ir
.input_files()
.iter()
.map(|f| f.file.max_l0_created_at)
.max()
.expect("max_l0_created_at should have value")
.into();
let inner = Arc::clone(&self.inner);
streams
.into_iter()
.map(move |stream| {
let inner = Arc::clone(&inner);
let partition_info = Arc::clone(&partition_info);
async move {
inner
.store(stream, partition_info, target_level, max_l0_created_at)
.await
}
})
// NB: FuturesOrdered allows the futures to run in parallel
.collect::<FuturesOrdered<_>>()
// Discard the streams that resulted in empty output / no file uploaded
// to the object store.
.try_filter_map(|v| futures::future::ready(Ok(v)))
// Collect all the persisted parquet files together.
.try_collect::<Vec<_>>()
.map_err(|e| Box::new(e) as _)
.await
}
}

View File

@ -0,0 +1,23 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams};
use datafusion::physical_plan::SendableRecordBatchStream;
use crate::{error::DynError, partition_info::PartitionInfo, plan_ir::PlanIR};
pub mod dispatch;
#[async_trait]
pub trait ParquetFilesSink: Debug + Display + Send + Sync {
async fn stream_into_file_sink(
&self,
streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
plan_ir: &PlanIR,
) -> Result<Vec<ParquetFileParams>, DynError>;
}

View File

@ -98,7 +98,7 @@ pub fn log_components(components: &Components) {
ir_planner,
df_planner,
df_plan_exec,
parquet_file_sink,
parquet_files_sink,
round_split,
divide_initial,
scratchpad_gen,
@ -122,7 +122,7 @@ pub fn log_components(components: &Components) {
%ir_planner,
%df_planner,
%df_plan_exec,
%parquet_file_sink,
%parquet_files_sink,
%round_split,
%divide_initial,
%scratchpad_gen,

View File

@ -1,9 +1,7 @@
use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
use iox_time::Time;
use futures::StreamExt;
use observability_deps::tracing::info;
use parquet_file::ParquetFilePath;
use tracker::InstrumentedAsyncSemaphore;
@ -56,6 +54,7 @@ async fn compact_partition(
job_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>,
) {
info!(partition_id = partition_id.get(), "compact partition",);
let mut scratchpad = components.scratchpad_gen.pad();
let res = tokio::time::timeout(
@ -78,6 +77,7 @@ async fn compact_partition(
.await;
scratchpad.clean().await;
info!(partition_id = partition_id.get(), "compacted partition",);
}
/// Main function to compact files of a single partition.
@ -381,13 +381,6 @@ async fn run_compaction_plan(
return Ok(vec![]);
}
// compute max_l0_created_at
let max_l0_created_at = files
.iter()
.map(|f| f.max_l0_created_at)
.max()
.expect("max_l0_created_at should have value");
// stage files
let input_paths: Vec<ParquetFilePath> = files.iter().map(|f| f.into()).collect();
let input_uuids_inpad = scratchpad_ctx.load_to_scratchpad(&input_paths).await;
@ -412,26 +405,37 @@ async fn run_compaction_plan(
// We guard the DataFusion planning (that doesn't perform any IO) via the semaphore as well in case
// DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our
// knowledge, this is currently (2023-01-25) not the case but if this ever changes, then we are prepared.
let _permit = job_semaphore
let permit = job_semaphore
.acquire(None)
.await
.expect("semaphore not closed");
info!(
partition_id = partition_info.partition_id.get(),
"job semaphore acquired",
);
let plan = components
.df_planner
.plan(plan_ir, Arc::clone(partition_info))
.plan(&plan_ir, Arc::clone(partition_info))
.await?;
let streams = components.df_plan_exec.exec(plan);
let job = stream_into_file_sink(
let job = components.parquet_files_sink.stream_into_file_sink(
streams,
Arc::clone(partition_info),
target_level,
max_l0_created_at.into(),
Arc::clone(components),
&plan_ir,
);
// TODO: react to OOM and try to divide branch
job.await?
let res = job.await;
drop(permit);
info!(
partition_id = partition_info.partition_id.get(),
"job semaphore released",
);
res?
};
Ok(create)
@ -544,32 +548,3 @@ async fn fetch_partition_info(
partition_key: partition.partition_key,
}))
}
fn stream_into_file_sink(
streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
max_l0_created_at: Time,
components: Arc<Components>,
) -> impl Future<Output = Result<Vec<ParquetFileParams>, DynError>> {
streams
.into_iter()
.map(move |stream| {
let components = Arc::clone(&components);
let partition_info = Arc::clone(&partition_info);
async move {
components
.parquet_file_sink
.store(stream, partition_info, target_level, max_l0_created_at)
.await
}
})
// NB: FuturesOrdered allows the futures to run in parallel
.collect::<FuturesOrdered<_>>()
// Discard the streams that resulted in empty output / no file uploaded
// to the object store.
.try_filter_map(|v| futures::future::ready(Ok(v)))
// Collect all the persisted parquet files together.
.try_collect::<Vec<_>>()
.map_err(|e| Box::new(e) as _)
}

View File

@ -26,6 +26,13 @@ impl PlanIR {
Self::Split { split_times, .. } => split_times.len() + 1,
}
}
pub fn input_files(&self) -> &[FileIR] {
match self {
Self::Compact { files } => files,
Self::Split { files, .. } => files,
}
}
}
impl Display for PlanIR {

View File

@ -297,6 +297,7 @@ impl SkippedCompactionBuilder {
}
}
// Default values for the test setup builder
const SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER;
const PARTITION_THRESHOLD: Duration = Duration::from_secs(10 * 60); // 10min
const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024;
@ -528,57 +529,42 @@ impl TestSetupBuilder<true> {
}
impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
pub fn with_shadow_mode(self) -> Self {
Self {
config: Config {
shadow_mode: true,
..self.config
},
..self
}
pub fn with_shadow_mode(mut self) -> Self {
self.config.shadow_mode = true;
self
}
/// Set compact version
pub fn with_compact_version(self, compact_version: AlgoVersion) -> Self {
Self {
config: Config {
compact_version,
..self.config
},
..self
}
pub fn with_compact_version(mut self, compact_version: AlgoVersion) -> Self {
self.config.compact_version = compact_version;
self
}
/// set min_num_l1_files_to_compact
pub fn with_min_num_l1_files_to_compact(self, min_num_l1_files_to_compact: usize) -> Self {
Self {
config: Config {
min_num_l1_files_to_compact,
..self.config
},
..self
}
pub fn with_min_num_l1_files_to_compact(mut self, min_num_l1_files_to_compact: usize) -> Self {
self.config.min_num_l1_files_to_compact = min_num_l1_files_to_compact;
self
}
/// Set max_input_files_per_partition
pub fn with_max_input_files_per_partition(self, max_input_files_per_partition: usize) -> Self {
Self {
config: Config {
max_input_files_per_partition,
..self.config
},
..self
}
pub fn with_max_input_files_per_partition(
mut self,
max_input_files_per_partition: usize,
) -> Self {
self.config.max_input_files_per_partition = max_input_files_per_partition;
self
}
pub fn simulate_without_object_store(self) -> Self {
Self {
config: Config {
simulate_without_object_store: true,
..self.config
},
..self
}
/// set simulate_without_object_store
pub fn simulate_without_object_store(mut self) -> Self {
self.config.simulate_without_object_store = true;
self
}
/// Set max_desired_file_size_bytes
pub fn with_max_desired_file_size_bytes(mut self, max_desired_file_size_bytes: u64) -> Self {
self.config.max_desired_file_size_bytes = max_desired_file_size_bytes;
self
}
pub async fn build(self) -> TestSetup {

View File

@ -485,6 +485,7 @@ async fn test_pr6890() {
let setup = TestSetup::builder()
.await
.simulate_without_object_store()
.with_max_desired_file_size_bytes(20_000_000) // 20MB
.build()
.await;
@ -540,54 +541,6 @@ async fn test_pr6890() {
- "L1, all files 1b "
- "L1.11[0,0] |------------------------------------L1.11-------------------------------------|"
- "L1.12[0,0] |------------------------------------L1.12-------------------------------------|"
- "L1.13[0,0] |------------------------------------L1.13-------------------------------------|"
- "L1.14[0,0] |------------------------------------L1.14-------------------------------------|"
- "L1.15[0,0] |------------------------------------L1.15-------------------------------------|"
- "L1.16[0,0] |------------------------------------L1.16-------------------------------------|"
- "L1.17[0,0] |------------------------------------L1.17-------------------------------------|"
- "L1.18[0,0] |------------------------------------L1.18-------------------------------------|"
- "L1.19[0,0] |------------------------------------L1.19-------------------------------------|"
- "L1.20[0,0] |------------------------------------L1.20-------------------------------------|"
- "L1.21[0,0] |------------------------------------L1.21-------------------------------------|"
- "L1.22[0,0] |------------------------------------L1.22-------------------------------------|"
- "L1.23[0,0] |------------------------------------L1.23-------------------------------------|"
- "L1.24[0,0] |------------------------------------L1.24-------------------------------------|"
- "L1.25[0,0] |------------------------------------L1.25-------------------------------------|"
- "L1.26[0,0] |------------------------------------L1.26-------------------------------------|"
- "L1.27[0,0] |------------------------------------L1.27-------------------------------------|"
- "L1.28[0,0] |------------------------------------L1.28-------------------------------------|"
- "L1.29[0,0] |------------------------------------L1.29-------------------------------------|"
- "L1.30[0,0] |------------------------------------L1.30-------------------------------------|"
- "L1.31[0,0] |------------------------------------L1.31-------------------------------------|"
- "L1.32[0,0] |------------------------------------L1.32-------------------------------------|"
- "L1.33[0,0] |------------------------------------L1.33-------------------------------------|"
- "L1.34[0,0] |------------------------------------L1.34-------------------------------------|"
- "L1.35[0,0] |------------------------------------L1.35-------------------------------------|"
- "L1.36[0,0] |------------------------------------L1.36-------------------------------------|"
- "L1.37[0,0] |------------------------------------L1.37-------------------------------------|"
- "L1.38[0,0] |------------------------------------L1.38-------------------------------------|"
- "L1.39[0,0] |------------------------------------L1.39-------------------------------------|"
- "L1.40[0,0] |------------------------------------L1.40-------------------------------------|"
- "L1.41[0,0] |------------------------------------L1.41-------------------------------------|"
- "L1.42[0,0] |------------------------------------L1.42-------------------------------------|"
- "L1.43[0,0] |------------------------------------L1.43-------------------------------------|"
- "L1.44[0,0] |------------------------------------L1.44-------------------------------------|"
- "L1.45[0,0] |------------------------------------L1.45-------------------------------------|"
- "L1.46[0,0] |------------------------------------L1.46-------------------------------------|"
- "L1.47[0,0] |------------------------------------L1.47-------------------------------------|"
- "L1.48[0,0] |------------------------------------L1.48-------------------------------------|"
- "L1.49[0,0] |------------------------------------L1.49-------------------------------------|"
- "L1.50[0,0] |------------------------------------L1.50-------------------------------------|"
- "L1.51[0,0] |------------------------------------L1.51-------------------------------------|"
- "L1.52[0,0] |------------------------------------L1.52-------------------------------------|"
- "L1.53[0,0] |------------------------------------L1.53-------------------------------------|"
- "L1.54[0,0] |------------------------------------L1.54-------------------------------------|"
- "L1.55[0,0] |------------------------------------L1.55-------------------------------------|"
- "L1.56[0,0] |------------------------------------L1.56-------------------------------------|"
- "L1.57[0,0] |------------------------------------L1.57-------------------------------------|"
- "L1.58[0,0] |------------------------------------L1.58-------------------------------------|"
- "L1.59[0,0] |------------------------------------L1.59-------------------------------------|"
- "L1.60[0,0] |------------------------------------L1.60-------------------------------------|"
"###
);
}

View File

@ -0,0 +1,9 @@
-- Add a soft-deletion timestamp to the "namespace" table.
--
-- <https://github.com/influxdata/influxdb_iox/issues/6492>
ALTER TABLE
namespace
ADD
COLUMN deleted_at BIGINT DEFAULT NULL;
CREATE INDEX namespace_deleted_at_idx ON namespace (deleted_at);

View File

@ -0,0 +1,9 @@
-- Add a soft-deletion timestamp to the "namespace" table.
--
-- <https://github.com/influxdata/influxdb_iox/issues/6492>
ALTER TABLE
namespace
ADD
COLUMN deleted_at numeric DEFAULT NULL;
CREATE INDEX namespace_deleted_at_idx ON namespace (deleted_at);