Merge branch 'main' into dependabot/cargo/clap-4.4.4

pull/24376/head
Dom 2023-09-19 10:12:09 +01:00 committed by GitHub
commit 18c45d39bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 166 additions and 56 deletions

View File

@ -20,7 +20,7 @@ use crate::{
timeout::{timeout_with_progress_checking, TimeoutWithProgress},
Components,
},
error::{DynError, ErrorKind, SimpleError},
error::{DynError, ErrorKind, ErrorKindExt, SimpleError},
file_classification::{FileClassification, FilesForProgress},
partition_info::PartitionInfo,
PlanIR, RoundInfo,
@ -556,76 +556,99 @@ async fn execute_plan(
span.set_metadata("input_bytes", plan_ir.input_bytes().to_string());
span.set_metadata("reason", plan_ir.reason());
// We'll start with 1 permit and if the job exhausts resources, increase it.
let mut requested_permits = 1;
let mut res: Result<Vec<ParquetFileParams>, DynError> = Ok(Vec::new());
let create = {
// use the address of the plan as a uniq identifier so logs can be matched despite the concurrency.
let plan_id = format!("{:p}", &plan_ir);
info!(
partition_id = partition_info.partition_id.get(),
jobs_running = df_semaphore.holders_acquired(),
jobs_pending = df_semaphore.holders_pending(),
permits_acquired = df_semaphore.permits_acquired(),
permits_pending = df_semaphore.permits_pending(),
plan_id,
"requesting job semaphore",
);
while requested_permits <= df_semaphore.total_permits() {
info!(
partition_id = partition_info.partition_id.get(),
jobs_running = df_semaphore.holders_acquired(),
jobs_pending = df_semaphore.holders_pending(),
requested_permits,
permits_acquired = df_semaphore.permits_acquired(),
permits_pending = df_semaphore.permits_pending(),
plan_id,
"requesting job semaphore",
);
// draw semaphore BEFORE creating the DataFusion plan and drop it directly AFTER finishing the
// DataFusion computation (but BEFORE doing any additional external IO).
//
// We guard the DataFusion planning (that doesn't perform any IO) via the semaphore as well in case
// DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our
// knowledge, this is currently (2023-08-29) not the case but if this ever changes, then we are prepared.
let permit_span = span.child("acquire_permit");
let permit = df_semaphore
.acquire(None)
.await
.expect("semaphore not closed");
drop(permit_span);
// draw semaphore BEFORE creating the DataFusion plan and drop it directly AFTER finishing the
// DataFusion computation (but BEFORE doing any additional external IO).
//
// We guard the DataFusion planning (that doesn't perform any IO) via the semaphore as well in case
// DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our
// knowledge, this is currently (2023-08-29) not the case but if this ever changes, then we are prepared.
let permit_span = span.child("acquire_permit");
let permit = df_semaphore
.acquire_many(requested_permits as u32, None)
.await
.expect("semaphore not closed");
drop(permit_span);
info!(
partition_id = partition_info.partition_id.get(),
column_count = partition_info.column_count(),
input_files = plan_ir.n_input_files(),
plan_id,
"job semaphore acquired",
);
info!(
partition_id = partition_info.partition_id.get(),
column_count = partition_info.column_count(),
input_files = plan_ir.n_input_files(),
plan_id,
"job semaphore acquired",
);
let df_span = span.child_span("data_fusion");
let plan = components
.df_planner
.plan(&plan_ir, Arc::clone(partition_info))
.await?;
let streams = components.df_plan_exec.exec(Arc::<
dyn datafusion::physical_plan::ExecutionPlan,
>::clone(&plan));
let job = components.parquet_files_sink.stream_into_file_sink(
streams,
Arc::clone(partition_info),
plan_ir.target_level(),
&plan_ir,
);
let df_span = span.child_span("data_fusion");
let plan = components
.df_planner
.plan(&plan_ir, Arc::clone(partition_info))
.await?;
let streams = components.df_plan_exec.exec(Arc::<
dyn datafusion::physical_plan::ExecutionPlan,
>::clone(&plan));
let job = components.parquet_files_sink.stream_into_file_sink(
streams,
Arc::clone(partition_info),
plan_ir.target_level(),
&plan_ir,
);
// TODO: react to OOM and try to divide branch
let res = job.await;
res = job.await;
if let Some(span) = &df_span {
send_metrics_to_tracing(Utc::now(), span, plan.as_ref(), true);
};
if let Some(span) = &df_span {
send_metrics_to_tracing(Utc::now(), span, plan.as_ref(), true);
};
drop(permit);
drop(df_span);
drop(permit);
drop(df_span);
// inputs can be removed from the scratchpad as soon as we're done with compaction.
info!(
partition_id = partition_info.partition_id.get(),
plan_id, "job semaphore released",
);
if let Err(e) = &res {
match e.classify() {
ErrorKind::OutOfMemory => {
requested_permits *= 2;
info!(
partition_id = partition_info.partition_id.get(),
plan_id,
requested_permits,
"job failed with out of memory error - increased permit request",
);
}
_ => break,
}
} else {
break;
}
}
// inputs can be removed from the scratchpad as soon as we're done with compaction
scratchpad_ctx
.clean_from_scratchpad(&plan_ir.input_paths())
.await;
info!(
partition_id = partition_info.partition_id.get(),
plan_id, "job semaphore released",
);
res?
};

View File

@ -2,6 +2,93 @@ syntax = "proto3";
package influxdata.iox.gossip.v1;
option go_package = "github.com/influxdata/iox/gossip/v1";
import "influxdata/iox/gossip/v1/schema.proto";
// An RPC service provided by peers wishing to take part in anti-entropy of
// their schema caches.
service AntiEntropyService {
// Return the computed Merkle Search Tree difference between the senders
// serialised compact MST representation included in the request, and the
// receivers local MST.
//
// The caller of this RPC sends their serialised MST page ranges, and the
// callee performs the tree diff, returning the key ranges identified as
// containing inconsistencies.
rpc GetTreeDiff(GetTreeDiffRequest) returns (GetTreeDiffResponse);
// Fetch all schemas in the peer cache within the specified inclusive key
// range bounds.
rpc GetRange(GetRangeRequest) returns (GetRangeResponse);
}
// Request the receiver perform a Merkle Search Tree diff against the provided
// set of MST pages.
message GetTreeDiffRequest {
// Serialised representation of the sender's MST.
repeated PageRange pages = 1;
}
// The set of namespace name ranges that contain inconsistencies.
message GetTreeDiffResponse {
// Computed diff ranges containing MST inconsistencies between the two nodes.
repeated DiffRange ranges = 1;
}
// A compact representation of a single page in a Merkle Search Tree.
message PageRange {
// Lexicographically minimum namespace name in this page (inclusive).
string min = 1;
// Lexicographically maximum namespace name in this page (inclusive).
string max = 2;
// A 16-byte MST page hash covering all entries in this page.
bytes page_hash = 3;
}
// An inclusive range of namespace names which contains at least one
// inconsistent schema.
message DiffRange {
// Lexicographically minimum inconsistent namespace name in this diff range
// (inclusive).
string min = 1;
// Lexicographically maximum inconsistent namespace name in this diff range
// (inclusive).
string max = 2;
}
// Fetch the namespace schemas with namespace names falling within the specified
// inclusive range.
message GetRangeRequest {
// Lexicographically minimum namespace name in this range to be fetched
// (inclusive).
string min = 1;
// Lexicographically maximum namespace name in this range to be fetched
// (inclusive).
string max = 2;
}
// A set of namespace schemas for a range request.
message GetRangeResponse {
repeated NamespaceSchemaEntry namespaces = 1;
}
// A composition of a "namespace create" event and zero-or-more "table create"
// events.
//
// Convergence is achieved by reapplying these gossip events and merging their
// content into the local node's schema cache.
message NamespaceSchemaEntry {
// The "namespace create" event containing namespace parameters.
influxdata.iox.gossip.v1.NamespaceCreated namespace = 1;
// The "table create" events containing the set of all tables and their
// parameters.
repeated influxdata.iox.gossip.v1.TableCreated tables = 2;
}
// A gossip frame sent to peers to begin a sync round / consistency check.
message ConsistencyProbe {
// A 16-byte Merkle Search Tree root hash convering the schema cache content.