Merge pull request #1598 from influxdata/crepererum/issue1513-b

refactor: clean-up chunk structure + add tests
pull/24376/head
kodiakhq[bot] 2021-06-02 07:14:47 +00:00 committed by GitHub
commit 399d5f1ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 315 additions and 140 deletions

View File

@ -470,7 +470,7 @@ impl Db {
})?
{
let mut chunk = chunk.write();
chunk.set_closed().context(RollingOverPartition {
chunk.freeze().context(RollingOverPartition {
partition_key,
table_name,
})?;
@ -1091,7 +1091,7 @@ fn check_chunk_closed(chunk: &mut CatalogChunk, mutable_size_threshold: Option<N
let size = mb_chunk.size();
if size > threshold.get() {
chunk.set_closed().expect("cannot close open chunk");
chunk.freeze().expect("cannot close open chunk");
}
}
}
@ -1299,9 +1299,7 @@ mod tests {
test_helpers::{try_write_lp, write_lp},
*,
};
use crate::db::catalog::chunk::{
ChunkStage, ChunkStageFrozen, ChunkStageFrozenRepr, ChunkStagePersisted,
};
use crate::db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr};
use crate::query_tests::utils::{make_db, TestDb};
use ::test_helpers::assert_contains;
use arrow::record_batch::RecordBatch;
@ -2095,17 +2093,17 @@ mod tests {
assert_eq!(chunks.len(), 2);
assert!(matches!(
chunks[0].read().stage(),
ChunkStage::Frozen(ChunkStageFrozen {
meta: _,
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_)
})
ChunkStage::Frozen {
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_),
..
}
));
assert!(matches!(
chunks[1].read().stage(),
ChunkStage::Frozen(ChunkStageFrozen {
meta: _,
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_)
})
ChunkStage::Frozen {
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_),
..
}
));
}
@ -2902,8 +2900,8 @@ mod tests {
partition.chunk(table_name, *chunk_id).unwrap()
};
let chunk = chunk.read();
if let ChunkStage::Persisted(stage) = chunk.stage() {
paths_expected.push(stage.parquet.table_path().display());
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
paths_expected.push(parquet.table_path().display());
} else {
panic!("Wrong chunk state.");
}
@ -2953,11 +2951,10 @@ mod tests {
let chunk = chunk.read();
assert!(matches!(
chunk.stage(),
ChunkStage::Persisted(ChunkStagePersisted {
parquet: _,
ChunkStage::Persisted {
read_buffer: None,
meta: _,
})
..
}
));
}
@ -2995,8 +2992,8 @@ mod tests {
partition.chunk(table_name.clone(), chunk_id).unwrap()
};
let chunk = chunk.read();
if let ChunkStage::Persisted(stage) = chunk.stage() {
paths_keep.push(stage.parquet.table_path());
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
paths_keep.push(parquet.table_path());
} else {
panic!("Wrong chunk state.");
}

View File

@ -98,12 +98,14 @@ pub enum Error {
},
#[snafu(display(
"Can not add an empty chunk to the catalog {}:{}",
"Cannot add an empty chunk to the catalog {}:{}:{}",
partition_key,
table_name,
chunk_id
))]
ChunkIsEmpty {
partition_key: String,
table_name: String,
chunk_id: u32,
},

View File

@ -49,15 +49,6 @@ pub struct ChunkMetadata {
pub schema: Arc<Schema>,
}
/// A chunk in an _open_ stage.
///
/// Chunks in this stage are writable (= can receive new data) and are never preserved.
#[derive(Debug)]
pub struct ChunkStageOpen {
/// Mutable Buffer that receives writes.
pub mb_chunk: MBChunk,
}
/// Different memory representations of a frozen chunk.
#[derive(Debug)]
pub enum ChunkStageFrozenRepr {
@ -70,32 +61,6 @@ pub enum ChunkStageFrozenRepr {
ReadBuffer(Arc<ReadBufferChunk>),
}
/// A chunk in an _frozen stage.
///
/// Chunks in this stage cannot be modified but are not yet persisted. They can however be compacted which will take two
/// or more chunks and creates a single new frozen chunk.
#[derive(Debug)]
pub struct ChunkStageFrozen {
/// Metadata (statistics, schema) about this chunk
pub meta: Arc<ChunkMetadata>,
/// Internal memory representation of the frozen chunk.
pub representation: ChunkStageFrozenRepr,
}
/// Chunk in _persisted_ stage.
#[derive(Debug)]
pub struct ChunkStagePersisted {
/// Metadata (statistics, schema) about this chunk
pub meta: Arc<ChunkMetadata>,
/// Parquet chunk that lives immutable within the object store.
pub parquet: Arc<ParquetChunk>,
/// In-memory version of the parquet data.
pub read_buffer: Option<Arc<ReadBufferChunk>>,
}
/// Represents the current lifecycle stage a chunk is in.
///
/// # Stages
@ -128,22 +93,47 @@ pub struct ChunkStagePersisted {
/// (according to the diagram shown above). The chunk stage is considered unchanged as long as the job is running.
#[derive(Debug)]
pub enum ChunkStage {
/// Chunk can receive new data. It is not persisted.
Open(ChunkStageOpen),
/// A chunk in an _open_ stage.
///
/// Chunks in this stage are writable (= can receive new data) and are not preserved.
Open {
/// Mutable Buffer that receives writes.
mb_chunk: MBChunk,
},
/// Chunk cannot receive new data. It is not persisted.
Frozen(ChunkStageFrozen),
/// A chunk in an _frozen stage.
///
/// Chunks in this stage cannot be modified but are not yet persisted. They can however be compacted which will take two
/// or more chunks and creates a single new frozen chunk.
Frozen {
/// Metadata (statistics, schema) about this chunk
meta: Arc<ChunkMetadata>,
/// Internal memory representation of the frozen chunk.
representation: ChunkStageFrozenRepr,
},
/// Chunk in _persisted_ stage.
///
/// Chunk cannot receive new data. It is persisted.
Persisted(ChunkStagePersisted),
Persisted {
/// Metadata (statistics, schema) about this chunk
meta: Arc<ChunkMetadata>,
/// Parquet chunk that lives immutable within the object store.
parquet: Arc<ParquetChunk>,
/// In-memory version of the parquet data.
read_buffer: Option<Arc<ReadBufferChunk>>,
},
}
impl ChunkStage {
pub fn name(&self) -> &'static str {
match self {
Self::Open(_) => "Open",
Self::Frozen(_) => "Frozen",
Self::Persisted(_) => "Persisted",
Self::Open { .. } => "Open",
Self::Frozen { .. } => "Frozen",
Self::Persisted { .. } => "Persisted",
}
}
}
@ -245,12 +235,13 @@ impl Chunk {
chunk.rows() > 0,
ChunkIsEmpty {
partition_key: partition_key.as_ref(),
table_name: chunk.table_name().as_ref(),
chunk_id,
}
);
let table_name = Arc::clone(&chunk.table_name());
let stage = ChunkStage::Open(ChunkStageOpen { mb_chunk: chunk });
let stage = ChunkStage::Open { mb_chunk: chunk };
metrics
.state
@ -288,11 +279,11 @@ impl Chunk {
schema: chunk.full_schema(),
});
let stage = ChunkStage::Persisted(ChunkStagePersisted {
let stage = ChunkStage::Persisted {
parquet: chunk,
read_buffer: None,
meta,
});
};
Self {
partition_key: Arc::from(partition_key.as_ref()),
@ -362,8 +353,8 @@ impl Chunk {
/// Return ChunkSummary metadata for this chunk
pub fn summary(&self) -> ChunkSummary {
let (row_count, storage) = match &self.stage {
ChunkStage::Open(stage) => (stage.mb_chunk.rows(), ChunkStorage::OpenMutableBuffer),
ChunkStage::Frozen(stage) => match &stage.representation {
ChunkStage::Open { mb_chunk, .. } => (mb_chunk.rows(), ChunkStorage::OpenMutableBuffer),
ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(repr) => {
(repr.rows(), ChunkStorage::ClosedMutableBuffer)
}
@ -371,9 +362,13 @@ impl Chunk {
(repr.rows() as usize, ChunkStorage::ReadBuffer)
}
},
ChunkStage::Persisted(stage) => {
let rows = stage.parquet.rows() as usize;
let storage = if stage.read_buffer.is_some() {
ChunkStage::Persisted {
parquet,
read_buffer,
..
} => {
let rows = parquet.rows() as usize;
let storage = if read_buffer.is_some() {
ChunkStorage::ReadBufferAndObjectStore
} else {
ChunkStorage::ObjectStoreOnly
@ -407,15 +402,15 @@ impl Chunk {
}
let columns: Vec<ChunkColumnSummary> = match &self.stage {
ChunkStage::Open(stage) => stage.mb_chunk.column_sizes().map(to_summary).collect(),
ChunkStage::Frozen(stage) => match &stage.representation {
ChunkStage::Open { mb_chunk, .. } => mb_chunk.column_sizes().map(to_summary).collect(),
ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(repr) => {
repr.column_sizes().map(to_summary).collect()
}
ChunkStageFrozenRepr::ReadBuffer(repr) => repr.column_sizes(&self.table_name),
},
ChunkStage::Persisted(stage) => {
if let Some(read_buffer) = &stage.read_buffer {
ChunkStage::Persisted { read_buffer, .. } => {
if let Some(read_buffer) = &read_buffer {
read_buffer.column_sizes(&self.table_name)
} else {
// TODO parquet statistics
@ -430,12 +425,12 @@ impl Chunk {
/// Return the summary information about the table stored in this Chunk
pub fn table_summary(&self) -> Arc<TableSummary> {
match &self.stage {
ChunkStage::Open(stage) => {
ChunkStage::Open { mb_chunk, .. } => {
// The stats for open chunks change so can't be cached
Arc::new(stage.mb_chunk.table_summary())
Arc::new(mb_chunk.table_summary())
}
ChunkStage::Frozen(stage) => Arc::clone(&stage.meta.table_summary),
ChunkStage::Persisted(stage) => Arc::clone(&stage.meta.table_summary),
ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.table_summary),
ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.table_summary),
}
}
@ -443,14 +438,18 @@ impl Chunk {
/// chunk
pub fn size(&self) -> usize {
match &self.stage {
ChunkStage::Open(stage) => stage.mb_chunk.size(),
ChunkStage::Frozen(stage) => match &stage.representation {
ChunkStage::Open { mb_chunk, .. } => mb_chunk.size(),
ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(repr) => repr.size(),
ChunkStageFrozenRepr::ReadBuffer(repr) => repr.size(),
},
ChunkStage::Persisted(stage) => {
let mut size = stage.parquet.size();
if let Some(read_buffer) = &stage.read_buffer {
ChunkStage::Persisted {
parquet,
read_buffer,
..
} => {
let mut size = parquet.size();
if let Some(read_buffer) = &read_buffer {
size += read_buffer.size();
}
size
@ -464,42 +463,48 @@ impl Chunk {
/// Must be in open or closed state
pub fn mutable_buffer(&mut self) -> Result<&mut MBChunk> {
match &mut self.stage {
ChunkStage::Open(stage) => Ok(&mut stage.mb_chunk),
ChunkStage::Open { mb_chunk, .. } => Ok(mb_chunk),
stage => unexpected_state!(self, "mutable buffer reference", "Open or Closed", stage),
}
}
/// Set the chunk to the Closed state
pub fn set_closed(&mut self) -> Result<Arc<MBChunkSnapshot>> {
/// Set chunk to _frozen_ state.
///
/// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage (no-op) and will
/// fail for other stages.
pub fn freeze(&mut self) -> Result<()> {
match &self.stage {
ChunkStage::Open(stage) => {
ChunkStage::Open { mb_chunk, .. } => {
assert!(self.time_closed.is_none());
self.time_closed = Some(Utc::now());
let s = stage.mb_chunk.snapshot();
let s = mb_chunk.snapshot();
self.metrics
.state
.inc_with_labels(&[KeyValue::new("state", "closed")]);
self.metrics.immutable_chunk_size.observe_with_labels(
stage.mb_chunk.size() as f64,
mb_chunk.size() as f64,
&[KeyValue::new("state", "closed")],
);
// Cache table summary + schema
let metadata = ChunkMetadata {
table_summary: Arc::new(stage.mb_chunk.table_summary()),
table_summary: Arc::new(mb_chunk.table_summary()),
schema: s.full_schema(),
};
self.stage = ChunkStage::Frozen(ChunkStageFrozen {
self.stage = ChunkStage::Frozen {
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(Arc::clone(&s)),
meta: Arc::new(metadata),
});
Ok(s)
};
Ok(())
}
&ChunkStage::Frozen { .. } => {
// already frozen => no-op
Ok(())
}
_ => {
unexpected_state!(self, "setting closed", "Open or Closed", &self.stage)
unexpected_state!(self, "setting closed", "Open or Frozen", &self.stage)
}
}
}
@ -507,16 +512,16 @@ impl Chunk {
/// Set the chunk to the Moving state, returning a handle to the underlying
/// storage
///
/// If called on an open chunk will first close the chunk
/// If called on an open chunk will first [`freeze`](Self::freeze) the chunk
pub fn set_moving(&mut self, registration: &TaskRegistration) -> Result<Arc<MBChunkSnapshot>> {
// This ensures the closing logic is consistent but doesn't break code that
// assumes a chunk can be moved from open
if matches!(self.stage, ChunkStage::Open(_)) {
self.set_closed()?;
if matches!(self.stage, ChunkStage::Open { .. }) {
self.freeze()?;
}
match &self.stage {
ChunkStage::Frozen(stage) => match &stage.representation {
ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(repr) => {
let chunk = Arc::clone(repr);
self.set_lifecycle_action(ChunkLifecycleAction::Moving, registration)?;
@ -553,7 +558,7 @@ impl Chunk {
/// storage.
pub fn set_moved(&mut self, chunk: Arc<ReadBufferChunk>) -> Result<()> {
match &mut self.stage {
ChunkStage::Frozen(stage) => match &stage.representation {
ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
self.metrics
.state
@ -564,7 +569,7 @@ impl Chunk {
&[KeyValue::new("state", "moved")],
);
stage.representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
*representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?;
Ok(())
}
@ -590,8 +595,8 @@ impl Chunk {
registration: &TaskRegistration,
) -> Result<Arc<ReadBufferChunk>> {
match &self.stage {
ChunkStage::Frozen(stage) => {
match &stage.representation {
ChunkStage::Frozen { representation, .. } => {
match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
// TODO: ideally we would support all Frozen representations
InternalChunkState {
@ -624,9 +629,13 @@ impl Chunk {
/// underlying storage
pub fn set_written_to_object_store(&mut self, chunk: Arc<ParquetChunk>) -> Result<()> {
match &self.stage {
ChunkStage::Frozen(stage) => {
let meta = Arc::clone(&stage.meta);
match &stage.representation {
ChunkStage::Frozen {
representation,
meta,
..
} => {
let meta = Arc::clone(&meta);
match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
// TODO: ideally we would support all Frozen representations
InternalChunkState {
@ -652,11 +661,11 @@ impl Chunk {
&[KeyValue::new("state", "rub_and_os")],
);
self.stage = ChunkStage::Persisted(ChunkStagePersisted {
self.stage = ChunkStage::Persisted {
meta,
parquet: chunk,
read_buffer: Some(db),
});
};
Ok(())
}
}
@ -674,19 +683,23 @@ impl Chunk {
pub fn set_unload_from_read_buffer(&mut self) -> Result<Arc<ReadBufferChunk>> {
match &mut self.stage {
ChunkStage::Persisted(stage) => {
if let Some(read_buffer) = &stage.read_buffer {
ChunkStage::Persisted {
parquet,
read_buffer,
..
} => {
if let Some(read_buffer_inner) = &read_buffer {
self.metrics
.state
.inc_with_labels(&[KeyValue::new("state", "os")]);
self.metrics.immutable_chunk_size.observe_with_labels(
stage.parquet.size() as f64,
parquet.size() as f64,
&[KeyValue::new("state", "os")],
);
let rub_chunk = Arc::clone(read_buffer);
stage.read_buffer = None;
let rub_chunk = Arc::clone(read_buffer_inner);
*read_buffer = None;
Ok(rub_chunk)
} else {
// TODO: do we really need to error here or should unloading an unloaded chunk be a no-op?
@ -747,3 +760,157 @@ impl Chunk {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use super::*;
use data_types::server_id::ServerId;
use entry::{test_helpers::lp_to_entry, ClockValue};
use mutable_buffer::chunk::{Chunk as MBChunk, ChunkMetrics as MBChunkMetrics};
use parquet_file::{
chunk::Chunk as ParquetChunk,
test_utils::{make_chunk as make_parquet_chunk_with_store, make_object_store},
};
#[test]
fn test_new_open() {
let server_id = ServerId::try_from(1).unwrap();
let table_name = "table1";
let partition_key = "part1";
let chunk_id = 0;
// works with non-empty MBChunk
let mb_chunk = make_mb_chunk(table_name, server_id);
let chunk = Chunk::new_open(
chunk_id,
partition_key,
mb_chunk,
ChunkMetrics::new_unregistered(),
)
.unwrap();
assert!(matches!(chunk.stage(), &ChunkStage::Open { .. }));
// fails with empty MBChunk
let mb_chunk = MBChunk::new(table_name, MBChunkMetrics::new_unregistered());
assert_eq!(
Chunk::new_open(
chunk_id,
partition_key,
mb_chunk,
ChunkMetrics::new_unregistered(),
)
.unwrap_err()
.to_string(),
"Cannot add an empty chunk to the catalog part1:table1:0"
);
}
#[tokio::test]
async fn test_freeze() {
let mut chunk = make_open_chunk();
// close it
chunk.freeze().unwrap();
assert!(matches!(chunk.stage(), &ChunkStage::Frozen { .. }));
// closing a second time is a no-op
chunk.freeze().unwrap();
assert!(matches!(chunk.stage(), &ChunkStage::Frozen { .. }));
// closing a chunk in persisted state will fail
let mut chunk = make_persisted_chunk().await;
assert_eq!(chunk.freeze().unwrap_err().to_string(), "Internal Error: unexpected chunk state for part1:table1:0 during setting closed. Expected Open or Frozen, got Persisted");
}
#[test]
fn test_lifecycle_action() {
let mut chunk = make_open_chunk();
let registration = TaskRegistration::new();
// no action to begin with
assert_eq!(chunk.lifecycle_action(), None);
// set some action
chunk
.set_lifecycle_action(ChunkLifecycleAction::Moving, &registration)
.unwrap();
assert_eq!(
chunk.lifecycle_action(),
Some(&ChunkLifecycleAction::Moving)
);
// setting an action while there is one running fails
assert_eq!(chunk.set_lifecycle_action(ChunkLifecycleAction::Moving, &registration).unwrap_err().to_string(), "Internal Error: A lifecycle action \'Moving to the Read Buffer\' is already in progress for part1:table1:0");
// finishing the wrong action fails
assert_eq!(chunk.finish_lifecycle_action(ChunkLifecycleAction::Compacting).unwrap_err().to_string(), "Internal Error: Unexpected chunk state for part1:table1:0. Expected Compacting, got Moving to the Read Buffer");
// finish some action
chunk
.finish_lifecycle_action(ChunkLifecycleAction::Moving)
.unwrap();
// finishing w/o any action in progress will fail
assert_eq!(chunk.finish_lifecycle_action(ChunkLifecycleAction::Moving).unwrap_err().to_string(), "Internal Error: Unexpected chunk state for part1:table1:0. Expected Moving to the Read Buffer, got None");
// now we can set another action
chunk
.set_lifecycle_action(ChunkLifecycleAction::Compacting, &registration)
.unwrap();
assert_eq!(
chunk.lifecycle_action(),
Some(&ChunkLifecycleAction::Compacting)
);
}
fn make_mb_chunk(table_name: &str, server_id: ServerId) -> MBChunk {
let mut mb_chunk = MBChunk::new(table_name, MBChunkMetrics::new_unregistered());
let entry = lp_to_entry(&format!("{} bar=1 10", table_name));
let write = entry.partition_writes().unwrap().remove(0);
let batch = write.table_batches().remove(0);
mb_chunk
.write_table_batch(ClockValue::try_from(1).unwrap(), server_id, batch)
.unwrap();
mb_chunk
}
async fn make_parquet_chunk(chunk_id: u32) -> ParquetChunk {
let object_store = make_object_store();
make_parquet_chunk_with_store(object_store, "foo", chunk_id).await
}
fn make_open_chunk() -> Chunk {
let server_id = ServerId::try_from(1).unwrap();
let table_name = "table1";
let partition_key = "part1";
let chunk_id = 0;
// assemble MBChunk
let mb_chunk = make_mb_chunk(table_name, server_id);
Chunk::new_open(
chunk_id,
partition_key,
mb_chunk,
ChunkMetrics::new_unregistered(),
)
.unwrap()
}
async fn make_persisted_chunk() -> Chunk {
let partition_key = "part1";
let chunk_id = 0;
// assemble ParquetChunk
let parquet_chunk = make_parquet_chunk(chunk_id).await;
Chunk::new_object_store_only(
chunk_id,
partition_key,
Arc::new(parquet_chunk),
ChunkMetrics::new_unregistered(),
)
}
}

View File

@ -193,7 +193,7 @@ impl Partition {
.values()
.find(|chunk| {
let chunk = chunk.read();
matches!(chunk.stage(), ChunkStage::Open(_))
matches!(chunk.stage(), ChunkStage::Open { .. })
})
.cloned()),
None => UnknownTable {

View File

@ -106,19 +106,23 @@ impl DbChunk {
use super::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr};
let (state, meta) = match chunk.stage() {
ChunkStage::Open(stage) => {
let snapshot = stage.mb_chunk.snapshot();
ChunkStage::Open { mb_chunk, .. } => {
let snapshot = mb_chunk.snapshot();
let state = State::MutableBuffer {
chunk: Arc::clone(&snapshot),
};
let meta = ChunkMetadata {
table_summary: Arc::new(stage.mb_chunk.table_summary()),
table_summary: Arc::new(mb_chunk.table_summary()),
schema: snapshot.full_schema(),
};
(state, Arc::new(meta))
}
ChunkStage::Frozen(stage) => {
let state = match &stage.representation {
ChunkStage::Frozen {
representation,
meta,
..
} => {
let state = match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(snapshot) => State::MutableBuffer {
chunk: Arc::clone(snapshot),
},
@ -127,20 +131,25 @@ impl DbChunk {
partition_key,
},
};
(state, Arc::clone(&stage.meta))
(state, Arc::clone(&meta))
}
ChunkStage::Persisted(stage) => {
let state = if let Some(read_buffer) = &stage.read_buffer {
ChunkStage::Persisted {
parquet,
read_buffer,
meta,
..
} => {
let state = if let Some(read_buffer) = &read_buffer {
State::ReadBuffer {
chunk: Arc::clone(read_buffer),
partition_key,
}
} else {
State::ParquetFile {
chunk: Arc::clone(&stage.parquet),
chunk: Arc::clone(&parquet),
}
};
(state, Arc::clone(&stage.meta))
(state, Arc::clone(&meta))
}
};
@ -161,10 +170,10 @@ impl DbChunk {
use super::catalog::chunk::ChunkStage;
let (state, meta) = match chunk.stage() {
ChunkStage::Persisted(stage) => {
let chunk = Arc::clone(&stage.parquet);
ChunkStage::Persisted { parquet, meta, .. } => {
let chunk = Arc::clone(&parquet);
let state = State::ParquetFile { chunk };
(state, Arc::clone(&stage.meta))
(state, Arc::clone(&meta))
}
_ => {
panic!("Internal error: This chunk's stage is not Persisted");

View File

@ -9,7 +9,7 @@ use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job};
use tracker::{RwLock, TaskTracker};
use super::{
catalog::chunk::{Chunk, ChunkStage, ChunkStageFrozen, ChunkStageFrozenRepr},
catalog::chunk::{Chunk, ChunkStage, ChunkStageFrozenRepr},
Db,
};
use data_types::database_rules::SortOrder;
@ -132,7 +132,7 @@ trait ChunkMover {
}
match chunk_guard.stage() {
ChunkStage::Open(_) => {
ChunkStage::Open { .. } => {
open_partitions.insert(chunk_guard.key().to_string());
if move_tracker.is_none() && would_move {
let partition_key = chunk_guard.key().to_string();
@ -145,7 +145,7 @@ trait ChunkMover {
Some(self.move_to_read_buffer(partition_key, table_name, chunk_id));
}
}
ChunkStage::Frozen(stage) => match &stage.representation {
ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) if move_tracker.is_none() => {
let partition_key = chunk_guard.key().to_string();
let table_name = chunk_guard.table_name().to_string();
@ -156,7 +156,7 @@ trait ChunkMover {
move_tracker =
Some(self.move_to_read_buffer(partition_key, table_name, chunk_id));
}
ChunkStageFrozenRepr::ReadBuffer(_) if would_write => {
ChunkStageFrozenRepr::ReadBuffer { .. } if would_write => {
let partition_key = chunk_guard.key().to_string();
let table_name = chunk_guard.table_name().to_string();
let chunk_id = chunk_guard.id();
@ -197,12 +197,12 @@ trait ChunkMover {
if (rules.drop_non_persisted
&& matches!(
chunk_guard.stage(),
ChunkStage::Frozen(ChunkStageFrozen {
ChunkStage::Frozen {
representation: ChunkStageFrozenRepr::ReadBuffer(_),
meta: _,
})
..
}
))
|| matches!(chunk_guard.stage(), ChunkStage::Persisted(_))
|| matches!(chunk_guard.stage(), ChunkStage::Persisted { .. })
{
let partition_key = chunk_guard.key().to_string();
let table_name = chunk_guard.table_name().to_string();
@ -416,7 +416,7 @@ mod tests {
/// Transitions a new ("open") chunk into the "moving" state.
fn transition_to_moving(mut chunk: Chunk) -> Chunk {
chunk.set_closed().unwrap();
chunk.freeze().unwrap();
chunk.set_moving(&Default::default()).unwrap();
chunk
}
@ -907,7 +907,7 @@ mod tests {
mover.check_for_work(from_secs(80));
assert_eq!(mover.events, vec![]);
mover.chunks[0].write().set_closed().unwrap();
mover.chunks[0].write().freeze().unwrap();
// As soon as closed can move
mover.check_for_work(from_secs(80));