From 4ec07b5d80210c31b54545e19f001e5b4e21a5a7 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Sat, 2 Sep 2023 13:29:19 +0200 Subject: [PATCH 1/3] feat(gossip): schema cache anti-entropy service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Defines an RPC service to be used by two peers to converge their schema cache content by exchanging their serialised Merkle Search Tree pages (a compact representation of the MST itself). This will be used in the latter half of the following sync protocol: ┌─────┐ ┌────┐ │Local│ │Peer│ └──┬──┘ └─┬──┘ │ [1] Send content hash │ │────────────────────────> │ │ │ ┌───────────────┐ │ │Compute hash, │ │ │stop if equal │ │ └───────────────┘ │ │ │ ╔════════════════╗ │ ═══════════════════════╪═══╣ Switch to gRPC ╠═══╪═══════════════════════ │ ╚════════════════╝ │ │ │ │[2] Serialised MST pages│ │<──────────────────────── │ │ ┌──────────────┐ │ │Perform diff │ │ └──────────────┘ │ │ [3] Inconsistent pages │ │────────────────────────> │ │ │ │ ╔═══════╤════╪════════════════════════╪════════════╗ ║ LOOP │ For each inconsistent page │ ║ ╟───────┘ │ │ ║ ║ │ [4] Scheams │ ║ ║ │<──────────────────────── ║ ╚════════════╪════════════════════════╪════════════╝ ┌──┴──┐ ┌─┴──┐ │Local│ │Peer│ └─────┘ └────┘ The initial consistency probe request [1] is sent over gossip and is used to trigger a further sync of inconsistent MST content if necessary. This message is a no-op if the MSTs are found to be fully consistent. If an inconsistency is detected between the two peers, the protocol switches to perform RPC over TCP, calling the AntiEntropyService defined in this commit to complete the sync process. The receiver of the consistency probe [1] calls GetTreeDiff and provides the their MST pages [2], causing the local node to compute the diff between the two MSTs, and return the set of inconsistent ranges [3] that require convergence. Once the set of inconsistent ranges have been identified, the peer pulls all the schemas within those ranges and merges them into the local cache to ensure it has all the content of the source node. Once this protocol has run in both directions between two peers (and in absence of further updates between runs) then these two peers are guaranteed to have converged. --- .../iox/gossip/v1/schema_sync.proto | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/generated_types/protos/influxdata/iox/gossip/v1/schema_sync.proto b/generated_types/protos/influxdata/iox/gossip/v1/schema_sync.proto index 6aa389477f..a6a0b52fbb 100644 --- a/generated_types/protos/influxdata/iox/gossip/v1/schema_sync.proto +++ b/generated_types/protos/influxdata/iox/gossip/v1/schema_sync.proto @@ -2,6 +2,89 @@ syntax = "proto3"; package influxdata.iox.gossip.v1; option go_package = "github.com/influxdata/iox/gossip/v1"; +import "influxdata/iox/gossip/v1/schema.proto"; + +// An RPC service provided by peers wishing to take part in anti-entropy of +// their schema caches. +service AntiEntropyService { + // Return the computed Merkle Search Tree difference between the senders + // serialised compact MST representation included in the request, and the + // receivers local MST. + // + // The caller of this RPC sends their serialised MST page ranges, and the + // callee performs the tree diff, returning the key ranges identified as + // containing inconsistencies. + rpc GetTreeDiff(GetTreeDiffRequest) returns (GetTreeDiffResponse); + + // Fetch all schemas in the peer cache within the specified inclusive key + // range bounds. + rpc GetRange(GetRangeRequest) returns (GetRangeResponse); +} + +// Request the receiver perform a Merkle Search Tree diff against the provided +// set of MST pages. +message GetTreeDiffRequest { + // Serialised representation of the sender's MST. + repeated PageRange pages = 1; +} + +// The set of namespace name ranges that contain inconsistencies. +message GetTreeDiffResponse { + // Computed diff ranges containing MST inconsistencies between the two nodes. + repeated DiffRange ranges = 1; +} + +// A compact representation of a single page in a Merkle Search Tree. +message PageRange { + // Minimum namespace name in this page (inclusive). + string min = 1; + + // Maximum namespace name in this page (inclusive). + string max = 2; + + // A 16-byte MST page hash covering all entries in this page. + bytes page_hash = 3; +} + +// An inclusive range of namespace names which contains at least one +// inconsistent schema. +message DiffRange { + // Minimum inconsistent namespace name in this diff range (inclusive). + string min = 1; + + // Maximum inconsistent namespace name in this diff range (inclusive). + string max = 2; +} + +// Fetch the namespace schemas with namespace names falling within the specified +// inclusive range. +message GetRangeRequest { + // Minimum namespace name in this range to be fetched (inclusive). + string min = 1; + + // Maximum namespace name in this range to be fetched (inclusive). + string max = 2; +} + +// A set of namespace schemas for a range request. +message GetRangeResponse { + repeated NamespaceSchemaEntry namespaces = 1; +} + +// A composition of a "namespace create" event and zero-or-more "table create" +// events. +// +// Convergence is achieved by reapplying these gossip events and merging their +// content into the local node's schema cache. +message NamespaceSchemaEntry { + // The "namespace create" event containing namespace parameters. + influxdata.iox.gossip.v1.NamespaceCreated namespace = 1; + + // The "table create" events containing the set of all tables and their + // parameters. + repeated influxdata.iox.gossip.v1.TableCreated tables = 2; +} + // A gossip frame sent to peers to begin a sync round / consistency check. message ConsistencyProbe { // A 16-byte Merkle Search Tree root hash convering the schema cache content. From 80f8b55baa2480d3692f0a7f5f344e205f6ec4a9 Mon Sep 17 00:00:00 2001 From: Joe-Blount <73478756+Joe-Blount@users.noreply.github.com> Date: Mon, 18 Sep 2023 15:01:08 -0500 Subject: [PATCH 2/3] fix(compactor): retry OOM error at reduced concurrency (#8763) * fix(compactor): retry OOM error at reduced concurrency * chore: address comment --- compactor/src/driver.rs | 135 +++++++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 56 deletions(-) diff --git a/compactor/src/driver.rs b/compactor/src/driver.rs index 6d4ff38c66..961fc7a0bd 100644 --- a/compactor/src/driver.rs +++ b/compactor/src/driver.rs @@ -20,7 +20,7 @@ use crate::{ timeout::{timeout_with_progress_checking, TimeoutWithProgress}, Components, }, - error::{DynError, ErrorKind, SimpleError}, + error::{DynError, ErrorKind, ErrorKindExt, SimpleError}, file_classification::{FileClassification, FilesForProgress}, partition_info::PartitionInfo, PlanIR, RoundInfo, @@ -556,76 +556,99 @@ async fn execute_plan( span.set_metadata("input_bytes", plan_ir.input_bytes().to_string()); span.set_metadata("reason", plan_ir.reason()); + // We'll start with 1 permit and if the job exhausts resources, increase it. + let mut requested_permits = 1; + let mut res: Result, DynError> = Ok(Vec::new()); + let create = { // use the address of the plan as a uniq identifier so logs can be matched despite the concurrency. let plan_id = format!("{:p}", &plan_ir); - info!( - partition_id = partition_info.partition_id.get(), - jobs_running = df_semaphore.holders_acquired(), - jobs_pending = df_semaphore.holders_pending(), - permits_acquired = df_semaphore.permits_acquired(), - permits_pending = df_semaphore.permits_pending(), - plan_id, - "requesting job semaphore", - ); + while requested_permits <= df_semaphore.total_permits() { + info!( + partition_id = partition_info.partition_id.get(), + jobs_running = df_semaphore.holders_acquired(), + jobs_pending = df_semaphore.holders_pending(), + requested_permits, + permits_acquired = df_semaphore.permits_acquired(), + permits_pending = df_semaphore.permits_pending(), + plan_id, + "requesting job semaphore", + ); - // draw semaphore BEFORE creating the DataFusion plan and drop it directly AFTER finishing the - // DataFusion computation (but BEFORE doing any additional external IO). - // - // We guard the DataFusion planning (that doesn't perform any IO) via the semaphore as well in case - // DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our - // knowledge, this is currently (2023-08-29) not the case but if this ever changes, then we are prepared. - let permit_span = span.child("acquire_permit"); - let permit = df_semaphore - .acquire(None) - .await - .expect("semaphore not closed"); - drop(permit_span); + // draw semaphore BEFORE creating the DataFusion plan and drop it directly AFTER finishing the + // DataFusion computation (but BEFORE doing any additional external IO). + // + // We guard the DataFusion planning (that doesn't perform any IO) via the semaphore as well in case + // DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our + // knowledge, this is currently (2023-08-29) not the case but if this ever changes, then we are prepared. + let permit_span = span.child("acquire_permit"); + let permit = df_semaphore + .acquire_many(requested_permits as u32, None) + .await + .expect("semaphore not closed"); + drop(permit_span); - info!( - partition_id = partition_info.partition_id.get(), - column_count = partition_info.column_count(), - input_files = plan_ir.n_input_files(), - plan_id, - "job semaphore acquired", - ); + info!( + partition_id = partition_info.partition_id.get(), + column_count = partition_info.column_count(), + input_files = plan_ir.n_input_files(), + plan_id, + "job semaphore acquired", + ); - let df_span = span.child_span("data_fusion"); - let plan = components - .df_planner - .plan(&plan_ir, Arc::clone(partition_info)) - .await?; - let streams = components.df_plan_exec.exec(Arc::< - dyn datafusion::physical_plan::ExecutionPlan, - >::clone(&plan)); - let job = components.parquet_files_sink.stream_into_file_sink( - streams, - Arc::clone(partition_info), - plan_ir.target_level(), - &plan_ir, - ); + let df_span = span.child_span("data_fusion"); + let plan = components + .df_planner + .plan(&plan_ir, Arc::clone(partition_info)) + .await?; + let streams = components.df_plan_exec.exec(Arc::< + dyn datafusion::physical_plan::ExecutionPlan, + >::clone(&plan)); + let job = components.parquet_files_sink.stream_into_file_sink( + streams, + Arc::clone(partition_info), + plan_ir.target_level(), + &plan_ir, + ); - // TODO: react to OOM and try to divide branch - let res = job.await; + res = job.await; - if let Some(span) = &df_span { - send_metrics_to_tracing(Utc::now(), span, plan.as_ref(), true); - }; + if let Some(span) = &df_span { + send_metrics_to_tracing(Utc::now(), span, plan.as_ref(), true); + }; - drop(permit); - drop(df_span); + drop(permit); + drop(df_span); - // inputs can be removed from the scratchpad as soon as we're done with compaction. + info!( + partition_id = partition_info.partition_id.get(), + plan_id, "job semaphore released", + ); + + if let Err(e) = &res { + match e.classify() { + ErrorKind::OutOfMemory => { + requested_permits *= 2; + info!( + partition_id = partition_info.partition_id.get(), + plan_id, + requested_permits, + "job failed with out of memory error - increased permit request", + ); + } + _ => break, + } + } else { + break; + } + } + + // inputs can be removed from the scratchpad as soon as we're done with compaction scratchpad_ctx .clean_from_scratchpad(&plan_ir.input_paths()) .await; - info!( - partition_id = partition_info.partition_id.get(), - plan_id, "job semaphore released", - ); - res? }; From ef1a7b0ce8d96db1a729aeaa4b7f878825d727a9 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 19 Sep 2023 10:45:32 +0200 Subject: [PATCH 3/3] docs: lexicographical ordering of min/max The min/max values are the minimum/maximum values when ordered lexicographically. --- .../influxdata/iox/gossip/v1/schema_sync.proto | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/generated_types/protos/influxdata/iox/gossip/v1/schema_sync.proto b/generated_types/protos/influxdata/iox/gossip/v1/schema_sync.proto index a6a0b52fbb..583e872812 100644 --- a/generated_types/protos/influxdata/iox/gossip/v1/schema_sync.proto +++ b/generated_types/protos/influxdata/iox/gossip/v1/schema_sync.proto @@ -36,10 +36,10 @@ message GetTreeDiffResponse { // A compact representation of a single page in a Merkle Search Tree. message PageRange { - // Minimum namespace name in this page (inclusive). + // Lexicographically minimum namespace name in this page (inclusive). string min = 1; - // Maximum namespace name in this page (inclusive). + // Lexicographically maximum namespace name in this page (inclusive). string max = 2; // A 16-byte MST page hash covering all entries in this page. @@ -49,20 +49,24 @@ message PageRange { // An inclusive range of namespace names which contains at least one // inconsistent schema. message DiffRange { - // Minimum inconsistent namespace name in this diff range (inclusive). + // Lexicographically minimum inconsistent namespace name in this diff range + // (inclusive). string min = 1; - // Maximum inconsistent namespace name in this diff range (inclusive). + // Lexicographically maximum inconsistent namespace name in this diff range + // (inclusive). string max = 2; } // Fetch the namespace schemas with namespace names falling within the specified // inclusive range. message GetRangeRequest { - // Minimum namespace name in this range to be fetched (inclusive). + // Lexicographically minimum namespace name in this range to be fetched + // (inclusive). string min = 1; - // Maximum namespace name in this range to be fetched (inclusive). + // Lexicographically maximum namespace name in this range to be fetched + // (inclusive). string max = 2; }