feat: track ingester jobs (#3836)
parent
10a1670341
commit
657ac249e9
|
@ -2101,6 +2101,7 @@ dependencies = [
|
|||
"tokio-util 0.7.0",
|
||||
"tonic",
|
||||
"trace",
|
||||
"tracker",
|
||||
"uuid",
|
||||
"workspace-hack",
|
||||
"write_buffer",
|
||||
|
|
|
@ -201,7 +201,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
object_store,
|
||||
write_buffer,
|
||||
Executor::new(config.query_exect_thread_count),
|
||||
&metric_registry,
|
||||
Arc::clone(&metric_registry),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
|
|
@ -38,6 +38,7 @@ thiserror = "1.0"
|
|||
time = { path = "../time" }
|
||||
tokio = { version = "1.17", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
|
||||
tonic = { version = "0.6" }
|
||||
tracker = { path = "../tracker" }
|
||||
uuid = { version = "0.8", features = ["v4"] }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
write_buffer = { path = "../write_buffer" }
|
||||
|
|
|
@ -1418,6 +1418,7 @@ mod tests {
|
|||
let pause_size = w1.size() + 1;
|
||||
let manager = LifecycleManager::new(
|
||||
LifecycleConfig::new(pause_size, 0, 0, Duration::from_secs(1)),
|
||||
Arc::new(metric::Registry::new()),
|
||||
Arc::new(SystemProvider::new()),
|
||||
);
|
||||
let should_pause = data
|
||||
|
@ -1501,6 +1502,7 @@ mod tests {
|
|||
std::mem::drop(repos);
|
||||
let manager = LifecycleManager::new(
|
||||
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)),
|
||||
Arc::new(metric::Registry::new()),
|
||||
Arc::new(SystemProvider::new()),
|
||||
);
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ impl IngestHandlerImpl {
|
|||
object_store: Arc<ObjectStore>,
|
||||
write_buffer: Arc<dyn WriteBufferReading>,
|
||||
exec: Executor,
|
||||
registry: &metric::Registry,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
) -> Result<Self> {
|
||||
// build the initial ingester data state
|
||||
let mut sequencers = BTreeMap::new();
|
||||
|
@ -139,12 +139,13 @@ impl IngestHandlerImpl {
|
|||
|
||||
let ingester_data = Arc::clone(&data);
|
||||
let kafka_topic_name = topic.name.clone();
|
||||
let ingest_metrics = WriteBufferIngestMetrics::new(registry, &topic.name);
|
||||
let ingest_metrics = WriteBufferIngestMetrics::new(&metric_registry, &topic.name);
|
||||
|
||||
// start the lifecycle manager
|
||||
let persister = Arc::clone(&data);
|
||||
let lifecycle_manager = Arc::new(LifecycleManager::new(
|
||||
lifecycle_config,
|
||||
metric_registry,
|
||||
Arc::new(SystemProvider::new()),
|
||||
));
|
||||
let manager = Arc::clone(&lifecycle_manager);
|
||||
|
@ -710,7 +711,7 @@ mod tests {
|
|||
object_store,
|
||||
reading,
|
||||
Executor::new(1),
|
||||
&metrics,
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use iox_catalog::interface::PartitionId;
|
||||
use parking_lot::Mutex;
|
||||
use time::TimeProvider;
|
||||
use tracker::{
|
||||
AbstractTaskRegistry, TaskRegistration, TaskRegistry, TaskRegistryWithHistory,
|
||||
TaskRegistryWithMetrics, TaskTracker,
|
||||
};
|
||||
|
||||
const JOB_HISTORY_SIZE: usize = 1000;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Job {
|
||||
Persist { partition_id: PartitionId },
|
||||
}
|
||||
|
||||
impl Job {
|
||||
fn name(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Persist { .. } => "persist",
|
||||
}
|
||||
}
|
||||
|
||||
fn partition_id(&self) -> Option<PartitionId> {
|
||||
match self {
|
||||
Self::Persist { partition_id, .. } => Some(*partition_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The global job registry
|
||||
#[derive(Debug)]
|
||||
pub struct JobRegistry {
|
||||
inner: Mutex<TaskRegistryWithMetrics<Job, TaskRegistryWithHistory<Job, TaskRegistry<Job>>>>,
|
||||
}
|
||||
|
||||
impl JobRegistry {
|
||||
pub fn new(
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> Self {
|
||||
let registry = TaskRegistry::new(time_provider);
|
||||
let registry = TaskRegistryWithHistory::new(registry, JOB_HISTORY_SIZE);
|
||||
let registry =
|
||||
TaskRegistryWithMetrics::new(registry, metric_registry, Box::new(f_attributes));
|
||||
Self {
|
||||
inner: Mutex::new(registry),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register(&self, job: Job) -> (TaskTracker<Job>, TaskRegistration) {
|
||||
self.inner.lock().register(job)
|
||||
}
|
||||
|
||||
/// Reclaims jobs into the historical archive
|
||||
///
|
||||
/// Returns the number of remaining jobs
|
||||
///
|
||||
/// Should be called periodically
|
||||
pub fn reclaim(&self) -> usize {
|
||||
let mut lock = self.inner.lock();
|
||||
lock.reclaim();
|
||||
lock.tracked_len()
|
||||
}
|
||||
}
|
||||
|
||||
fn f_attributes(job: &Job) -> metric::Attributes {
|
||||
let mut attributes = metric::Attributes::from(&[("name", job.name())]);
|
||||
|
||||
if let Some(partition_id) = job.partition_id() {
|
||||
attributes.insert("partition_id", partition_id.get().to_string());
|
||||
}
|
||||
|
||||
attributes
|
||||
}
|
|
@ -16,6 +16,7 @@
|
|||
pub mod compact;
|
||||
pub mod data;
|
||||
pub mod handler;
|
||||
mod job;
|
||||
pub mod lifecycle;
|
||||
pub mod persist;
|
||||
mod poison;
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
use crate::{
|
||||
data::Persister,
|
||||
job::{Job, JobRegistry},
|
||||
poison::{PoisonCabinet, PoisonPill},
|
||||
};
|
||||
use iox_catalog::interface::{PartitionId, SequenceNumber, SequencerId};
|
||||
|
@ -17,6 +18,7 @@ use std::sync::Arc;
|
|||
use std::time::Duration;
|
||||
use time::{Time, TimeProvider};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracker::TrackedFutureExt;
|
||||
|
||||
/// The lifecycle manager keeps track of the size and age of partitions across all sequencers.
|
||||
/// It triggers persistence based on keeping total memory usage around a set amount while
|
||||
|
@ -26,6 +28,7 @@ pub struct LifecycleManager {
|
|||
time_provider: Arc<dyn TimeProvider>,
|
||||
state: Mutex<LifecycleState>,
|
||||
persist_running: tokio::sync::Mutex<()>,
|
||||
job_registry: Arc<JobRegistry>,
|
||||
}
|
||||
|
||||
/// The configuration options for the lifecycle on the ingester.
|
||||
|
@ -113,12 +116,21 @@ pub struct PartitionLifecycleStats {
|
|||
impl LifecycleManager {
|
||||
/// Initialize a new lifecycle manager that will persist when `maybe_persist` is called
|
||||
/// if anything is over the size or age threshold.
|
||||
pub(crate) fn new(config: LifecycleConfig, time_provider: Arc<dyn TimeProvider>) -> Self {
|
||||
pub(crate) fn new(
|
||||
config: LifecycleConfig,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> Self {
|
||||
let job_registry = Arc::new(JobRegistry::new(
|
||||
metric_registry,
|
||||
Arc::clone(&time_provider),
|
||||
));
|
||||
Self {
|
||||
config,
|
||||
time_provider,
|
||||
state: Default::default(),
|
||||
persist_running: Default::default(),
|
||||
job_registry,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,9 +241,13 @@ impl LifecycleManager {
|
|||
self.remove(s.partition_id);
|
||||
let persister = Arc::clone(persister);
|
||||
|
||||
let (_tracker, registration) = self.job_registry.register(Job::Persist {
|
||||
partition_id: s.partition_id,
|
||||
});
|
||||
tokio::task::spawn(async move {
|
||||
persister.persist(s.partition_id).await;
|
||||
})
|
||||
.track(registration)
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -301,6 +317,8 @@ pub(crate) async fn run_lifecycle_manager<P: Persister>(
|
|||
|
||||
manager.maybe_persist(&persister).await;
|
||||
|
||||
manager.job_registry.reclaim();
|
||||
|
||||
tokio::select!(
|
||||
_ = tokio::time::sleep(CHECK_INTERVAL) => {},
|
||||
_ = shutdown.cancelled() => {},
|
||||
|
@ -313,7 +331,7 @@ mod tests {
|
|||
use super::*;
|
||||
use async_trait::async_trait;
|
||||
use std::collections::BTreeSet;
|
||||
use time::{MockProvider, SystemProvider};
|
||||
use time::MockProvider;
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestPersister {
|
||||
|
@ -358,9 +376,7 @@ mod tests {
|
|||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_nanos(0),
|
||||
};
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let tp = Arc::clone(&time_provider);
|
||||
let m = LifecycleManager::new(config, tp);
|
||||
let TestLifecycleManger { m, time_provider } = TestLifecycleManger::new(config);
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
|
||||
// log first two writes at different times
|
||||
|
@ -393,8 +409,7 @@ mod tests {
|
|||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_nanos(0),
|
||||
};
|
||||
let time_provider = Arc::new(SystemProvider::new());
|
||||
let m = LifecycleManager::new(config, time_provider);
|
||||
let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config);
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
|
||||
assert!(!m.log_write(
|
||||
|
@ -426,9 +441,7 @@ mod tests {
|
|||
partition_size_threshold: 10,
|
||||
partition_age_threshold: Duration::from_nanos(5),
|
||||
};
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let tp = Arc::clone(&time_provider);
|
||||
let m = LifecycleManager::new(config, tp);
|
||||
let TestLifecycleManger { m, time_provider } = TestLifecycleManger::new(config);
|
||||
let partition_id = PartitionId::new(1);
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
|
@ -472,10 +485,9 @@ mod tests {
|
|||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_millis(100),
|
||||
};
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config);
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
|
||||
let m = LifecycleManager::new(config, time_provider);
|
||||
let partition_id = PartitionId::new(1);
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
m.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 4);
|
||||
|
@ -515,8 +527,7 @@ mod tests {
|
|||
partition_age_threshold: Duration::from_millis(1000),
|
||||
};
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let m = LifecycleManager::new(config, time_provider);
|
||||
let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config);
|
||||
let partition_id = PartitionId::new(1);
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
m.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 8);
|
||||
|
@ -579,9 +590,7 @@ mod tests {
|
|||
partition_age_threshold: Duration::from_millis(1000),
|
||||
};
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let tp = Arc::clone(&time_provider);
|
||||
let m = LifecycleManager::new(config, tp);
|
||||
let TestLifecycleManger { m, time_provider } = TestLifecycleManger::new(config);
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
m.log_write(PartitionId::new(1), sequencer_id, SequenceNumber::new(1), 4);
|
||||
time_provider.inc(Duration::from_nanos(1));
|
||||
|
@ -603,4 +612,22 @@ mod tests {
|
|||
vec![(sequencer_id, SequenceNumber::new(3))]
|
||||
);
|
||||
}
|
||||
|
||||
struct TestLifecycleManger {
|
||||
m: LifecycleManager,
|
||||
time_provider: Arc<MockProvider>,
|
||||
}
|
||||
|
||||
impl TestLifecycleManger {
|
||||
fn new(config: LifecycleConfig) -> Self {
|
||||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let m = LifecycleManager::new(
|
||||
config,
|
||||
metric_registry,
|
||||
Arc::<MockProvider>::clone(&time_provider),
|
||||
);
|
||||
Self { m, time_provider }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ use tokio_util::sync::CancellationToken;
|
|||
|
||||
pub use future::{TrackedFuture, TrackedFutureExt};
|
||||
pub use history::TaskRegistryWithHistory;
|
||||
pub use metrics::TaskRegistryWithMetrics;
|
||||
pub use metrics::{FAttributes, TaskRegistryWithMetrics};
|
||||
pub use registry::{AbstractTaskRegistry, TaskId, TaskRegistry};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
|
|
Loading…
Reference in New Issue