Merge pull request #2415 from influxdata/crepererum/job_start_time_in_system_table

feat: add start time to `operations` system table
pull/24376/head
kodiakhq[bot] 2021-08-26 08:11:49 +00:00 committed by GitHub
commit c81a650409
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 9 deletions

View File

@ -390,20 +390,20 @@ async fn sql_select_from_system_chunk_columns() {
async fn sql_select_from_system_operations() {
test_helpers::maybe_start_logging();
let expected = vec![
"+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
"| id | status | took_cpu_time | took_wall_time | table_name | partition_key | chunk_ids | description |",
"+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
"| 0 | Success | true | true | h2o | 1970-01-01T00 | 0 | Compacting chunk to ReadBuffer |",
"| 1 | Success | true | true | h2o | 1970-01-01T00 | 0, 1 | Persisting chunks to object storage |",
"| 2 | Success | true | true | h2o | 1970-01-01T00 | 2 | Writing chunk to Object Storage |",
"+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
"+----+---------+------------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
"| id | status | start_time | took_cpu_time | took_wall_time | table_name | partition_key | chunk_ids | description |",
"+----+---------+------------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
"| 0 | Success | true | true | true | h2o | 1970-01-01T00 | 0 | Compacting chunk to ReadBuffer |",
"| 1 | Success | true | true | true | h2o | 1970-01-01T00 | 0, 1 | Persisting chunks to object storage |",
"| 2 | Success | true | true | true | h2o | 1970-01-01T00 | 2 | Writing chunk to Object Storage |",
"+----+---------+------------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
];
// Check that the cpu time used reported is greater than zero as it isn't
// repeatable
run_sql_test_case!(
TwoMeasurementsManyFieldsLifecycle {},
"SELECT id, status, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, table_name, partition_key, chunk_ids, description from system.operations",
"SELECT id, status, CAST(start_time as BIGINT) > 0 as start_time, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, table_name, partition_key, chunk_ids, description from system.operations",
&expected
);
}

View File

@ -1,9 +1,10 @@
use std::sync::Arc;
use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray};
use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray, TimestampNanosecondArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::instant::to_approximate_datetime;
use itertools::Itertools;
use data_types::error::ErrorLogger;
@ -47,6 +48,11 @@ fn operations_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("status", DataType::Utf8, false),
Field::new(
"start_time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("cpu_time_used", ts.clone(), true),
Field::new("wall_time_used", ts, true),
Field::new("table_name", DataType::Utf8, true),
@ -85,6 +91,10 @@ fn from_task_trackers(
}
})
.collect::<StringArray>();
let start_time = jobs
.iter()
.map(|job| Some(to_approximate_datetime(job.start_instant()).timestamp_nanos()))
.collect::<TimestampNanosecondArray>();
let cpu_time_used = jobs
.iter()
.map(|job| job.get_status().cpu_nanos().map(|n| n as i64))
@ -119,6 +129,7 @@ fn from_task_trackers(
vec![
Arc::new(ids) as ArrayRef,
Arc::new(statuses),
Arc::new(start_time),
Arc::new(cpu_time_used),
Arc::new(wall_time_used),
Arc::new(table_names),