feat: include more information in system.operations table (#2097)
* feat: include more information in system.operations table * chore: review feedback Co-authored-by: Andrew Lamb <alamb@influxdata.com> Co-authored-by: Andrew Lamb <alamb@influxdata.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
b3d6e3ed7b
commit
20d06e3225
|
|
@ -10,12 +10,12 @@ readme = "README.md"
|
|||
chrono = { version = "0.4", features = ["serde"] }
|
||||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||
num_cpus = "1.13.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
once_cell = { version = "1.4.0", features = ["parking_lot"] }
|
||||
percent-encoding = "2.1.0"
|
||||
regex = "1.4"
|
||||
serde = { version = "1.0", features = ["rc", "derive"] }
|
||||
snafu = "0.6"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
once_cell = { version = "1.4.0", features = ["parking_lot"] }
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
|
|
|
|||
|
|
@ -1,5 +1,10 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::chunk_metadata::ChunkAddr;
|
||||
use crate::partition_metadata::PartitionAddr;
|
||||
|
||||
/// Metadata associated with a set of background tasks
|
||||
/// Used in combination with TrackerRegistry
|
||||
///
|
||||
|
|
@ -11,87 +16,87 @@ pub enum Job {
|
|||
},
|
||||
|
||||
/// Move a chunk from mutable buffer to read buffer
|
||||
CloseChunk {
|
||||
db_name: String,
|
||||
partition_key: String,
|
||||
table_name: String,
|
||||
chunk_id: u32,
|
||||
CompactChunk {
|
||||
chunk: ChunkAddr,
|
||||
},
|
||||
|
||||
/// Write a chunk from read buffer to object store
|
||||
WriteChunk {
|
||||
db_name: String,
|
||||
partition_key: String,
|
||||
table_name: String,
|
||||
chunk_id: u32,
|
||||
chunk: ChunkAddr,
|
||||
},
|
||||
|
||||
/// Compact a set of chunks
|
||||
CompactChunks {
|
||||
db_name: String,
|
||||
partition_key: String,
|
||||
table_name: String,
|
||||
partition: PartitionAddr,
|
||||
chunks: Vec<u32>,
|
||||
},
|
||||
|
||||
/// Split and persist a set of chunks
|
||||
PersistChunks {
|
||||
db_name: String,
|
||||
partition_key: String,
|
||||
table_name: String,
|
||||
partition: PartitionAddr,
|
||||
chunks: Vec<u32>,
|
||||
},
|
||||
|
||||
/// Drop chunk from memory and (if persisted) from object store.
|
||||
DropChunk {
|
||||
db_name: String,
|
||||
partition_key: String,
|
||||
table_name: String,
|
||||
chunk_id: u32,
|
||||
chunk: ChunkAddr,
|
||||
},
|
||||
|
||||
/// Wipe preserved catalog
|
||||
WipePreservedCatalog {
|
||||
db_name: String,
|
||||
db_name: Arc<str>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Job {
|
||||
/// Returns the database name assocated with this job, if any
|
||||
pub fn db_name(&self) -> Option<&str> {
|
||||
/// Returns the database name associated with this job, if any
|
||||
pub fn db_name(&self) -> Option<&Arc<str>> {
|
||||
match self {
|
||||
Self::Dummy { .. } => None,
|
||||
Self::CloseChunk { db_name, .. } => Some(db_name),
|
||||
Self::WriteChunk { db_name, .. } => Some(db_name),
|
||||
Self::CompactChunks { db_name, .. } => Some(db_name),
|
||||
Self::PersistChunks { db_name, .. } => Some(db_name),
|
||||
Self::DropChunk { db_name, .. } => Some(db_name),
|
||||
Self::WipePreservedCatalog { db_name, .. } => Some(db_name),
|
||||
Self::CompactChunk { chunk, .. } => Some(&chunk.db_name),
|
||||
Self::WriteChunk { chunk, .. } => Some(&chunk.db_name),
|
||||
Self::CompactChunks { partition, .. } => Some(&partition.db_name),
|
||||
Self::PersistChunks { partition, .. } => Some(&partition.db_name),
|
||||
Self::DropChunk { chunk, .. } => Some(&chunk.db_name),
|
||||
Self::WipePreservedCatalog { db_name, .. } => Some(&db_name),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the partition name assocated with this job, if any
|
||||
pub fn partition_key(&self) -> Option<&str> {
|
||||
/// Returns the partition name associated with this job, if any
|
||||
pub fn partition_key(&self) -> Option<&Arc<str>> {
|
||||
match self {
|
||||
Self::Dummy { .. } => None,
|
||||
Self::CloseChunk { partition_key, .. } => Some(partition_key),
|
||||
Self::WriteChunk { partition_key, .. } => Some(partition_key),
|
||||
Self::CompactChunks { partition_key, .. } => Some(partition_key),
|
||||
Self::PersistChunks { partition_key, .. } => Some(partition_key),
|
||||
Self::DropChunk { partition_key, .. } => Some(partition_key),
|
||||
Self::CompactChunk { chunk, .. } => Some(&chunk.partition_key),
|
||||
Self::WriteChunk { chunk, .. } => Some(&chunk.partition_key),
|
||||
Self::CompactChunks { partition, .. } => Some(&partition.partition_key),
|
||||
Self::PersistChunks { partition, .. } => Some(&partition.partition_key),
|
||||
Self::DropChunk { chunk, .. } => Some(&chunk.partition_key),
|
||||
Self::WipePreservedCatalog { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the chunk_id associated with this job, if any
|
||||
pub fn chunk_id(&self) -> Option<u32> {
|
||||
/// Returns the table name associated with this job, if any
|
||||
pub fn table_name(&self) -> Option<&Arc<str>> {
|
||||
match self {
|
||||
Self::Dummy { .. } => None,
|
||||
Self::CloseChunk { chunk_id, .. } => Some(*chunk_id),
|
||||
Self::WriteChunk { chunk_id, .. } => Some(*chunk_id),
|
||||
Self::CompactChunks { .. } => None,
|
||||
Self::PersistChunks { .. } => None,
|
||||
Self::DropChunk { chunk_id, .. } => Some(*chunk_id),
|
||||
Self::CompactChunk { chunk, .. } => Some(&chunk.table_name),
|
||||
Self::WriteChunk { chunk, .. } => Some(&chunk.table_name),
|
||||
Self::CompactChunks { partition, .. } => Some(&partition.table_name),
|
||||
Self::PersistChunks { partition, .. } => Some(&partition.table_name),
|
||||
Self::DropChunk { chunk, .. } => Some(&chunk.table_name),
|
||||
Self::WipePreservedCatalog { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the chunk_ids associated with this job, if any
|
||||
pub fn chunk_ids(&self) -> Option<Vec<u32>> {
|
||||
match self {
|
||||
Self::Dummy { .. } => None,
|
||||
Self::CompactChunk { chunk, .. } => Some(vec![chunk.chunk_id]),
|
||||
Self::WriteChunk { chunk, .. } => Some(vec![chunk.chunk_id]),
|
||||
Self::CompactChunks { chunks, .. } => Some(chunks.clone()),
|
||||
Self::PersistChunks { chunks, .. } => Some(chunks.clone()),
|
||||
Self::DropChunk { chunk, .. } => Some(vec![chunk.chunk_id]),
|
||||
Self::WipePreservedCatalog { .. } => None,
|
||||
}
|
||||
}
|
||||
|
|
@ -100,7 +105,7 @@ impl Job {
|
|||
pub fn description(&self) -> &str {
|
||||
match self {
|
||||
Self::Dummy { .. } => "Dummy Job, for testing",
|
||||
Self::CloseChunk { .. } => "Loading chunk to ReadBuffer",
|
||||
Self::CompactChunk { .. } => "Compacting chunk to ReadBuffer",
|
||||
Self::WriteChunk { .. } => "Writing chunk to Object Storage",
|
||||
Self::CompactChunks { .. } => "Compacting chunks to ReadBuffer",
|
||||
Self::PersistChunks { .. } => "Persisting chunks to object storage",
|
||||
|
|
|
|||
|
|
@ -1,70 +1,54 @@
|
|||
use crate::google::{longrunning, protobuf::Any, FieldViolation, FieldViolationExt};
|
||||
use crate::influxdata::iox::management::v1 as management;
|
||||
use crate::protobuf_type_url_eq;
|
||||
use data_types::chunk_metadata::ChunkAddr;
|
||||
use data_types::job::{Job, OperationStatus};
|
||||
use data_types::partition_metadata::PartitionAddr;
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
|
||||
impl From<Job> for management::operation_metadata::Job {
|
||||
fn from(job: Job) -> Self {
|
||||
match job {
|
||||
Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }),
|
||||
Job::CloseChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
} => Self::CloseChunk(management::CloseChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
Job::CompactChunk { chunk } => Self::CloseChunk(management::CloseChunk {
|
||||
db_name: chunk.db_name.to_string(),
|
||||
partition_key: chunk.partition_key.to_string(),
|
||||
table_name: chunk.table_name.to_string(),
|
||||
chunk_id: chunk.chunk_id,
|
||||
}),
|
||||
Job::WriteChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
} => Self::WriteChunk(management::WriteChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
Job::WriteChunk { chunk } => Self::WriteChunk(management::WriteChunk {
|
||||
db_name: chunk.db_name.to_string(),
|
||||
partition_key: chunk.partition_key.to_string(),
|
||||
table_name: chunk.table_name.to_string(),
|
||||
chunk_id: chunk.chunk_id,
|
||||
}),
|
||||
Job::WipePreservedCatalog { db_name } => {
|
||||
Self::WipePreservedCatalog(management::WipePreservedCatalog { db_name })
|
||||
Self::WipePreservedCatalog(management::WipePreservedCatalog {
|
||||
db_name: db_name.to_string(),
|
||||
})
|
||||
}
|
||||
Job::CompactChunks {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunks,
|
||||
} => Self::CompactChunks(management::CompactChunks {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunks,
|
||||
}),
|
||||
Job::PersistChunks {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunks,
|
||||
} => Self::PersistChunks(management::PersistChunks {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunks,
|
||||
}),
|
||||
Job::DropChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
} => Self::DropChunk(management::DropChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
Job::CompactChunks { partition, chunks } => {
|
||||
Self::CompactChunks(management::CompactChunks {
|
||||
db_name: partition.db_name.to_string(),
|
||||
partition_key: partition.partition_key.to_string(),
|
||||
table_name: partition.table_name.to_string(),
|
||||
chunks,
|
||||
})
|
||||
}
|
||||
Job::PersistChunks { partition, chunks } => {
|
||||
Self::PersistChunks(management::PersistChunks {
|
||||
db_name: partition.db_name.to_string(),
|
||||
partition_key: partition.partition_key.to_string(),
|
||||
table_name: partition.table_name.to_string(),
|
||||
chunks,
|
||||
})
|
||||
}
|
||||
Job::DropChunk { chunk } => Self::DropChunk(management::DropChunk {
|
||||
db_name: chunk.db_name.to_string(),
|
||||
partition_key: chunk.partition_key.to_string(),
|
||||
table_name: chunk.table_name.to_string(),
|
||||
chunk_id: chunk.chunk_id,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
|
@ -80,11 +64,13 @@ impl From<management::operation_metadata::Job> for Job {
|
|||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
}) => Self::CloseChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
}) => Self::CompactChunk {
|
||||
chunk: ChunkAddr {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
table_name: Arc::from(table_name.as_str()),
|
||||
partition_key: Arc::from(partition_key.as_str()),
|
||||
chunk_id,
|
||||
},
|
||||
},
|
||||
Job::WriteChunk(management::WriteChunk {
|
||||
db_name,
|
||||
|
|
@ -92,13 +78,17 @@ impl From<management::operation_metadata::Job> for Job {
|
|||
table_name,
|
||||
chunk_id,
|
||||
}) => Self::WriteChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
chunk: ChunkAddr {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
table_name: Arc::from(table_name.as_str()),
|
||||
partition_key: Arc::from(partition_key.as_str()),
|
||||
chunk_id,
|
||||
},
|
||||
},
|
||||
Job::WipePreservedCatalog(management::WipePreservedCatalog { db_name }) => {
|
||||
Self::WipePreservedCatalog { db_name }
|
||||
Self::WipePreservedCatalog {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
}
|
||||
}
|
||||
Job::CompactChunks(management::CompactChunks {
|
||||
db_name,
|
||||
|
|
@ -106,9 +96,11 @@ impl From<management::operation_metadata::Job> for Job {
|
|||
table_name,
|
||||
chunks,
|
||||
}) => Self::CompactChunks {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
partition: PartitionAddr {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
table_name: Arc::from(table_name.as_str()),
|
||||
partition_key: Arc::from(partition_key.as_str()),
|
||||
},
|
||||
chunks,
|
||||
},
|
||||
Job::PersistChunks(management::PersistChunks {
|
||||
|
|
@ -117,9 +109,11 @@ impl From<management::operation_metadata::Job> for Job {
|
|||
table_name,
|
||||
chunks,
|
||||
}) => Self::PersistChunks {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
partition: PartitionAddr {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
table_name: Arc::from(table_name.as_str()),
|
||||
partition_key: Arc::from(partition_key.as_str()),
|
||||
},
|
||||
chunks,
|
||||
},
|
||||
Job::DropChunk(management::DropChunk {
|
||||
|
|
@ -128,10 +122,12 @@ impl From<management::operation_metadata::Job> for Job {
|
|||
table_name,
|
||||
chunk_id,
|
||||
}) => Self::DropChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
chunk_id,
|
||||
chunk: ChunkAddr {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
table_name: Arc::from(table_name.as_str()),
|
||||
partition_key: Arc::from(partition_key.as_str()),
|
||||
chunk_id,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -347,20 +347,20 @@ async fn sql_select_from_system_chunk_columns() {
|
|||
async fn sql_select_from_system_operations() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let expected = vec![
|
||||
"+----+----------+---------------+----------------+---------------+----------+-------------------------------------+",
|
||||
"| id | status | took_cpu_time | took_wall_time | partition_key | chunk_id | description |",
|
||||
"+----+----------+---------------+----------------+---------------+----------+-------------------------------------+",
|
||||
"| 0 | Complete | true | true | 1970-01-01T00 | 0 | Loading chunk to ReadBuffer |",
|
||||
"| 1 | Complete | true | true | 1970-01-01T00 | | Persisting chunks to object storage |",
|
||||
"| 2 | Complete | true | true | 1970-01-01T00 | 2 | Writing chunk to Object Storage |",
|
||||
"+----+----------+---------------+----------------+---------------+----------+-------------------------------------+",
|
||||
"+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
|
||||
"| id | status | took_cpu_time | took_wall_time | table_name | partition_key | chunk_ids | description |",
|
||||
"+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
|
||||
"| 0 | Success | true | true | h2o | 1970-01-01T00 | 0 | Compacting chunk to ReadBuffer |",
|
||||
"| 1 | Success | true | true | h2o | 1970-01-01T00 | 0, 1 | Persisting chunks to object storage |",
|
||||
"| 2 | Success | true | true | h2o | 1970-01-01T00 | 2 | Writing chunk to Object Storage |",
|
||||
"+----+---------+---------------+----------------+------------+---------------+-----------+-------------------------------------+",
|
||||
];
|
||||
|
||||
// Check that the cpu time used reported is greater than zero as it isn't
|
||||
// repeatable
|
||||
run_sql_test_case!(
|
||||
TwoMeasurementsManyFieldsLifecycle {},
|
||||
"SELECT id, status, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, partition_key, chunk_id, description from system.operations",
|
||||
"SELECT id, status, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, table_name, partition_key, chunk_ids, description from system.operations",
|
||||
&expected
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,9 +37,7 @@ pub(crate) fn compact_chunks(
|
|||
let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect();
|
||||
|
||||
let (tracker, registration) = db.jobs.register(Job::CompactChunks {
|
||||
db_name: partition.db_name().to_string(),
|
||||
partition_key: partition.key().to_string(),
|
||||
table_name: table_name.clone(),
|
||||
partition: partition.addr().clone(),
|
||||
chunks: chunk_ids.clone(),
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -31,10 +31,7 @@ pub fn drop_chunk(
|
|||
let chunk_id = guard.id();
|
||||
|
||||
let (tracker, registration) = db.jobs.register(Job::DropChunk {
|
||||
db_name: partition.db_name().to_string(),
|
||||
partition_key: partition_key.clone(),
|
||||
table_name: table_name.clone(),
|
||||
chunk_id,
|
||||
chunk: guard.addr().clone(),
|
||||
});
|
||||
|
||||
guard.set_dropping(®istration)?;
|
||||
|
|
|
|||
|
|
@ -23,12 +23,8 @@ pub fn move_chunk_to_read_buffer(
|
|||
)> {
|
||||
let db = Arc::clone(&guard.data().db);
|
||||
let addr = guard.addr().clone();
|
||||
// TODO: Use ChunkAddr within Job
|
||||
let (tracker, registration) = db.jobs.register(Job::CloseChunk {
|
||||
db_name: addr.db_name.to_string(),
|
||||
partition_key: addr.partition_key.to_string(),
|
||||
table_name: addr.table_name.to_string(),
|
||||
chunk_id: addr.chunk_id,
|
||||
let (tracker, registration) = db.jobs.register(Job::CompactChunk {
|
||||
chunk: addr.clone(),
|
||||
});
|
||||
|
||||
// update the catalog to say we are processing this chunk and
|
||||
|
|
|
|||
|
|
@ -43,9 +43,7 @@ pub fn persist_chunks(
|
|||
let flush_timestamp = max_persistable_timestamp.timestamp_nanos();
|
||||
|
||||
let (tracker, registration) = db.jobs.register(Job::PersistChunks {
|
||||
db_name: partition.db_name().to_string(),
|
||||
partition_key: partition.key().to_string(),
|
||||
table_name: table_name.clone(),
|
||||
partition: partition.addr().clone(),
|
||||
chunks: chunk_ids.clone(),
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -54,12 +54,8 @@ pub(super) fn write_chunk_to_object_store(
|
|||
let db = Arc::clone(&chunk.data().db);
|
||||
let addr = chunk.addr().clone();
|
||||
|
||||
// TODO: Use ChunkAddr within Job
|
||||
let (tracker, registration) = db.jobs.register(Job::WriteChunk {
|
||||
db_name: addr.db_name.to_string(),
|
||||
partition_key: addr.partition_key.to_string(),
|
||||
table_name: addr.table_name.to_string(),
|
||||
chunk_id: addr.chunk_id,
|
||||
chunk: addr.clone(),
|
||||
});
|
||||
|
||||
// update the catalog to say we are processing this chunk and
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray, UInt32Array};
|
||||
use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray};
|
||||
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
|
||||
use arrow::error::Result;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use itertools::Itertools;
|
||||
|
||||
use data_types::error::ErrorLogger;
|
||||
use data_types::job::Job;
|
||||
|
|
@ -45,11 +46,12 @@ fn operations_schema() -> SchemaRef {
|
|||
let ts = DataType::Time64(TimeUnit::Nanosecond);
|
||||
Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Utf8, false),
|
||||
Field::new("status", DataType::Utf8, true),
|
||||
Field::new("status", DataType::Utf8, false),
|
||||
Field::new("cpu_time_used", ts.clone(), true),
|
||||
Field::new("wall_time_used", ts, true),
|
||||
Field::new("table_name", DataType::Utf8, true),
|
||||
Field::new("partition_key", DataType::Utf8, true),
|
||||
Field::new("chunk_id", DataType::UInt32, true),
|
||||
Field::new("chunk_ids", DataType::Utf8, true),
|
||||
Field::new("description", DataType::Utf8, true),
|
||||
]))
|
||||
}
|
||||
|
|
@ -61,7 +63,12 @@ fn from_task_trackers(
|
|||
) -> Result<RecordBatch> {
|
||||
let jobs = jobs
|
||||
.into_iter()
|
||||
.filter(|job| job.metadata().db_name() == Some(db_name))
|
||||
.filter(|job| {
|
||||
job.metadata()
|
||||
.db_name()
|
||||
.map(|x| x.as_ref() == db_name)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let ids = jobs
|
||||
|
|
@ -70,7 +77,13 @@ fn from_task_trackers(
|
|||
.collect::<StringArray>();
|
||||
let statuses = jobs
|
||||
.iter()
|
||||
.map(|job| Some(job.get_status().name()))
|
||||
.map(|job| {
|
||||
let status = job.get_status();
|
||||
match status.result() {
|
||||
Some(result) => Some(result.name()),
|
||||
None => Some(status.name()),
|
||||
}
|
||||
})
|
||||
.collect::<StringArray>();
|
||||
let cpu_time_used = jobs
|
||||
.iter()
|
||||
|
|
@ -84,10 +97,18 @@ fn from_task_trackers(
|
|||
.iter()
|
||||
.map(|job| job.metadata().partition_key())
|
||||
.collect::<StringArray>();
|
||||
let table_names = jobs
|
||||
.iter()
|
||||
.map(|job| job.metadata().table_name())
|
||||
.collect::<StringArray>();
|
||||
let chunk_ids = jobs
|
||||
.iter()
|
||||
.map(|job| job.metadata().chunk_id())
|
||||
.collect::<UInt32Array>();
|
||||
.map(|job| {
|
||||
job.metadata()
|
||||
.chunk_ids()
|
||||
.map(|ids| ids.into_iter().join(", "))
|
||||
})
|
||||
.collect::<StringArray>();
|
||||
let descriptions = jobs
|
||||
.iter()
|
||||
.map(|job| Some(job.metadata().description()))
|
||||
|
|
@ -100,6 +121,7 @@ fn from_task_trackers(
|
|||
Arc::new(statuses),
|
||||
Arc::new(cpu_time_used),
|
||||
Arc::new(wall_time_used),
|
||||
Arc::new(table_names),
|
||||
Arc::new(partition_keys),
|
||||
Arc::new(chunk_ids),
|
||||
Arc::new(descriptions),
|
||||
|
|
|
|||
|
|
@ -1091,7 +1091,7 @@ where
|
|||
};
|
||||
|
||||
let (tracker, registration) = self.jobs.register(Job::WipePreservedCatalog {
|
||||
db_name: db_name.to_string(),
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
});
|
||||
|
||||
let state = Arc::clone(&self.stage);
|
||||
|
|
@ -1341,6 +1341,7 @@ mod tests {
|
|||
use arrow_util::assert_batches_eq;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use data_types::chunk_metadata::ChunkAddr;
|
||||
use data_types::database_rules::{
|
||||
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG,
|
||||
};
|
||||
|
|
@ -1827,19 +1828,24 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// start the close (note this is not an async)
|
||||
let partition_key = "";
|
||||
let table_name = "cpu";
|
||||
let db_name_string = db_name.to_string();
|
||||
let chunk_addr = ChunkAddr {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
table_name: Arc::from("cpu"),
|
||||
partition_key: Arc::from(""),
|
||||
chunk_id: 0,
|
||||
};
|
||||
let tracker = server
|
||||
.close_chunk(db_name, table_name, partition_key, 0)
|
||||
.close_chunk(
|
||||
db_name,
|
||||
chunk_addr.table_name.as_ref(),
|
||||
chunk_addr.partition_key.as_ref(),
|
||||
chunk_addr.chunk_id,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let metadata = tracker.metadata();
|
||||
let expected_metadata = Job::CloseChunk {
|
||||
db_name: db_name_string,
|
||||
partition_key: partition_key.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
chunk_id: 0,
|
||||
let expected_metadata = Job::CompactChunk {
|
||||
chunk: chunk_addr.clone(),
|
||||
};
|
||||
assert_eq!(metadata, &expected_metadata);
|
||||
|
||||
|
|
@ -2275,7 +2281,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let metadata = tracker.metadata();
|
||||
let expected_metadata = Job::WipePreservedCatalog {
|
||||
db_name: db_name_non_existing.to_string(),
|
||||
db_name: Arc::from(db_name_non_existing.as_str()),
|
||||
};
|
||||
assert_eq!(metadata, &expected_metadata);
|
||||
tracker.join().await;
|
||||
|
|
@ -2296,7 +2302,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let metadata = tracker.metadata();
|
||||
let expected_metadata = Job::WipePreservedCatalog {
|
||||
db_name: db_name_rules_broken.to_string(),
|
||||
db_name: Arc::from(db_name_rules_broken.as_str()),
|
||||
};
|
||||
assert_eq!(metadata, &expected_metadata);
|
||||
tracker.join().await;
|
||||
|
|
@ -2317,7 +2323,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let metadata = tracker.metadata();
|
||||
let expected_metadata = Job::WipePreservedCatalog {
|
||||
db_name: db_name_catalog_broken.to_string(),
|
||||
db_name: Arc::from(db_name_catalog_broken.as_str()),
|
||||
};
|
||||
assert_eq!(metadata, &expected_metadata);
|
||||
tracker.join().await;
|
||||
|
|
|
|||
|
|
@ -1,9 +1,13 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use assert_cmd::Command;
|
||||
use predicates::prelude::*;
|
||||
|
||||
use data_types::chunk_metadata::ChunkAddr;
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkStorage,
|
||||
job::{Job, Operation},
|
||||
};
|
||||
use predicates::prelude::*;
|
||||
use test_helpers::make_temp_file;
|
||||
|
||||
use crate::{
|
||||
|
|
@ -571,11 +575,13 @@ async fn test_close_partition_chunk() {
|
|||
)
|
||||
.expect("Expected JSON output");
|
||||
|
||||
let expected_job = Job::CloseChunk {
|
||||
db_name,
|
||||
partition_key: "cpu".into(),
|
||||
table_name: "cpu".into(),
|
||||
chunk_id: 0,
|
||||
let expected_job = Job::CompactChunk {
|
||||
chunk: ChunkAddr {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
table_name: Arc::from("cpu"),
|
||||
partition_key: Arc::from("cpu"),
|
||||
chunk_id: 0,
|
||||
},
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
|
|
@ -630,7 +636,9 @@ async fn test_wipe_persisted_catalog() {
|
|||
)
|
||||
.expect("Expected JSON output");
|
||||
|
||||
let expected_job = Job::WipePreservedCatalog { db_name };
|
||||
let expected_job = Job::WipePreservedCatalog {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
Some(expected_job),
|
||||
|
|
|
|||
|
|
@ -321,24 +321,24 @@ async fn test_sql_observer_operations() {
|
|||
.expect("failed to wait operation");
|
||||
|
||||
let expected_output = r#"
|
||||
+---------------+----------+-----------------------------+
|
||||
| partition_key | chunk_id | description |
|
||||
+---------------+----------+-----------------------------+
|
||||
| cpu | 0 | Loading chunk to ReadBuffer |
|
||||
+---------------+----------+-----------------------------+
|
||||
+------------+---------------+-----------+--------------------------------+
|
||||
| table_name | partition_key | chunk_ids | description |
|
||||
+------------+---------------+-----------+--------------------------------+
|
||||
| cpu | cpu | 0 | Compacting chunk to ReadBuffer |
|
||||
+------------+---------------+-----------+--------------------------------+
|
||||
"#
|
||||
.trim();
|
||||
|
||||
let query = format!(
|
||||
r#"
|
||||
select
|
||||
partition_key, chunk_id, description
|
||||
table_name, partition_key, chunk_ids, description
|
||||
from
|
||||
operations
|
||||
where
|
||||
database_name = '{}'
|
||||
order by
|
||||
partition_key, chunk_id, description
|
||||
table_name, partition_key, chunk_ids, description
|
||||
"#,
|
||||
db_name
|
||||
);
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ async fn test_operations() {
|
|||
.expect("failed to wait operation");
|
||||
|
||||
let mut client = fixture.flight_client();
|
||||
let sql_query = "select chunk_id, status, description from system.operations";
|
||||
let sql_query = "select chunk_ids, status, description from system.operations";
|
||||
|
||||
let query_results = client.perform_query(&db_name1, sql_query).await.unwrap();
|
||||
|
||||
|
|
@ -48,11 +48,11 @@ async fn test_operations() {
|
|||
// parameterize on db_name1
|
||||
|
||||
let expected_read_data = vec![
|
||||
"+----------+----------+-----------------------------+",
|
||||
"| chunk_id | status | description |",
|
||||
"+----------+----------+-----------------------------+",
|
||||
"| 0 | Complete | Loading chunk to ReadBuffer |",
|
||||
"+----------+----------+-----------------------------+",
|
||||
"+-----------+---------+--------------------------------+",
|
||||
"| chunk_ids | status | description |",
|
||||
"+-----------+---------+--------------------------------+",
|
||||
"| 0 | Success | Compacting chunk to ReadBuffer |",
|
||||
"+-----------+---------+--------------------------------+",
|
||||
];
|
||||
|
||||
assert_batches_eq!(expected_read_data, &batches);
|
||||
|
|
|
|||
|
|
@ -131,6 +131,18 @@ pub enum TaskResult {
|
|||
Error,
|
||||
}
|
||||
|
||||
impl TaskResult {
|
||||
/// return a human readable name for this result
|
||||
pub fn name(&self) -> &'static str {
|
||||
match self {
|
||||
TaskResult::Success => "Success",
|
||||
TaskResult::Cancelled => "Cancelled",
|
||||
TaskResult::Dropped => "Dropped",
|
||||
TaskResult::Error => "Error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The status of the tracked task
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum TaskStatus {
|
||||
|
|
|
|||
Loading…
Reference in New Issue