diff --git a/server/src/db.rs b/server/src/db.rs index 92d6979fe4..ea882bb232 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -470,7 +470,7 @@ impl Db { })? { let mut chunk = chunk.write(); - chunk.set_closed().context(RollingOverPartition { + chunk.freeze().context(RollingOverPartition { partition_key, table_name, })?; @@ -1091,7 +1091,7 @@ fn check_chunk_closed(chunk: &mut CatalogChunk, mutable_size_threshold: Option threshold.get() { - chunk.set_closed().expect("cannot close open chunk"); + chunk.freeze().expect("cannot close open chunk"); } } } @@ -1299,9 +1299,7 @@ mod tests { test_helpers::{try_write_lp, write_lp}, *, }; - use crate::db::catalog::chunk::{ - ChunkStage, ChunkStageFrozen, ChunkStageFrozenRepr, ChunkStagePersisted, - }; + use crate::db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}; use crate::query_tests::utils::{make_db, TestDb}; use ::test_helpers::assert_contains; use arrow::record_batch::RecordBatch; @@ -2095,17 +2093,17 @@ mod tests { assert_eq!(chunks.len(), 2); assert!(matches!( chunks[0].read().stage(), - ChunkStage::Frozen(ChunkStageFrozen { - meta: _, - representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_) - }) + ChunkStage::Frozen { + representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_), + .. + } )); assert!(matches!( chunks[1].read().stage(), - ChunkStage::Frozen(ChunkStageFrozen { - meta: _, - representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_) - }) + ChunkStage::Frozen { + representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_), + .. + } )); } @@ -2902,8 +2900,8 @@ mod tests { partition.chunk(table_name, *chunk_id).unwrap() }; let chunk = chunk.read(); - if let ChunkStage::Persisted(stage) = chunk.stage() { - paths_expected.push(stage.parquet.table_path().display()); + if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { + paths_expected.push(parquet.table_path().display()); } else { panic!("Wrong chunk state."); } @@ -2953,11 +2951,10 @@ mod tests { let chunk = chunk.read(); assert!(matches!( chunk.stage(), - ChunkStage::Persisted(ChunkStagePersisted { - parquet: _, + ChunkStage::Persisted { read_buffer: None, - meta: _, - }) + .. + } )); } @@ -2995,8 +2992,8 @@ mod tests { partition.chunk(table_name.clone(), chunk_id).unwrap() }; let chunk = chunk.read(); - if let ChunkStage::Persisted(stage) = chunk.stage() { - paths_keep.push(stage.parquet.table_path()); + if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { + paths_keep.push(parquet.table_path()); } else { panic!("Wrong chunk state."); } diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index d42374845b..f0b2ee94bb 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -98,12 +98,14 @@ pub enum Error { }, #[snafu(display( - "Can not add an empty chunk to the catalog {}:{}", + "Cannot add an empty chunk to the catalog {}:{}:{}", partition_key, + table_name, chunk_id ))] ChunkIsEmpty { partition_key: String, + table_name: String, chunk_id: u32, }, diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 5d18d50f39..25b159d75b 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -49,15 +49,6 @@ pub struct ChunkMetadata { pub schema: Arc, } -/// A chunk in an _open_ stage. -/// -/// Chunks in this stage are writable (= can receive new data) and are never preserved. -#[derive(Debug)] -pub struct ChunkStageOpen { - /// Mutable Buffer that receives writes. - pub mb_chunk: MBChunk, -} - /// Different memory representations of a frozen chunk. #[derive(Debug)] pub enum ChunkStageFrozenRepr { @@ -70,32 +61,6 @@ pub enum ChunkStageFrozenRepr { ReadBuffer(Arc), } -/// A chunk in an _frozen stage. -/// -/// Chunks in this stage cannot be modified but are not yet persisted. They can however be compacted which will take two -/// or more chunks and creates a single new frozen chunk. -#[derive(Debug)] -pub struct ChunkStageFrozen { - /// Metadata (statistics, schema) about this chunk - pub meta: Arc, - - /// Internal memory representation of the frozen chunk. - pub representation: ChunkStageFrozenRepr, -} - -/// Chunk in _persisted_ stage. -#[derive(Debug)] -pub struct ChunkStagePersisted { - /// Metadata (statistics, schema) about this chunk - pub meta: Arc, - - /// Parquet chunk that lives immutable within the object store. - pub parquet: Arc, - - /// In-memory version of the parquet data. - pub read_buffer: Option>, -} - /// Represents the current lifecycle stage a chunk is in. /// /// # Stages @@ -128,22 +93,47 @@ pub struct ChunkStagePersisted { /// (according to the diagram shown above). The chunk stage is considered unchanged as long as the job is running. #[derive(Debug)] pub enum ChunkStage { - /// Chunk can receive new data. It is not persisted. - Open(ChunkStageOpen), + /// A chunk in an _open_ stage. + /// + /// Chunks in this stage are writable (= can receive new data) and are not preserved. + Open { + /// Mutable Buffer that receives writes. + mb_chunk: MBChunk, + }, - /// Chunk cannot receive new data. It is not persisted. - Frozen(ChunkStageFrozen), + /// A chunk in an _frozen stage. + /// + /// Chunks in this stage cannot be modified but are not yet persisted. They can however be compacted which will take two + /// or more chunks and creates a single new frozen chunk. + Frozen { + /// Metadata (statistics, schema) about this chunk + meta: Arc, + /// Internal memory representation of the frozen chunk. + representation: ChunkStageFrozenRepr, + }, + + /// Chunk in _persisted_ stage. + /// /// Chunk cannot receive new data. It is persisted. - Persisted(ChunkStagePersisted), + Persisted { + /// Metadata (statistics, schema) about this chunk + meta: Arc, + + /// Parquet chunk that lives immutable within the object store. + parquet: Arc, + + /// In-memory version of the parquet data. + read_buffer: Option>, + }, } impl ChunkStage { pub fn name(&self) -> &'static str { match self { - Self::Open(_) => "Open", - Self::Frozen(_) => "Frozen", - Self::Persisted(_) => "Persisted", + Self::Open { .. } => "Open", + Self::Frozen { .. } => "Frozen", + Self::Persisted { .. } => "Persisted", } } } @@ -245,12 +235,13 @@ impl Chunk { chunk.rows() > 0, ChunkIsEmpty { partition_key: partition_key.as_ref(), + table_name: chunk.table_name().as_ref(), chunk_id, } ); let table_name = Arc::clone(&chunk.table_name()); - let stage = ChunkStage::Open(ChunkStageOpen { mb_chunk: chunk }); + let stage = ChunkStage::Open { mb_chunk: chunk }; metrics .state @@ -288,11 +279,11 @@ impl Chunk { schema: chunk.full_schema(), }); - let stage = ChunkStage::Persisted(ChunkStagePersisted { + let stage = ChunkStage::Persisted { parquet: chunk, read_buffer: None, meta, - }); + }; Self { partition_key: Arc::from(partition_key.as_ref()), @@ -362,8 +353,8 @@ impl Chunk { /// Return ChunkSummary metadata for this chunk pub fn summary(&self) -> ChunkSummary { let (row_count, storage) = match &self.stage { - ChunkStage::Open(stage) => (stage.mb_chunk.rows(), ChunkStorage::OpenMutableBuffer), - ChunkStage::Frozen(stage) => match &stage.representation { + ChunkStage::Open { mb_chunk, .. } => (mb_chunk.rows(), ChunkStorage::OpenMutableBuffer), + ChunkStage::Frozen { representation, .. } => match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(repr) => { (repr.rows(), ChunkStorage::ClosedMutableBuffer) } @@ -371,9 +362,13 @@ impl Chunk { (repr.rows() as usize, ChunkStorage::ReadBuffer) } }, - ChunkStage::Persisted(stage) => { - let rows = stage.parquet.rows() as usize; - let storage = if stage.read_buffer.is_some() { + ChunkStage::Persisted { + parquet, + read_buffer, + .. + } => { + let rows = parquet.rows() as usize; + let storage = if read_buffer.is_some() { ChunkStorage::ReadBufferAndObjectStore } else { ChunkStorage::ObjectStoreOnly @@ -407,15 +402,15 @@ impl Chunk { } let columns: Vec = match &self.stage { - ChunkStage::Open(stage) => stage.mb_chunk.column_sizes().map(to_summary).collect(), - ChunkStage::Frozen(stage) => match &stage.representation { + ChunkStage::Open { mb_chunk, .. } => mb_chunk.column_sizes().map(to_summary).collect(), + ChunkStage::Frozen { representation, .. } => match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(repr) => { repr.column_sizes().map(to_summary).collect() } ChunkStageFrozenRepr::ReadBuffer(repr) => repr.column_sizes(&self.table_name), }, - ChunkStage::Persisted(stage) => { - if let Some(read_buffer) = &stage.read_buffer { + ChunkStage::Persisted { read_buffer, .. } => { + if let Some(read_buffer) = &read_buffer { read_buffer.column_sizes(&self.table_name) } else { // TODO parquet statistics @@ -430,12 +425,12 @@ impl Chunk { /// Return the summary information about the table stored in this Chunk pub fn table_summary(&self) -> Arc { match &self.stage { - ChunkStage::Open(stage) => { + ChunkStage::Open { mb_chunk, .. } => { // The stats for open chunks change so can't be cached - Arc::new(stage.mb_chunk.table_summary()) + Arc::new(mb_chunk.table_summary()) } - ChunkStage::Frozen(stage) => Arc::clone(&stage.meta.table_summary), - ChunkStage::Persisted(stage) => Arc::clone(&stage.meta.table_summary), + ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.table_summary), + ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.table_summary), } } @@ -443,14 +438,18 @@ impl Chunk { /// chunk pub fn size(&self) -> usize { match &self.stage { - ChunkStage::Open(stage) => stage.mb_chunk.size(), - ChunkStage::Frozen(stage) => match &stage.representation { + ChunkStage::Open { mb_chunk, .. } => mb_chunk.size(), + ChunkStage::Frozen { representation, .. } => match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(repr) => repr.size(), ChunkStageFrozenRepr::ReadBuffer(repr) => repr.size(), }, - ChunkStage::Persisted(stage) => { - let mut size = stage.parquet.size(); - if let Some(read_buffer) = &stage.read_buffer { + ChunkStage::Persisted { + parquet, + read_buffer, + .. + } => { + let mut size = parquet.size(); + if let Some(read_buffer) = &read_buffer { size += read_buffer.size(); } size @@ -464,42 +463,48 @@ impl Chunk { /// Must be in open or closed state pub fn mutable_buffer(&mut self) -> Result<&mut MBChunk> { match &mut self.stage { - ChunkStage::Open(stage) => Ok(&mut stage.mb_chunk), + ChunkStage::Open { mb_chunk, .. } => Ok(mb_chunk), stage => unexpected_state!(self, "mutable buffer reference", "Open or Closed", stage), } } - /// Set the chunk to the Closed state - pub fn set_closed(&mut self) -> Result> { + /// Set chunk to _frozen_ state. + /// + /// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage (no-op) and will + /// fail for other stages. + pub fn freeze(&mut self) -> Result<()> { match &self.stage { - ChunkStage::Open(stage) => { + ChunkStage::Open { mb_chunk, .. } => { assert!(self.time_closed.is_none()); self.time_closed = Some(Utc::now()); - let s = stage.mb_chunk.snapshot(); + let s = mb_chunk.snapshot(); self.metrics .state .inc_with_labels(&[KeyValue::new("state", "closed")]); self.metrics.immutable_chunk_size.observe_with_labels( - stage.mb_chunk.size() as f64, + mb_chunk.size() as f64, &[KeyValue::new("state", "closed")], ); // Cache table summary + schema let metadata = ChunkMetadata { - table_summary: Arc::new(stage.mb_chunk.table_summary()), + table_summary: Arc::new(mb_chunk.table_summary()), schema: s.full_schema(), }; - self.stage = ChunkStage::Frozen(ChunkStageFrozen { + self.stage = ChunkStage::Frozen { representation: ChunkStageFrozenRepr::MutableBufferSnapshot(Arc::clone(&s)), meta: Arc::new(metadata), - }); - - Ok(s) + }; + Ok(()) + } + &ChunkStage::Frozen { .. } => { + // already frozen => no-op + Ok(()) } _ => { - unexpected_state!(self, "setting closed", "Open or Closed", &self.stage) + unexpected_state!(self, "setting closed", "Open or Frozen", &self.stage) } } } @@ -507,16 +512,16 @@ impl Chunk { /// Set the chunk to the Moving state, returning a handle to the underlying /// storage /// - /// If called on an open chunk will first close the chunk + /// If called on an open chunk will first [`freeze`](Self::freeze) the chunk pub fn set_moving(&mut self, registration: &TaskRegistration) -> Result> { // This ensures the closing logic is consistent but doesn't break code that // assumes a chunk can be moved from open - if matches!(self.stage, ChunkStage::Open(_)) { - self.set_closed()?; + if matches!(self.stage, ChunkStage::Open { .. }) { + self.freeze()?; } match &self.stage { - ChunkStage::Frozen(stage) => match &stage.representation { + ChunkStage::Frozen { representation, .. } => match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(repr) => { let chunk = Arc::clone(repr); self.set_lifecycle_action(ChunkLifecycleAction::Moving, registration)?; @@ -553,7 +558,7 @@ impl Chunk { /// storage. pub fn set_moved(&mut self, chunk: Arc) -> Result<()> { match &mut self.stage { - ChunkStage::Frozen(stage) => match &stage.representation { + ChunkStage::Frozen { representation, .. } => match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(_) => { self.metrics .state @@ -564,7 +569,7 @@ impl Chunk { &[KeyValue::new("state", "moved")], ); - stage.representation = ChunkStageFrozenRepr::ReadBuffer(chunk); + *representation = ChunkStageFrozenRepr::ReadBuffer(chunk); self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?; Ok(()) } @@ -590,8 +595,8 @@ impl Chunk { registration: &TaskRegistration, ) -> Result> { match &self.stage { - ChunkStage::Frozen(stage) => { - match &stage.representation { + ChunkStage::Frozen { representation, .. } => { + match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(_) => { // TODO: ideally we would support all Frozen representations InternalChunkState { @@ -624,9 +629,13 @@ impl Chunk { /// underlying storage pub fn set_written_to_object_store(&mut self, chunk: Arc) -> Result<()> { match &self.stage { - ChunkStage::Frozen(stage) => { - let meta = Arc::clone(&stage.meta); - match &stage.representation { + ChunkStage::Frozen { + representation, + meta, + .. + } => { + let meta = Arc::clone(&meta); + match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(_) => { // TODO: ideally we would support all Frozen representations InternalChunkState { @@ -652,11 +661,11 @@ impl Chunk { &[KeyValue::new("state", "rub_and_os")], ); - self.stage = ChunkStage::Persisted(ChunkStagePersisted { + self.stage = ChunkStage::Persisted { meta, parquet: chunk, read_buffer: Some(db), - }); + }; Ok(()) } } @@ -674,19 +683,23 @@ impl Chunk { pub fn set_unload_from_read_buffer(&mut self) -> Result> { match &mut self.stage { - ChunkStage::Persisted(stage) => { - if let Some(read_buffer) = &stage.read_buffer { + ChunkStage::Persisted { + parquet, + read_buffer, + .. + } => { + if let Some(read_buffer_inner) = &read_buffer { self.metrics .state .inc_with_labels(&[KeyValue::new("state", "os")]); self.metrics.immutable_chunk_size.observe_with_labels( - stage.parquet.size() as f64, + parquet.size() as f64, &[KeyValue::new("state", "os")], ); - let rub_chunk = Arc::clone(read_buffer); - stage.read_buffer = None; + let rub_chunk = Arc::clone(read_buffer_inner); + *read_buffer = None; Ok(rub_chunk) } else { // TODO: do we really need to error here or should unloading an unloaded chunk be a no-op? @@ -747,3 +760,157 @@ impl Chunk { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::convert::TryFrom; + + use super::*; + use data_types::server_id::ServerId; + use entry::{test_helpers::lp_to_entry, ClockValue}; + use mutable_buffer::chunk::{Chunk as MBChunk, ChunkMetrics as MBChunkMetrics}; + use parquet_file::{ + chunk::Chunk as ParquetChunk, + test_utils::{make_chunk as make_parquet_chunk_with_store, make_object_store}, + }; + + #[test] + fn test_new_open() { + let server_id = ServerId::try_from(1).unwrap(); + let table_name = "table1"; + let partition_key = "part1"; + let chunk_id = 0; + + // works with non-empty MBChunk + let mb_chunk = make_mb_chunk(table_name, server_id); + let chunk = Chunk::new_open( + chunk_id, + partition_key, + mb_chunk, + ChunkMetrics::new_unregistered(), + ) + .unwrap(); + assert!(matches!(chunk.stage(), &ChunkStage::Open { .. })); + + // fails with empty MBChunk + let mb_chunk = MBChunk::new(table_name, MBChunkMetrics::new_unregistered()); + assert_eq!( + Chunk::new_open( + chunk_id, + partition_key, + mb_chunk, + ChunkMetrics::new_unregistered(), + ) + .unwrap_err() + .to_string(), + "Cannot add an empty chunk to the catalog part1:table1:0" + ); + } + + #[tokio::test] + async fn test_freeze() { + let mut chunk = make_open_chunk(); + + // close it + chunk.freeze().unwrap(); + assert!(matches!(chunk.stage(), &ChunkStage::Frozen { .. })); + + // closing a second time is a no-op + chunk.freeze().unwrap(); + assert!(matches!(chunk.stage(), &ChunkStage::Frozen { .. })); + + // closing a chunk in persisted state will fail + let mut chunk = make_persisted_chunk().await; + assert_eq!(chunk.freeze().unwrap_err().to_string(), "Internal Error: unexpected chunk state for part1:table1:0 during setting closed. Expected Open or Frozen, got Persisted"); + } + + #[test] + fn test_lifecycle_action() { + let mut chunk = make_open_chunk(); + let registration = TaskRegistration::new(); + + // no action to begin with + assert_eq!(chunk.lifecycle_action(), None); + + // set some action + chunk + .set_lifecycle_action(ChunkLifecycleAction::Moving, ®istration) + .unwrap(); + assert_eq!( + chunk.lifecycle_action(), + Some(&ChunkLifecycleAction::Moving) + ); + + // setting an action while there is one running fails + assert_eq!(chunk.set_lifecycle_action(ChunkLifecycleAction::Moving, ®istration).unwrap_err().to_string(), "Internal Error: A lifecycle action \'Moving to the Read Buffer\' is already in progress for part1:table1:0"); + + // finishing the wrong action fails + assert_eq!(chunk.finish_lifecycle_action(ChunkLifecycleAction::Compacting).unwrap_err().to_string(), "Internal Error: Unexpected chunk state for part1:table1:0. Expected Compacting, got Moving to the Read Buffer"); + + // finish some action + chunk + .finish_lifecycle_action(ChunkLifecycleAction::Moving) + .unwrap(); + + // finishing w/o any action in progress will fail + assert_eq!(chunk.finish_lifecycle_action(ChunkLifecycleAction::Moving).unwrap_err().to_string(), "Internal Error: Unexpected chunk state for part1:table1:0. Expected Moving to the Read Buffer, got None"); + + // now we can set another action + chunk + .set_lifecycle_action(ChunkLifecycleAction::Compacting, ®istration) + .unwrap(); + assert_eq!( + chunk.lifecycle_action(), + Some(&ChunkLifecycleAction::Compacting) + ); + } + + fn make_mb_chunk(table_name: &str, server_id: ServerId) -> MBChunk { + let mut mb_chunk = MBChunk::new(table_name, MBChunkMetrics::new_unregistered()); + let entry = lp_to_entry(&format!("{} bar=1 10", table_name)); + let write = entry.partition_writes().unwrap().remove(0); + let batch = write.table_batches().remove(0); + mb_chunk + .write_table_batch(ClockValue::try_from(1).unwrap(), server_id, batch) + .unwrap(); + mb_chunk + } + + async fn make_parquet_chunk(chunk_id: u32) -> ParquetChunk { + let object_store = make_object_store(); + make_parquet_chunk_with_store(object_store, "foo", chunk_id).await + } + + fn make_open_chunk() -> Chunk { + let server_id = ServerId::try_from(1).unwrap(); + let table_name = "table1"; + let partition_key = "part1"; + let chunk_id = 0; + + // assemble MBChunk + let mb_chunk = make_mb_chunk(table_name, server_id); + + Chunk::new_open( + chunk_id, + partition_key, + mb_chunk, + ChunkMetrics::new_unregistered(), + ) + .unwrap() + } + + async fn make_persisted_chunk() -> Chunk { + let partition_key = "part1"; + let chunk_id = 0; + + // assemble ParquetChunk + let parquet_chunk = make_parquet_chunk(chunk_id).await; + + Chunk::new_object_store_only( + chunk_id, + partition_key, + Arc::new(parquet_chunk), + ChunkMetrics::new_unregistered(), + ) + } +} diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index ae043a234c..41b73d7a29 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -193,7 +193,7 @@ impl Partition { .values() .find(|chunk| { let chunk = chunk.read(); - matches!(chunk.stage(), ChunkStage::Open(_)) + matches!(chunk.stage(), ChunkStage::Open { .. }) }) .cloned()), None => UnknownTable { diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 82e5015333..6215811b76 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -106,19 +106,23 @@ impl DbChunk { use super::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}; let (state, meta) = match chunk.stage() { - ChunkStage::Open(stage) => { - let snapshot = stage.mb_chunk.snapshot(); + ChunkStage::Open { mb_chunk, .. } => { + let snapshot = mb_chunk.snapshot(); let state = State::MutableBuffer { chunk: Arc::clone(&snapshot), }; let meta = ChunkMetadata { - table_summary: Arc::new(stage.mb_chunk.table_summary()), + table_summary: Arc::new(mb_chunk.table_summary()), schema: snapshot.full_schema(), }; (state, Arc::new(meta)) } - ChunkStage::Frozen(stage) => { - let state = match &stage.representation { + ChunkStage::Frozen { + representation, + meta, + .. + } => { + let state = match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(snapshot) => State::MutableBuffer { chunk: Arc::clone(snapshot), }, @@ -127,20 +131,25 @@ impl DbChunk { partition_key, }, }; - (state, Arc::clone(&stage.meta)) + (state, Arc::clone(&meta)) } - ChunkStage::Persisted(stage) => { - let state = if let Some(read_buffer) = &stage.read_buffer { + ChunkStage::Persisted { + parquet, + read_buffer, + meta, + .. + } => { + let state = if let Some(read_buffer) = &read_buffer { State::ReadBuffer { chunk: Arc::clone(read_buffer), partition_key, } } else { State::ParquetFile { - chunk: Arc::clone(&stage.parquet), + chunk: Arc::clone(&parquet), } }; - (state, Arc::clone(&stage.meta)) + (state, Arc::clone(&meta)) } }; @@ -161,10 +170,10 @@ impl DbChunk { use super::catalog::chunk::ChunkStage; let (state, meta) = match chunk.stage() { - ChunkStage::Persisted(stage) => { - let chunk = Arc::clone(&stage.parquet); + ChunkStage::Persisted { parquet, meta, .. } => { + let chunk = Arc::clone(&parquet); let state = State::ParquetFile { chunk }; - (state, Arc::clone(&stage.meta)) + (state, Arc::clone(&meta)) } _ => { panic!("Internal error: This chunk's stage is not Persisted"); diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 1e3829f1f8..7ac04e435f 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -9,7 +9,7 @@ use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job}; use tracker::{RwLock, TaskTracker}; use super::{ - catalog::chunk::{Chunk, ChunkStage, ChunkStageFrozen, ChunkStageFrozenRepr}, + catalog::chunk::{Chunk, ChunkStage, ChunkStageFrozenRepr}, Db, }; use data_types::database_rules::SortOrder; @@ -132,7 +132,7 @@ trait ChunkMover { } match chunk_guard.stage() { - ChunkStage::Open(_) => { + ChunkStage::Open { .. } => { open_partitions.insert(chunk_guard.key().to_string()); if move_tracker.is_none() && would_move { let partition_key = chunk_guard.key().to_string(); @@ -145,7 +145,7 @@ trait ChunkMover { Some(self.move_to_read_buffer(partition_key, table_name, chunk_id)); } } - ChunkStage::Frozen(stage) => match &stage.representation { + ChunkStage::Frozen { representation, .. } => match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(_) if move_tracker.is_none() => { let partition_key = chunk_guard.key().to_string(); let table_name = chunk_guard.table_name().to_string(); @@ -156,7 +156,7 @@ trait ChunkMover { move_tracker = Some(self.move_to_read_buffer(partition_key, table_name, chunk_id)); } - ChunkStageFrozenRepr::ReadBuffer(_) if would_write => { + ChunkStageFrozenRepr::ReadBuffer { .. } if would_write => { let partition_key = chunk_guard.key().to_string(); let table_name = chunk_guard.table_name().to_string(); let chunk_id = chunk_guard.id(); @@ -197,12 +197,12 @@ trait ChunkMover { if (rules.drop_non_persisted && matches!( chunk_guard.stage(), - ChunkStage::Frozen(ChunkStageFrozen { + ChunkStage::Frozen { representation: ChunkStageFrozenRepr::ReadBuffer(_), - meta: _, - }) + .. + } )) - || matches!(chunk_guard.stage(), ChunkStage::Persisted(_)) + || matches!(chunk_guard.stage(), ChunkStage::Persisted { .. }) { let partition_key = chunk_guard.key().to_string(); let table_name = chunk_guard.table_name().to_string(); @@ -416,7 +416,7 @@ mod tests { /// Transitions a new ("open") chunk into the "moving" state. fn transition_to_moving(mut chunk: Chunk) -> Chunk { - chunk.set_closed().unwrap(); + chunk.freeze().unwrap(); chunk.set_moving(&Default::default()).unwrap(); chunk } @@ -907,7 +907,7 @@ mod tests { mover.check_for_work(from_secs(80)); assert_eq!(mover.events, vec![]); - mover.chunks[0].write().set_closed().unwrap(); + mover.chunks[0].write().freeze().unwrap(); // As soon as closed can move mover.check_for_work(from_secs(80));