chore: add tracing to compactor
parent
5521310005
commit
16939c849d
|
@ -973,6 +973,7 @@ dependencies = [
|
|||
"test_helpers",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"trace",
|
||||
"tracker",
|
||||
"uuid",
|
||||
"workspace-hack",
|
||||
|
@ -1017,6 +1018,7 @@ dependencies = [
|
|||
"parquet_file",
|
||||
"schema",
|
||||
"tokio",
|
||||
"trace",
|
||||
"tracker",
|
||||
"uuid",
|
||||
"workspace-hack",
|
||||
|
|
|
@ -25,6 +25,7 @@ rand = "0.8.3"
|
|||
schema = { path = "../schema" }
|
||||
tokio = { version = "1", features = ["macros", "rt", "sync"] }
|
||||
tokio-util = { version = "0.7.8" }
|
||||
trace = { version = "0.1.0", path = "../trace" }
|
||||
tracker = { path = "../tracker" }
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
workspace-hack = { version = "0.1", path = "../workspace-hack" }
|
||||
|
|
|
@ -57,6 +57,7 @@ impl Compactor {
|
|||
_ = shutdown_captured.cancelled() => {}
|
||||
_ = async {
|
||||
compact(
|
||||
config.trace_collector,
|
||||
config.partition_concurrency,
|
||||
config.partition_timeout,
|
||||
Arc::clone(&df_semaphore),
|
||||
|
|
|
@ -12,6 +12,8 @@ pub fn log_config(config: &Config) {
|
|||
let Config {
|
||||
// no need to print the internal state of the registry
|
||||
metric_registry: _,
|
||||
// no need to print the internal state of the trace collector
|
||||
trace_collector: _,
|
||||
catalog,
|
||||
scheduler_config,
|
||||
parquet_store_real,
|
||||
|
|
|
@ -22,6 +22,9 @@ pub struct Config {
|
|||
/// Metric registry.
|
||||
pub metric_registry: Arc<metric::Registry>,
|
||||
|
||||
/// trace collector
|
||||
pub trace_collector: Option<Arc<dyn trace::TraceCollector>>,
|
||||
|
||||
/// Central catalog.
|
||||
pub catalog: Arc<dyn Catalog>,
|
||||
|
||||
|
|
|
@ -5,6 +5,8 @@ use futures::{stream, StreamExt, TryStreamExt};
|
|||
use observability_deps::tracing::info;
|
||||
use parquet_file::ParquetFilePath;
|
||||
use tokio::sync::watch::Sender;
|
||||
use trace::span::Span;
|
||||
use trace::span::SpanRecorder;
|
||||
use tracker::InstrumentedAsyncSemaphore;
|
||||
|
||||
use crate::{
|
||||
|
@ -23,6 +25,7 @@ use crate::{
|
|||
/// Tries to compact all eligible partitions, up to
|
||||
/// partition_concurrency at a time.
|
||||
pub async fn compact(
|
||||
trace_collector: Option<Arc<dyn trace::TraceCollector>>,
|
||||
partition_concurrency: NonZeroUsize,
|
||||
partition_timeout: Duration,
|
||||
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
|
||||
|
@ -34,7 +37,25 @@ pub async fn compact(
|
|||
.map(|partition_id| {
|
||||
let components = Arc::clone(components);
|
||||
|
||||
// A root span is created for each partition. Later this can be linked to the
|
||||
// scheduler's span via something passed through partition_stream.
|
||||
let root_span: Option<Span> =
|
||||
match std::env::var("INFLUXDB_IOX_COMPACTION_PARTITION_TRACE") {
|
||||
Ok(v) => {
|
||||
if v == "all" || v == partition_id.get().to_string() {
|
||||
trace_collector
|
||||
.as_ref()
|
||||
.map(|collector| Span::root("compaction", Arc::clone(collector)))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(_) => None,
|
||||
};
|
||||
let span = SpanRecorder::new(root_span);
|
||||
|
||||
compact_partition(
|
||||
span,
|
||||
partition_id,
|
||||
partition_timeout,
|
||||
Arc::clone(&df_semaphore),
|
||||
|
@ -47,6 +68,7 @@ pub async fn compact(
|
|||
}
|
||||
|
||||
async fn compact_partition(
|
||||
span: SpanRecorder,
|
||||
partition_id: PartitionId,
|
||||
partition_timeout: Duration,
|
||||
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
|
||||
|
@ -60,6 +82,7 @@ async fn compact_partition(
|
|||
let scratchpad = Arc::clone(&scratchpad);
|
||||
async {
|
||||
try_compact_partition(
|
||||
span,
|
||||
partition_id,
|
||||
df_semaphore,
|
||||
components,
|
||||
|
@ -184,6 +207,7 @@ async fn compact_partition(
|
|||
/// . If there are no L0s files in the partition, the first round can just compact L1s and L2s to L2s
|
||||
/// . Round 2 happens or not depends on the stop condition
|
||||
async fn try_compact_partition(
|
||||
span: SpanRecorder,
|
||||
partition_id: PartitionId,
|
||||
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
|
||||
components: Arc<Components>,
|
||||
|
@ -199,6 +223,8 @@ async fn try_compact_partition(
|
|||
// with mutliple calls to execute_branch is important to frequently clean the scratchpad and prevent
|
||||
// high memory use.
|
||||
loop {
|
||||
let round_span = span.child("round");
|
||||
|
||||
let round_info = components
|
||||
.round_info_source
|
||||
.calculate(&partition_info, &files)
|
||||
|
@ -239,9 +265,11 @@ async fn try_compact_partition(
|
|||
let df_semaphore = Arc::clone(&df_semaphore);
|
||||
let transmit_progress_signal = Arc::clone(&transmit_progress_signal);
|
||||
let scratchpad = Arc::clone(&scratchpad_ctx);
|
||||
let branch_span = round_span.child("branch");
|
||||
|
||||
async move {
|
||||
execute_branch(
|
||||
branch_span,
|
||||
partition_id,
|
||||
branch,
|
||||
df_semaphore,
|
||||
|
@ -265,6 +293,7 @@ async fn try_compact_partition(
|
|||
/// Compact or split given files
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn execute_branch(
|
||||
span: SpanRecorder,
|
||||
partition_id: PartitionId,
|
||||
branch: Vec<ParquetFile>,
|
||||
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
|
||||
|
@ -315,6 +344,7 @@ async fn execute_branch(
|
|||
|
||||
// Compact & Split
|
||||
let created_file_params = run_plans(
|
||||
span.child("run_plans"),
|
||||
split_or_compact.clone(),
|
||||
&partition_info,
|
||||
&components,
|
||||
|
@ -328,11 +358,13 @@ async fn execute_branch(
|
|||
scratchpad_ctx.clean_from_scratchpad(&input_paths).await;
|
||||
|
||||
// upload files to real object store
|
||||
let upload_span = span.child("upload_objects");
|
||||
let created_file_params = upload_files_to_object_store(
|
||||
created_file_params,
|
||||
Arc::<dyn Scratchpad>::clone(&scratchpad_ctx),
|
||||
)
|
||||
.await;
|
||||
drop(upload_span);
|
||||
|
||||
for file_param in &created_file_params {
|
||||
info!(
|
||||
|
@ -384,6 +416,7 @@ async fn execute_branch(
|
|||
|
||||
/// Compact or split given files
|
||||
async fn run_plans(
|
||||
span: SpanRecorder,
|
||||
split_or_compact: FilesToSplitOrCompact,
|
||||
partition_info: &Arc<PartitionInfo>,
|
||||
components: &Arc<Components>,
|
||||
|
@ -392,9 +425,11 @@ async fn run_plans(
|
|||
scratchpad_ctx: Arc<dyn Scratchpad>,
|
||||
) -> Result<Vec<ParquetFileParams>, DynError> {
|
||||
// stage files
|
||||
let download_span = span.child("download_objects");
|
||||
let input_uuids_inpad = scratchpad_ctx
|
||||
.load_to_scratchpad(&split_or_compact.file_input_paths())
|
||||
.await;
|
||||
drop(download_span);
|
||||
|
||||
let plans = components.ir_planner.create_plans(
|
||||
Arc::clone(partition_info),
|
||||
|
@ -417,6 +452,7 @@ async fn run_plans(
|
|||
)
|
||||
.map(|plan_ir| {
|
||||
execute_plan(
|
||||
span.child("execute_plan"),
|
||||
plan_ir,
|
||||
partition_info,
|
||||
components,
|
||||
|
@ -431,6 +467,7 @@ async fn run_plans(
|
|||
}
|
||||
|
||||
async fn execute_plan(
|
||||
span: SpanRecorder,
|
||||
plan_ir: PlanIR,
|
||||
partition_info: &Arc<PartitionInfo>,
|
||||
components: &Arc<Components>,
|
||||
|
@ -460,10 +497,13 @@ async fn execute_plan(
|
|||
// We guard the DataFusion planning (that doesn't perform any IO) via the semaphore as well in case
|
||||
// DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our
|
||||
// knowledge, this is currently (2023-01-25) not the case but if this ever changes, then we are prepared.
|
||||
let permit_span = span.child("acquire_permit");
|
||||
let permit = df_semaphore
|
||||
.acquire_many(permits, None)
|
||||
.await
|
||||
.expect("semaphore not closed");
|
||||
drop(permit_span);
|
||||
|
||||
info!(
|
||||
partition_id = partition_info.partition_id.get(),
|
||||
column_count = partition_info.column_count(),
|
||||
|
@ -473,6 +513,7 @@ async fn execute_plan(
|
|||
"job semaphore acquired",
|
||||
);
|
||||
|
||||
let df_span = span.child("data_fusion");
|
||||
let plan = components
|
||||
.df_planner
|
||||
.plan(&plan_ir, Arc::clone(partition_info))
|
||||
|
@ -489,6 +530,7 @@ async fn execute_plan(
|
|||
let res = job.await;
|
||||
|
||||
drop(permit);
|
||||
drop(df_span);
|
||||
info!(
|
||||
partition_id = partition_info.partition_id.get(),
|
||||
plan_id, "job semaphore released",
|
||||
|
|
|
@ -24,6 +24,7 @@ observability_deps = { path = "../observability_deps" }
|
|||
parquet_file = { path = "../parquet_file" }
|
||||
schema = { path = "../schema" }
|
||||
tokio = { version = "1", features = ["macros", "rt", "sync"] }
|
||||
trace = { version = "0.1.0", path = "../trace" }
|
||||
tracker = { path = "../tracker" }
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
workspace-hack = { version = "0.1", path = "../workspace-hack" }
|
||||
|
|
|
@ -56,6 +56,7 @@ use iox_time::{MockProvider, Time, TimeProvider};
|
|||
use object_store::{path::Path, DynObjectStore};
|
||||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use schema::sort::SortKey;
|
||||
use trace::{RingBufferTraceCollector, TraceCollector};
|
||||
use tracker::AsyncSemaphoreMetrics;
|
||||
|
||||
// Default values for the test setup builder
|
||||
|
@ -118,8 +119,13 @@ impl TestSetupBuilder<false> {
|
|||
let commit_wrapper = CommitRecorderBuilder::new(Arc::clone(&run_log))
|
||||
.with_invariant_check(Arc::clone(&invariant_check) as _);
|
||||
|
||||
let ring_buffer = Arc::new(RingBufferTraceCollector::new(5));
|
||||
let trace_collector: Option<Arc<dyn TraceCollector>> =
|
||||
Some(Arc::new(Arc::clone(&ring_buffer)));
|
||||
|
||||
let config = Config {
|
||||
metric_registry: catalog.metric_registry(),
|
||||
trace_collector,
|
||||
catalog: catalog.catalog(),
|
||||
scheduler_config: SchedulerConfig::default(),
|
||||
parquet_store_real: catalog.parquet_store.clone(),
|
||||
|
@ -677,6 +683,7 @@ impl TestSetup {
|
|||
let df_semaphore = Arc::new(
|
||||
Arc::new(AsyncSemaphoreMetrics::new(&config.metric_registry, [])).new_semaphore(10),
|
||||
);
|
||||
let trace_collector = config.trace_collector.clone();
|
||||
|
||||
// register scratchpad store
|
||||
let runtime_env = self
|
||||
|
@ -692,6 +699,7 @@ impl TestSetup {
|
|||
);
|
||||
|
||||
compact(
|
||||
trace_collector,
|
||||
NonZeroUsize::new(10).unwrap(),
|
||||
config.partition_timeout,
|
||||
df_semaphore,
|
||||
|
|
|
@ -160,6 +160,7 @@ pub async fn create_compactor_server_type(
|
|||
|
||||
let compactor = Compactor::start(Config {
|
||||
metric_registry: Arc::clone(&metric_registry),
|
||||
trace_collector: common_state.trace_collector(),
|
||||
catalog,
|
||||
scheduler_config: convert_scheduler_config(
|
||||
compactor_config.compactor_scheduler_config.clone(),
|
||||
|
|
Loading…
Reference in New Issue