Merge branch 'main' into main

pull/24376/head
kodiakhq[bot] 2021-07-19 10:00:23 +00:00 committed by GitHub
commit eb00a72782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 402 additions and 239 deletions

View File

@ -115,16 +115,11 @@ impl Job {
pub enum OperationStatus { pub enum OperationStatus {
/// A task associated with the operation is running /// A task associated with the operation is running
Running, Running,
/// All tasks associated with the operation have finished /// All tasks associated with the operation have finished successfully
/// Success,
/// Note: This does not indicate success or failure only that
/// no tasks associated with the operation are running
Complete,
/// The operation was cancelled and no associated tasks are running /// The operation was cancelled and no associated tasks are running
Cancelled, Cancelled,
/// An operation error was returned /// An operation error was returned
///
/// Note: The tracker system currently will never return this
Errored, Errored,
} }
@ -135,10 +130,18 @@ pub enum OperationStatus {
pub struct Operation { pub struct Operation {
/// ID of the running operation /// ID of the running operation
pub id: usize, pub id: usize,
/// Number of subtasks for this operation // The total number of created tasks
pub task_count: u64, pub total_count: u64,
/// Number of pending tasks for this operation // The number of pending tasks
pub pending_count: u64, pub pending_count: u64,
// The number of tasks that completed successfully
pub success_count: u64,
// The number of tasks that returned an error
pub error_count: u64,
// The number of tasks that were cancelled
pub cancelled_count: u64,
// The number of tasks that did not run to completion (e.g. panic)
pub dropped_count: u64,
/// Wall time spent executing this operation /// Wall time spent executing this operation
pub wall_time: std::time::Duration, pub wall_time: std::time::Duration,
/// CPU time spent executing this operation /// CPU time spent executing this operation

View File

@ -8,12 +8,24 @@ message OperationMetadata {
// How many nanoseconds has it been since the job was submitted // How many nanoseconds has it been since the job was submitted
uint64 wall_nanos = 2; uint64 wall_nanos = 2;
// How many total tasks does this job have currently // The total number of created tasks
uint64 task_count = 3; uint64 total_count = 3;
// How many tasks for this job are still pending // The number of pending tasks
uint64 pending_count = 4; uint64 pending_count = 4;
// The number of tasks that completed successfully
uint64 success_count = 13;
// The number of tasks that returned an error
uint64 error_count = 14;
// The number of tasks that were cancelled
uint64 cancelled_count = 15;
// The number of tasks that did not run to completion (e.g. panic)
uint64 dropped_count = 16;
// What kind of job is it? // What kind of job is it?
oneof job { oneof job {
Dummy dummy = 5; Dummy dummy = 5;

View File

@ -157,7 +157,7 @@ impl TryFrom<longrunning::Operation> for data_types::job::Operation {
let status = match &operation.result { let status = match &operation.result {
None => OperationStatus::Running, None => OperationStatus::Running,
Some(longrunning::operation::Result::Response(_)) => OperationStatus::Complete, Some(longrunning::operation::Result::Response(_)) => OperationStatus::Success,
Some(longrunning::operation::Result::Error(status)) => { Some(longrunning::operation::Result::Error(status)) => {
if status.code == tonic::Code::Cancelled as i32 { if status.code == tonic::Code::Cancelled as i32 {
OperationStatus::Cancelled OperationStatus::Cancelled
@ -169,8 +169,12 @@ impl TryFrom<longrunning::Operation> for data_types::job::Operation {
Ok(Self { Ok(Self {
id: operation.name.parse().field("name")?, id: operation.name.parse().field("name")?,
task_count: meta.task_count, total_count: meta.total_count,
pending_count: meta.pending_count, pending_count: meta.pending_count,
success_count: meta.success_count,
error_count: meta.error_count,
cancelled_count: meta.cancelled_count,
dropped_count: meta.dropped_count,
wall_time: std::time::Duration::from_nanos(meta.wall_nanos), wall_time: std::time::Duration::from_nanos(meta.wall_nanos),
cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos), cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos),
job: meta.job.map(Into::into), job: meta.job.map(Into::into),

View File

@ -9,6 +9,7 @@ use tracker::{TaskRegistration, TrackedFutureExt};
use futures::Future; use futures::Future;
use observability_deps::tracing::warn; use observability_deps::tracing::warn;
use std::convert::Infallible;
/// The type of thing that the dedicated executor runs /// The type of thing that the dedicated executor runs
type Task = Pin<Box<dyn Future<Output = ()> + Send>>; type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
@ -83,7 +84,13 @@ impl DedicatedExecutor {
let registration = TaskRegistration::new(); let registration = TaskRegistration::new();
while let Ok(task) = rx.recv() { while let Ok(task) = rx.recv() {
tokio::task::spawn(task.track(registration.clone())); tokio::task::spawn(
async move {
task.await;
Ok::<_, Infallible>(())
}
.track(registration.clone()),
);
} }
// Wait for all tasks to finish // Wait for all tasks to finish

View File

@ -1,13 +1,7 @@
//! Tests for the Influx gRPC queries //! Tests for the Influx gRPC queries
use std::{convert::TryFrom, num::NonZeroU32};
use crate::scenarios::*; use crate::scenarios::*;
use data_types::database_rules::LifecycleRules; use server::{db::test_helpers::write_lp, utils::make_db};
use server::{
db::test_helpers::write_lp,
utils::{make_db, TestDb},
};
use arrow::util::pretty::pretty_format_batches; use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait; use async_trait::async_trait;
@ -194,14 +188,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
db, db,
}; };
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let data = lp_lines.join("\n"); let data = lp_lines.join("\n");
write_lp(&db, &data).await; write_lp(&db, &data).await;
// roll over and load chunks into both RUB and OS // roll over and load chunks into both RUB and OS

View File

@ -1,12 +1,9 @@
//! This module contains testing scenarios for Db //! This module contains testing scenarios for Db
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom;
use std::num::NonZeroU32;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use data_types::database_rules::LifecycleRules;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
#[allow(unused_imports, dead_code, unused_macros)] #[allow(unused_imports, dead_code, unused_macros)]
@ -16,7 +13,6 @@ use async_trait::async_trait;
use server::utils::{ use server::utils::{
count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db, count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db,
TestDb,
}; };
use server::{db::test_helpers::write_lp, Db}; use server::{db::test_helpers::write_lp, Db};
@ -138,14 +134,7 @@ impl DbSetup for NoData {
// Scenario 4: the database has had data loaded into RB & Object Store and then deleted // Scenario 4: the database has had data loaded into RB & Object Store and then deleted
// //
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let data = "cpu,region=west user=23.2 100"; let data = "cpu,region=west user=23.2 100";
write_lp(&db, data).await; write_lp(&db, data).await;
// move data out of open chunk // move data out of open chunk
@ -564,14 +553,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
async fn make(&self) -> Vec<DbScenario> { async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00"; let partition_key = "1970-01-01T00";
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
write_lp( write_lp(
&db, &db,
@ -771,14 +753,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
}; };
// Scenario 4: One closed chunk in both RUb and OS // Scenario 4: One closed chunk in both RUb and OS
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
@ -802,14 +777,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
}; };
// Scenario 5: One closed chunk in OS only // Scenario 5: One closed chunk in OS only
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
@ -914,14 +882,7 @@ pub async fn make_two_chunk_scenarios(
}; };
// in 2 read buffer chunks that also loaded into object store // in 2 read buffer chunks that also loaded into object store
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let table_names = write_lp(&db, data1).await; let table_names = write_lp(&db, data1).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
@ -960,14 +921,7 @@ pub async fn make_two_chunk_scenarios(
}; };
// Scenario 6: Two closed chunk in OS only // Scenario 6: Two closed chunk in OS only
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let table_names = write_lp(&db, data1).await; let table_names = write_lp(&db, data1).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
@ -1077,14 +1031,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
}; };
// Scenario 2: One closed chunk in Parquet only // Scenario 2: One closed chunk in Parquet only
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)

View File

@ -1392,13 +1392,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn metrics_during_rollover() { async fn metrics_during_rollover() {
let test_db = TestDb::builder() let test_db = make_db().await;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await;
let db = test_db.db; let db = test_db.db;
write_lp(db.as_ref(), "cpu bar=1 10").await; write_lp(db.as_ref(), "cpu bar=1 10").await;
@ -2454,14 +2448,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn chunk_summaries() { async fn chunk_summaries() {
// Test that chunk id listing is hooked up // Test that chunk id listing is hooked up
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
// get three chunks: one open, one closed in mb and one close in rb // get three chunks: one open, one closed in mb and one close in rb
write_lp(&db, "cpu bar=1 1").await; write_lp(&db, "cpu bar=1 1").await;
@ -2541,14 +2528,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn partition_summaries() { async fn partition_summaries() {
// Test that chunk id listing is hooked up // Test that chunk id listing is hooked up
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
write_lp(&db, "cpu bar=1 1").await; write_lp(&db, "cpu bar=1 1").await;
let chunk_id = db let chunk_id = db
@ -2741,14 +2721,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn write_chunk_to_object_store_in_background() { async fn write_chunk_to_object_store_in_background() {
// Test that data can be written to object store using a background task // Test that data can be written to object store using a background task
let db = TestDb::builder() let db = make_db().await.db;
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
.db;
// create MB partition // create MB partition
write_lp(db.as_ref(), "cpu bar=1 10").await; write_lp(db.as_ref(), "cpu bar=1 10").await;

View File

@ -68,7 +68,7 @@
clippy::future_not_send clippy::future_not_send
)] )]
use std::convert::TryInto; use std::convert::{Infallible, TryInto};
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -872,8 +872,11 @@ where
for duration in nanos { for duration in nanos {
tokio::spawn( tokio::spawn(
tokio::time::sleep(tokio::time::Duration::from_nanos(duration)) async move {
.track(registration.clone()), tokio::time::sleep(tokio::time::Duration::from_nanos(duration)).await;
Ok::<_, Infallible>(())
}
.track(registration.clone()),
); );
} }

View File

@ -1,4 +1,4 @@
use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration}; use std::{borrow::Cow, convert::TryFrom, num::NonZeroU32, sync::Arc, time::Duration};
use data_types::{ use data_types::{
chunk_metadata::{ChunkStorage, ChunkSummary}, chunk_metadata::{ChunkStorage, ChunkSummary},
@ -77,7 +77,11 @@ impl TestDbBuilder {
.worker_cleanup_avg_sleep .worker_cleanup_avg_sleep
.unwrap_or_else(|| Duration::from_secs(1)); .unwrap_or_else(|| Duration::from_secs(1));
rules.lifecycle_rules = self.lifecycle_rules.unwrap_or_default(); // default to quick lifecycle rules for faster tests
rules.lifecycle_rules = self.lifecycle_rules.unwrap_or_else(|| LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
});
// set partion template // set partion template
if let Some(partition_template) = self.partition_template { if let Some(partition_template) = self.partition_template {

View File

@ -18,7 +18,7 @@ use generated_types::{
influxdata::iox::management::v1 as management, influxdata::iox::management::v1 as management,
protobuf_type_url, protobuf_type_url,
}; };
use tracker::{TaskId, TaskStatus, TaskTracker}; use tracker::{TaskId, TaskResult, TaskStatus, TaskTracker};
use server::{ConnectionManager, Server}; use server::{ConnectionManager, Server};
use std::convert::TryInto; use std::convert::TryInto;
@ -30,48 +30,44 @@ struct OperationsService<M: ConnectionManager> {
pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Status> { pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Status> {
let id = tracker.id(); let id = tracker.id();
let is_cancelled = tracker.is_cancelled();
let status = tracker.get_status(); let status = tracker.get_status();
let result = status.result();
let (operation_metadata, is_complete) = match status { let operation_metadata = match status {
TaskStatus::Creating => { TaskStatus::Creating => management::OperationMetadata {
let metadata = management::OperationMetadata { job: Some(tracker.metadata().clone().into()),
job: Some(tracker.metadata().clone().into()), ..Default::default()
..Default::default() },
};
(metadata, false)
}
TaskStatus::Running { TaskStatus::Running {
total_count, total_count,
pending_count, pending_count,
cpu_nanos, cpu_nanos,
} => { } => management::OperationMetadata {
let metadata = management::OperationMetadata { cpu_nanos: cpu_nanos as _,
cpu_nanos: cpu_nanos as _, total_count: total_count as _,
task_count: total_count as _, pending_count: pending_count as _,
pending_count: pending_count as _, job: Some(tracker.metadata().clone().into()),
job: Some(tracker.metadata().clone().into()), ..Default::default()
..Default::default() },
};
(metadata, false)
}
TaskStatus::Complete { TaskStatus::Complete {
total_count, total_count,
success_count,
error_count,
cancelled_count,
dropped_count,
cpu_nanos, cpu_nanos,
wall_nanos, wall_nanos,
} => { } => management::OperationMetadata {
let metadata = management::OperationMetadata { cpu_nanos: cpu_nanos as _,
cpu_nanos: cpu_nanos as _, total_count: total_count as _,
task_count: total_count as _, success_count: success_count as _,
wall_nanos: wall_nanos as _, error_count: error_count as _,
job: Some(tracker.metadata().clone().into()), cancelled_count: cancelled_count as _,
..Default::default() dropped_count: dropped_count as _,
}; wall_nanos: wall_nanos as _,
job: Some(tracker.metadata().clone().into()),
(metadata, true) ..Default::default()
} },
}; };
let mut buffer = BytesMut::new(); let mut buffer = BytesMut::new();
@ -85,25 +81,33 @@ pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Sta
value: buffer.freeze(), value: buffer.freeze(),
}; };
let result = match (is_complete, is_cancelled) { let result = match result {
(true, true) => Some(operation::Result::Error(Status { Some(TaskResult::Success) => Some(operation::Result::Response(Any {
type_url: "type.googleapis.com/google.protobuf.Empty".to_string(),
value: Default::default(),
})),
Some(TaskResult::Cancelled) => Some(operation::Result::Error(Status {
code: tonic::Code::Cancelled as _, code: tonic::Code::Cancelled as _,
message: "Job cancelled".to_string(), message: "Job cancelled".to_string(),
details: vec![], details: vec![],
})), })),
Some(TaskResult::Dropped) => Some(operation::Result::Error(Status {
(true, false) => Some(operation::Result::Response(Any { code: tonic::Code::Internal as _,
type_url: "type.googleapis.com/google.protobuf.Empty".to_string(), message: "Job did not run to completion, possible panic".to_string(),
value: Default::default(), // TODO: Verify this is correct details: vec![],
})), })),
Some(TaskResult::Error) => Some(operation::Result::Error(Status {
_ => None, code: tonic::Code::Internal as _,
message: "Job returned an error".to_string(),
details: vec![],
})),
None => None,
}; };
Ok(Operation { Ok(Operation {
name: id.to_string(), name: id.to_string(),
metadata: Some(metadata), metadata: Some(metadata),
done: is_complete, done: result.is_some(),
result, result,
}) })
} }

View File

@ -40,7 +40,7 @@ async fn test_operations() {
let job = meta.job.expect("expected a job"); let job = meta.job.expect("expected a job");
assert_eq!(meta.task_count, 2); assert_eq!(meta.total_count, 2);
assert_eq!(meta.pending_count, 1); assert_eq!(meta.pending_count, 1);
assert_eq!(job, operation_metadata::Job::Dummy(Dummy { nanos })); assert_eq!(job, operation_metadata::Job::Dummy(Dummy { nanos }));
@ -76,7 +76,8 @@ async fn test_operations() {
assert!(meta.wall_nanos > 0); assert!(meta.wall_nanos > 0);
assert!(meta.cpu_nanos > 0); assert!(meta.cpu_nanos > 0);
assert_eq!(meta.pending_count, 0); assert_eq!(meta.pending_count, 0);
assert_eq!(meta.task_count, 2); assert_eq!(meta.total_count, 2);
assert_eq!(meta.cancelled_count, 1);
match waited.result { match waited.result {
Some(operations::generated_types::operation::Result::Error(status)) => { Some(operations::generated_types::operation::Result::Error(status)) => {

View File

@ -24,7 +24,7 @@ async fn test_start_stop() {
) )
.expect("expected JSON output"); .expect("expected JSON output");
assert_eq!(stdout.task_count, 1); assert_eq!(stdout.total_count, 1);
match stdout.job { match stdout.job {
Some(Job::Dummy { nanos }) => assert_eq!(nanos, vec![duration]), Some(Job::Dummy { nanos }) => assert_eq!(nanos, vec![duration]),
_ => panic!("expected dummy job got {:?}", stdout.job), _ => panic!("expected dummy job got {:?}", stdout.job),
@ -82,7 +82,8 @@ async fn test_start_stop() {
.expect("expected JSON output"); .expect("expected JSON output");
assert_eq!(completed.pending_count, 0); assert_eq!(completed.pending_count, 0);
assert_eq!(completed.task_count, 1); assert_eq!(completed.total_count, 1);
assert_eq!(completed.cancelled_count, 1);
assert_eq!(completed.status, OperationStatus::Cancelled); assert_eq!(completed.status, OperationStatus::Cancelled);
assert_eq!(&completed.job, &operations[0].job) assert_eq!(&completed.job, &operations[0].job)
} }

View File

@ -111,11 +111,28 @@ struct TrackerState {
pending_futures: AtomicUsize, pending_futures: AtomicUsize,
pending_registrations: AtomicUsize, pending_registrations: AtomicUsize,
ok_futures: AtomicUsize,
err_futures: AtomicUsize,
cancelled_futures: AtomicUsize,
notify: Notify, notify: Notify,
} }
/// Returns a summary of the task execution
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TaskResult {
/// All futures completed successfully
Success,
/// Some futures were cancelled, and none were dropped or errored
Cancelled,
/// Some futures were dropped, and none errored
Dropped,
/// Some futures returned an error
Error,
}
/// The status of the tracked task /// The status of the tracked task
#[derive(Debug, Clone)] #[derive(Debug, Clone, Eq, PartialEq)]
pub enum TaskStatus { pub enum TaskStatus {
/// More futures can be registered /// More futures can be registered
Creating, Creating,
@ -141,6 +158,14 @@ pub enum TaskStatus {
Complete { Complete {
/// The number of created futures /// The number of created futures
total_count: usize, total_count: usize,
/// The number of futures that completed successfully
success_count: usize,
/// The number of futures that returned an error
error_count: usize,
/// The number of futures that were aborted
cancelled_count: usize,
/// The number of futures that were dropped without running to completion (e.g. panic)
dropped_count: usize,
/// The total amount of CPU time spent executing the futures /// The total amount of CPU time spent executing the futures
cpu_nanos: usize, cpu_nanos: usize,
/// The number of nanoseconds between tracker registration and /// The number of nanoseconds between tracker registration and
@ -169,7 +194,7 @@ impl TaskStatus {
} }
} }
/// If the job has competed, returns the total amount of wall clock time /// If the job has completed, returns the total amount of wall clock time
/// spent executing futures /// spent executing futures
pub fn wall_nanos(&self) -> Option<usize> { pub fn wall_nanos(&self) -> Option<usize> {
match self { match self {
@ -178,6 +203,34 @@ impl TaskStatus {
Self::Complete { wall_nanos, .. } => Some(*wall_nanos), Self::Complete { wall_nanos, .. } => Some(*wall_nanos),
} }
} }
/// Returns the result of the job if it has completed, otherwise None
pub fn result(&self) -> Option<TaskResult> {
match self {
TaskStatus::Creating => None,
TaskStatus::Running { .. } => None,
TaskStatus::Complete {
total_count,
success_count,
error_count,
dropped_count,
cancelled_count,
..
} => {
if *error_count != 0 {
Some(TaskResult::Error)
} else if *dropped_count != 0 {
Some(TaskResult::Dropped)
} else if *cancelled_count != 0 {
Some(TaskResult::Cancelled)
} else {
// Sanity check
assert_eq!(total_count, success_count);
Some(TaskResult::Success)
}
}
}
}
} }
/// A Tracker can be used to monitor/cancel/wait for a set of associated futures /// A Tracker can be used to monitor/cancel/wait for a set of associated futures
@ -279,11 +332,29 @@ where
pending_count: self.state.pending_futures.load(Ordering::Relaxed), pending_count: self.state.pending_futures.load(Ordering::Relaxed),
cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed),
}, },
(true, true) => TaskStatus::Complete { (true, true) => {
total_count: self.state.created_futures.load(Ordering::Relaxed), let total_count = self.state.created_futures.load(Ordering::Relaxed);
cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), let success_count = self.state.ok_futures.load(Ordering::Relaxed);
wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed), let error_count = self.state.err_futures.load(Ordering::Relaxed);
}, let cancelled_count = self.state.cancelled_futures.load(Ordering::Relaxed);
// Failure of this would imply a future reported its completion status multiple
// times or a future was created without incrementing created_futures.
// Both of these should be impossible
let dropped_count = total_count
.checked_sub(success_count + error_count + cancelled_count)
.expect("invalid tracker state");
TaskStatus::Complete {
total_count,
success_count,
error_count,
cancelled_count,
dropped_count,
cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed),
wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed),
}
}
} }
} }
@ -352,6 +423,9 @@ impl Default for TaskRegistration {
created_futures: AtomicUsize::new(0), created_futures: AtomicUsize::new(0),
pending_futures: AtomicUsize::new(0), pending_futures: AtomicUsize::new(0),
pending_registrations: AtomicUsize::new(1), pending_registrations: AtomicUsize::new(1),
ok_futures: AtomicUsize::new(0),
err_futures: AtomicUsize::new(0),
cancelled_futures: AtomicUsize::new(0),
notify: Notify::new(), notify: Notify::new(),
}); });
@ -401,8 +475,18 @@ mod tests {
use std::time::Duration; use std::time::Duration;
use super::*; use super::*;
use futures::FutureExt;
use std::convert::Infallible;
use tokio::sync::oneshot; use tokio::sync::oneshot;
fn pending() -> futures::future::Pending<Result<(), Infallible>> {
futures::future::pending()
}
fn ready_ok() -> futures::future::Ready<Result<(), Infallible>> {
futures::future::ready(Ok(()))
}
#[tokio::test] #[tokio::test]
async fn test_lifecycle() { async fn test_lifecycle() {
let (sender, receive) = oneshot::channel(); let (sender, receive) = oneshot::channel();
@ -451,7 +535,7 @@ mod tests {
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
{ {
let f = futures::future::pending::<()>().track(registration); let f = pending().track(registration);
assert_eq!(registry.running().len(), 1); assert_eq!(registry.running().len(), 1);
@ -467,9 +551,9 @@ mod tests {
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
{ {
let f = futures::future::pending::<()>().track(registration.clone()); let f = pending().track(registration.clone());
{ {
let f = futures::future::pending::<()>().track(registration); let f = pending().track(registration);
assert_eq!(registry.running().len(), 1); assert_eq!(registry.running().len(), 1);
std::mem::drop(f); std::mem::drop(f);
} }
@ -485,7 +569,7 @@ mod tests {
let mut registry = TaskRegistry::new(); let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
let task = tokio::spawn(futures::future::pending::<()>().track(registration)); let task = tokio::spawn(pending().track(registration));
let tracked = registry.running(); let tracked = registry.running();
assert_eq!(tracked.len(), 1); assert_eq!(tracked.len(), 1);
@ -503,7 +587,7 @@ mod tests {
let (tracker, registration) = registry.register(()); let (tracker, registration) = registry.register(());
tracker.cancel(); tracker.cancel();
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration)); let task1 = tokio::spawn(pending().track(registration));
let result1 = task1.await.unwrap(); let result1 = task1.await.unwrap();
assert!(result1.is_err()); assert!(result1.is_err());
@ -515,8 +599,8 @@ mod tests {
let mut registry = TaskRegistry::new(); let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration.clone())); let task1 = tokio::spawn(pending().track(registration.clone()));
let task2 = tokio::spawn(futures::future::pending::<()>().track(registration)); let task2 = tokio::spawn(pending().track(registration));
let tracked = registry.running(); let tracked = registry.running();
assert_eq!(tracked.len(), 1); assert_eq!(tracked.len(), 1);
@ -539,11 +623,11 @@ mod tests {
let (_, registration2) = registry.register(2); let (_, registration2) = registry.register(2);
let (_, registration3) = registry.register(3); let (_, registration3) = registry.register(3);
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration1.clone())); let task1 = tokio::spawn(pending().track(registration1.clone()));
let task2 = tokio::spawn(futures::future::pending::<()>().track(registration1)); let task2 = tokio::spawn(pending().track(registration1));
let task3 = tokio::spawn(futures::future::ready(()).track(registration2.clone())); let task3 = tokio::spawn(ready_ok().track(registration2.clone()));
let task4 = tokio::spawn(futures::future::pending::<()>().track(registration2)); let task4 = tokio::spawn(pending().track(registration2));
let task5 = tokio::spawn(futures::future::pending::<()>().track(registration3)); let task5 = tokio::spawn(pending().track(registration3));
let running = sorted(registry.running()); let running = sorted(registry.running());
let tracked = sorted(registry.tracked()); let tracked = sorted(registry.tracked());
@ -637,25 +721,25 @@ mod tests {
let (tracker2, registration2) = registry.register(2); let (tracker2, registration2) = registry.register(2);
let (tracker3, registration3) = registry.register(3); let (tracker3, registration3) = registry.register(3);
let task1 = let async_task = || async move {
tokio::spawn(tokio::time::sleep(Duration::from_millis(100)).track(registration1)); tokio::time::sleep(Duration::from_millis(100)).await;
let task2 = tokio::spawn( Ok::<_, Infallible>(())
async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration2), };
);
let task3 = tokio::spawn( let blocking_task = || async move {
async move { std::thread::sleep(Duration::from_millis(100)) } std::thread::sleep(Duration::from_millis(100));
.track(registration3.clone()), Ok::<_, Infallible>(())
); };
let task4 = tokio::spawn( let task1 = tokio::spawn(async_task().track(registration1));
async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration3), let task2 = tokio::spawn(blocking_task().track(registration2));
); let task3 = tokio::spawn(blocking_task().track(registration3.clone()));
let task4 = tokio::spawn(blocking_task().track(registration3));
task1.await.unwrap().unwrap(); task1.await.unwrap().unwrap().unwrap();
task2.await.unwrap().unwrap(); task2.await.unwrap().unwrap().unwrap();
task3.await.unwrap().unwrap(); task3.await.unwrap().unwrap().unwrap();
task4.await.unwrap().unwrap(); task4.await.unwrap().unwrap().unwrap();
let assert_fuzzy = |actual: usize, expected: std::time::Duration| { let assert_fuzzy = |actual: usize, expected: std::time::Duration| {
// Number of milliseconds of toleration // Number of milliseconds of toleration
@ -710,8 +794,8 @@ mod tests {
let mut registry = TaskRegistry::new(); let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
let task1 = tokio::spawn(futures::future::ready(()).track(registration.clone())); let task1 = tokio::spawn(ready_ok().track(registration.clone()));
task1.await.unwrap().unwrap(); task1.await.unwrap().unwrap().unwrap();
let tracked = registry.tracked(); let tracked = registry.tracked();
assert_eq!(tracked.len(), 1); assert_eq!(tracked.len(), 1);
@ -721,13 +805,138 @@ mod tests {
let reclaimed: Vec<_> = registry.reclaim().collect(); let reclaimed: Vec<_> = registry.reclaim().collect();
assert_eq!(reclaimed.len(), 0); assert_eq!(reclaimed.len(), 0);
let task2 = tokio::spawn(futures::future::ready(()).track(registration)); let task2 = tokio::spawn(ready_ok().track(registration));
task2.await.unwrap().unwrap(); task2.await.unwrap().unwrap().unwrap();
let reclaimed: Vec<_> = registry.reclaim().collect(); let reclaimed: Vec<_> = registry.reclaim().collect();
assert_eq!(reclaimed.len(), 1); assert_eq!(reclaimed.len(), 1);
} }
#[tokio::test]
async fn test_failure() {
let mut registry = TaskRegistry::new();
let zero_clocks = |mut status: TaskStatus| {
match &mut status {
TaskStatus::Creating => {}
TaskStatus::Running { cpu_nanos, .. } => {
*cpu_nanos = 0;
}
TaskStatus::Complete {
wall_nanos,
cpu_nanos,
..
} => {
*wall_nanos = 0;
*cpu_nanos = 0;
}
}
status
};
let (task, registration) = registry.register(());
let (sender, receiver) = oneshot::channel();
let handle = tokio::spawn(receiver.track(registration));
sender.send(()).unwrap();
handle.await.unwrap().unwrap().unwrap();
assert_eq!(task.get_status().result(), Some(TaskResult::Success));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 1,
error_count: 0,
cancelled_count: 0,
dropped_count: 0,
cpu_nanos: 0,
wall_nanos: 0
}
);
let (task, registration) = registry.register(());
let (sender, receiver) = oneshot::channel::<()>();
let handle = tokio::spawn(receiver.track(registration));
std::mem::drop(sender);
handle.await.unwrap().unwrap().expect_err("expected error");
assert_eq!(task.get_status().result(), Some(TaskResult::Error));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 0,
error_count: 1,
cancelled_count: 0,
dropped_count: 0,
cpu_nanos: 0,
wall_nanos: 0
}
);
let (task, registration) = registry.register(());
let handle = tokio::spawn(pending().track(registration));
task.cancel();
handle.await.unwrap().expect_err("expected aborted");
assert_eq!(task.get_status().result(), Some(TaskResult::Cancelled));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 0,
error_count: 0,
cancelled_count: 1,
dropped_count: 0,
cpu_nanos: 0,
wall_nanos: 0
}
);
let (task, registration) = registry.register(());
std::mem::drop(pending().track(registration));
assert_eq!(task.get_status().result(), Some(TaskResult::Dropped));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 0,
error_count: 0,
cancelled_count: 0,
dropped_count: 1,
cpu_nanos: 0,
wall_nanos: 0
}
);
let (task, registration) = registry.register(());
let handle = tokio::spawn(
async move {
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
panic!("test");
}
.inspect(|_output: &Result<(), Infallible>| {})
.track(registration),
);
handle.await.unwrap_err();
assert_eq!(task.get_status().result(), Some(TaskResult::Dropped));
assert_eq!(
zero_clocks(task.get_status()),
TaskStatus::Complete {
total_count: 1,
success_count: 0,
error_count: 0,
cancelled_count: 0,
dropped_count: 1,
cpu_nanos: 0,
wall_nanos: 0
}
);
}
#[tokio::test] #[tokio::test]
async fn test_join() { async fn test_join() {
use std::future::Future; use std::future::Future;
@ -737,20 +946,10 @@ mod tests {
let (tracker, registration) = registry.register(()); let (tracker, registration) = registry.register(());
let (s1, r1) = oneshot::channel(); let (s1, r1) = oneshot::channel();
let task1 = tokio::spawn( let task1 = tokio::spawn(r1.track(registration.clone()));
async move {
r1.await.unwrap();
}
.track(registration.clone()),
);
let (s2, r2) = oneshot::channel(); let (s2, r2) = oneshot::channel();
let task2 = tokio::spawn( let task2 = tokio::spawn(r2.track(registration.clone()));
async move {
r2.await.unwrap();
}
.track(registration.clone()),
);
// This executor goop is necessary to get a future into // This executor goop is necessary to get a future into
// a state where it is waiting on the Notify resource // a state where it is waiting on the Notify resource
@ -767,7 +966,7 @@ mod tests {
assert!(matches!(tracker.get_status(), TaskStatus::Creating)); assert!(matches!(tracker.get_status(), TaskStatus::Creating));
s1.send(()).unwrap(); s1.send(()).unwrap();
task1.await.unwrap().unwrap(); task1.await.unwrap().unwrap().unwrap();
assert!(matches!(tracker.get_status(), TaskStatus::Creating)); assert!(matches!(tracker.get_status(), TaskStatus::Creating));
@ -775,7 +974,7 @@ mod tests {
assert_eq!(poll, Poll::Pending); assert_eq!(poll, Poll::Pending);
s2.send(()).unwrap(); s2.send(()).unwrap();
task2.await.unwrap().unwrap(); task2.await.unwrap().unwrap().unwrap();
assert!(matches!(tracker.get_status(), TaskStatus::Creating)); assert!(matches!(tracker.get_status(), TaskStatus::Creating));

View File

@ -11,7 +11,7 @@ use std::sync::Arc;
/// An extension trait that provides `self.track(registration)` allowing /// An extension trait that provides `self.track(registration)` allowing
/// associating this future with a `TrackerRegistration` /// associating this future with a `TrackerRegistration`
pub trait TrackedFutureExt: Future { pub trait TrackedFutureExt: TryFuture {
fn track(self, registration: TaskRegistration) -> TrackedFuture<Self> fn track(self, registration: TaskRegistration) -> TrackedFuture<Self>
where where
Self: Sized, Self: Sized,
@ -28,17 +28,18 @@ pub trait TrackedFutureExt: Future {
// The future returned by CancellationToken::cancelled borrows the token // The future returned by CancellationToken::cancelled borrows the token
// In order to ensure we get a future with a static lifetime // In order to ensure we get a future with a static lifetime
// we box them up together and let async work its magic // we box them up together and let async work its magic
let abort = Box::pin(async move { token.cancelled().await }); let cancel = Box::pin(async move { token.cancelled().await });
TrackedFuture { TrackedFuture {
inner: self, inner: self,
abort, cancel,
tracker, tracker,
complete: false,
} }
} }
} }
impl<T: ?Sized> TrackedFutureExt for T where T: Future {} impl<T: ?Sized> TrackedFutureExt for T where T: TryFuture {}
/// The `Future` returned by `TrackedFutureExt::track()` /// The `Future` returned by `TrackedFutureExt::track()`
/// Unregisters the future from the registered `TrackerRegistry` on drop /// Unregisters the future from the registered `TrackerRegistry` on drop
@ -46,36 +47,53 @@ impl<T: ?Sized> TrackedFutureExt for T where T: Future {}
/// `TrackerRegistry::terminate` /// `TrackerRegistry::terminate`
#[pin_project(PinnedDrop)] #[pin_project(PinnedDrop)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct TrackedFuture<F: Future> { pub struct TrackedFuture<F: TryFuture> {
#[pin] #[pin]
inner: F, inner: F,
#[pin] #[pin]
abort: BoxFuture<'static, ()>, cancel: BoxFuture<'static, ()>,
tracker: Arc<TrackerState>, tracker: Arc<TrackerState>,
complete: bool,
} }
impl<F: Future> Future for TrackedFuture<F> { impl<F: TryFuture> Future for TrackedFuture<F> {
type Output = Result<F::Output, future::Aborted>; type Output = Result<Result<F::Ok, F::Error>, future::Aborted>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.as_mut().project().abort.poll(cx).is_ready() { assert!(!self.complete, "It is illegal to poll a completed future");
if self.as_mut().project().cancel.poll(cx).is_ready() {
*self.as_mut().project().complete = true;
self.tracker
.cancelled_futures
.fetch_add(1, Ordering::Relaxed);
return Poll::Ready(Err(future::Aborted {})); return Poll::Ready(Err(future::Aborted {}));
} }
let start = Instant::now(); let start = Instant::now();
let poll = self.as_mut().project().inner.poll(cx); let poll = self.as_mut().project().inner.try_poll(cx);
let delta = start.elapsed().as_nanos() as usize; let delta = start.elapsed().as_nanos() as usize;
self.tracker.cpu_nanos.fetch_add(delta, Ordering::Relaxed); self.tracker.cpu_nanos.fetch_add(delta, Ordering::Relaxed);
poll.map(Ok) match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(v) => {
match v.is_ok() {
true => self.tracker.ok_futures.fetch_add(1, Ordering::Relaxed),
false => self.tracker.err_futures.fetch_add(1, Ordering::Relaxed),
};
*self.as_mut().project().complete = true;
Poll::Ready(Ok(v))
}
}
} }
} }
#[pinned_drop] #[pinned_drop]
impl<F: Future> PinnedDrop for TrackedFuture<F> { impl<F: TryFuture> PinnedDrop for TrackedFuture<F> {
fn drop(self: Pin<&mut Self>) { fn drop(self: Pin<&mut Self>) {
let state = &self.project().tracker; let state: &TrackerState = &self.project().tracker;
let wall_nanos = state.start_instant.elapsed().as_nanos() as usize; let wall_nanos = state.start_instant.elapsed().as_nanos() as usize;