feat: Add more changes

pull/24376/head
Nga Tran 2021-03-30 18:31:09 -04:00
parent 1a0e698571
commit 0bcd52d5c9
4 changed files with 65 additions and 3 deletions

0
parquet_file/Cargo.toml Normal file
View File

0
parquet_file/src/lib.rs Normal file
View File

View File

@ -380,10 +380,13 @@ impl Db {
Ok(DBChunk::snapshot(&chunk))
}
pub fn load_chunk_to_parquet(&self,
pub fn load_chunk_to_object_store(&self,
partition_key: &str,
chunk_id: u32,
) -> Result<Arc<DBChunk>> {
// TODO: Refactor this on because it is exactly the same the one inside load_chunk_to_read_buffer
// Get the chunk from the catalog
let chunk = {
let partition = self
.catalog
@ -399,7 +402,65 @@ impl Db {
chunk_id,
})?
};
Ok(&chunk)
// update the catalog to say we are processing this chunk and
// then drop the lock while we do the work
let mb_chunk = {
let mut chunk = chunk.write();
// TODO: Make sure this is set to the corresponding "moving to object store state"
chunk.set_moving().context(LoadingChunk {
partition_key,
chunk_id,
})?
};
// TODO: Change to the right state that move data to object store
debug!(%partition_key, %chunk_id, "chunk marked MOVING , loading tables into object store");
//Get all tables in this chunk
let mut batches = Vec::new();
let table_stats = mb_chunk
.table_stats()
.expect("Figuring out what tables are in the mutable buffer");
for stats in table_stats {
debug!(%partition_key, %chunk_id, table=%stats.name, "loading table to object store");
mb_chunk
.table_to_arrow(&mut batches, &stats.name, Selection::All)
// It is probably reasonable to recover from this error
// (reset the chunk state to Open) but until that is
// implemented (and tested) just panic
.expect("Loading chunk to mutable buffer");
for batch in batches.drain(..) {
// TODO: function to write this batch to a parquet file
// TODO: remove this when done
// // As implemented now, taking this write lock will wait
// // until all reads to the read buffer to complete and
// // then will block all reads while the insert is occuring
// self.read_buffer
// .upsert_partition(partition_key, mb_chunk.id(), &stats.name, batch)
}
}
// Relock the chunk again (nothing else should have been able
// to modify the chunk state while we were moving it
let mut chunk = chunk.write();
// update the catalog to say we are done processing
chunk
// TODO: set to the corresponding state
.set_moved(Arc::clone(&self.read_buffer))
.context(LoadingChunk {
partition_key,
chunk_id,
})?;
// TODO: mark down the right state
debug!(%partition_key, %chunk_id, "chunk marked MOVED. Persisting to object store complete");
Ok(DBChunk::snapshot(&chunk))
}
/// Returns the next write sequence number

View File

@ -2,6 +2,7 @@ use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
use data_types::chunk::{ChunkStorage, ChunkSummary};
use internal_types::{schema::Schema, selection::Selection};
use mutable_buffer::chunk::Chunk as MBChunk;
use object_store::ObjectStore;
use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk};
use read_buffer::Database as ReadBufferDb;
use snafu::{ResultExt, Snafu};
@ -69,7 +70,7 @@ pub enum DBChunk {
chunk_id: u32,
},
ParquetFile {
path: String, // name of the parquet file
path: ObjectStore,
partition_key: Arc<String>,
chunk_id: u32, // ID of the chunk stored in this parquet file
}