refactor: Address comments and add more tests

pull/24376/head
Nga Tran 2021-04-12 11:46:31 -04:00
parent 453aeaf1a0
commit e86a02efa7
2 changed files with 116 additions and 18 deletions

View File

@ -29,7 +29,6 @@ serde = "1.0"
serde_json = "1.0"
snafu = "0.6"
snap = "1.0.0"
tempfile = "3.1.0"
tokio = { version = "1.0", features = ["macros", "time"] }
tokio-util = { version = "0.6.3" }
tracker = { path = "../tracker" }
@ -37,3 +36,4 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }
tempfile = "3.1.0"

View File

@ -721,7 +721,7 @@ mod tests {
use arrow_deps::{
arrow::record_batch::RecordBatch,
assert_table_eq,
datafusion::{self, execution::context, physical_plan::collect},
datafusion::{execution::context, physical_plan::collect},
};
use chrono::Utc;
use data_types::{
@ -918,20 +918,8 @@ mod tests {
.await
}
async fn datafusion_plan_and_collect(
ctx: &mut context::ExecutionContext,
sql: &str,
) -> Result<Vec<RecordBatch>, datafusion::error::DataFusionError> {
let logical_plan = ctx.create_logical_plan(sql).expect("Create logical plan");
let logical_plan = ctx.optimize(&logical_plan).expect("Optimize Logical Plan");
let physical_plan = ctx
.create_physical_plan(&logical_plan)
.expect("Create Physical Plan");
collect(physical_plan).await
}
#[tokio::test]
async fn write_to_parquet_file() {
async fn write_one_chunk_one_table_to_parquet_file() {
// Test that data can be written into parquet files
// Create an object store with a specified location in a local disk
@ -986,8 +974,6 @@ mod tests {
assert_eq!(path_list, paths.clone());
// Get full string path
// Todo: it is better if we can get this full path from a function
// in object_store::path::Path (will talk with mkm (aka Marko))
let root_path = format!("{:?}", root.path());
let root_path = root_path.trim_matches('"');
let path = format!("{}/{}", root_path, paths[0].display());
@ -995,6 +981,7 @@ mod tests {
// Create External table of this parquet file to get its content in a human
// readable form
// Note: We do not care about escaping quotes here because it is just a test
let sql = format!(
"CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'",
path
@ -1006,7 +993,7 @@ mod tests {
// Select data from that table
let sql = "SELECT * FROM parquet_table";
let content = datafusion_plan_and_collect(&mut ctx, &sql).await.unwrap();
let content = ctx.sql(&sql).unwrap().collect().await.unwrap();
println!("Content: {:?}", content);
let expected = vec![
"+-----+------+",
@ -1019,6 +1006,117 @@ mod tests {
assert_table_eq!(expected, &content);
}
#[tokio::test]
async fn write_one_chunk_many_tables_to_parquet_files() {
// Test that data can be written into parquet files
// Create an object store with a specified location in a local disk
let root = TempDir::new().unwrap();
let object_store = Arc::new(ObjectStore::new_file(File::new(root.path())));
// Create a DB given a server id, an object store and a db name
let server_id: NonZeroU32 = NonZeroU32::new(10).unwrap();
let db_name = "parquet_test_db";
let db = Arc::new(make_database(server_id, Arc::clone(&object_store), db_name));
// Write some line protocols in Mutable buffer of the DB
let mut writer = TestLPWriter::default();
writer.write_lp_string(db.as_ref(), "cpu bar=1 10").unwrap();
writer
.write_lp_string(db.as_ref(), "disk ops=1 20")
.unwrap();
writer.write_lp_string(db.as_ref(), "cpu bar=2 20").unwrap();
//Now mark the MB chunk close
let partition_key = "1970-01-01T00";
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
// Move that MB chunk to RB chunk and drop it from MB
let rb_chunk = db
.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
let pq_chunk = db
.load_chunk_to_object_store(partition_key, mb_chunk.id())
.await
.unwrap();
// it should be the same chunk!
assert_eq!(mb_chunk.id(), rb_chunk.id());
assert_eq!(mb_chunk.id(), pq_chunk.id());
// we should have chunks in the mutable buffer, read buffer, and object store
// (Note the currently open chunk is not listed)
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![1]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![0]);
assert_eq!(read_parquet_file_chunk_ids(&db, partition_key), vec![0]);
// Verify data written to the parquet files in object store
// First, there must be 2 paths of object store in the catalog
// that represents 2 files
let paths = pq_chunk.object_store_paths();
assert_eq!(paths.len(), 2);
// Check that the path must exist in the object store
let prefix = object_store.new_path();
let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&prefix))
.await
.unwrap();
println!("path_list: {:#?}", path_list);
assert_eq!(path_list.len(), 2);
assert_eq!(path_list, paths.clone());
// Check the content of each path
// Root path
let root_path = format!("{:?}", root.path());
let root_path = root_path.trim_matches('"');
let mut i = 0;
while i < 2 {
// Get full string path
let path = format!("{}/{}", root_path, paths[i].display());
println!("path: {}", path);
// Create External table of this parquet file to get its content in a human
// readable form
// Note: We do not care about escaping quotes here because it is just a test
let sql = format!(
"CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'",
path
);
let mut ctx = context::ExecutionContext::new();
let df = ctx.sql(&sql).unwrap();
df.collect().await.unwrap();
// Select data from that table
let sql = "SELECT * FROM parquet_table";
let content = ctx.sql(&sql).unwrap().collect().await.unwrap();
println!("Content: {:?}", content);
let mut expected = vec![
"+-----+------+",
"| bar | time |",
"+-----+------+",
"| 1 | 10 |",
"| 2 | 20 |",
"+-----+------+",
];
if i == 1 {
expected = vec![
"+-----+------+",
"| ops | time |",
"+-----+------+",
"| 1 | 20 |",
"+-----+------+",
];
}
assert_table_eq!(expected, &content);
i += 1;
}
}
#[tokio::test]
async fn write_updates_last_write_at() {
let db = make_db();