From 348f38c2c95247739fcf1300a50a55678022b1f9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 16 Apr 2021 16:48:20 -0400 Subject: [PATCH] feat: add status column --- server/src/db/system_tables.rs | 17 ++++++++--------- server/src/query_tests/sql.rs | 14 +++++++------- tracker/src/task.rs | 21 +++++++++++++++++++++ 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index cc9abe7283..133f45c4d8 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -23,7 +23,7 @@ use arrow_deps::{ use data_types::{ chunk::ChunkSummary, error::ErrorLogger, job::Job, partition_metadata::PartitionSummary, }; -use tracker::{TaskStatus, TaskTracker}; +use tracker::TaskTracker; use super::catalog::Catalog; use crate::JobRegistry; @@ -208,14 +208,11 @@ fn from_partition_summaries(partitions: Vec) -> Result>) -> Result { let ids = StringArray::from_iter(jobs.iter().map(|job| Some(job.id().to_string()))); - - let cpu_time_used = - Time64NanosecondArray::from_iter(jobs.iter().map(|job| match job.get_status() { - TaskStatus::Creating => None, - TaskStatus::Running { cpu_nanos, .. } => Some(cpu_nanos as i64), - TaskStatus::Complete { cpu_nanos, .. } => Some(cpu_nanos as i64), - })); - + let statuses = StringArray::from_iter(jobs.iter().map(|job| Some(job.get_status().name()))); + let cpu_time_used = Time64NanosecondArray::from_iter( + jobs.iter() + .map(|job| job.get_status().cpu_nanos().map(|n| n as i64)), + ); let db_names = StringArray::from_iter(jobs.iter().map(|job| job.metadata().db_name())); let partition_keys = StringArray::from_iter(jobs.iter().map(|job| job.metadata().partition_key())); @@ -225,6 +222,7 @@ fn from_task_trackers(jobs: Vec>) -> Result { let schema = Schema::new(vec![ Field::new("id", ids.data_type().clone(), false), + Field::new("status", statuses.data_type().clone(), false), Field::new("cpu_time_used", cpu_time_used.data_type().clone(), true), Field::new("db_name", db_names.data_type().clone(), true), Field::new("partition_key", partition_keys.data_type().clone(), true), @@ -236,6 +234,7 @@ fn from_task_trackers(jobs: Vec>) -> Result { Arc::new(schema), vec![ Arc::new(ids), + Arc::new(statuses), Arc::new(cpu_time_used), Arc::new(db_names), Arc::new(partition_keys), diff --git a/server/src/query_tests/sql.rs b/server/src/query_tests/sql.rs index a986cdabb5..05ea2cbff9 100644 --- a/server/src/query_tests/sql.rs +++ b/server/src/query_tests/sql.rs @@ -314,19 +314,19 @@ async fn sql_select_from_system_columns() { async fn sql_select_from_system_operations() { test_helpers::maybe_start_logging(); let expected = vec![ - "+----+-----------+-------------+---------------+----------+---------------------------------+", - "| id | took_time | db_name | partition_key | chunk_id | description |", - "+----+-----------+-------------+---------------+----------+---------------------------------+", - "| 0 | true | placeholder | 1970-01-01T00 | 0 | Loading chunk to ReadBuffer |", - "| 1 | true | placeholder | 1970-01-01T00 | 0 | Writing chunk to Object Storage |", - "+----+-----------+-------------+---------------+----------+---------------------------------+", + "+----+----------+-----------+-------------+---------------+----------+---------------------------------+", + "| id | status | took_time | db_name | partition_key | chunk_id | description |", + "+----+----------+-----------+-------------+---------------+----------+---------------------------------+", + "| 0 | Complete | true | placeholder | 1970-01-01T00 | 0 | Loading chunk to ReadBuffer |", + "| 1 | Complete | true | placeholder | 1970-01-01T00 | 0 | Writing chunk to Object Storage |", + "+----+----------+-----------+-------------+---------------+----------+---------------------------------+", ]; // Check that the cpu time used reported is greater than zero as it isn't // repeatable run_sql_test_case!( TwoMeasurementsManyFieldsLifecycle {}, - "SELECT id, CAST(cpu_time_used AS BIGINT) > 0 as took_time, db_name, partition_key, chunk_id, description from system.operations", + "SELECT id, status, CAST(cpu_time_used AS BIGINT) > 0 as took_time, db_name, partition_key, chunk_id, description from system.operations", &expected ); } diff --git a/tracker/src/task.rs b/tracker/src/task.rs index 2baeca1d6b..60d80d488a 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -148,6 +148,27 @@ pub enum TaskStatus { }, } +impl TaskStatus { + /// return a human readable name for this status + pub fn name(&self) -> &'static str { + match self { + Self::Creating => "Creating", + Self::Running { .. } => "Running", + Self::Complete { .. } => "Complete", + } + } + + /// If the job has competed, returns the total amount of CPU time + /// spent executing futures + pub fn cpu_nanos(&self) -> Option { + match self { + Self::Creating => None, + Self::Running { cpu_nanos, .. } => Some(*cpu_nanos), + Self::Complete { cpu_nanos, .. } => Some(*cpu_nanos), + } + } +} + /// A Tracker can be used to monitor/cancel/wait for a set of associated futures #[derive(Debug)] pub struct TaskTracker {