From 20d06e3225527f7fa532c0fddeae415bcd769ec4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 22 Jul 2021 18:16:09 +0100 Subject: [PATCH] feat: include more information in system.operations table (#2097) * feat: include more information in system.operations table * chore: review feedback Co-authored-by: Andrew Lamb Co-authored-by: Andrew Lamb Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- data_types/Cargo.toml | 4 +- data_types/src/job.rs | 91 +++++++------- generated_types/src/job.rs | 142 +++++++++++----------- query_tests/src/sql.rs | 16 +-- server/src/db/lifecycle/compact.rs | 4 +- server/src/db/lifecycle/drop.rs | 5 +- server/src/db/lifecycle/move_chunk.rs | 8 +- server/src/db/lifecycle/persist.rs | 4 +- server/src/db/lifecycle/write.rs | 6 +- server/src/db/system_tables/operations.rs | 36 ++++-- server/src/lib.rs | 32 +++-- tests/end_to_end_cases/management_cli.rs | 22 ++-- tests/end_to_end_cases/sql_cli.rs | 14 +-- tests/end_to_end_cases/system_tables.rs | 12 +- tracker/src/task.rs | 12 ++ 15 files changed, 221 insertions(+), 187 deletions(-) diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index fd145052f9..8ea61d25e2 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -10,12 +10,12 @@ readme = "README.md" chrono = { version = "0.4", features = ["serde"] } influxdb_line_protocol = { path = "../influxdb_line_protocol" } num_cpus = "1.13.0" +observability_deps = { path = "../observability_deps" } +once_cell = { version = "1.4.0", features = ["parking_lot"] } percent-encoding = "2.1.0" regex = "1.4" serde = { version = "1.0", features = ["rc", "derive"] } snafu = "0.6" -observability_deps = { path = "../observability_deps" } -once_cell = { version = "1.4.0", features = ["parking_lot"] } [dev-dependencies] # In alphabetical order test_helpers = { path = "../test_helpers" } diff --git a/data_types/src/job.rs b/data_types/src/job.rs index ec6929eb8c..37018bd058 100644 --- a/data_types/src/job.rs +++ b/data_types/src/job.rs @@ -1,5 +1,10 @@ +use std::sync::Arc; + use serde::{Deserialize, Serialize}; +use crate::chunk_metadata::ChunkAddr; +use crate::partition_metadata::PartitionAddr; + /// Metadata associated with a set of background tasks /// Used in combination with TrackerRegistry /// @@ -11,87 +16,87 @@ pub enum Job { }, /// Move a chunk from mutable buffer to read buffer - CloseChunk { - db_name: String, - partition_key: String, - table_name: String, - chunk_id: u32, + CompactChunk { + chunk: ChunkAddr, }, /// Write a chunk from read buffer to object store WriteChunk { - db_name: String, - partition_key: String, - table_name: String, - chunk_id: u32, + chunk: ChunkAddr, }, /// Compact a set of chunks CompactChunks { - db_name: String, - partition_key: String, - table_name: String, + partition: PartitionAddr, chunks: Vec, }, /// Split and persist a set of chunks PersistChunks { - db_name: String, - partition_key: String, - table_name: String, + partition: PartitionAddr, chunks: Vec, }, /// Drop chunk from memory and (if persisted) from object store. DropChunk { - db_name: String, - partition_key: String, - table_name: String, - chunk_id: u32, + chunk: ChunkAddr, }, /// Wipe preserved catalog WipePreservedCatalog { - db_name: String, + db_name: Arc, }, } impl Job { - /// Returns the database name assocated with this job, if any - pub fn db_name(&self) -> Option<&str> { + /// Returns the database name associated with this job, if any + pub fn db_name(&self) -> Option<&Arc> { match self { Self::Dummy { .. } => None, - Self::CloseChunk { db_name, .. } => Some(db_name), - Self::WriteChunk { db_name, .. } => Some(db_name), - Self::CompactChunks { db_name, .. } => Some(db_name), - Self::PersistChunks { db_name, .. } => Some(db_name), - Self::DropChunk { db_name, .. } => Some(db_name), - Self::WipePreservedCatalog { db_name, .. } => Some(db_name), + Self::CompactChunk { chunk, .. } => Some(&chunk.db_name), + Self::WriteChunk { chunk, .. } => Some(&chunk.db_name), + Self::CompactChunks { partition, .. } => Some(&partition.db_name), + Self::PersistChunks { partition, .. } => Some(&partition.db_name), + Self::DropChunk { chunk, .. } => Some(&chunk.db_name), + Self::WipePreservedCatalog { db_name, .. } => Some(&db_name), } } - /// Returns the partition name assocated with this job, if any - pub fn partition_key(&self) -> Option<&str> { + /// Returns the partition name associated with this job, if any + pub fn partition_key(&self) -> Option<&Arc> { match self { Self::Dummy { .. } => None, - Self::CloseChunk { partition_key, .. } => Some(partition_key), - Self::WriteChunk { partition_key, .. } => Some(partition_key), - Self::CompactChunks { partition_key, .. } => Some(partition_key), - Self::PersistChunks { partition_key, .. } => Some(partition_key), - Self::DropChunk { partition_key, .. } => Some(partition_key), + Self::CompactChunk { chunk, .. } => Some(&chunk.partition_key), + Self::WriteChunk { chunk, .. } => Some(&chunk.partition_key), + Self::CompactChunks { partition, .. } => Some(&partition.partition_key), + Self::PersistChunks { partition, .. } => Some(&partition.partition_key), + Self::DropChunk { chunk, .. } => Some(&chunk.partition_key), Self::WipePreservedCatalog { .. } => None, } } - /// Returns the chunk_id associated with this job, if any - pub fn chunk_id(&self) -> Option { + /// Returns the table name associated with this job, if any + pub fn table_name(&self) -> Option<&Arc> { match self { Self::Dummy { .. } => None, - Self::CloseChunk { chunk_id, .. } => Some(*chunk_id), - Self::WriteChunk { chunk_id, .. } => Some(*chunk_id), - Self::CompactChunks { .. } => None, - Self::PersistChunks { .. } => None, - Self::DropChunk { chunk_id, .. } => Some(*chunk_id), + Self::CompactChunk { chunk, .. } => Some(&chunk.table_name), + Self::WriteChunk { chunk, .. } => Some(&chunk.table_name), + Self::CompactChunks { partition, .. } => Some(&partition.table_name), + Self::PersistChunks { partition, .. } => Some(&partition.table_name), + Self::DropChunk { chunk, .. } => Some(&chunk.table_name), + Self::WipePreservedCatalog { .. } => None, + } + } + + /// Returns the chunk_ids associated with this job, if any + pub fn chunk_ids(&self) -> Option> { + match self { + Self::Dummy { .. } => None, + Self::CompactChunk { chunk, .. } => Some(vec![chunk.chunk_id]), + Self::WriteChunk { chunk, .. } => Some(vec![chunk.chunk_id]), + Self::CompactChunks { chunks, .. } => Some(chunks.clone()), + Self::PersistChunks { chunks, .. } => Some(chunks.clone()), + Self::DropChunk { chunk, .. } => Some(vec![chunk.chunk_id]), Self::WipePreservedCatalog { .. } => None, } } @@ -100,7 +105,7 @@ impl Job { pub fn description(&self) -> &str { match self { Self::Dummy { .. } => "Dummy Job, for testing", - Self::CloseChunk { .. } => "Loading chunk to ReadBuffer", + Self::CompactChunk { .. } => "Compacting chunk to ReadBuffer", Self::WriteChunk { .. } => "Writing chunk to Object Storage", Self::CompactChunks { .. } => "Compacting chunks to ReadBuffer", Self::PersistChunks { .. } => "Persisting chunks to object storage", diff --git a/generated_types/src/job.rs b/generated_types/src/job.rs index 2cb1c78221..0ecb310d78 100644 --- a/generated_types/src/job.rs +++ b/generated_types/src/job.rs @@ -1,70 +1,54 @@ use crate::google::{longrunning, protobuf::Any, FieldViolation, FieldViolationExt}; use crate::influxdata::iox::management::v1 as management; use crate::protobuf_type_url_eq; +use data_types::chunk_metadata::ChunkAddr; use data_types::job::{Job, OperationStatus}; +use data_types::partition_metadata::PartitionAddr; use std::convert::TryFrom; +use std::sync::Arc; impl From for management::operation_metadata::Job { fn from(job: Job) -> Self { match job { Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }), - Job::CloseChunk { - db_name, - partition_key, - table_name, - chunk_id, - } => Self::CloseChunk(management::CloseChunk { - db_name, - partition_key, - table_name, - chunk_id, + Job::CompactChunk { chunk } => Self::CloseChunk(management::CloseChunk { + db_name: chunk.db_name.to_string(), + partition_key: chunk.partition_key.to_string(), + table_name: chunk.table_name.to_string(), + chunk_id: chunk.chunk_id, }), - Job::WriteChunk { - db_name, - partition_key, - table_name, - chunk_id, - } => Self::WriteChunk(management::WriteChunk { - db_name, - partition_key, - table_name, - chunk_id, + Job::WriteChunk { chunk } => Self::WriteChunk(management::WriteChunk { + db_name: chunk.db_name.to_string(), + partition_key: chunk.partition_key.to_string(), + table_name: chunk.table_name.to_string(), + chunk_id: chunk.chunk_id, }), Job::WipePreservedCatalog { db_name } => { - Self::WipePreservedCatalog(management::WipePreservedCatalog { db_name }) + Self::WipePreservedCatalog(management::WipePreservedCatalog { + db_name: db_name.to_string(), + }) } - Job::CompactChunks { - db_name, - partition_key, - table_name, - chunks, - } => Self::CompactChunks(management::CompactChunks { - db_name, - partition_key, - table_name, - chunks, - }), - Job::PersistChunks { - db_name, - partition_key, - table_name, - chunks, - } => Self::PersistChunks(management::PersistChunks { - db_name, - partition_key, - table_name, - chunks, - }), - Job::DropChunk { - db_name, - partition_key, - table_name, - chunk_id, - } => Self::DropChunk(management::DropChunk { - db_name, - partition_key, - table_name, - chunk_id, + Job::CompactChunks { partition, chunks } => { + Self::CompactChunks(management::CompactChunks { + db_name: partition.db_name.to_string(), + partition_key: partition.partition_key.to_string(), + table_name: partition.table_name.to_string(), + chunks, + }) + } + Job::PersistChunks { partition, chunks } => { + Self::PersistChunks(management::PersistChunks { + db_name: partition.db_name.to_string(), + partition_key: partition.partition_key.to_string(), + table_name: partition.table_name.to_string(), + chunks, + }) + } + Job::DropChunk { chunk } => Self::DropChunk(management::DropChunk { + db_name: chunk.db_name.to_string(), + partition_key: chunk.partition_key.to_string(), + table_name: chunk.table_name.to_string(), + chunk_id: chunk.chunk_id, }), } } @@ -80,11 +64,13 @@ impl From for Job { partition_key, table_name, chunk_id, - }) => Self::CloseChunk { - db_name, - partition_key, - table_name, - chunk_id, + }) => Self::CompactChunk { + chunk: ChunkAddr { + db_name: Arc::from(db_name.as_str()), + table_name: Arc::from(table_name.as_str()), + partition_key: Arc::from(partition_key.as_str()), + chunk_id, + }, }, Job::WriteChunk(management::WriteChunk { db_name, @@ -92,13 +78,17 @@ impl From for Job { table_name, chunk_id, }) => Self::WriteChunk { - db_name, - partition_key, - table_name, - chunk_id, + chunk: ChunkAddr { + db_name: Arc::from(db_name.as_str()), + table_name: Arc::from(table_name.as_str()), + partition_key: Arc::from(partition_key.as_str()), + chunk_id, + }, }, Job::WipePreservedCatalog(management::WipePreservedCatalog { db_name }) => { - Self::WipePreservedCatalog { db_name } + Self::WipePreservedCatalog { + db_name: Arc::from(db_name.as_str()), + } } Job::CompactChunks(management::CompactChunks { db_name, @@ -106,9 +96,11 @@ impl From for Job { table_name, chunks, }) => Self::CompactChunks { - db_name, - partition_key, - table_name, + partition: PartitionAddr { + db_name: Arc::from(db_name.as_str()), + table_name: Arc::from(table_name.as_str()), + partition_key: Arc::from(partition_key.as_str()), + }, chunks, }, Job::PersistChunks(management::PersistChunks { @@ -117,9 +109,11 @@ impl From for Job { table_name, chunks, }) => Self::PersistChunks { - db_name, - partition_key, - table_name, + partition: PartitionAddr { + db_name: Arc::from(db_name.as_str()), + table_name: Arc::from(table_name.as_str()), + partition_key: Arc::from(partition_key.as_str()), + }, chunks, }, Job::DropChunk(management::DropChunk { @@ -128,10 +122,12 @@ impl From for Job { table_name, chunk_id, }) => Self::DropChunk { - db_name, - partition_key, - table_name, - chunk_id, + chunk: ChunkAddr { + db_name: Arc::from(db_name.as_str()), + table_name: Arc::from(table_name.as_str()), + partition_key: Arc::from(partition_key.as_str()), + chunk_id, + }, }, } } diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 6fc2c13550..252e0af2d1 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -347,20 +347,20 @@ async fn sql_select_from_system_chunk_columns() { async fn sql_select_from_system_operations() { test_helpers::maybe_start_logging(); let expected = vec![ - "+----+----------+---------------+----------------+---------------+----------+-------------------------------------+", - "| id | status | took_cpu_time | took_wall_time | partition_key | chunk_id | description |", - "+----+----------+---------------+----------------+---------------+----------+-------------------------------------+", - "| 0 | Complete | true | true | 1970-01-01T00 | 0 | Loading chunk to ReadBuffer |", - "| 1 | Complete | true | true | 1970-01-01T00 | | Persisting chunks to object storage |", - "| 2 | Complete | true | true | 1970-01-01T00 | 2 | Writing chunk to Object Storage |", - "+----+----------+---------------+----------------+---------------+----------+-------------------------------------+", + "+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+", + "| id | status | took_cpu_time | took_wall_time | table_name | partition_key | chunk_ids | description |", + "+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+", + "| 0 | Success | true | true | h2o | 1970-01-01T00 | 0 | Compacting chunk to ReadBuffer |", + "| 1 | Success | true | true | h2o | 1970-01-01T00 | 0, 1 | Persisting chunks to object storage |", + "| 2 | Success | true | true | h2o | 1970-01-01T00 | 2 | Writing chunk to Object Storage |", + "+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+", ]; // Check that the cpu time used reported is greater than zero as it isn't // repeatable run_sql_test_case!( TwoMeasurementsManyFieldsLifecycle {}, - "SELECT id, status, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, partition_key, chunk_id, description from system.operations", + "SELECT id, status, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, table_name, partition_key, chunk_ids, description from system.operations", &expected ); } diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index e864fb8926..a12c4acb29 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -37,9 +37,7 @@ pub(crate) fn compact_chunks( let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect(); let (tracker, registration) = db.jobs.register(Job::CompactChunks { - db_name: partition.db_name().to_string(), - partition_key: partition.key().to_string(), - table_name: table_name.clone(), + partition: partition.addr().clone(), chunks: chunk_ids.clone(), }); diff --git a/server/src/db/lifecycle/drop.rs b/server/src/db/lifecycle/drop.rs index 5c1847a1cd..ad737ec911 100644 --- a/server/src/db/lifecycle/drop.rs +++ b/server/src/db/lifecycle/drop.rs @@ -31,10 +31,7 @@ pub fn drop_chunk( let chunk_id = guard.id(); let (tracker, registration) = db.jobs.register(Job::DropChunk { - db_name: partition.db_name().to_string(), - partition_key: partition_key.clone(), - table_name: table_name.clone(), - chunk_id, + chunk: guard.addr().clone(), }); guard.set_dropping(®istration)?; diff --git a/server/src/db/lifecycle/move_chunk.rs b/server/src/db/lifecycle/move_chunk.rs index 4f4ef4ce04..e3973f64d3 100644 --- a/server/src/db/lifecycle/move_chunk.rs +++ b/server/src/db/lifecycle/move_chunk.rs @@ -23,12 +23,8 @@ pub fn move_chunk_to_read_buffer( )> { let db = Arc::clone(&guard.data().db); let addr = guard.addr().clone(); - // TODO: Use ChunkAddr within Job - let (tracker, registration) = db.jobs.register(Job::CloseChunk { - db_name: addr.db_name.to_string(), - partition_key: addr.partition_key.to_string(), - table_name: addr.table_name.to_string(), - chunk_id: addr.chunk_id, + let (tracker, registration) = db.jobs.register(Job::CompactChunk { + chunk: addr.clone(), }); // update the catalog to say we are processing this chunk and diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 84c87488f4..901eeed69c 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -43,9 +43,7 @@ pub fn persist_chunks( let flush_timestamp = max_persistable_timestamp.timestamp_nanos(); let (tracker, registration) = db.jobs.register(Job::PersistChunks { - db_name: partition.db_name().to_string(), - partition_key: partition.key().to_string(), - table_name: table_name.clone(), + partition: partition.addr().clone(), chunks: chunk_ids.clone(), }); diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index 02eb4acc49..1076adce47 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -54,12 +54,8 @@ pub(super) fn write_chunk_to_object_store( let db = Arc::clone(&chunk.data().db); let addr = chunk.addr().clone(); - // TODO: Use ChunkAddr within Job let (tracker, registration) = db.jobs.register(Job::WriteChunk { - db_name: addr.db_name.to_string(), - partition_key: addr.partition_key.to_string(), - table_name: addr.table_name.to_string(), - chunk_id: addr.chunk_id, + chunk: addr.clone(), }); // update the catalog to say we are processing this chunk and diff --git a/server/src/db/system_tables/operations.rs b/server/src/db/system_tables/operations.rs index d8b2af0ac2..7bc5443ef6 100644 --- a/server/src/db/system_tables/operations.rs +++ b/server/src/db/system_tables/operations.rs @@ -1,9 +1,10 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray, UInt32Array}; +use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use arrow::error::Result; use arrow::record_batch::RecordBatch; +use itertools::Itertools; use data_types::error::ErrorLogger; use data_types::job::Job; @@ -45,11 +46,12 @@ fn operations_schema() -> SchemaRef { let ts = DataType::Time64(TimeUnit::Nanosecond); Arc::new(Schema::new(vec![ Field::new("id", DataType::Utf8, false), - Field::new("status", DataType::Utf8, true), + Field::new("status", DataType::Utf8, false), Field::new("cpu_time_used", ts.clone(), true), Field::new("wall_time_used", ts, true), + Field::new("table_name", DataType::Utf8, true), Field::new("partition_key", DataType::Utf8, true), - Field::new("chunk_id", DataType::UInt32, true), + Field::new("chunk_ids", DataType::Utf8, true), Field::new("description", DataType::Utf8, true), ])) } @@ -61,7 +63,12 @@ fn from_task_trackers( ) -> Result { let jobs = jobs .into_iter() - .filter(|job| job.metadata().db_name() == Some(db_name)) + .filter(|job| { + job.metadata() + .db_name() + .map(|x| x.as_ref() == db_name) + .unwrap_or(false) + }) .collect::>(); let ids = jobs @@ -70,7 +77,13 @@ fn from_task_trackers( .collect::(); let statuses = jobs .iter() - .map(|job| Some(job.get_status().name())) + .map(|job| { + let status = job.get_status(); + match status.result() { + Some(result) => Some(result.name()), + None => Some(status.name()), + } + }) .collect::(); let cpu_time_used = jobs .iter() @@ -84,10 +97,18 @@ fn from_task_trackers( .iter() .map(|job| job.metadata().partition_key()) .collect::(); + let table_names = jobs + .iter() + .map(|job| job.metadata().table_name()) + .collect::(); let chunk_ids = jobs .iter() - .map(|job| job.metadata().chunk_id()) - .collect::(); + .map(|job| { + job.metadata() + .chunk_ids() + .map(|ids| ids.into_iter().join(", ")) + }) + .collect::(); let descriptions = jobs .iter() .map(|job| Some(job.metadata().description())) @@ -100,6 +121,7 @@ fn from_task_trackers( Arc::new(statuses), Arc::new(cpu_time_used), Arc::new(wall_time_used), + Arc::new(table_names), Arc::new(partition_keys), Arc::new(chunk_ids), Arc::new(descriptions), diff --git a/server/src/lib.rs b/server/src/lib.rs index e2dc829e31..8cffddce44 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1091,7 +1091,7 @@ where }; let (tracker, registration) = self.jobs.register(Job::WipePreservedCatalog { - db_name: db_name.to_string(), + db_name: Arc::from(db_name.as_str()), }); let state = Arc::clone(&self.stage); @@ -1341,6 +1341,7 @@ mod tests { use arrow_util::assert_batches_eq; use async_trait::async_trait; use bytes::Bytes; + use data_types::chunk_metadata::ChunkAddr; use data_types::database_rules::{ HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG, }; @@ -1827,19 +1828,24 @@ mod tests { .unwrap(); // start the close (note this is not an async) - let partition_key = ""; - let table_name = "cpu"; - let db_name_string = db_name.to_string(); + let chunk_addr = ChunkAddr { + db_name: Arc::from(db_name.as_str()), + table_name: Arc::from("cpu"), + partition_key: Arc::from(""), + chunk_id: 0, + }; let tracker = server - .close_chunk(db_name, table_name, partition_key, 0) + .close_chunk( + db_name, + chunk_addr.table_name.as_ref(), + chunk_addr.partition_key.as_ref(), + chunk_addr.chunk_id, + ) .unwrap(); let metadata = tracker.metadata(); - let expected_metadata = Job::CloseChunk { - db_name: db_name_string, - partition_key: partition_key.to_string(), - table_name: table_name.to_string(), - chunk_id: 0, + let expected_metadata = Job::CompactChunk { + chunk: chunk_addr.clone(), }; assert_eq!(metadata, &expected_metadata); @@ -2275,7 +2281,7 @@ mod tests { .unwrap(); let metadata = tracker.metadata(); let expected_metadata = Job::WipePreservedCatalog { - db_name: db_name_non_existing.to_string(), + db_name: Arc::from(db_name_non_existing.as_str()), }; assert_eq!(metadata, &expected_metadata); tracker.join().await; @@ -2296,7 +2302,7 @@ mod tests { .unwrap(); let metadata = tracker.metadata(); let expected_metadata = Job::WipePreservedCatalog { - db_name: db_name_rules_broken.to_string(), + db_name: Arc::from(db_name_rules_broken.as_str()), }; assert_eq!(metadata, &expected_metadata); tracker.join().await; @@ -2317,7 +2323,7 @@ mod tests { .unwrap(); let metadata = tracker.metadata(); let expected_metadata = Job::WipePreservedCatalog { - db_name: db_name_catalog_broken.to_string(), + db_name: Arc::from(db_name_catalog_broken.as_str()), }; assert_eq!(metadata, &expected_metadata); tracker.join().await; diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index 3e0be27290..c3c93a8012 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -1,9 +1,13 @@ +use std::sync::Arc; + use assert_cmd::Command; +use predicates::prelude::*; + +use data_types::chunk_metadata::ChunkAddr; use data_types::{ chunk_metadata::ChunkStorage, job::{Job, Operation}, }; -use predicates::prelude::*; use test_helpers::make_temp_file; use crate::{ @@ -571,11 +575,13 @@ async fn test_close_partition_chunk() { ) .expect("Expected JSON output"); - let expected_job = Job::CloseChunk { - db_name, - partition_key: "cpu".into(), - table_name: "cpu".into(), - chunk_id: 0, + let expected_job = Job::CompactChunk { + chunk: ChunkAddr { + db_name: Arc::from(db_name.as_str()), + table_name: Arc::from("cpu"), + partition_key: Arc::from("cpu"), + chunk_id: 0, + }, }; assert_eq!( @@ -630,7 +636,9 @@ async fn test_wipe_persisted_catalog() { ) .expect("Expected JSON output"); - let expected_job = Job::WipePreservedCatalog { db_name }; + let expected_job = Job::WipePreservedCatalog { + db_name: Arc::from(db_name.as_str()), + }; assert_eq!( Some(expected_job), diff --git a/tests/end_to_end_cases/sql_cli.rs b/tests/end_to_end_cases/sql_cli.rs index 432a686200..a712a925c5 100644 --- a/tests/end_to_end_cases/sql_cli.rs +++ b/tests/end_to_end_cases/sql_cli.rs @@ -321,24 +321,24 @@ async fn test_sql_observer_operations() { .expect("failed to wait operation"); let expected_output = r#" -+---------------+----------+-----------------------------+ -| partition_key | chunk_id | description | -+---------------+----------+-----------------------------+ -| cpu | 0 | Loading chunk to ReadBuffer | -+---------------+----------+-----------------------------+ ++------------+---------------+-----------+--------------------------------+ +| table_name | partition_key | chunk_ids | description | ++------------+---------------+-----------+--------------------------------+ +| cpu | cpu | 0 | Compacting chunk to ReadBuffer | ++------------+---------------+-----------+--------------------------------+ "# .trim(); let query = format!( r#" select - partition_key, chunk_id, description + table_name, partition_key, chunk_ids, description from operations where database_name = '{}' order by - partition_key, chunk_id, description + table_name, partition_key, chunk_ids, description "#, db_name ); diff --git a/tests/end_to_end_cases/system_tables.rs b/tests/end_to_end_cases/system_tables.rs index 0dc5e173ec..530911f236 100644 --- a/tests/end_to_end_cases/system_tables.rs +++ b/tests/end_to_end_cases/system_tables.rs @@ -39,7 +39,7 @@ async fn test_operations() { .expect("failed to wait operation"); let mut client = fixture.flight_client(); - let sql_query = "select chunk_id, status, description from system.operations"; + let sql_query = "select chunk_ids, status, description from system.operations"; let query_results = client.perform_query(&db_name1, sql_query).await.unwrap(); @@ -48,11 +48,11 @@ async fn test_operations() { // parameterize on db_name1 let expected_read_data = vec![ - "+----------+----------+-----------------------------+", - "| chunk_id | status | description |", - "+----------+----------+-----------------------------+", - "| 0 | Complete | Loading chunk to ReadBuffer |", - "+----------+----------+-----------------------------+", + "+-----------+---------+--------------------------------+", + "| chunk_ids | status | description |", + "+-----------+---------+--------------------------------+", + "| 0 | Success | Compacting chunk to ReadBuffer |", + "+-----------+---------+--------------------------------+", ]; assert_batches_eq!(expected_read_data, &batches); diff --git a/tracker/src/task.rs b/tracker/src/task.rs index 6a1443a08d..90de1133e2 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -131,6 +131,18 @@ pub enum TaskResult { Error, } +impl TaskResult { + /// return a human readable name for this result + pub fn name(&self) -> &'static str { + match self { + TaskResult::Success => "Success", + TaskResult::Cancelled => "Cancelled", + TaskResult::Dropped => "Dropped", + TaskResult::Error => "Error", + } + } +} + /// The status of the tracked task #[derive(Debug, Clone, Eq, PartialEq)] pub enum TaskStatus {