feat: Add any lifecycle_action to system.chunks and API (#1947)

pull/24376/head
Andrew Lamb 2021-07-09 13:38:29 -04:00 committed by GitHub
parent 7af560aa99
commit 9534220035
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 114 additions and 14 deletions

View File

@ -63,7 +63,7 @@ impl ChunkStorage {
} }
/// Any lifecycle action currently in progress for this chunk /// Any lifecycle action currently in progress for this chunk
#[derive(Debug, Copy, Clone, PartialEq)] #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ChunkLifecycleAction { pub enum ChunkLifecycleAction {
/// Chunk is in the process of being moved to the read buffer /// Chunk is in the process of being moved to the read buffer
Moving, Moving,
@ -107,6 +107,9 @@ pub struct ChunkSummary {
/// How is this chunk stored? /// How is this chunk stored?
pub storage: ChunkStorage, pub storage: ChunkStorage,
/// Is there any outstanding lifecycle action for this chunk?
pub lifecycle_action: Option<ChunkLifecycleAction>,
/// The total estimated size of this chunk, in bytes /// The total estimated size of this chunk, in bytes
pub estimated_bytes: usize, pub estimated_bytes: usize,
@ -154,6 +157,7 @@ impl ChunkSummary {
table_name: Arc<str>, table_name: Arc<str>,
id: u32, id: u32,
storage: ChunkStorage, storage: ChunkStorage,
lifecycle_action: Option<ChunkLifecycleAction>,
estimated_bytes: usize, estimated_bytes: usize,
row_count: usize, row_count: usize,
) -> Self { ) -> Self {
@ -162,6 +166,7 @@ impl ChunkSummary {
table_name, table_name,
id, id,
storage, storage,
lifecycle_action,
estimated_bytes, estimated_bytes,
row_count, row_count,
time_of_first_write: None, time_of_first_write: None,

View File

@ -24,6 +24,23 @@ enum ChunkStorage {
CHUNK_STORAGE_OBJECT_STORE_ONLY = 5; CHUNK_STORAGE_OBJECT_STORE_ONLY = 5;
} }
// Is there any lifecycle action currently outstanding for this chunk?
enum ChunkLifecycleAction {
// No lifecycle
CHUNK_LIFECYCLE_ACTION_UNSPECIFIED = 0;
// Chunk is in the process of being moved to the read buffer
CHUNK_LIFECYCLE_ACTION_MOVING = 1;
/// Chunk is in the process of being written to object storage
CHUNK_LIFECYCLE_ACTION_PERSISTING = 2;
/// Chunk is in the process of being compacted
CHUNK_LIFECYCLE_ACTION_COMPACTING = 3;
}
// `Chunk` represents part of a partition of data in a database. // `Chunk` represents part of a partition of data in a database.
// A chunk can contain one or more tables. // A chunk can contain one or more tables.
message Chunk { message Chunk {
@ -39,6 +56,9 @@ message Chunk {
// Which storage system the chunk is located in // Which storage system the chunk is located in
ChunkStorage storage = 3; ChunkStorage storage = 3;
// Is there any outstanding lifecycle action for this chunk?
ChunkLifecycleAction lifecycle_action = 10;
// The total estimated size of this chunk, in bytes // The total estimated size of this chunk, in bytes
uint64 estimated_bytes = 4; uint64 estimated_bytes = 4;

View File

@ -1,6 +1,6 @@
use crate::google::{FieldViolation, FromField}; use crate::google::{FieldViolation, FromField};
use crate::influxdata::iox::management::v1 as management; use crate::influxdata::iox::management::v1 as management;
use data_types::chunk_metadata::{ChunkStorage, ChunkSummary}; use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage, ChunkSummary};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::sync::Arc; use std::sync::Arc;
@ -12,6 +12,7 @@ impl From<ChunkSummary> for management::Chunk {
table_name, table_name,
id, id,
storage, storage,
lifecycle_action,
estimated_bytes, estimated_bytes,
row_count, row_count,
time_of_first_write, time_of_first_write,
@ -21,6 +22,8 @@ impl From<ChunkSummary> for management::Chunk {
let storage: management::ChunkStorage = storage.into(); let storage: management::ChunkStorage = storage.into();
let storage = storage.into(); // convert to i32 let storage = storage.into(); // convert to i32
let lifecycle_action: management::ChunkLifecycleAction = lifecycle_action.into();
let lifecycle_action = lifecycle_action.into(); // convert to i32
let estimated_bytes = estimated_bytes as u64; let estimated_bytes = estimated_bytes as u64;
let row_count = row_count as u64; let row_count = row_count as u64;
@ -37,6 +40,7 @@ impl From<ChunkSummary> for management::Chunk {
table_name, table_name,
id, id,
storage, storage,
lifecycle_action,
estimated_bytes, estimated_bytes,
row_count, row_count,
time_of_first_write, time_of_first_write,
@ -58,6 +62,17 @@ impl From<ChunkStorage> for management::ChunkStorage {
} }
} }
impl From<Option<ChunkLifecycleAction>> for management::ChunkLifecycleAction {
fn from(lifecycle_action: Option<ChunkLifecycleAction>) -> Self {
match lifecycle_action {
Some(ChunkLifecycleAction::Moving) => Self::Moving,
Some(ChunkLifecycleAction::Persisting) => Self::Persisting,
Some(ChunkLifecycleAction::Compacting) => Self::Compacting,
None => Self::Unspecified,
}
}
}
/// Conversion code from management API chunk structure /// Conversion code from management API chunk structure
impl TryFrom<management::Chunk> for ChunkSummary { impl TryFrom<management::Chunk> for ChunkSummary {
type Error = FieldViolation; type Error = FieldViolation;
@ -65,6 +80,7 @@ impl TryFrom<management::Chunk> for ChunkSummary {
fn try_from(proto: management::Chunk) -> Result<Self, Self::Error> { fn try_from(proto: management::Chunk) -> Result<Self, Self::Error> {
// Use prost enum conversion // Use prost enum conversion
let storage = proto.storage().scope("storage")?; let storage = proto.storage().scope("storage")?;
let lifecycle_action = proto.lifecycle_action().scope("lifecycle_action")?;
let time_of_first_write = proto let time_of_first_write = proto
.time_of_first_write .time_of_first_write
@ -112,6 +128,7 @@ impl TryFrom<management::Chunk> for ChunkSummary {
table_name, table_name,
id, id,
storage, storage,
lifecycle_action,
estimated_bytes, estimated_bytes,
row_count, row_count,
time_of_first_write, time_of_first_write,
@ -138,6 +155,23 @@ impl TryFrom<management::ChunkStorage> for ChunkStorage {
} }
} }
impl TryFrom<management::ChunkLifecycleAction> for Option<ChunkLifecycleAction> {
type Error = FieldViolation;
fn try_from(proto: management::ChunkLifecycleAction) -> Result<Self, Self::Error> {
match proto {
management::ChunkLifecycleAction::Moving => Ok(Some(ChunkLifecycleAction::Moving)),
management::ChunkLifecycleAction::Persisting => {
Ok(Some(ChunkLifecycleAction::Persisting))
}
management::ChunkLifecycleAction::Compacting => {
Ok(Some(ChunkLifecycleAction::Compacting))
}
management::ChunkLifecycleAction::Unspecified => Ok(None),
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@ -151,6 +185,7 @@ mod test {
estimated_bytes: 1234, estimated_bytes: 1234,
row_count: 321, row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(), storage: management::ChunkStorage::ObjectStoreOnly.into(),
lifecycle_action: management::ChunkLifecycleAction::Moving.into(),
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
time_closed: None, time_closed: None,
@ -164,6 +199,7 @@ mod test {
estimated_bytes: 1234, estimated_bytes: 1234,
row_count: 321, row_count: 321,
storage: ChunkStorage::ObjectStoreOnly, storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: Some(ChunkLifecycleAction::Moving),
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
time_closed: None, time_closed: None,
@ -185,6 +221,7 @@ mod test {
estimated_bytes: 1234, estimated_bytes: 1234,
row_count: 321, row_count: 321,
storage: ChunkStorage::ObjectStoreOnly, storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: Some(ChunkLifecycleAction::Persisting),
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
time_closed: None, time_closed: None,
@ -199,6 +236,7 @@ mod test {
estimated_bytes: 1234, estimated_bytes: 1234,
row_count: 321, row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(), storage: management::ChunkStorage::ObjectStoreOnly.into(),
lifecycle_action: management::ChunkLifecycleAction::Persisting.into(),
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
time_closed: None, time_closed: None,

View File

@ -2199,6 +2199,7 @@ mod tests {
table_name, table_name,
id, id,
storage, storage,
lifecycle_action,
estimated_bytes, estimated_bytes,
row_count, row_count,
.. ..
@ -2208,6 +2209,7 @@ mod tests {
table_name, table_name,
id, id,
storage, storage,
lifecycle_action,
estimated_bytes, estimated_bytes,
row_count, row_count,
) )
@ -2238,6 +2240,7 @@ mod tests {
Arc::from("cpu"), Arc::from("cpu"),
0, 0,
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
None,
70, 70,
1, 1,
)]; )];
@ -2341,12 +2344,15 @@ mod tests {
let chunk_summaries = db.chunk_summaries().expect("expected summary to return"); let chunk_summaries = db.chunk_summaries().expect("expected summary to return");
let chunk_summaries = normalize_summaries(chunk_summaries); let chunk_summaries = normalize_summaries(chunk_summaries);
let lifecycle_action = None;
let expected = vec![ let expected = vec![
ChunkSummary::new_without_timestamps( ChunkSummary::new_without_timestamps(
Arc::from("1970-01-01T00"), Arc::from("1970-01-01T00"),
Arc::from("cpu"), Arc::from("cpu"),
0, 0,
ChunkStorage::ReadBufferAndObjectStore, ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
2139, // size of RB and OS chunks 2139, // size of RB and OS chunks
1, 1,
), ),
@ -2355,6 +2361,7 @@ mod tests {
Arc::from("cpu"), Arc::from("cpu"),
1, 1,
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
lifecycle_action,
64, 64,
1, 1,
), ),
@ -2363,6 +2370,7 @@ mod tests {
Arc::from("cpu"), Arc::from("cpu"),
0, 0,
ChunkStorage::ClosedMutableBuffer, ChunkStorage::ClosedMutableBuffer,
lifecycle_action,
2190, 2190,
1, 1,
), ),
@ -2371,6 +2379,7 @@ mod tests {
Arc::from("cpu"), Arc::from("cpu"),
1, 1,
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
lifecycle_action,
87, 87,
1, 1,
), ),

View File

@ -419,11 +419,17 @@ impl CatalogChunk {
pub fn summary(&self) -> ChunkSummary { pub fn summary(&self) -> ChunkSummary {
let (row_count, storage) = self.storage(); let (row_count, storage) = self.storage();
let lifecycle_action = self
.lifecycle_action
.as_ref()
.map(|tracker| *tracker.metadata());
ChunkSummary { ChunkSummary {
partition_key: Arc::clone(&self.addr.partition_key), partition_key: Arc::clone(&self.addr.partition_key),
table_name: Arc::clone(&self.addr.table_name), table_name: Arc::clone(&self.addr.table_name),
id: self.addr.chunk_id, id: self.addr.chunk_id,
storage, storage,
lifecycle_action,
estimated_bytes: self.size(), estimated_bytes: self.size(),
row_count, row_count,
time_of_first_write: self.time_of_first_write, time_of_first_write: self.time_of_first_write,

View File

@ -196,6 +196,7 @@ fn chunk_summaries_schema() -> SchemaRef {
Field::new("partition_key", DataType::Utf8, false), Field::new("partition_key", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false), Field::new("table_name", DataType::Utf8, false),
Field::new("storage", DataType::Utf8, false), Field::new("storage", DataType::Utf8, false),
Field::new("lifecycle_action", DataType::Utf8, true),
Field::new("estimated_bytes", DataType::UInt64, false), Field::new("estimated_bytes", DataType::UInt64, false),
Field::new("row_count", DataType::UInt64, false), Field::new("row_count", DataType::UInt64, false),
Field::new("time_of_first_write", ts.clone(), true), Field::new("time_of_first_write", ts.clone(), true),
@ -218,6 +219,10 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
.iter() .iter()
.map(|c| Some(c.storage.as_str())) .map(|c| Some(c.storage.as_str()))
.collect::<StringArray>(); .collect::<StringArray>();
let lifecycle_action = chunks
.iter()
.map(|c| c.lifecycle_action.map(|a| a.name()))
.collect::<StringArray>();
let estimated_bytes = chunks let estimated_bytes = chunks
.iter() .iter()
.map(|c| Some(c.estimated_bytes as u64)) .map(|c| Some(c.estimated_bytes as u64))
@ -245,10 +250,11 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
RecordBatch::try_new( RecordBatch::try_new(
schema, schema,
vec![ vec![
Arc::new(id), // as ArrayRef, Arc::new(id),
Arc::new(partition_key), Arc::new(partition_key),
Arc::new(table_name), Arc::new(table_name),
Arc::new(storage), Arc::new(storage),
Arc::new(lifecycle_action),
Arc::new(estimated_bytes), Arc::new(estimated_bytes),
Arc::new(row_counts), Arc::new(row_counts),
Arc::new(time_of_first_write), Arc::new(time_of_first_write),
@ -597,7 +603,7 @@ mod tests {
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use data_types::{ use data_types::{
chunk_metadata::{ChunkColumnSummary, ChunkStorage}, chunk_metadata::{ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
}; };
@ -609,6 +615,7 @@ mod tests {
table_name: Arc::from("table1"), table_name: Arc::from("table1"),
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: None,
estimated_bytes: 23754, estimated_bytes: 23754,
row_count: 11, row_count: 11,
time_of_first_write: Some(DateTime::from_utc( time_of_first_write: Some(DateTime::from_utc(
@ -623,6 +630,7 @@ mod tests {
table_name: Arc::from("table1"), table_name: Arc::from("table1"),
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: Some(ChunkLifecycleAction::Persisting),
estimated_bytes: 23454, estimated_bytes: 23454,
row_count: 22, row_count: 22,
time_of_first_write: None, time_of_first_write: None,
@ -635,12 +643,12 @@ mod tests {
]; ];
let expected = vec![ let expected = vec![
"+----+---------------+------------+-------------------+-----------------+-----------+---------------------+---------------------+-------------+", "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+",
"| id | partition_key | table_name | storage | estimated_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |", "| id | partition_key | table_name | storage | lifecycle_action | estimated_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |",
"+----+---------------+------------+-------------------+-----------------+-----------+---------------------+---------------------+-------------+", "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+",
"| 0 | p1 | table1 | OpenMutableBuffer | 23754 | 11 | 1970-01-01 00:00:10 | | |", "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | 11 | 1970-01-01 00:00:10 | | |",
"| 0 | p1 | table1 | OpenMutableBuffer | 23454 | 22 | | 1970-01-01 00:01:20 | |", "| 0 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23454 | 22 | | 1970-01-01 00:01:20 | |",
"+----+---------------+------------+-------------------+-----------------+-----------+---------------------+---------------------+-------------+", "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+",
]; ];
let schema = chunk_summaries_schema(); let schema = chunk_summaries_schema();
@ -787,6 +795,8 @@ mod tests {
#[test] #[test]
fn test_assemble_chunk_columns() { fn test_assemble_chunk_columns() {
let lifecycle_action = None;
let summaries = vec![ let summaries = vec![
( (
Arc::new(TableSummary { Arc::new(TableSummary {
@ -814,6 +824,7 @@ mod tests {
table_name: "t1".into(), table_name: "t1".into(),
id: 42, id: 42,
storage: ChunkStorage::ReadBuffer, storage: ChunkStorage::ReadBuffer,
lifecycle_action,
estimated_bytes: 23754, estimated_bytes: 23754,
row_count: 11, row_count: 11,
time_of_first_write: None, time_of_first_write: None,
@ -847,6 +858,7 @@ mod tests {
table_name: "t1".into(), table_name: "t1".into(),
id: 43, id: 43,
storage: ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
estimated_bytes: 23754, estimated_bytes: 23754,
row_count: 11, row_count: 11,
time_of_first_write: None, time_of_first_write: None,
@ -874,6 +886,7 @@ mod tests {
table_name: "t2".into(), table_name: "t2".into(),
id: 44, id: 44,
storage: ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
estimated_bytes: 23754, estimated_bytes: 23754,
row_count: 11, row_count: 11,
time_of_first_write: None, time_of_first_write: None,

View File

@ -269,7 +269,9 @@ async fn test_create_get_update_database() {
#[tokio::test] #[tokio::test]
async fn test_chunk_get() { async fn test_chunk_get() {
use generated_types::influxdata::iox::management::v1::{Chunk, ChunkStorage}; use generated_types::influxdata::iox::management::v1::{
Chunk, ChunkLifecycleAction, ChunkStorage,
};
let fixture = ServerFixture::create_shared().await; let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client(); let mut management_client = fixture.management_client();
@ -308,12 +310,15 @@ async fn test_chunk_get() {
let chunks = normalize_chunks(chunks); let chunks = normalize_chunks(chunks);
let lifecycle_action = ChunkLifecycleAction::Unspecified.into();
let expected: Vec<Chunk> = vec![ let expected: Vec<Chunk> = vec![
Chunk { Chunk {
partition_key: "cpu".into(), partition_key: "cpu".into(),
table_name: "cpu".into(), table_name: "cpu".into(),
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32, storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action,
estimated_bytes: 100, estimated_bytes: 100,
row_count: 2, row_count: 2,
time_of_first_write: None, time_of_first_write: None,
@ -324,7 +329,8 @@ async fn test_chunk_get() {
partition_key: "disk".into(), partition_key: "disk".into(),
table_name: "disk".into(), table_name: "disk".into(),
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32, storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action,
estimated_bytes: 82, estimated_bytes: 82,
row_count: 1, row_count: 1,
time_of_first_write: None, time_of_first_write: None,
@ -492,7 +498,8 @@ async fn test_list_partition_chunks() {
partition_key: "cpu".into(), partition_key: "cpu".into(),
table_name: "cpu".into(), table_name: "cpu".into(),
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32, storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action: ChunkLifecycleAction::Unspecified.into(),
estimated_bytes: 100, estimated_bytes: 100,
row_count: 2, row_count: 2,
time_of_first_write: None, time_of_first_write: None,
@ -813,6 +820,7 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
table_name, table_name,
id, id,
storage, storage,
lifecycle_action,
estimated_bytes, estimated_bytes,
row_count, row_count,
.. ..
@ -822,6 +830,7 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
table_name, table_name,
id, id,
storage, storage,
lifecycle_action,
estimated_bytes, estimated_bytes,
row_count, row_count,
time_of_first_write: None, time_of_first_write: None,