feat: add 2 more chunk states for writing chunks to object store
parent
30701bff3e
commit
f66c67692c
|
@ -409,6 +409,9 @@ impl Db {
|
|||
partition_key: &str,
|
||||
chunk_id: u32,
|
||||
) -> Result<Arc<DBChunk>> {
|
||||
// TODO: the below was done for MB --> Parquet. Need to rewrite to RB
|
||||
// --> Parquet
|
||||
|
||||
// Get the chunk from the catalog
|
||||
let chunk = {
|
||||
let partition =
|
||||
|
@ -428,30 +431,40 @@ impl Db {
|
|||
|
||||
// update the catalog to say we are processing this chunk and
|
||||
// then drop the lock while we do the work
|
||||
// TODO: this read_buffer in the near future will be rb_chunk
|
||||
//let read_buffer = {
|
||||
let mb_chunk = {
|
||||
let mut chunk = chunk.write();
|
||||
|
||||
// TODO: Make sure this is set to the corresponding "moving to object store
|
||||
// state"
|
||||
// chunk.set_writing_to_object_store().context(LoadingChunkToParquet {
|
||||
// partition_key,
|
||||
// chunk_id,
|
||||
// })?
|
||||
chunk.set_moving().context(LoadingChunkToParquet {
|
||||
partition_key,
|
||||
chunk_id,
|
||||
})?
|
||||
};
|
||||
|
||||
// TODO: Change to the right state that move data to object store
|
||||
debug!(%partition_key, %chunk_id, "chunk marked MOVING , loading tables into object store");
|
||||
debug!(%partition_key, %chunk_id, "chunk marked WRITING , loading tables into object store");
|
||||
|
||||
//Get all tables in this chunk
|
||||
let mut batches = Vec::new();
|
||||
let table_stats = mb_chunk.table_summaries();
|
||||
// let table_stats = mb_chunk
|
||||
// .table_stats()
|
||||
// .expect("Figuring out what tables are in the mutable buffer");
|
||||
let table_stats = mb_chunk.table_summaries(); // read_buffer.table_summaries(partition_key, &[chunk_id]);
|
||||
|
||||
// Create a parquet chunk for this chunk
|
||||
let _chunk = Chunk::new(partition_key.to_string(), chunk_id);
|
||||
|
||||
// TODO: This code will be removed when the read_buffer become rb_chunk
|
||||
// let partition_data = read_buffer.data.write().unwrap();
|
||||
// let partition = partition_data
|
||||
// .partitions
|
||||
// .get_mut(partition_key).expect("Read buffer partition not found");
|
||||
|
||||
// let chunk_data = partition.data.write().unwrap();
|
||||
// let rb_chunk = chunk_data.chunks.get_mut(&chunk_id).expect("Read buffer chunk
|
||||
// not found");
|
||||
|
||||
for stats in table_stats {
|
||||
debug!(%partition_key, %chunk_id, table=%stats.name, "loading table to object store");
|
||||
mb_chunk
|
||||
|
|
|
@ -31,10 +31,14 @@ pub enum ChunkState {
|
|||
Moving(Arc<MBChunk>),
|
||||
|
||||
/// Chunk has been completely loaded in the read buffer
|
||||
Moved(Arc<ReadBufferDb>), // todo use read buffer chunk here
|
||||
/// Still need both MB and RD for moving the MB to Object Store
|
||||
Moved(Arc<ReadBufferDb>), // todo use read buffer chunk instead of ReadBufferDb
|
||||
|
||||
/// Chunk has been completely loaded in the object store
|
||||
ObjectStore(Arc<ParquetChunk>),
|
||||
// Chunk is actively writing to object store
|
||||
WritingToObjectStore(Arc<ReadBufferDb>), // todo use read buffer chunk instead of ReadBufferD
|
||||
|
||||
// Chunk has been completely written into object store
|
||||
WrittenToObjectStore(Arc<ReadBufferDb>, Arc<ParquetChunk>),
|
||||
}
|
||||
|
||||
impl ChunkState {
|
||||
|
@ -45,7 +49,8 @@ impl ChunkState {
|
|||
Self::Closing(_) => "Closing",
|
||||
Self::Moving(_) => "Moving",
|
||||
Self::Moved(_) => "Moved",
|
||||
Self::ObjectStore(_) => "ObjectStore",
|
||||
Self::WritingToObjectStore(_) => "Writing to Object Store",
|
||||
Self::WrittenToObjectStore(_, _) => "Written to Object Store",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -178,7 +183,12 @@ impl Chunk {
|
|||
ChunkState::Moved(db) => {
|
||||
db.has_table(self.partition_key.as_str(), table_name, &[self.id])
|
||||
}
|
||||
ChunkState::ObjectStore(chunk) => chunk.has_table(table_name),
|
||||
ChunkState::WritingToObjectStore(db) => {
|
||||
db.has_table(self.partition_key.as_str(), table_name, &[self.id])
|
||||
}
|
||||
ChunkState::WrittenToObjectStore(db, _) => {
|
||||
db.has_table(self.partition_key.as_str(), table_name, &[self.id])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,7 +201,12 @@ impl Chunk {
|
|||
ChunkState::Moved(db) => {
|
||||
db.all_table_names(self.partition_key.as_str(), &[self.id], names)
|
||||
}
|
||||
ChunkState::ObjectStore(chunk) => chunk.all_table_names(names),
|
||||
ChunkState::WritingToObjectStore(db) => {
|
||||
db.all_table_names(self.partition_key.as_str(), &[self.id], names)
|
||||
}
|
||||
ChunkState::WrittenToObjectStore(db, _) => {
|
||||
db.all_table_names(self.partition_key.as_str(), &[self.id], names)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,7 +220,14 @@ impl Chunk {
|
|||
ChunkState::Moved(db) => db
|
||||
.chunks_size(self.partition_key.as_str(), &[self.id])
|
||||
.unwrap_or(0) as usize,
|
||||
ChunkState::ObjectStore(chunk) => chunk.size(),
|
||||
ChunkState::WritingToObjectStore(db) => db
|
||||
.chunks_size(self.partition_key.as_str(), &[self.id])
|
||||
.unwrap_or(0) as usize,
|
||||
ChunkState::WrittenToObjectStore(db, parquet_chunk) => {
|
||||
parquet_chunk.size()
|
||||
+ db.chunks_size(self.partition_key.as_str(), &[self.id])
|
||||
.unwrap_or(0) as usize
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -278,17 +300,15 @@ impl Chunk {
|
|||
}
|
||||
}
|
||||
|
||||
/// Set the chunk to the ObjectStore state, returning a handle to the
|
||||
/// underlying storage
|
||||
pub fn set_object_store(&mut self, chunk: Arc<ParquetChunk>) -> Result<Arc<ParquetChunk>> {
|
||||
/// Set the chunk to the MovingToObjectStore state
|
||||
pub fn set_writing_to_object_store(&mut self) -> Result<Arc<ReadBufferDb>> {
|
||||
let mut s = ChunkState::Invalid;
|
||||
std::mem::swap(&mut s, &mut self.state);
|
||||
|
||||
// TODO: Need to see from which state we can persist to object store
|
||||
match s {
|
||||
ChunkState::Moved(_) => {
|
||||
self.state = ChunkState::ObjectStore(Arc::clone(&chunk));
|
||||
Ok(chunk)
|
||||
ChunkState::Moved(db) => {
|
||||
self.state = ChunkState::WritingToObjectStore(Arc::clone(&db));
|
||||
Ok(db)
|
||||
}
|
||||
state => {
|
||||
self.state = state;
|
||||
|
@ -296,4 +316,27 @@ impl Chunk {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the chunk to the MovedToObjectStore state, returning a handle to the
|
||||
/// underlying storage
|
||||
pub fn set_written_to_object_store(&mut self, chunk: Arc<ParquetChunk>) -> Result<()> {
|
||||
let mut s = ChunkState::Invalid;
|
||||
std::mem::swap(&mut s, &mut self.state);
|
||||
|
||||
match s {
|
||||
ChunkState::WritingToObjectStore(db) => {
|
||||
self.state = ChunkState::WrittenToObjectStore(db, chunk);
|
||||
Ok(())
|
||||
}
|
||||
state => {
|
||||
self.state = state;
|
||||
unexpected_state!(
|
||||
self,
|
||||
"setting object store",
|
||||
"MovingToObjectStore",
|
||||
&self.state
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -124,7 +124,15 @@ impl DBChunk {
|
|||
chunk_id,
|
||||
}
|
||||
}
|
||||
super::catalog::chunk::ChunkState::ObjectStore(chunk) => {
|
||||
ChunkState::WritingToObjectStore(db) => {
|
||||
let db = Arc::clone(db);
|
||||
Self::ReadBuffer {
|
||||
db,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
}
|
||||
}
|
||||
ChunkState::WrittenToObjectStore(_, chunk) => {
|
||||
let chunk = Arc::clone(chunk);
|
||||
Self::ParquetFile { chunk }
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue