diff --git a/data_types/src/chunk_metadata.rs b/data_types/src/chunk_metadata.rs index e3755598a2..7835b99d64 100644 --- a/data_types/src/chunk_metadata.rs +++ b/data_types/src/chunk_metadata.rs @@ -63,7 +63,7 @@ impl ChunkStorage { } /// Any lifecycle action currently in progress for this chunk -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub enum ChunkLifecycleAction { /// Chunk is in the process of being moved to the read buffer Moving, @@ -107,6 +107,9 @@ pub struct ChunkSummary { /// How is this chunk stored? pub storage: ChunkStorage, + /// Is there any outstanding lifecycle action for this chunk? + pub lifecycle_action: Option, + /// The total estimated size of this chunk, in bytes pub estimated_bytes: usize, @@ -154,6 +157,7 @@ impl ChunkSummary { table_name: Arc, id: u32, storage: ChunkStorage, + lifecycle_action: Option, estimated_bytes: usize, row_count: usize, ) -> Self { @@ -162,6 +166,7 @@ impl ChunkSummary { table_name, id, storage, + lifecycle_action, estimated_bytes, row_count, time_of_first_write: None, diff --git a/generated_types/protos/influxdata/iox/management/v1/chunk.proto b/generated_types/protos/influxdata/iox/management/v1/chunk.proto index 7d342da57d..3b96b06277 100644 --- a/generated_types/protos/influxdata/iox/management/v1/chunk.proto +++ b/generated_types/protos/influxdata/iox/management/v1/chunk.proto @@ -24,6 +24,23 @@ enum ChunkStorage { CHUNK_STORAGE_OBJECT_STORE_ONLY = 5; } + +// Is there any lifecycle action currently outstanding for this chunk? +enum ChunkLifecycleAction { + // No lifecycle + CHUNK_LIFECYCLE_ACTION_UNSPECIFIED = 0; + + // Chunk is in the process of being moved to the read buffer + CHUNK_LIFECYCLE_ACTION_MOVING = 1; + + /// Chunk is in the process of being written to object storage + CHUNK_LIFECYCLE_ACTION_PERSISTING = 2; + + /// Chunk is in the process of being compacted + CHUNK_LIFECYCLE_ACTION_COMPACTING = 3; +} + + // `Chunk` represents part of a partition of data in a database. // A chunk can contain one or more tables. message Chunk { @@ -39,6 +56,9 @@ message Chunk { // Which storage system the chunk is located in ChunkStorage storage = 3; + // Is there any outstanding lifecycle action for this chunk? + ChunkLifecycleAction lifecycle_action = 10; + // The total estimated size of this chunk, in bytes uint64 estimated_bytes = 4; diff --git a/generated_types/src/chunk.rs b/generated_types/src/chunk.rs index 78b5150e6f..4a858045ef 100644 --- a/generated_types/src/chunk.rs +++ b/generated_types/src/chunk.rs @@ -1,6 +1,6 @@ use crate::google::{FieldViolation, FromField}; use crate::influxdata::iox::management::v1 as management; -use data_types::chunk_metadata::{ChunkStorage, ChunkSummary}; +use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage, ChunkSummary}; use std::convert::{TryFrom, TryInto}; use std::sync::Arc; @@ -12,6 +12,7 @@ impl From for management::Chunk { table_name, id, storage, + lifecycle_action, estimated_bytes, row_count, time_of_first_write, @@ -21,6 +22,8 @@ impl From for management::Chunk { let storage: management::ChunkStorage = storage.into(); let storage = storage.into(); // convert to i32 + let lifecycle_action: management::ChunkLifecycleAction = lifecycle_action.into(); + let lifecycle_action = lifecycle_action.into(); // convert to i32 let estimated_bytes = estimated_bytes as u64; let row_count = row_count as u64; @@ -37,6 +40,7 @@ impl From for management::Chunk { table_name, id, storage, + lifecycle_action, estimated_bytes, row_count, time_of_first_write, @@ -58,6 +62,17 @@ impl From for management::ChunkStorage { } } +impl From> for management::ChunkLifecycleAction { + fn from(lifecycle_action: Option) -> Self { + match lifecycle_action { + Some(ChunkLifecycleAction::Moving) => Self::Moving, + Some(ChunkLifecycleAction::Persisting) => Self::Persisting, + Some(ChunkLifecycleAction::Compacting) => Self::Compacting, + None => Self::Unspecified, + } + } +} + /// Conversion code from management API chunk structure impl TryFrom for ChunkSummary { type Error = FieldViolation; @@ -65,6 +80,7 @@ impl TryFrom for ChunkSummary { fn try_from(proto: management::Chunk) -> Result { // Use prost enum conversion let storage = proto.storage().scope("storage")?; + let lifecycle_action = proto.lifecycle_action().scope("lifecycle_action")?; let time_of_first_write = proto .time_of_first_write @@ -112,6 +128,7 @@ impl TryFrom for ChunkSummary { table_name, id, storage, + lifecycle_action, estimated_bytes, row_count, time_of_first_write, @@ -138,6 +155,23 @@ impl TryFrom for ChunkStorage { } } +impl TryFrom for Option { + type Error = FieldViolation; + + fn try_from(proto: management::ChunkLifecycleAction) -> Result { + match proto { + management::ChunkLifecycleAction::Moving => Ok(Some(ChunkLifecycleAction::Moving)), + management::ChunkLifecycleAction::Persisting => { + Ok(Some(ChunkLifecycleAction::Persisting)) + } + management::ChunkLifecycleAction::Compacting => { + Ok(Some(ChunkLifecycleAction::Compacting)) + } + management::ChunkLifecycleAction::Unspecified => Ok(None), + } + } +} + #[cfg(test)] mod test { use super::*; @@ -151,6 +185,7 @@ mod test { estimated_bytes: 1234, row_count: 321, storage: management::ChunkStorage::ObjectStoreOnly.into(), + lifecycle_action: management::ChunkLifecycleAction::Moving.into(), time_of_first_write: None, time_of_last_write: None, time_closed: None, @@ -164,6 +199,7 @@ mod test { estimated_bytes: 1234, row_count: 321, storage: ChunkStorage::ObjectStoreOnly, + lifecycle_action: Some(ChunkLifecycleAction::Moving), time_of_first_write: None, time_of_last_write: None, time_closed: None, @@ -185,6 +221,7 @@ mod test { estimated_bytes: 1234, row_count: 321, storage: ChunkStorage::ObjectStoreOnly, + lifecycle_action: Some(ChunkLifecycleAction::Persisting), time_of_first_write: None, time_of_last_write: None, time_closed: None, @@ -199,6 +236,7 @@ mod test { estimated_bytes: 1234, row_count: 321, storage: management::ChunkStorage::ObjectStoreOnly.into(), + lifecycle_action: management::ChunkLifecycleAction::Persisting.into(), time_of_first_write: None, time_of_last_write: None, time_closed: None, diff --git a/server/src/db.rs b/server/src/db.rs index 0368661712..701e6f5ced 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2199,6 +2199,7 @@ mod tests { table_name, id, storage, + lifecycle_action, estimated_bytes, row_count, .. @@ -2208,6 +2209,7 @@ mod tests { table_name, id, storage, + lifecycle_action, estimated_bytes, row_count, ) @@ -2238,6 +2240,7 @@ mod tests { Arc::from("cpu"), 0, ChunkStorage::OpenMutableBuffer, + None, 70, 1, )]; @@ -2341,12 +2344,15 @@ mod tests { let chunk_summaries = db.chunk_summaries().expect("expected summary to return"); let chunk_summaries = normalize_summaries(chunk_summaries); + let lifecycle_action = None; + let expected = vec![ ChunkSummary::new_without_timestamps( Arc::from("1970-01-01T00"), Arc::from("cpu"), 0, ChunkStorage::ReadBufferAndObjectStore, + lifecycle_action, 2139, // size of RB and OS chunks 1, ), @@ -2355,6 +2361,7 @@ mod tests { Arc::from("cpu"), 1, ChunkStorage::OpenMutableBuffer, + lifecycle_action, 64, 1, ), @@ -2363,6 +2370,7 @@ mod tests { Arc::from("cpu"), 0, ChunkStorage::ClosedMutableBuffer, + lifecycle_action, 2190, 1, ), @@ -2371,6 +2379,7 @@ mod tests { Arc::from("cpu"), 1, ChunkStorage::OpenMutableBuffer, + lifecycle_action, 87, 1, ), diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index b2f03e37c2..6ad79289fe 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -419,11 +419,17 @@ impl CatalogChunk { pub fn summary(&self) -> ChunkSummary { let (row_count, storage) = self.storage(); + let lifecycle_action = self + .lifecycle_action + .as_ref() + .map(|tracker| *tracker.metadata()); + ChunkSummary { partition_key: Arc::clone(&self.addr.partition_key), table_name: Arc::clone(&self.addr.table_name), id: self.addr.chunk_id, storage, + lifecycle_action, estimated_bytes: self.size(), row_count, time_of_first_write: self.time_of_first_write, diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index b12dd09f84..733ecc56ae 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -196,6 +196,7 @@ fn chunk_summaries_schema() -> SchemaRef { Field::new("partition_key", DataType::Utf8, false), Field::new("table_name", DataType::Utf8, false), Field::new("storage", DataType::Utf8, false), + Field::new("lifecycle_action", DataType::Utf8, true), Field::new("estimated_bytes", DataType::UInt64, false), Field::new("row_count", DataType::UInt64, false), Field::new("time_of_first_write", ts.clone(), true), @@ -218,6 +219,10 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< .iter() .map(|c| Some(c.storage.as_str())) .collect::(); + let lifecycle_action = chunks + .iter() + .map(|c| c.lifecycle_action.map(|a| a.name())) + .collect::(); let estimated_bytes = chunks .iter() .map(|c| Some(c.estimated_bytes as u64)) @@ -245,10 +250,11 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< RecordBatch::try_new( schema, vec![ - Arc::new(id), // as ArrayRef, + Arc::new(id), Arc::new(partition_key), Arc::new(table_name), Arc::new(storage), + Arc::new(lifecycle_action), Arc::new(estimated_bytes), Arc::new(row_counts), Arc::new(time_of_first_write), @@ -597,7 +603,7 @@ mod tests { use arrow_util::assert_batches_eq; use chrono::NaiveDateTime; use data_types::{ - chunk_metadata::{ChunkColumnSummary, ChunkStorage}, + chunk_metadata::{ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, }; @@ -609,6 +615,7 @@ mod tests { table_name: Arc::from("table1"), id: 0, storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action: None, estimated_bytes: 23754, row_count: 11, time_of_first_write: Some(DateTime::from_utc( @@ -623,6 +630,7 @@ mod tests { table_name: Arc::from("table1"), id: 0, storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action: Some(ChunkLifecycleAction::Persisting), estimated_bytes: 23454, row_count: 22, time_of_first_write: None, @@ -635,12 +643,12 @@ mod tests { ]; let expected = vec![ - "+----+---------------+------------+-------------------+-----------------+-----------+---------------------+---------------------+-------------+", - "| id | partition_key | table_name | storage | estimated_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |", - "+----+---------------+------------+-------------------+-----------------+-----------+---------------------+---------------------+-------------+", - "| 0 | p1 | table1 | OpenMutableBuffer | 23754 | 11 | 1970-01-01 00:00:10 | | |", - "| 0 | p1 | table1 | OpenMutableBuffer | 23454 | 22 | | 1970-01-01 00:01:20 | |", - "+----+---------------+------------+-------------------+-----------------+-----------+---------------------+---------------------+-------------+", + "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", + "| id | partition_key | table_name | storage | lifecycle_action | estimated_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |", + "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", + "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | 11 | 1970-01-01 00:00:10 | | |", + "| 0 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23454 | 22 | | 1970-01-01 00:01:20 | |", + "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", ]; let schema = chunk_summaries_schema(); @@ -787,6 +795,8 @@ mod tests { #[test] fn test_assemble_chunk_columns() { + let lifecycle_action = None; + let summaries = vec![ ( Arc::new(TableSummary { @@ -814,6 +824,7 @@ mod tests { table_name: "t1".into(), id: 42, storage: ChunkStorage::ReadBuffer, + lifecycle_action, estimated_bytes: 23754, row_count: 11, time_of_first_write: None, @@ -847,6 +858,7 @@ mod tests { table_name: "t1".into(), id: 43, storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action, estimated_bytes: 23754, row_count: 11, time_of_first_write: None, @@ -874,6 +886,7 @@ mod tests { table_name: "t2".into(), id: 44, storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action, estimated_bytes: 23754, row_count: 11, time_of_first_write: None, diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 0f2b06f70f..0943afca1c 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -269,7 +269,9 @@ async fn test_create_get_update_database() { #[tokio::test] async fn test_chunk_get() { - use generated_types::influxdata::iox::management::v1::{Chunk, ChunkStorage}; + use generated_types::influxdata::iox::management::v1::{ + Chunk, ChunkLifecycleAction, ChunkStorage, + }; let fixture = ServerFixture::create_shared().await; let mut management_client = fixture.management_client(); @@ -308,12 +310,15 @@ async fn test_chunk_get() { let chunks = normalize_chunks(chunks); + let lifecycle_action = ChunkLifecycleAction::Unspecified.into(); + let expected: Vec = vec![ Chunk { partition_key: "cpu".into(), table_name: "cpu".into(), id: 0, - storage: ChunkStorage::OpenMutableBuffer as i32, + storage: ChunkStorage::OpenMutableBuffer.into(), + lifecycle_action, estimated_bytes: 100, row_count: 2, time_of_first_write: None, @@ -324,7 +329,8 @@ async fn test_chunk_get() { partition_key: "disk".into(), table_name: "disk".into(), id: 0, - storage: ChunkStorage::OpenMutableBuffer as i32, + storage: ChunkStorage::OpenMutableBuffer.into(), + lifecycle_action, estimated_bytes: 82, row_count: 1, time_of_first_write: None, @@ -492,7 +498,8 @@ async fn test_list_partition_chunks() { partition_key: "cpu".into(), table_name: "cpu".into(), id: 0, - storage: ChunkStorage::OpenMutableBuffer as i32, + storage: ChunkStorage::OpenMutableBuffer.into(), + lifecycle_action: ChunkLifecycleAction::Unspecified.into(), estimated_bytes: 100, row_count: 2, time_of_first_write: None, @@ -813,6 +820,7 @@ fn normalize_chunks(chunks: Vec) -> Vec { table_name, id, storage, + lifecycle_action, estimated_bytes, row_count, .. @@ -822,6 +830,7 @@ fn normalize_chunks(chunks: Vec) -> Vec { table_name, id, storage, + lifecycle_action, estimated_bytes, row_count, time_of_first_write: None,