Merge branch 'main' into mkmik-patch-1
commit
e545ea4acb
|
@ -252,7 +252,8 @@ impl File {
|
|||
}
|
||||
}
|
||||
|
||||
fn path(&self, location: &FilePath) -> PathBuf {
|
||||
/// Return full path of the given location
|
||||
pub fn path(&self, location: &FilePath) -> PathBuf {
|
||||
let mut path = self.root.clone();
|
||||
path.push_path(location);
|
||||
path.to_raw()
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::collections::BTreeSet;
|
||||
use std::{collections::BTreeSet, sync::Arc};
|
||||
|
||||
use crate::table::Table;
|
||||
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange};
|
||||
use internal_types::{schema::Schema, selection::Selection};
|
||||
use object_store::path::Path;
|
||||
use object_store::{path::Path, ObjectStore};
|
||||
use query::predicate::Predicate;
|
||||
use tracker::{MemRegistry, MemTracker};
|
||||
|
||||
|
@ -94,11 +94,17 @@ impl Chunk {
|
|||
&mut self,
|
||||
table_summary: TableSummary,
|
||||
file_location: Path,
|
||||
store: Arc<ObjectStore>,
|
||||
schema: Schema,
|
||||
range: Option<TimestampRange>,
|
||||
) {
|
||||
self.tables
|
||||
.push(Table::new(table_summary, file_location, schema, range));
|
||||
self.tables.push(Table::new(
|
||||
table_summary,
|
||||
file_location,
|
||||
store,
|
||||
schema,
|
||||
range,
|
||||
));
|
||||
}
|
||||
|
||||
/// Return true if this chunk includes the given table
|
||||
|
|
|
@ -3,15 +3,12 @@
|
|||
use arrow_deps::{
|
||||
arrow::{
|
||||
datatypes::{Schema, SchemaRef},
|
||||
error::ArrowError,
|
||||
error::Result as ArrowResult,
|
||||
record_batch::RecordBatch,
|
||||
},
|
||||
datafusion::{
|
||||
error::DataFusionError,
|
||||
physical_plan::{
|
||||
parquet::RowGroupPredicateBuilder, RecordBatchStream, SendableRecordBatchStream,
|
||||
},
|
||||
datafusion::physical_plan::{
|
||||
common::SizedRecordBatchStream, parquet::RowGroupPredicateBuilder, RecordBatchStream,
|
||||
SendableRecordBatchStream,
|
||||
},
|
||||
parquet::{
|
||||
self,
|
||||
|
@ -19,16 +16,17 @@ use arrow_deps::{
|
|||
file::{reader::FileReader, serialized_reader::SerializedFileReader, writer::TryClone},
|
||||
},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use data_types::server_id::ServerId;
|
||||
use futures::{Stream, StreamExt};
|
||||
use internal_types::selection::Selection;
|
||||
use object_store::{
|
||||
path::{ObjectStorePath, Path},
|
||||
ObjectStore, ObjectStoreApi,
|
||||
ObjectStore, ObjectStoreApi, ObjectStoreIntegration,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use query::predicate::Predicate;
|
||||
|
||||
use bytes::Bytes;
|
||||
use data_types::server_id::ServerId;
|
||||
use futures::{Stream, StreamExt};
|
||||
use parking_lot::Mutex;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{
|
||||
fs::File,
|
||||
|
@ -36,10 +34,6 @@ use std::{
|
|||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::{
|
||||
sync::mpsc::{channel, Receiver, Sender},
|
||||
task,
|
||||
};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -257,27 +251,17 @@ impl Storage {
|
|||
predicate: &Predicate,
|
||||
selection: Selection<'_>,
|
||||
schema: SchemaRef,
|
||||
path: &Path,
|
||||
path: Path,
|
||||
store: Arc<ObjectStore>,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
// TODO: support non local file object store
|
||||
if !path.local_file() {
|
||||
panic!("Non local file object store not supported")
|
||||
}
|
||||
|
||||
// The below code is based on
|
||||
// datafusion::physical_plan::parquet::ParquetExec::execute
|
||||
// Will be improved as we go
|
||||
|
||||
let (response_tx, response_rx): (
|
||||
Sender<ArrowResult<RecordBatch>>,
|
||||
Receiver<ArrowResult<RecordBatch>>,
|
||||
) = channel(2);
|
||||
|
||||
// Full path of the parquet file
|
||||
// Might need different function if this display function does not show the full
|
||||
// path
|
||||
let filename = path.display();
|
||||
// println!("Parquet filename: {}", filename); // TODO: remove after tests
|
||||
// let (response_tx, response_rx): (
|
||||
// Sender<ArrowResult<RecordBatch>>,
|
||||
// Receiver<ArrowResult<RecordBatch>>,
|
||||
// ) = channel(2);
|
||||
|
||||
// Indices of columns in the schema needed to read
|
||||
let projection: Vec<usize> = Self::column_indices(selection, Arc::clone(&schema));
|
||||
|
@ -292,50 +276,158 @@ impl Storage {
|
|||
// Limit of total rows to read
|
||||
let limit: Option<usize> = None; // Todo: this should be a parameter of the function
|
||||
|
||||
// Todo: Convert this tokio into using thread pool Andrew has implemented
|
||||
// recently
|
||||
task::spawn_blocking(move || {
|
||||
if let Err(e) = Self::read_file(
|
||||
filename,
|
||||
projection.as_slice(),
|
||||
predicate_builder.as_ref(),
|
||||
batch_size,
|
||||
response_tx,
|
||||
limit,
|
||||
) {
|
||||
println!("Parquet reader thread terminated due to error: {:?}", e);
|
||||
}
|
||||
});
|
||||
// TODO: These commented-out code lines will either be used or deleted when #1082 done
|
||||
// TODO: Until this read_filter is an async, we cannot make this multi-threaded yet
|
||||
// because it returns wrong results if other thread rerun before full results are returned
|
||||
// task::spawn_blocking(move || {
|
||||
// if let Err(e) = Self::read_file(
|
||||
// path,
|
||||
// Arc::clone(&store),
|
||||
// projection.as_slice(),
|
||||
// predicate_builder.as_ref(),
|
||||
// batch_size,
|
||||
// response_tx,
|
||||
// limit,
|
||||
// ) {
|
||||
// println!("Parquet reader thread terminated due to error: {:?}", e);
|
||||
// }
|
||||
// });
|
||||
|
||||
Ok(Box::pin(ParquetStream {
|
||||
schema,
|
||||
inner: ReceiverStream::new(response_rx),
|
||||
}))
|
||||
// Ok(Box::pin(ParquetStream {
|
||||
// schema,
|
||||
// inner: ReceiverStream::new(response_rx),
|
||||
// }))
|
||||
|
||||
let mut batches: Vec<Arc<RecordBatch>> = vec![];
|
||||
if let Err(e) = Self::read_file(
|
||||
path,
|
||||
Arc::clone(&store),
|
||||
projection.as_slice(),
|
||||
predicate_builder.as_ref(),
|
||||
batch_size,
|
||||
&mut batches,
|
||||
limit,
|
||||
) {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// TODO: removed when #1082 done
|
||||
// println!("Record batches from read_file: {:#?}", batches);
|
||||
|
||||
Ok(Box::pin(SizedRecordBatchStream::new(schema, batches)))
|
||||
}
|
||||
|
||||
fn send_result(
|
||||
response_tx: &Sender<ArrowResult<RecordBatch>>,
|
||||
result: ArrowResult<RecordBatch>,
|
||||
) -> Result<()> {
|
||||
// Note this function is running on its own blocking tokio thread so blocking
|
||||
// here is ok.
|
||||
response_tx
|
||||
.blocking_send(result)
|
||||
.map_err(|e| DataFusionError::Execution(e.to_string()))
|
||||
.context(SendResult)?;
|
||||
Ok(())
|
||||
}
|
||||
// TODO notes: implemented this for #1082 but i turns out might not be able to use
|
||||
// because needs to finish #1342 before #1082 is fully tested. Thi function will
|
||||
// be either used or removed when #1082 is done
|
||||
//
|
||||
// fn send_result(
|
||||
// response_tx: &Sender<ArrowResult<RecordBatch>>,
|
||||
// result: ArrowResult<RecordBatch>,
|
||||
// ) -> Result<()> {
|
||||
// // Note this function is running on its own blocking tokio thread so blocking
|
||||
// // here is ok.
|
||||
// response_tx
|
||||
// .blocking_send(result)
|
||||
// .map_err(|e| DataFusionError::Execution(e.to_string()))
|
||||
// .context(SendResult)?;
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
// TODO: see the notes for send_result above
|
||||
// fn read_file(
|
||||
// path: Path,
|
||||
// store: Arc<ObjectStore>,
|
||||
// projection: &[usize],
|
||||
// predicate_builder: Option<&RowGroupPredicateBuilder>,
|
||||
// batch_size: usize,
|
||||
// response_tx: Sender<ArrowResult<RecordBatch>>,
|
||||
// limit: Option<usize>,
|
||||
// ) -> Result<()> {
|
||||
|
||||
// // TODO: support non local file object store
|
||||
// let (file_root, file_path) = match (&store.0, path) {
|
||||
// (ObjectStoreIntegration::File(file), Path::File(location)) => (file, location),
|
||||
// (_, _) => {
|
||||
// panic!("Non local file object store not supported")
|
||||
// }
|
||||
// };
|
||||
// // Get full string path
|
||||
// let full_path = format!("{:?}", file_root.path(&file_path));
|
||||
// let full_path = full_path.trim_matches('"');
|
||||
// println!("Full path filename: {}", full_path);
|
||||
|
||||
// let mut total_rows = 0;
|
||||
|
||||
// let file = File::open(&full_path).context(OpenFile)?;
|
||||
// let mut file_reader = SerializedFileReader::new(file).context(SerializedFileReaderError)?;
|
||||
// if let Some(predicate_builder) = predicate_builder {
|
||||
// let row_group_predicate =
|
||||
// predicate_builder.build_row_group_predicate(file_reader.metadata().row_groups());
|
||||
// file_reader.filter_row_groups(&row_group_predicate); //filter out
|
||||
// // row group based
|
||||
// // on the predicate
|
||||
// }
|
||||
// let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
|
||||
// let mut batch_reader = arrow_reader
|
||||
// .get_record_reader_by_columns(projection.to_owned(), batch_size)
|
||||
// .context(ParquetArrowReaderError)?;
|
||||
// loop {
|
||||
// match batch_reader.next() {
|
||||
// Some(Ok(batch)) => {
|
||||
// //println!("ParquetExec got new batch from {}", filename);
|
||||
// total_rows += batch.num_rows();
|
||||
// Self::send_result(&response_tx, Ok(batch))?;
|
||||
// if limit.map(|l| total_rows >= l).unwrap_or(false) {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// None => {
|
||||
// break;
|
||||
// }
|
||||
// Some(Err(e)) => {
|
||||
// let err_msg =
|
||||
// //format!("Error reading batch from {}: {}", filename, e.to_string());
|
||||
// format!("Error reading batch: {}", e.to_string());
|
||||
// // send error to operator
|
||||
// Self::send_result(&response_tx, Err(ArrowError::ParquetError(err_msg)))?;
|
||||
// // terminate thread with error
|
||||
// return Err(e).context(ReadingFile);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// // finished reading files (dropping response_tx will close
|
||||
// // channel)
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
/// Read the given path of the parquet file and return record batches satisfied
|
||||
/// the given predicate_builder
|
||||
fn read_file(
|
||||
filename: String,
|
||||
path: Path,
|
||||
store: Arc<ObjectStore>,
|
||||
projection: &[usize],
|
||||
predicate_builder: Option<&RowGroupPredicateBuilder>,
|
||||
batch_size: usize,
|
||||
response_tx: Sender<ArrowResult<RecordBatch>>,
|
||||
batches: &mut Vec<Arc<RecordBatch>>,
|
||||
limit: Option<usize>,
|
||||
) -> Result<()> {
|
||||
// TODO: support non local file object store. Ticket #1342
|
||||
let (file_root, file_path) = match (&store.0, path) {
|
||||
(ObjectStoreIntegration::File(file), Path::File(location)) => (file, location),
|
||||
(_, _) => {
|
||||
panic!("Non local file object store not supported")
|
||||
}
|
||||
};
|
||||
// Get full string path
|
||||
let full_path = format!("{:?}", file_root.path(&file_path));
|
||||
let full_path = full_path.trim_matches('"');
|
||||
//println!("Full path filename: {}", full_path); // TOTO: to be removed after both #1082 and #1342 done
|
||||
|
||||
let mut total_rows = 0;
|
||||
|
||||
let file = File::open(&filename).context(OpenFile)?;
|
||||
let file = File::open(&full_path).context(OpenFile)?;
|
||||
let mut file_reader = SerializedFileReader::new(file).context(SerializedFileReaderError)?;
|
||||
if let Some(predicate_builder) = predicate_builder {
|
||||
let row_group_predicate =
|
||||
|
@ -351,9 +443,10 @@ impl Storage {
|
|||
loop {
|
||||
match batch_reader.next() {
|
||||
Some(Ok(batch)) => {
|
||||
//println!("ParquetExec got new batch from {}", filename);
|
||||
//println!("ParquetExec got new batch from {}", filename); TODO: remove when #1082 done
|
||||
//println!("Batch value: {:#?}", batch);
|
||||
total_rows += batch.num_rows();
|
||||
Self::send_result(&response_tx, Ok(batch))?;
|
||||
batches.push(Arc::new(batch));
|
||||
if limit.map(|l| total_rows >= l).unwrap_or(false) {
|
||||
break;
|
||||
}
|
||||
|
@ -362,18 +455,11 @@ impl Storage {
|
|||
break;
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
let err_msg =
|
||||
format!("Error reading batch from {}: {}", filename, e.to_string());
|
||||
// send error to operator
|
||||
Self::send_result(&response_tx, Err(ArrowError::ParquetError(err_msg)))?;
|
||||
// terminate thread with error
|
||||
return Err(e).context(ReadingFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// finished reading files (dropping response_tx will close
|
||||
// channel)
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ use crate::storage::{self, Storage};
|
|||
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange};
|
||||
use internal_types::{schema::Schema, selection::Selection};
|
||||
use object_store::path::Path;
|
||||
use object_store::{path::Path, ObjectStore};
|
||||
use query::predicate::Predicate;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -32,6 +32,9 @@ pub struct Table {
|
|||
/// id>/<tablename>.parquet
|
||||
object_store_path: Path,
|
||||
|
||||
/// Object store of the above relative path to open and read the file
|
||||
object_store: Arc<ObjectStore>,
|
||||
|
||||
/// Schema that goes with this table's parquet file
|
||||
table_schema: Schema,
|
||||
|
||||
|
@ -43,12 +46,14 @@ impl Table {
|
|||
pub fn new(
|
||||
meta: TableSummary,
|
||||
path: Path,
|
||||
store: Arc<ObjectStore>,
|
||||
schema: Schema,
|
||||
range: Option<TimestampRange>,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_summary: meta,
|
||||
object_store_path: path,
|
||||
object_store: store,
|
||||
table_schema: schema,
|
||||
timestamp_range: range,
|
||||
}
|
||||
|
@ -130,7 +135,8 @@ impl Table {
|
|||
predicate,
|
||||
selection,
|
||||
Arc::clone(&self.table_schema.as_arrow()),
|
||||
&self.object_store_path,
|
||||
self.object_store_path.clone(),
|
||||
Arc::clone(&self.object_store),
|
||||
)
|
||||
.context(ReadParquet)
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ serde = "1.0"
|
|||
serde_json = "1.0"
|
||||
snafu = "0.6"
|
||||
snap = "1.0.0"
|
||||
tempfile = "3.1.0"
|
||||
tokio = { version = "1.0", features = ["macros", "time"] }
|
||||
tokio-util = { version = "0.6.3" }
|
||||
tracker = { path = "../tracker" }
|
||||
|
@ -43,7 +44,6 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
|
|||
[dev-dependencies] # In alphabetical order
|
||||
criterion = { version = "0.3.4", features = ["async_tokio"] }
|
||||
flate2 = "1.0.20"
|
||||
tempfile = "3.1.0"
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
|
||||
[features]
|
||||
|
|
|
@ -722,7 +722,13 @@ impl Db {
|
|||
.try_into()
|
||||
.context(SchemaConversion)?;
|
||||
let table_time_range = time_range.map(|(start, end)| TimestampRange::new(start, end));
|
||||
parquet_chunk.add_table(stats, path, schema, table_time_range);
|
||||
parquet_chunk.add_table(
|
||||
stats,
|
||||
path,
|
||||
Arc::clone(&self.store),
|
||||
schema,
|
||||
table_time_range,
|
||||
);
|
||||
}
|
||||
|
||||
// Relock the chunk again (nothing else should have been able
|
||||
|
@ -752,7 +758,8 @@ impl Db {
|
|||
self.metrics.update_chunk_state(chunk.state());
|
||||
debug!(%partition_key, %table_name, %chunk_id, "chunk marked MOVED. Persisting to object store complete");
|
||||
|
||||
Ok(DbChunk::snapshot(&chunk))
|
||||
// We know this chunk is ParquetFile type
|
||||
Ok(DbChunk::parquet_file_snapshot(&chunk))
|
||||
}
|
||||
|
||||
/// Spawns a task to perform
|
||||
|
@ -1528,7 +1535,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(1889.0)
|
||||
.sample_sum_eq(1897.0)
|
||||
.unwrap();
|
||||
|
||||
// it should be the same chunk!
|
||||
|
@ -1886,6 +1893,10 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
db.write_chunk_to_object_store("1970-01-01T00", "cpu", 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
print!("Partitions2: {:?}", db.partition_keys().unwrap());
|
||||
|
||||
db.rollover_partition("1970-01-05T15", "cpu").await.unwrap();
|
||||
|
@ -1903,8 +1914,8 @@ mod tests {
|
|||
to_arc("1970-01-01T00"),
|
||||
to_arc("cpu"),
|
||||
0,
|
||||
ChunkStorage::ReadBuffer,
|
||||
1213,
|
||||
ChunkStorage::ReadBufferAndObjectStore,
|
||||
1213 + 675, // size of RB and OS chunks
|
||||
1,
|
||||
),
|
||||
ChunkSummary::new_without_timestamps(
|
||||
|
@ -1941,6 +1952,7 @@ mod tests {
|
|||
|
||||
assert_eq!(db.memory_registries.mutable_buffer.bytes(), 100 + 129 + 131);
|
||||
assert_eq!(db.memory_registries.read_buffer.bytes(), 1213);
|
||||
assert_eq!(db.memory_registries.parquet.bytes(), 89); // TODO: This 89 must be replaced with 675. Ticket #1311
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1963,6 +1975,11 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
// write the read buffer chunk to object store
|
||||
db.write_chunk_to_object_store("1970-01-01T00", "cpu", chunk_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// write into a separate partitiion
|
||||
write_lp(&db, "cpu bar=1 400000000000000");
|
||||
write_lp(&db, "mem frob=3 400000000000001");
|
||||
|
|
|
@ -41,6 +41,8 @@ pub enum ChunkState {
|
|||
|
||||
// Chunk has been completely written into object store
|
||||
WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>),
|
||||
// Todo : There must be another state for chunk that only in object store
|
||||
// ticket #1314
|
||||
}
|
||||
|
||||
impl ChunkState {
|
||||
|
@ -231,8 +233,8 @@ impl Chunk {
|
|||
chunk.rows() as usize,
|
||||
ChunkStorage::ReadBufferAndObjectStore,
|
||||
),
|
||||
ChunkState::WrittenToObjectStore(chunk, _) => (
|
||||
chunk.size(),
|
||||
ChunkState::WrittenToObjectStore(chunk, parquet_chunk) => (
|
||||
chunk.size() + parquet_chunk.size(),
|
||||
chunk.rows() as usize,
|
||||
ChunkStorage::ReadBufferAndObjectStore,
|
||||
),
|
||||
|
|
|
@ -107,10 +107,37 @@ impl DbChunk {
|
|||
chunk: Arc::clone(chunk),
|
||||
partition_key,
|
||||
},
|
||||
ChunkState::WrittenToObjectStore(chunk, _) => Self::ReadBuffer {
|
||||
chunk: Arc::clone(chunk),
|
||||
partition_key,
|
||||
// Since data exists in both read buffer and object store, we should
|
||||
// snapshot the chunk of read buffer
|
||||
},
|
||||
// Todo: Turn this on When we have this state
|
||||
// ChunkState::ObjectStoreOnly(chunk) => {
|
||||
// let chunk = Arc::clone(chunk);
|
||||
// Self::ParquetFile { chunk }
|
||||
// }
|
||||
};
|
||||
Arc::new(db_chunk)
|
||||
}
|
||||
|
||||
/// Return the snapshot of the chunk with type ParquetFile
|
||||
/// This function should be only invoked when you know your chunk
|
||||
/// is ParquetFile type whose state is WrittenToObjectStore. The
|
||||
/// reason we have this function is because the above snapshot
|
||||
/// function always returns the read buffer one for the same state
|
||||
pub fn parquet_file_snapshot(chunk: &super::catalog::chunk::Chunk) -> Arc<Self> {
|
||||
use super::catalog::chunk::ChunkState;
|
||||
|
||||
let db_chunk = match chunk.state() {
|
||||
ChunkState::WrittenToObjectStore(_, chunk) => {
|
||||
let chunk = Arc::clone(chunk);
|
||||
Self::ParquetFile { chunk }
|
||||
}
|
||||
_ => {
|
||||
panic!("Internal error: This chunk's state is not WrittenToObjectStore");
|
||||
}
|
||||
};
|
||||
Arc::new(db_chunk)
|
||||
}
|
||||
|
|
|
@ -184,6 +184,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
|
|||
let db = make_db().db;
|
||||
let data = lp_lines.join("\n");
|
||||
write_lp(&db, &data);
|
||||
// roll over and load chunks into both RUB and OS
|
||||
rollover_and_load(&db, "2020-03-01T00", "h2o").await;
|
||||
rollover_and_load(&db, "2020-03-02T00", "h2o").await;
|
||||
rollover_and_load(&db, "2020-04-01T00", "h2o").await;
|
||||
|
@ -193,6 +194,8 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
|
|||
db,
|
||||
};
|
||||
|
||||
// TODO: Add a scenario for OS only in #1342
|
||||
|
||||
vec![scenario1, scenario2, scenario3]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@ use query::PartitionChunk;
|
|||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::db::{test_helpers::write_lp, Db};
|
||||
use crate::{db::test_helpers::write_lp, Db};
|
||||
|
||||
use super::utils::{count_mutable_buffer_chunks, count_read_buffer_chunks, make_db};
|
||||
use super::utils::{
|
||||
count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db,
|
||||
};
|
||||
|
||||
/// Holds a database and a description of how its data was configured
|
||||
#[derive(Debug)]
|
||||
|
@ -31,24 +33,29 @@ impl DbSetup for NoData {
|
|||
async fn make(&self) -> Vec<DbScenario> {
|
||||
let partition_key = "1970-01-01T00";
|
||||
let table_name = "cpu";
|
||||
|
||||
// Scenario 1: No data in the DB yet
|
||||
//
|
||||
let db = make_db().db;
|
||||
let scenario1 = DbScenario {
|
||||
scenario_name: "New, Empty Database".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
// listing partitions (which may create an entry in a map)
|
||||
// Scenario 2: listing partitions (which may create an entry in a map)
|
||||
// in an empty database
|
||||
//
|
||||
let db = make_db().db;
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_object_store_chunks(&db), 0);
|
||||
let scenario2 = DbScenario {
|
||||
scenario_name: "New, Empty Database after partitions are listed".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
// a scenario where the database has had data loaded and then deleted
|
||||
|
||||
// Scenario 3: the database has had data loaded into RB and then deleted
|
||||
//
|
||||
let db = make_db().db;
|
||||
let data = "cpu,region=west user=23.2 100";
|
||||
write_lp(&db, data);
|
||||
|
@ -61,28 +68,80 @@ impl DbSetup for NoData {
|
|||
.id(),
|
||||
0
|
||||
);
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 1); //
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0); // nothing yet
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 1);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0); // only open chunk
|
||||
|
||||
// Now load the closed chunk into the RB
|
||||
db.load_chunk_to_read_buffer(partition_key, table_name, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1); // close chunk only
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1); // only open chunk
|
||||
// drop chunk 0
|
||||
db.drop_chunk(partition_key, table_name, 0).unwrap();
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0); // nothing after dropping chunk 0
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // still nothing
|
||||
|
||||
let scenario3 = DbScenario {
|
||||
scenario_name: "Empty Database after drop chunk that is in read buffer".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
// Scenario 4: the database has had data loaded into RB & Object Store and then deleted
|
||||
//
|
||||
let db = make_db().db;
|
||||
let data = "cpu,region=west user=23.2 100";
|
||||
write_lp(&db, data);
|
||||
// move data out of open chunk
|
||||
assert_eq!(
|
||||
db.rollover_partition(partition_key, table_name)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.id(),
|
||||
0
|
||||
);
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 1); // 1 open chunk
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0); // nothing yet
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
// Now load the closed chunk into the RB
|
||||
db.load_chunk_to_read_buffer(partition_key, table_name, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1); // close chunk only
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
// Now write the data in RB to object store but keep it in RB
|
||||
db.write_chunk_to_object_store(partition_key, "cpu", 0)
|
||||
.await
|
||||
.unwrap();
|
||||
// it should be the same chunk!
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1); // closed chunk only
|
||||
assert_eq!(count_object_store_chunks(&db), 1); // close chunk only
|
||||
|
||||
// drop chunk 0
|
||||
db.drop_chunk(partition_key, table_name, 0).unwrap();
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_object_store_chunks(&db), 0);
|
||||
|
||||
let scenario3 = DbScenario {
|
||||
scenario_name: "Empty Database after drop chunk".into(),
|
||||
let scenario4 = DbScenario {
|
||||
scenario_name:
|
||||
"Empty Database after drop chunk that is in both read buffer and object store"
|
||||
.into(),
|
||||
db,
|
||||
};
|
||||
|
||||
vec![scenario1, scenario2, scenario3]
|
||||
vec![scenario1, scenario2, scenario3, scenario4]
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -356,7 +415,28 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
|
|||
db,
|
||||
};
|
||||
|
||||
vec![scenario1, scenario2, scenario3]
|
||||
let db = make_db().db;
|
||||
let table_names = write_lp(&db, data);
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(partition_key, &table_name)
|
||||
.await
|
||||
.unwrap();
|
||||
db.load_chunk_to_read_buffer(partition_key, &table_name, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.write_chunk_to_object_store(partition_key, &table_name, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let scenario4 = DbScenario {
|
||||
scenario_name: "Data in both read buffer and object store".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
// TODO: Add scenario 5 where data is in object store only
|
||||
// go with #1342
|
||||
|
||||
vec![scenario1, scenario2, scenario3, scenario4]
|
||||
}
|
||||
|
||||
/// This function loads two chunks of lp data into 4 different scenarios
|
||||
|
@ -436,10 +516,45 @@ pub async fn make_two_chunk_scenarios(
|
|||
db,
|
||||
};
|
||||
|
||||
vec![scenario1, scenario2, scenario3, scenario4]
|
||||
// in 2 read buffer chunks that also loaded into object store
|
||||
let db = make_db().db;
|
||||
let table_names = write_lp(&db, data1);
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(partition_key, &table_name)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let table_names = write_lp(&db, data2);
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(partition_key, &table_name)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(partition_key, &table_name, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer(partition_key, &table_name, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.write_chunk_to_object_store(partition_key, &table_name, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.write_chunk_to_object_store(partition_key, &table_name, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let scenario5 = DbScenario {
|
||||
scenario_name: "Data in two read buffer chunks and two parquet file chunks".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
vec![scenario1, scenario2, scenario3, scenario4, scenario5]
|
||||
}
|
||||
|
||||
/// Rollover the mutable buffer and load chunk 0 to the read bufer
|
||||
/// Rollover the mutable buffer and load chunk 0 to the read buffer and object store
|
||||
pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) {
|
||||
db.rollover_partition(partition_key, table_name)
|
||||
.await
|
||||
|
@ -447,4 +562,7 @@ pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) {
|
|||
db.load_chunk_to_read_buffer(partition_key, table_name, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.write_chunk_to_object_store(partition_key, table_name, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
|
|
@ -4,11 +4,12 @@ use data_types::{
|
|||
server_id::ServerId,
|
||||
DatabaseName,
|
||||
};
|
||||
use object_store::{memory::InMemory, ObjectStore};
|
||||
use object_store::{disk::File, ObjectStore};
|
||||
use query::{exec::Executor, Database};
|
||||
|
||||
use crate::{db::Db, JobRegistry};
|
||||
use std::{convert::TryFrom, sync::Arc};
|
||||
use tempfile::TempDir;
|
||||
|
||||
// A wrapper around a Db and a metrics registry allowing for isolated testing
|
||||
// of a Db and its metrics.
|
||||
|
@ -21,7 +22,13 @@ pub struct TestDb {
|
|||
/// Used for testing: create a Database with a local store
|
||||
pub fn make_db() -> TestDb {
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
// TODO: When we support parquet file in memory, we will either turn this test back to memory
|
||||
// or have both tests: local disk and memory
|
||||
//let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
//
|
||||
// Create an object store with a specified location in a local disk
|
||||
let root = TempDir::new().unwrap();
|
||||
let object_store = Arc::new(ObjectStore::new_file(File::new(root.path())));
|
||||
let exec = Arc::new(Executor::new(1));
|
||||
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
|
||||
|
||||
|
@ -77,6 +84,19 @@ pub fn count_mutable_buffer_chunks(db: &Db) -> usize {
|
|||
/// Returns the number of read buffer chunks in the specified database
|
||||
pub fn count_read_buffer_chunks(db: &Db) -> usize {
|
||||
chunk_summary_iter(db)
|
||||
.filter(|s| s.storage == ChunkStorage::ReadBuffer)
|
||||
.filter(|s| {
|
||||
s.storage == ChunkStorage::ReadBuffer
|
||||
|| s.storage == ChunkStorage::ReadBufferAndObjectStore
|
||||
})
|
||||
.count()
|
||||
}
|
||||
|
||||
/// Returns the number of object store chunks in the specified database
|
||||
pub fn count_object_store_chunks(db: &Db) -> usize {
|
||||
chunk_summary_iter(db)
|
||||
.filter(|s| {
|
||||
s.storage == ChunkStorage::ReadBufferAndObjectStore
|
||||
|| s.storage == ChunkStorage::ObjectStoreOnly
|
||||
})
|
||||
.count()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue