diff --git a/server/Cargo.toml b/server/Cargo.toml index 85d6ec8fad..d27f1ac029 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -29,7 +29,6 @@ serde = "1.0" serde_json = "1.0" snafu = "0.6" snap = "1.0.0" -tempfile = "3.1.0" tokio = { version = "1.0", features = ["macros", "time"] } tokio-util = { version = "0.6.3" } tracker = { path = "../tracker" } @@ -37,3 +36,4 @@ uuid = { version = "0.8", features = ["serde", "v4"] } [dev-dependencies] # In alphabetical order test_helpers = { path = "../test_helpers" } +tempfile = "3.1.0" diff --git a/server/src/db.rs b/server/src/db.rs index 5673bd0364..d358c4766b 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -721,7 +721,7 @@ mod tests { use arrow_deps::{ arrow::record_batch::RecordBatch, assert_table_eq, - datafusion::{self, execution::context, physical_plan::collect}, + datafusion::{execution::context, physical_plan::collect}, }; use chrono::Utc; use data_types::{ @@ -918,20 +918,8 @@ mod tests { .await } - async fn datafusion_plan_and_collect( - ctx: &mut context::ExecutionContext, - sql: &str, - ) -> Result, datafusion::error::DataFusionError> { - let logical_plan = ctx.create_logical_plan(sql).expect("Create logical plan"); - let logical_plan = ctx.optimize(&logical_plan).expect("Optimize Logical Plan"); - let physical_plan = ctx - .create_physical_plan(&logical_plan) - .expect("Create Physical Plan"); - collect(physical_plan).await - } - #[tokio::test] - async fn write_to_parquet_file() { + async fn write_one_chunk_one_table_to_parquet_file() { // Test that data can be written into parquet files // Create an object store with a specified location in a local disk @@ -986,8 +974,6 @@ mod tests { assert_eq!(path_list, paths.clone()); // Get full string path - // Todo: it is better if we can get this full path from a function - // in object_store::path::Path (will talk with mkm (aka Marko)) let root_path = format!("{:?}", root.path()); let root_path = root_path.trim_matches('"'); let path = format!("{}/{}", root_path, paths[0].display()); @@ -995,6 +981,7 @@ mod tests { // Create External table of this parquet file to get its content in a human // readable form + // Note: We do not care about escaping quotes here because it is just a test let sql = format!( "CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'", path @@ -1006,7 +993,7 @@ mod tests { // Select data from that table let sql = "SELECT * FROM parquet_table"; - let content = datafusion_plan_and_collect(&mut ctx, &sql).await.unwrap(); + let content = ctx.sql(&sql).unwrap().collect().await.unwrap(); println!("Content: {:?}", content); let expected = vec![ "+-----+------+", @@ -1019,6 +1006,117 @@ mod tests { assert_table_eq!(expected, &content); } + #[tokio::test] + async fn write_one_chunk_many_tables_to_parquet_files() { + // Test that data can be written into parquet files + + // Create an object store with a specified location in a local disk + let root = TempDir::new().unwrap(); + let object_store = Arc::new(ObjectStore::new_file(File::new(root.path()))); + + // Create a DB given a server id, an object store and a db name + let server_id: NonZeroU32 = NonZeroU32::new(10).unwrap(); + let db_name = "parquet_test_db"; + let db = Arc::new(make_database(server_id, Arc::clone(&object_store), db_name)); + + // Write some line protocols in Mutable buffer of the DB + let mut writer = TestLPWriter::default(); + writer.write_lp_string(db.as_ref(), "cpu bar=1 10").unwrap(); + writer + .write_lp_string(db.as_ref(), "disk ops=1 20") + .unwrap(); + writer.write_lp_string(db.as_ref(), "cpu bar=2 20").unwrap(); + + //Now mark the MB chunk close + let partition_key = "1970-01-01T00"; + let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); + // Move that MB chunk to RB chunk and drop it from MB + let rb_chunk = db + .load_chunk_to_read_buffer(partition_key, mb_chunk.id()) + .await + .unwrap(); + // Write the RB chunk to Object Store but keep it in RB + let pq_chunk = db + .load_chunk_to_object_store(partition_key, mb_chunk.id()) + .await + .unwrap(); + + // it should be the same chunk! + assert_eq!(mb_chunk.id(), rb_chunk.id()); + assert_eq!(mb_chunk.id(), pq_chunk.id()); + + // we should have chunks in the mutable buffer, read buffer, and object store + // (Note the currently open chunk is not listed) + assert_eq!(mutable_chunk_ids(&db, partition_key), vec![1]); + assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![0]); + assert_eq!(read_parquet_file_chunk_ids(&db, partition_key), vec![0]); + + // Verify data written to the parquet files in object store + // First, there must be 2 paths of object store in the catalog + // that represents 2 files + let paths = pq_chunk.object_store_paths(); + assert_eq!(paths.len(), 2); + + // Check that the path must exist in the object store + let prefix = object_store.new_path(); + let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&prefix)) + .await + .unwrap(); + println!("path_list: {:#?}", path_list); + assert_eq!(path_list.len(), 2); + assert_eq!(path_list, paths.clone()); + + // Check the content of each path + + // Root path + let root_path = format!("{:?}", root.path()); + let root_path = root_path.trim_matches('"'); + + let mut i = 0; + while i < 2 { + // Get full string path + let path = format!("{}/{}", root_path, paths[i].display()); + println!("path: {}", path); + + // Create External table of this parquet file to get its content in a human + // readable form + // Note: We do not care about escaping quotes here because it is just a test + let sql = format!( + "CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'", + path + ); + + let mut ctx = context::ExecutionContext::new(); + let df = ctx.sql(&sql).unwrap(); + df.collect().await.unwrap(); + + // Select data from that table + let sql = "SELECT * FROM parquet_table"; + let content = ctx.sql(&sql).unwrap().collect().await.unwrap(); + println!("Content: {:?}", content); + let mut expected = vec![ + "+-----+------+", + "| bar | time |", + "+-----+------+", + "| 1 | 10 |", + "| 2 | 20 |", + "+-----+------+", + ]; + if i == 1 { + expected = vec![ + "+-----+------+", + "| ops | time |", + "+-----+------+", + "| 1 | 20 |", + "+-----+------+", + ]; + } + + assert_table_eq!(expected, &content); + i += 1; + } + } + #[tokio::test] async fn write_updates_last_write_at() { let db = make_db();