diff --git a/data_types/src/job.rs b/data_types/src/job.rs index 10c64919d6..ec6929eb8c 100644 --- a/data_types/src/job.rs +++ b/data_types/src/job.rs @@ -115,16 +115,11 @@ impl Job { pub enum OperationStatus { /// A task associated with the operation is running Running, - /// All tasks associated with the operation have finished - /// - /// Note: This does not indicate success or failure only that - /// no tasks associated with the operation are running - Complete, + /// All tasks associated with the operation have finished successfully + Success, /// The operation was cancelled and no associated tasks are running Cancelled, /// An operation error was returned - /// - /// Note: The tracker system currently will never return this Errored, } @@ -135,10 +130,18 @@ pub enum OperationStatus { pub struct Operation { /// ID of the running operation pub id: usize, - /// Number of subtasks for this operation - pub task_count: u64, - /// Number of pending tasks for this operation + // The total number of created tasks + pub total_count: u64, + // The number of pending tasks pub pending_count: u64, + // The number of tasks that completed successfully + pub success_count: u64, + // The number of tasks that returned an error + pub error_count: u64, + // The number of tasks that were cancelled + pub cancelled_count: u64, + // The number of tasks that did not run to completion (e.g. panic) + pub dropped_count: u64, /// Wall time spent executing this operation pub wall_time: std::time::Duration, /// CPU time spent executing this operation diff --git a/generated_types/protos/influxdata/iox/management/v1/jobs.proto b/generated_types/protos/influxdata/iox/management/v1/jobs.proto index 68c64a50d9..8f22ed9b92 100644 --- a/generated_types/protos/influxdata/iox/management/v1/jobs.proto +++ b/generated_types/protos/influxdata/iox/management/v1/jobs.proto @@ -8,12 +8,24 @@ message OperationMetadata { // How many nanoseconds has it been since the job was submitted uint64 wall_nanos = 2; - // How many total tasks does this job have currently - uint64 task_count = 3; + // The total number of created tasks + uint64 total_count = 3; - // How many tasks for this job are still pending + // The number of pending tasks uint64 pending_count = 4; + // The number of tasks that completed successfully + uint64 success_count = 13; + + // The number of tasks that returned an error + uint64 error_count = 14; + + // The number of tasks that were cancelled + uint64 cancelled_count = 15; + + // The number of tasks that did not run to completion (e.g. panic) + uint64 dropped_count = 16; + // What kind of job is it? oneof job { Dummy dummy = 5; diff --git a/generated_types/src/job.rs b/generated_types/src/job.rs index 2b39019d30..2cb1c78221 100644 --- a/generated_types/src/job.rs +++ b/generated_types/src/job.rs @@ -157,7 +157,7 @@ impl TryFrom for data_types::job::Operation { let status = match &operation.result { None => OperationStatus::Running, - Some(longrunning::operation::Result::Response(_)) => OperationStatus::Complete, + Some(longrunning::operation::Result::Response(_)) => OperationStatus::Success, Some(longrunning::operation::Result::Error(status)) => { if status.code == tonic::Code::Cancelled as i32 { OperationStatus::Cancelled @@ -169,8 +169,12 @@ impl TryFrom for data_types::job::Operation { Ok(Self { id: operation.name.parse().field("name")?, - task_count: meta.task_count, + total_count: meta.total_count, pending_count: meta.pending_count, + success_count: meta.success_count, + error_count: meta.error_count, + cancelled_count: meta.cancelled_count, + dropped_count: meta.dropped_count, wall_time: std::time::Duration::from_nanos(meta.wall_nanos), cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos), job: meta.job.map(Into::into), diff --git a/query/src/exec/task.rs b/query/src/exec/task.rs index 0860282332..53512b9f97 100644 --- a/query/src/exec/task.rs +++ b/query/src/exec/task.rs @@ -9,6 +9,7 @@ use tracker::{TaskRegistration, TrackedFutureExt}; use futures::Future; use observability_deps::tracing::warn; +use std::convert::Infallible; /// The type of thing that the dedicated executor runs type Task = Pin + Send>>; @@ -83,7 +84,13 @@ impl DedicatedExecutor { let registration = TaskRegistration::new(); while let Ok(task) = rx.recv() { - tokio::task::spawn(task.track(registration.clone())); + tokio::task::spawn( + async move { + task.await; + Ok::<_, Infallible>(()) + } + .track(registration.clone()), + ); } // Wait for all tasks to finish diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index 6554385ca9..e467fbfe55 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -1,13 +1,7 @@ //! Tests for the Influx gRPC queries -use std::{convert::TryFrom, num::NonZeroU32}; - use crate::scenarios::*; -use data_types::database_rules::LifecycleRules; -use server::{ - db::test_helpers::write_lp, - utils::{make_db, TestDb}, -}; +use server::{db::test_helpers::write_lp, utils::make_db}; use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; @@ -194,14 +188,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths { db, }; - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; let data = lp_lines.join("\n"); write_lp(&db, &data).await; // roll over and load chunks into both RUB and OS diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 52ab2f66c1..e594aafff7 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -1,12 +1,9 @@ //! This module contains testing scenarios for Db use std::collections::HashMap; -use std::convert::TryFrom; -use std::num::NonZeroU32; use std::sync::Arc; use std::time::{Duration, Instant}; -use data_types::database_rules::LifecycleRules; use once_cell::sync::OnceCell; #[allow(unused_imports, dead_code, unused_macros)] @@ -16,7 +13,6 @@ use async_trait::async_trait; use server::utils::{ count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db, - TestDb, }; use server::{db::test_helpers::write_lp, Db}; @@ -138,14 +134,7 @@ impl DbSetup for NoData { // Scenario 4: the database has had data loaded into RB & Object Store and then deleted // - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; let data = "cpu,region=west user=23.2 100"; write_lp(&db, data).await; // move data out of open chunk @@ -564,14 +553,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle { async fn make(&self) -> Vec { let partition_key = "1970-01-01T00"; - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; write_lp( &db, @@ -771,14 +753,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> }; // Scenario 4: One closed chunk in both RUb and OS - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) @@ -802,14 +777,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> }; // Scenario 5: One closed chunk in OS only - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) @@ -914,14 +882,7 @@ pub async fn make_two_chunk_scenarios( }; // in 2 read buffer chunks that also loaded into object store - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; let table_names = write_lp(&db, data1).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) @@ -960,14 +921,7 @@ pub async fn make_two_chunk_scenarios( }; // Scenario 6: Two closed chunk in OS only - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; let table_names = write_lp(&db, data1).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) @@ -1077,14 +1031,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario( }; // Scenario 2: One closed chunk in Parquet only - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; let table_names = write_lp(&db, data).await; for table_name in &table_names { db.rollover_partition(&table_name, partition_key) diff --git a/server/src/db.rs b/server/src/db.rs index 77114930ce..b6792835da 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1392,13 +1392,7 @@ mod tests { #[tokio::test] async fn metrics_during_rollover() { - let test_db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await; + let test_db = make_db().await; let db = test_db.db; write_lp(db.as_ref(), "cpu bar=1 10").await; @@ -2454,14 +2448,7 @@ mod tests { #[tokio::test] async fn chunk_summaries() { // Test that chunk id listing is hooked up - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; // get three chunks: one open, one closed in mb and one close in rb write_lp(&db, "cpu bar=1 1").await; @@ -2541,14 +2528,7 @@ mod tests { #[tokio::test] async fn partition_summaries() { // Test that chunk id listing is hooked up - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; write_lp(&db, "cpu bar=1 1").await; let chunk_id = db @@ -2741,14 +2721,7 @@ mod tests { #[tokio::test] async fn write_chunk_to_object_store_in_background() { // Test that data can be written to object store using a background task - let db = TestDb::builder() - .lifecycle_rules(LifecycleRules { - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .build() - .await - .db; + let db = make_db().await.db; // create MB partition write_lp(db.as_ref(), "cpu bar=1 10").await; diff --git a/server/src/lib.rs b/server/src/lib.rs index c95bca0ba9..f4a5722c3b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -68,7 +68,7 @@ clippy::future_not_send )] -use std::convert::TryInto; +use std::convert::{Infallible, TryInto}; use std::sync::Arc; use async_trait::async_trait; @@ -872,8 +872,11 @@ where for duration in nanos { tokio::spawn( - tokio::time::sleep(tokio::time::Duration::from_nanos(duration)) - .track(registration.clone()), + async move { + tokio::time::sleep(tokio::time::Duration::from_nanos(duration)).await; + Ok::<_, Infallible>(()) + } + .track(registration.clone()), ); } diff --git a/server/src/utils.rs b/server/src/utils.rs index d1b55440d9..7827e4c278 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration}; +use std::{borrow::Cow, convert::TryFrom, num::NonZeroU32, sync::Arc, time::Duration}; use data_types::{ chunk_metadata::{ChunkStorage, ChunkSummary}, @@ -77,7 +77,11 @@ impl TestDbBuilder { .worker_cleanup_avg_sleep .unwrap_or_else(|| Duration::from_secs(1)); - rules.lifecycle_rules = self.lifecycle_rules.unwrap_or_default(); + // default to quick lifecycle rules for faster tests + rules.lifecycle_rules = self.lifecycle_rules.unwrap_or_else(|| LifecycleRules { + late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), + ..Default::default() + }); // set partion template if let Some(partition_template) = self.partition_template { diff --git a/src/influxdb_ioxd/rpc/operations.rs b/src/influxdb_ioxd/rpc/operations.rs index 5b906d4e60..79a8467499 100644 --- a/src/influxdb_ioxd/rpc/operations.rs +++ b/src/influxdb_ioxd/rpc/operations.rs @@ -18,7 +18,7 @@ use generated_types::{ influxdata::iox::management::v1 as management, protobuf_type_url, }; -use tracker::{TaskId, TaskStatus, TaskTracker}; +use tracker::{TaskId, TaskResult, TaskStatus, TaskTracker}; use server::{ConnectionManager, Server}; use std::convert::TryInto; @@ -30,48 +30,44 @@ struct OperationsService { pub fn encode_tracker(tracker: TaskTracker) -> Result { let id = tracker.id(); - let is_cancelled = tracker.is_cancelled(); let status = tracker.get_status(); + let result = status.result(); - let (operation_metadata, is_complete) = match status { - TaskStatus::Creating => { - let metadata = management::OperationMetadata { - job: Some(tracker.metadata().clone().into()), - ..Default::default() - }; - - (metadata, false) - } + let operation_metadata = match status { + TaskStatus::Creating => management::OperationMetadata { + job: Some(tracker.metadata().clone().into()), + ..Default::default() + }, TaskStatus::Running { total_count, pending_count, cpu_nanos, - } => { - let metadata = management::OperationMetadata { - cpu_nanos: cpu_nanos as _, - task_count: total_count as _, - pending_count: pending_count as _, - job: Some(tracker.metadata().clone().into()), - ..Default::default() - }; - - (metadata, false) - } + } => management::OperationMetadata { + cpu_nanos: cpu_nanos as _, + total_count: total_count as _, + pending_count: pending_count as _, + job: Some(tracker.metadata().clone().into()), + ..Default::default() + }, TaskStatus::Complete { total_count, + success_count, + error_count, + cancelled_count, + dropped_count, cpu_nanos, wall_nanos, - } => { - let metadata = management::OperationMetadata { - cpu_nanos: cpu_nanos as _, - task_count: total_count as _, - wall_nanos: wall_nanos as _, - job: Some(tracker.metadata().clone().into()), - ..Default::default() - }; - - (metadata, true) - } + } => management::OperationMetadata { + cpu_nanos: cpu_nanos as _, + total_count: total_count as _, + success_count: success_count as _, + error_count: error_count as _, + cancelled_count: cancelled_count as _, + dropped_count: dropped_count as _, + wall_nanos: wall_nanos as _, + job: Some(tracker.metadata().clone().into()), + ..Default::default() + }, }; let mut buffer = BytesMut::new(); @@ -85,25 +81,33 @@ pub fn encode_tracker(tracker: TaskTracker) -> Result Some(operation::Result::Error(Status { + let result = match result { + Some(TaskResult::Success) => Some(operation::Result::Response(Any { + type_url: "type.googleapis.com/google.protobuf.Empty".to_string(), + value: Default::default(), + })), + Some(TaskResult::Cancelled) => Some(operation::Result::Error(Status { code: tonic::Code::Cancelled as _, message: "Job cancelled".to_string(), details: vec![], })), - - (true, false) => Some(operation::Result::Response(Any { - type_url: "type.googleapis.com/google.protobuf.Empty".to_string(), - value: Default::default(), // TODO: Verify this is correct + Some(TaskResult::Dropped) => Some(operation::Result::Error(Status { + code: tonic::Code::Internal as _, + message: "Job did not run to completion, possible panic".to_string(), + details: vec![], })), - - _ => None, + Some(TaskResult::Error) => Some(operation::Result::Error(Status { + code: tonic::Code::Internal as _, + message: "Job returned an error".to_string(), + details: vec![], + })), + None => None, }; Ok(Operation { name: id.to_string(), metadata: Some(metadata), - done: is_complete, + done: result.is_some(), result, }) } diff --git a/tests/end_to_end_cases/operations_api.rs b/tests/end_to_end_cases/operations_api.rs index 35b9cd55ba..6c1270690a 100644 --- a/tests/end_to_end_cases/operations_api.rs +++ b/tests/end_to_end_cases/operations_api.rs @@ -40,7 +40,7 @@ async fn test_operations() { let job = meta.job.expect("expected a job"); - assert_eq!(meta.task_count, 2); + assert_eq!(meta.total_count, 2); assert_eq!(meta.pending_count, 1); assert_eq!(job, operation_metadata::Job::Dummy(Dummy { nanos })); @@ -76,7 +76,8 @@ async fn test_operations() { assert!(meta.wall_nanos > 0); assert!(meta.cpu_nanos > 0); assert_eq!(meta.pending_count, 0); - assert_eq!(meta.task_count, 2); + assert_eq!(meta.total_count, 2); + assert_eq!(meta.cancelled_count, 1); match waited.result { Some(operations::generated_types::operation::Result::Error(status)) => { diff --git a/tests/end_to_end_cases/operations_cli.rs b/tests/end_to_end_cases/operations_cli.rs index ac802b3889..2b0e678cad 100644 --- a/tests/end_to_end_cases/operations_cli.rs +++ b/tests/end_to_end_cases/operations_cli.rs @@ -24,7 +24,7 @@ async fn test_start_stop() { ) .expect("expected JSON output"); - assert_eq!(stdout.task_count, 1); + assert_eq!(stdout.total_count, 1); match stdout.job { Some(Job::Dummy { nanos }) => assert_eq!(nanos, vec![duration]), _ => panic!("expected dummy job got {:?}", stdout.job), @@ -82,7 +82,8 @@ async fn test_start_stop() { .expect("expected JSON output"); assert_eq!(completed.pending_count, 0); - assert_eq!(completed.task_count, 1); + assert_eq!(completed.total_count, 1); + assert_eq!(completed.cancelled_count, 1); assert_eq!(completed.status, OperationStatus::Cancelled); assert_eq!(&completed.job, &operations[0].job) } diff --git a/tracker/src/task.rs b/tracker/src/task.rs index 4ea55984d3..6a1443a08d 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -111,11 +111,28 @@ struct TrackerState { pending_futures: AtomicUsize, pending_registrations: AtomicUsize, + ok_futures: AtomicUsize, + err_futures: AtomicUsize, + cancelled_futures: AtomicUsize, + notify: Notify, } +/// Returns a summary of the task execution +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum TaskResult { + /// All futures completed successfully + Success, + /// Some futures were cancelled, and none were dropped or errored + Cancelled, + /// Some futures were dropped, and none errored + Dropped, + /// Some futures returned an error + Error, +} + /// The status of the tracked task -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub enum TaskStatus { /// More futures can be registered Creating, @@ -141,6 +158,14 @@ pub enum TaskStatus { Complete { /// The number of created futures total_count: usize, + /// The number of futures that completed successfully + success_count: usize, + /// The number of futures that returned an error + error_count: usize, + /// The number of futures that were aborted + cancelled_count: usize, + /// The number of futures that were dropped without running to completion (e.g. panic) + dropped_count: usize, /// The total amount of CPU time spent executing the futures cpu_nanos: usize, /// The number of nanoseconds between tracker registration and @@ -169,7 +194,7 @@ impl TaskStatus { } } - /// If the job has competed, returns the total amount of wall clock time + /// If the job has completed, returns the total amount of wall clock time /// spent executing futures pub fn wall_nanos(&self) -> Option { match self { @@ -178,6 +203,34 @@ impl TaskStatus { Self::Complete { wall_nanos, .. } => Some(*wall_nanos), } } + + /// Returns the result of the job if it has completed, otherwise None + pub fn result(&self) -> Option { + match self { + TaskStatus::Creating => None, + TaskStatus::Running { .. } => None, + TaskStatus::Complete { + total_count, + success_count, + error_count, + dropped_count, + cancelled_count, + .. + } => { + if *error_count != 0 { + Some(TaskResult::Error) + } else if *dropped_count != 0 { + Some(TaskResult::Dropped) + } else if *cancelled_count != 0 { + Some(TaskResult::Cancelled) + } else { + // Sanity check + assert_eq!(total_count, success_count); + Some(TaskResult::Success) + } + } + } + } } /// A Tracker can be used to monitor/cancel/wait for a set of associated futures @@ -279,11 +332,29 @@ where pending_count: self.state.pending_futures.load(Ordering::Relaxed), cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), }, - (true, true) => TaskStatus::Complete { - total_count: self.state.created_futures.load(Ordering::Relaxed), - cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), - wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed), - }, + (true, true) => { + let total_count = self.state.created_futures.load(Ordering::Relaxed); + let success_count = self.state.ok_futures.load(Ordering::Relaxed); + let error_count = self.state.err_futures.load(Ordering::Relaxed); + let cancelled_count = self.state.cancelled_futures.load(Ordering::Relaxed); + + // Failure of this would imply a future reported its completion status multiple + // times or a future was created without incrementing created_futures. + // Both of these should be impossible + let dropped_count = total_count + .checked_sub(success_count + error_count + cancelled_count) + .expect("invalid tracker state"); + + TaskStatus::Complete { + total_count, + success_count, + error_count, + cancelled_count, + dropped_count, + cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), + wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed), + } + } } } @@ -352,6 +423,9 @@ impl Default for TaskRegistration { created_futures: AtomicUsize::new(0), pending_futures: AtomicUsize::new(0), pending_registrations: AtomicUsize::new(1), + ok_futures: AtomicUsize::new(0), + err_futures: AtomicUsize::new(0), + cancelled_futures: AtomicUsize::new(0), notify: Notify::new(), }); @@ -401,8 +475,18 @@ mod tests { use std::time::Duration; use super::*; + use futures::FutureExt; + use std::convert::Infallible; use tokio::sync::oneshot; + fn pending() -> futures::future::Pending> { + futures::future::pending() + } + + fn ready_ok() -> futures::future::Ready> { + futures::future::ready(Ok(())) + } + #[tokio::test] async fn test_lifecycle() { let (sender, receive) = oneshot::channel(); @@ -451,7 +535,7 @@ mod tests { let (_, registration) = registry.register(()); { - let f = futures::future::pending::<()>().track(registration); + let f = pending().track(registration); assert_eq!(registry.running().len(), 1); @@ -467,9 +551,9 @@ mod tests { let (_, registration) = registry.register(()); { - let f = futures::future::pending::<()>().track(registration.clone()); + let f = pending().track(registration.clone()); { - let f = futures::future::pending::<()>().track(registration); + let f = pending().track(registration); assert_eq!(registry.running().len(), 1); std::mem::drop(f); } @@ -485,7 +569,7 @@ mod tests { let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); - let task = tokio::spawn(futures::future::pending::<()>().track(registration)); + let task = tokio::spawn(pending().track(registration)); let tracked = registry.running(); assert_eq!(tracked.len(), 1); @@ -503,7 +587,7 @@ mod tests { let (tracker, registration) = registry.register(()); tracker.cancel(); - let task1 = tokio::spawn(futures::future::pending::<()>().track(registration)); + let task1 = tokio::spawn(pending().track(registration)); let result1 = task1.await.unwrap(); assert!(result1.is_err()); @@ -515,8 +599,8 @@ mod tests { let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); - let task1 = tokio::spawn(futures::future::pending::<()>().track(registration.clone())); - let task2 = tokio::spawn(futures::future::pending::<()>().track(registration)); + let task1 = tokio::spawn(pending().track(registration.clone())); + let task2 = tokio::spawn(pending().track(registration)); let tracked = registry.running(); assert_eq!(tracked.len(), 1); @@ -539,11 +623,11 @@ mod tests { let (_, registration2) = registry.register(2); let (_, registration3) = registry.register(3); - let task1 = tokio::spawn(futures::future::pending::<()>().track(registration1.clone())); - let task2 = tokio::spawn(futures::future::pending::<()>().track(registration1)); - let task3 = tokio::spawn(futures::future::ready(()).track(registration2.clone())); - let task4 = tokio::spawn(futures::future::pending::<()>().track(registration2)); - let task5 = tokio::spawn(futures::future::pending::<()>().track(registration3)); + let task1 = tokio::spawn(pending().track(registration1.clone())); + let task2 = tokio::spawn(pending().track(registration1)); + let task3 = tokio::spawn(ready_ok().track(registration2.clone())); + let task4 = tokio::spawn(pending().track(registration2)); + let task5 = tokio::spawn(pending().track(registration3)); let running = sorted(registry.running()); let tracked = sorted(registry.tracked()); @@ -637,25 +721,25 @@ mod tests { let (tracker2, registration2) = registry.register(2); let (tracker3, registration3) = registry.register(3); - let task1 = - tokio::spawn(tokio::time::sleep(Duration::from_millis(100)).track(registration1)); - let task2 = tokio::spawn( - async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration2), - ); + let async_task = || async move { + tokio::time::sleep(Duration::from_millis(100)).await; + Ok::<_, Infallible>(()) + }; - let task3 = tokio::spawn( - async move { std::thread::sleep(Duration::from_millis(100)) } - .track(registration3.clone()), - ); + let blocking_task = || async move { + std::thread::sleep(Duration::from_millis(100)); + Ok::<_, Infallible>(()) + }; - let task4 = tokio::spawn( - async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration3), - ); + let task1 = tokio::spawn(async_task().track(registration1)); + let task2 = tokio::spawn(blocking_task().track(registration2)); + let task3 = tokio::spawn(blocking_task().track(registration3.clone())); + let task4 = tokio::spawn(blocking_task().track(registration3)); - task1.await.unwrap().unwrap(); - task2.await.unwrap().unwrap(); - task3.await.unwrap().unwrap(); - task4.await.unwrap().unwrap(); + task1.await.unwrap().unwrap().unwrap(); + task2.await.unwrap().unwrap().unwrap(); + task3.await.unwrap().unwrap().unwrap(); + task4.await.unwrap().unwrap().unwrap(); let assert_fuzzy = |actual: usize, expected: std::time::Duration| { // Number of milliseconds of toleration @@ -710,8 +794,8 @@ mod tests { let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); - let task1 = tokio::spawn(futures::future::ready(()).track(registration.clone())); - task1.await.unwrap().unwrap(); + let task1 = tokio::spawn(ready_ok().track(registration.clone())); + task1.await.unwrap().unwrap().unwrap(); let tracked = registry.tracked(); assert_eq!(tracked.len(), 1); @@ -721,13 +805,138 @@ mod tests { let reclaimed: Vec<_> = registry.reclaim().collect(); assert_eq!(reclaimed.len(), 0); - let task2 = tokio::spawn(futures::future::ready(()).track(registration)); - task2.await.unwrap().unwrap(); + let task2 = tokio::spawn(ready_ok().track(registration)); + task2.await.unwrap().unwrap().unwrap(); let reclaimed: Vec<_> = registry.reclaim().collect(); assert_eq!(reclaimed.len(), 1); } + #[tokio::test] + async fn test_failure() { + let mut registry = TaskRegistry::new(); + let zero_clocks = |mut status: TaskStatus| { + match &mut status { + TaskStatus::Creating => {} + TaskStatus::Running { cpu_nanos, .. } => { + *cpu_nanos = 0; + } + TaskStatus::Complete { + wall_nanos, + cpu_nanos, + .. + } => { + *wall_nanos = 0; + *cpu_nanos = 0; + } + } + status + }; + + let (task, registration) = registry.register(()); + let (sender, receiver) = oneshot::channel(); + let handle = tokio::spawn(receiver.track(registration)); + + sender.send(()).unwrap(); + handle.await.unwrap().unwrap().unwrap(); + assert_eq!(task.get_status().result(), Some(TaskResult::Success)); + assert_eq!( + zero_clocks(task.get_status()), + TaskStatus::Complete { + total_count: 1, + success_count: 1, + error_count: 0, + cancelled_count: 0, + dropped_count: 0, + cpu_nanos: 0, + wall_nanos: 0 + } + ); + + let (task, registration) = registry.register(()); + let (sender, receiver) = oneshot::channel::<()>(); + let handle = tokio::spawn(receiver.track(registration)); + + std::mem::drop(sender); + handle.await.unwrap().unwrap().expect_err("expected error"); + assert_eq!(task.get_status().result(), Some(TaskResult::Error)); + assert_eq!( + zero_clocks(task.get_status()), + TaskStatus::Complete { + total_count: 1, + success_count: 0, + error_count: 1, + cancelled_count: 0, + dropped_count: 0, + cpu_nanos: 0, + wall_nanos: 0 + } + ); + + let (task, registration) = registry.register(()); + let handle = tokio::spawn(pending().track(registration)); + + task.cancel(); + handle.await.unwrap().expect_err("expected aborted"); + + assert_eq!(task.get_status().result(), Some(TaskResult::Cancelled)); + assert_eq!( + zero_clocks(task.get_status()), + TaskStatus::Complete { + total_count: 1, + success_count: 0, + error_count: 0, + cancelled_count: 1, + dropped_count: 0, + cpu_nanos: 0, + wall_nanos: 0 + } + ); + + let (task, registration) = registry.register(()); + std::mem::drop(pending().track(registration)); + + assert_eq!(task.get_status().result(), Some(TaskResult::Dropped)); + assert_eq!( + zero_clocks(task.get_status()), + TaskStatus::Complete { + total_count: 1, + success_count: 0, + error_count: 0, + cancelled_count: 0, + dropped_count: 1, + cpu_nanos: 0, + wall_nanos: 0 + } + ); + + let (task, registration) = registry.register(()); + let handle = tokio::spawn( + async move { + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + panic!("test"); + } + .inspect(|_output: &Result<(), Infallible>| {}) + .track(registration), + ); + + handle.await.unwrap_err(); + + assert_eq!(task.get_status().result(), Some(TaskResult::Dropped)); + assert_eq!( + zero_clocks(task.get_status()), + TaskStatus::Complete { + total_count: 1, + success_count: 0, + error_count: 0, + cancelled_count: 0, + dropped_count: 1, + cpu_nanos: 0, + wall_nanos: 0 + } + ); + } + #[tokio::test] async fn test_join() { use std::future::Future; @@ -737,20 +946,10 @@ mod tests { let (tracker, registration) = registry.register(()); let (s1, r1) = oneshot::channel(); - let task1 = tokio::spawn( - async move { - r1.await.unwrap(); - } - .track(registration.clone()), - ); + let task1 = tokio::spawn(r1.track(registration.clone())); let (s2, r2) = oneshot::channel(); - let task2 = tokio::spawn( - async move { - r2.await.unwrap(); - } - .track(registration.clone()), - ); + let task2 = tokio::spawn(r2.track(registration.clone())); // This executor goop is necessary to get a future into // a state where it is waiting on the Notify resource @@ -767,7 +966,7 @@ mod tests { assert!(matches!(tracker.get_status(), TaskStatus::Creating)); s1.send(()).unwrap(); - task1.await.unwrap().unwrap(); + task1.await.unwrap().unwrap().unwrap(); assert!(matches!(tracker.get_status(), TaskStatus::Creating)); @@ -775,7 +974,7 @@ mod tests { assert_eq!(poll, Poll::Pending); s2.send(()).unwrap(); - task2.await.unwrap().unwrap(); + task2.await.unwrap().unwrap().unwrap(); assert!(matches!(tracker.get_status(), TaskStatus::Creating)); diff --git a/tracker/src/task/future.rs b/tracker/src/task/future.rs index a003b16a4a..10c005c38d 100644 --- a/tracker/src/task/future.rs +++ b/tracker/src/task/future.rs @@ -11,7 +11,7 @@ use std::sync::Arc; /// An extension trait that provides `self.track(registration)` allowing /// associating this future with a `TrackerRegistration` -pub trait TrackedFutureExt: Future { +pub trait TrackedFutureExt: TryFuture { fn track(self, registration: TaskRegistration) -> TrackedFuture where Self: Sized, @@ -28,17 +28,18 @@ pub trait TrackedFutureExt: Future { // The future returned by CancellationToken::cancelled borrows the token // In order to ensure we get a future with a static lifetime // we box them up together and let async work its magic - let abort = Box::pin(async move { token.cancelled().await }); + let cancel = Box::pin(async move { token.cancelled().await }); TrackedFuture { inner: self, - abort, + cancel, tracker, + complete: false, } } } -impl TrackedFutureExt for T where T: Future {} +impl TrackedFutureExt for T where T: TryFuture {} /// The `Future` returned by `TrackedFutureExt::track()` /// Unregisters the future from the registered `TrackerRegistry` on drop @@ -46,36 +47,53 @@ impl TrackedFutureExt for T where T: Future {} /// `TrackerRegistry::terminate` #[pin_project(PinnedDrop)] #[allow(missing_debug_implementations)] -pub struct TrackedFuture { +pub struct TrackedFuture { #[pin] inner: F, #[pin] - abort: BoxFuture<'static, ()>, + cancel: BoxFuture<'static, ()>, tracker: Arc, + complete: bool, } -impl Future for TrackedFuture { - type Output = Result; +impl Future for TrackedFuture { + type Output = Result, future::Aborted>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.as_mut().project().abort.poll(cx).is_ready() { + assert!(!self.complete, "It is illegal to poll a completed future"); + if self.as_mut().project().cancel.poll(cx).is_ready() { + *self.as_mut().project().complete = true; + self.tracker + .cancelled_futures + .fetch_add(1, Ordering::Relaxed); return Poll::Ready(Err(future::Aborted {})); } let start = Instant::now(); - let poll = self.as_mut().project().inner.poll(cx); + let poll = self.as_mut().project().inner.try_poll(cx); let delta = start.elapsed().as_nanos() as usize; self.tracker.cpu_nanos.fetch_add(delta, Ordering::Relaxed); - poll.map(Ok) + match poll { + Poll::Pending => Poll::Pending, + Poll::Ready(v) => { + match v.is_ok() { + true => self.tracker.ok_futures.fetch_add(1, Ordering::Relaxed), + false => self.tracker.err_futures.fetch_add(1, Ordering::Relaxed), + }; + + *self.as_mut().project().complete = true; + Poll::Ready(Ok(v)) + } + } } } #[pinned_drop] -impl PinnedDrop for TrackedFuture { +impl PinnedDrop for TrackedFuture { fn drop(self: Pin<&mut Self>) { - let state = &self.project().tracker; + let state: &TrackerState = &self.project().tracker; let wall_nanos = state.start_instant.elapsed().as_nanos() as usize;