feat: add status column

pull/24376/head
Andrew Lamb 2021-04-16 16:48:20 -04:00 committed by kodiakhq[bot]
parent 9c74074095
commit 348f38c2c9
3 changed files with 36 additions and 16 deletions

View File

@ -23,7 +23,7 @@ use arrow_deps::{
use data_types::{
chunk::ChunkSummary, error::ErrorLogger, job::Job, partition_metadata::PartitionSummary,
};
use tracker::{TaskStatus, TaskTracker};
use tracker::TaskTracker;
use super::catalog::Catalog;
use crate::JobRegistry;
@ -208,14 +208,11 @@ fn from_partition_summaries(partitions: Vec<PartitionSummary>) -> Result<RecordB
fn from_task_trackers(jobs: Vec<TaskTracker<Job>>) -> Result<RecordBatch> {
let ids = StringArray::from_iter(jobs.iter().map(|job| Some(job.id().to_string())));
let cpu_time_used =
Time64NanosecondArray::from_iter(jobs.iter().map(|job| match job.get_status() {
TaskStatus::Creating => None,
TaskStatus::Running { cpu_nanos, .. } => Some(cpu_nanos as i64),
TaskStatus::Complete { cpu_nanos, .. } => Some(cpu_nanos as i64),
}));
let statuses = StringArray::from_iter(jobs.iter().map(|job| Some(job.get_status().name())));
let cpu_time_used = Time64NanosecondArray::from_iter(
jobs.iter()
.map(|job| job.get_status().cpu_nanos().map(|n| n as i64)),
);
let db_names = StringArray::from_iter(jobs.iter().map(|job| job.metadata().db_name()));
let partition_keys =
StringArray::from_iter(jobs.iter().map(|job| job.metadata().partition_key()));
@ -225,6 +222,7 @@ fn from_task_trackers(jobs: Vec<TaskTracker<Job>>) -> Result<RecordBatch> {
let schema = Schema::new(vec![
Field::new("id", ids.data_type().clone(), false),
Field::new("status", statuses.data_type().clone(), false),
Field::new("cpu_time_used", cpu_time_used.data_type().clone(), true),
Field::new("db_name", db_names.data_type().clone(), true),
Field::new("partition_key", partition_keys.data_type().clone(), true),
@ -236,6 +234,7 @@ fn from_task_trackers(jobs: Vec<TaskTracker<Job>>) -> Result<RecordBatch> {
Arc::new(schema),
vec![
Arc::new(ids),
Arc::new(statuses),
Arc::new(cpu_time_used),
Arc::new(db_names),
Arc::new(partition_keys),

View File

@ -314,19 +314,19 @@ async fn sql_select_from_system_columns() {
async fn sql_select_from_system_operations() {
test_helpers::maybe_start_logging();
let expected = vec![
"+----+-----------+-------------+---------------+----------+---------------------------------+",
"| id | took_time | db_name | partition_key | chunk_id | description |",
"+----+-----------+-------------+---------------+----------+---------------------------------+",
"| 0 | true | placeholder | 1970-01-01T00 | 0 | Loading chunk to ReadBuffer |",
"| 1 | true | placeholder | 1970-01-01T00 | 0 | Writing chunk to Object Storage |",
"+----+-----------+-------------+---------------+----------+---------------------------------+",
"+----+----------+-----------+-------------+---------------+----------+---------------------------------+",
"| id | status | took_time | db_name | partition_key | chunk_id | description |",
"+----+----------+-----------+-------------+---------------+----------+---------------------------------+",
"| 0 | Complete | true | placeholder | 1970-01-01T00 | 0 | Loading chunk to ReadBuffer |",
"| 1 | Complete | true | placeholder | 1970-01-01T00 | 0 | Writing chunk to Object Storage |",
"+----+----------+-----------+-------------+---------------+----------+---------------------------------+",
];
// Check that the cpu time used reported is greater than zero as it isn't
// repeatable
run_sql_test_case!(
TwoMeasurementsManyFieldsLifecycle {},
"SELECT id, CAST(cpu_time_used AS BIGINT) > 0 as took_time, db_name, partition_key, chunk_id, description from system.operations",
"SELECT id, status, CAST(cpu_time_used AS BIGINT) > 0 as took_time, db_name, partition_key, chunk_id, description from system.operations",
&expected
);
}

View File

@ -148,6 +148,27 @@ pub enum TaskStatus {
},
}
impl TaskStatus {
/// return a human readable name for this status
pub fn name(&self) -> &'static str {
match self {
Self::Creating => "Creating",
Self::Running { .. } => "Running",
Self::Complete { .. } => "Complete",
}
}
/// If the job has competed, returns the total amount of CPU time
/// spent executing futures
pub fn cpu_nanos(&self) -> Option<usize> {
match self {
Self::Creating => None,
Self::Running { cpu_nanos, .. } => Some(*cpu_nanos),
Self::Complete { cpu_nanos, .. } => Some(*cpu_nanos),
}
}
}
/// A Tracker can be used to monitor/cancel/wait for a set of associated futures
#[derive(Debug)]
pub struct TaskTracker<T> {