From 7969808f09973640dc996c8391356b26d13016fb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 19 Jan 2021 13:28:26 -0500 Subject: [PATCH] feat: Chunk Migration APIs and query data in the read buffer via SQL (#668) * feat: Chunk Migration APIs and query data in the read buffer via SQL * fix: Make code more consistent * fix: fmt / clippy * chore: Apply suggestions from code review Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> * refactor: Remove unecessary Result and make chunks() infallable * chore: Apply more suggestions from code review Co-authored-by: Edd Robinson Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> Co-authored-by: Edd Robinson --- Cargo.lock | 1 + mutable_buffer/src/database.rs | 7 +- mutable_buffer/src/table.rs | 6 +- query/src/frontend/sql.rs | 2 +- query/src/lib.rs | 2 +- query/src/test.rs | 2 +- read_buffer/src/lib.rs | 31 ++- read_buffer/src/table.rs | 1 + server/Cargo.toml | 3 + server/src/config.rs | 4 +- server/src/db.rs | 368 +++++++++++++++++++++++++++++++-- server/src/db/chunk.rs | 94 +++++++-- server/src/lib.rs | 2 +- 13 files changed, 483 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0983b54488..705e0965cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3298,6 +3298,7 @@ dependencies = [ "serde_json", "snafu", "snap", + "test_helpers", "tokio", "tracing", "uuid", diff --git a/mutable_buffer/src/database.rs b/mutable_buffer/src/database.rs index 2e0b9e4d3f..067fece9c0 100644 --- a/mutable_buffer/src/database.rs +++ b/mutable_buffer/src/database.rs @@ -313,13 +313,12 @@ impl Database for MutableBufferDb { /// Return the list of chunks, in order of id, for the specified /// partition_key - async fn chunks(&self, partition_key: &str) -> Result>> { - Ok(self - .get_partition(partition_key) + async fn chunks(&self, partition_key: &str) -> Vec> { + self.get_partition(partition_key) .await .read() .await - .chunks()) + .chunks() } } diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index f78a4d4781..c0914481ce 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -921,7 +921,11 @@ impl Table { Arc::new(builder.finish()) } Column::I64(vals, _) => { - schema_builder = schema_builder.field(column_name, ArrowDataType::Int64); + schema_builder = if column_name == TIME_COLUMN_NAME { + schema_builder.timestamp() + } else { + schema_builder.field(column_name, ArrowDataType::Int64) + }; let mut builder = Int64Builder::new(vals.len()); for v in vals { diff --git a/query/src/frontend/sql.rs b/query/src/frontend/sql.rs index ab2ff96d74..551b61602b 100644 --- a/query/src/frontend/sql.rs +++ b/query/src/frontend/sql.rs @@ -69,7 +69,7 @@ impl SQLQueryPlanner { for table in &table_names { let mut data = Vec::new(); for partition_key in &partition_keys { - for chunk in database.chunks(partition_key).await.unwrap() { + for chunk in database.chunks(partition_key).await { chunk .table_to_arrow(&mut data, &table, &[]) .map_err(|e| Box::new(e) as _) diff --git a/query/src/lib.rs b/query/src/lib.rs index 256dbd0e2c..a62f070870 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -50,7 +50,7 @@ pub trait Database: Debug + Send + Sync { /// Returns a covering set of chunks in the specified partition. A /// covering set means that together the chunks make up a single /// complete copy of the data being queried. - async fn chunks(&self, partition_key: &str) -> Result>, Self::Error>; + async fn chunks(&self, partition_key: &str) -> Vec>; // ---------- // The functions below are slated for removal (migration into a gRPC query diff --git a/query/src/test.rs b/query/src/test.rs index eac73c5484..c32ca08a9d 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -423,7 +423,7 @@ impl Database for TestDatabase { unimplemented!("table_names_for_partition not implemented for test database"); } - async fn chunks(&self, _partition_key: &str) -> Result>, Self::Error> { + async fn chunks(&self, _partition_key: &str) -> Vec> { unimplemented!("query_chunks for test database"); } } diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 5ce2e9ee3a..fecfa4c68b 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -144,8 +144,17 @@ impl Database { } // Lists all partition keys with data for this database. - pub fn partition_keys(&mut self) -> Vec<&String> { - self.partitions.keys().collect::>() + pub fn partition_keys(&self) -> Vec<&String> { + self.partitions.keys().collect() + } + + /// Lists all chunk ids in the given partition key. Returns empty + /// `Vec` if no partition with the given key exists + pub fn chunk_ids(&self, partition_key: &str) -> Vec { + self.partitions + .get(partition_key) + .map(|partition| partition.chunk_ids()) + .unwrap_or_default() } pub fn size(&self) -> u64 { @@ -452,6 +461,11 @@ impl Partition { }; } + /// Return the chunk ids stored in this partition, in order of id + fn chunk_ids(&self) -> Vec { + self.chunks.keys().cloned().collect() + } + fn chunks_by_ids(&self, ids: &[u32]) -> Result> { let mut chunks = vec![]; for chunk_id in ids { @@ -492,6 +506,19 @@ pub struct ReadFilterResults<'input, 'chunk> { select_columns: table::ColumnSelection<'input>, } +impl<'input, 'chunk> fmt::Debug for ReadFilterResults<'input, 'chunk> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReadFilterResults") + .field("chunks.len", &self.chunks.len()) + .field("next_i", &self.next_i) + .field("curr_table_results", &"") + .field("table_name", &self.table_name) + .field("predicate", &self.predicate) + .field("select_columns", &self.select_columns) + .finish() + } +} + impl<'input, 'chunk> ReadFilterResults<'input, 'chunk> { fn new( chunks: Vec<&'chunk Chunk>, diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 89c9a53ad5..8e49e417ff 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -522,6 +522,7 @@ impl MetaData { /// /// The `All` variant denotes that the caller wishes to include all table /// columns in the results. +#[derive(Debug)] pub enum ColumnSelection<'a> { All, Some(&'a [&'a str]), diff --git a/server/Cargo.toml b/server/Cargo.toml index f418759ebc..5479350071 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -28,3 +28,6 @@ uuid = { version = "0.8", features = ["serde", "v4"]} flatbuffers = "0.6" crc32fast = "1.2.0" snap = "1.0.0" + +[dev-dependencies] +test_helpers = { path = "../test_helpers" } diff --git a/server/src/config.rs b/server/src/config.rs index 4bd3753dc7..7a23177dec 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -37,12 +37,12 @@ impl Config { } let mutable_buffer = if rules.store_locally { - Some(Arc::new(MutableBufferDb::new(name.to_string()))) + Some(MutableBufferDb::new(name.to_string())) } else { None }; - let read_buffer = Arc::new(ReadBufferDb::new()); + let read_buffer = ReadBufferDb::new(); let wal_buffer = rules.wal_buffer_config.as_ref().map(Into::into); let db = Arc::new(Db::new(rules, mutable_buffer, read_buffer, wal_buffer)); diff --git a/server/src/db.rs b/server/src/db.rs index a48472a531..ceca3db788 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -5,7 +5,7 @@ use std::{ collections::BTreeMap, sync::{ atomic::{AtomicU64, Ordering}, - Arc, Mutex, + Arc, Mutex, RwLock, }, }; @@ -30,12 +30,20 @@ pub enum Error { source: mutable_buffer::chunk::Error, }, + #[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))] + UnknownMutableBufferChunk { chunk_id: u32 }, + #[snafu(display("Cannot write to this database: no mutable buffer configured"))] DatatbaseNotWriteable {}, #[snafu(display("Cannot read to this database: no mutable buffer configured"))] DatabaseNotReadable {}, + #[snafu(display("Error dropping data from mutable buffer: {}", source))] + MutableBufferDrop { + source: mutable_buffer::database::Error, + }, + #[snafu(display("Error rolling partition: {}", source))] RollingPartition { source: mutable_buffer::database::Error, @@ -50,6 +58,9 @@ pub enum Error { MutableBufferWrite { source: mutable_buffer::database::Error, }, + + #[snafu(display("Error dropping data from read buffer: {}", source))] + ReadBufferDrop { source: read_buffer::Error }, } pub type Result = std::result::Result; @@ -66,12 +77,14 @@ pub struct Db { /// The (optional) mutable buffer stores incoming writes. If a /// database does not have a mutable buffer it can not accept /// writes (it is a read replica) - pub mutable_buffer: Option>, + pub mutable_buffer: Option, #[serde(skip)] /// The read buffer holds chunk data in an in-memory optimized /// format. - pub read_buffer: Arc, + /// + /// TODO: finer grained locking see ticket https://github.com/influxdata/influxdb_iox/issues/669 + pub read_buffer: Arc>, #[serde(skip)] /// The wal buffer holds replicated writes in an append in-memory @@ -85,11 +98,12 @@ pub struct Db { impl Db { pub fn new( rules: DatabaseRules, - mutable_buffer: Option>, - read_buffer: Arc, + mutable_buffer: Option, + read_buffer: ReadBufferDb, wal_buffer: Option, ) -> Self { let wal_buffer = wal_buffer.map(Mutex::new); + let read_buffer = Arc::new(RwLock::new(read_buffer)); Self { rules, mutable_buffer, @@ -106,7 +120,7 @@ impl Db { .rollover_partition(partition_key) .await .context(RollingPartition) - .map(|c| Arc::new(DBChunk::MutableBuffer(c))) + .map(DBChunk::new_mb) } else { DatatbaseNotWriteable {}.fail() } @@ -114,19 +128,112 @@ impl Db { // Return a list of all chunks in the mutable_buffer (that can // potentially be migrated into the read buffer or object store) - pub async fn mutable_buffer_chunks(&self, partition_key: &str) -> Result>> { + pub async fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec> { let chunks = if let Some(mutable_buffer) = self.mutable_buffer.as_ref() { mutable_buffer .chunks(partition_key) .await - .context(MutableBufferRead)? .into_iter() - .map(|c| Arc::new(DBChunk::MutableBuffer(c))) + .map(DBChunk::new_mb) .collect() } else { vec![] }; - Ok(chunks) + chunks + } + + /// List chunks that are currently in the read buffer + pub async fn read_buffer_chunks(&self, partition_key: &str) -> Vec> { + self.read_buffer + .read() + .expect("mutex poisoned") + .chunk_ids(partition_key) + .into_iter() + .map(|chunk_id| DBChunk::new_rb(self.read_buffer.clone(), partition_key, chunk_id)) + .collect() + } + + /// Drops the specified chunk from the mutable buffer, returning + /// the dropped chunk. + pub async fn drop_mutable_buffer_chunk( + &self, + partition_key: &str, + chunk_id: u32, + ) -> Result> { + self.mutable_buffer + .as_ref() + .context(DatatbaseNotWriteable)? + .drop_chunk(partition_key, chunk_id) + .await + .map(DBChunk::new_mb) + .context(MutableBufferDrop) + } + + /// Drops the specified chunk from the read buffer, returning + /// the dropped chunk. + pub async fn drop_read_buffer_chunk( + &self, + partition_key: &str, + chunk_id: u32, + ) -> Result> { + self.read_buffer + .write() + .expect("mutex poisoned") + .drop_chunk(partition_key, chunk_id) + .context(ReadBufferDrop)?; + + Ok(DBChunk::new_rb( + self.read_buffer.clone(), + partition_key, + chunk_id, + )) + } + + /// Loads a chunk into the ReadBuffer. + /// + /// If the chunk is present in the mutable_buffer then it is + /// loaded from there. Otherwise, the chunk must be fetched from the + /// object store (Not yet implemented) + /// + /// Also uncontemplated as of yet is ensuring the read buffer does + /// not exceed a memory limit) + /// + /// This (async) function returns when this process is complete, + /// but the process may take a long time + /// + /// Returns a reference to the newly loaded chunk in the read buffer + pub async fn load_chunk_to_read_buffer( + &self, + partition_key: &str, + chunk_id: u32, + ) -> Result> { + let mb_chunk = self + .mutable_buffer + .as_ref() + .context(DatatbaseNotWriteable)? + .get_chunk(partition_key, chunk_id) + .await + .context(UnknownMutableBufferChunk { chunk_id })?; + + let mut batches = Vec::new(); + for stats in mb_chunk.table_stats().unwrap() { + mb_chunk + .table_to_arrow(&mut batches, &stats.name, &[]) + .unwrap(); + for batch in batches.drain(..) { + // As implemented now, taking this write lock will wait + // until all reads to the read buffer to complete and + // then will block all reads while the insert is occuring + let mut read_buffer = self.read_buffer.write().expect("mutex poisoned"); + read_buffer.upsert_partition(partition_key, mb_chunk.id(), &stats.name, batch) + } + } + + Ok(DBChunk::new_rb( + self.read_buffer.clone(), + partition_key, + mb_chunk.id, + )) } /// Returns the next write sequence number @@ -148,19 +255,22 @@ impl Database for Db { type Chunk = DBChunk; /// Return a covering set of chunks for a particular partition - async fn chunks(&self, partition_key: &str) -> Result>, Self::Error> { + async fn chunks(&self, partition_key: &str) -> Vec> { // return a coverting set of chunks. TODO include read buffer // chunks and take them preferentially from the read buffer. - let mutable_chunk_iter = self.mutable_buffer_chunks(partition_key).await?.into_iter(); + // returns a coverting set of chunks -- aka take chunks from read buffer + // preferentially + let mutable_chunk_iter = self.mutable_buffer_chunks(partition_key).await.into_iter(); + + let read_buffer_chunk_iter = self.read_buffer_chunks(partition_key).await.into_iter(); let chunks: BTreeMap<_, _> = mutable_chunk_iter + .chain(read_buffer_chunk_iter) .map(|chunk| (chunk.id(), chunk)) .collect(); // inserting into the map will have removed any dupes - let chunks: Vec<_> = chunks.into_iter().map(|(_id, chunk)| chunk).collect(); - - Ok(chunks) + chunks.into_iter().map(|(_id, chunk)| chunk).collect() } // Note that most of the functions below will eventually be removed from @@ -270,3 +380,231 @@ impl Database for Db { .context(MutableBufferRead) } } + +#[cfg(test)] +mod tests { + use arrow_deps::{ + arrow::record_batch::RecordBatch, assert_table_eq, datafusion::physical_plan::collect, + }; + use query::{ + exec::Executor, frontend::sql::SQLQueryPlanner, test::TestLPWriter, PartitionChunk, + }; + use test_helpers::assert_contains; + + use super::*; + + /// Create a Database with a local store + fn make_db() -> Db { + let name = "test_db"; + Db::new( + DatabaseRules::default(), + Some(MutableBufferDb::new(name)), + ReadBufferDb::new(), + None, // wal buffer + ) + } + + #[tokio::test] + async fn write_no_mutable_buffer() { + // Validate that writes are rejected if there is no mutable buffer + let mutable_buffer = None; + let db = make_db(); + let db = Db { + mutable_buffer, + ..db + }; + + let mut writer = TestLPWriter::default(); + let res = writer.write_lp_string(&db, "cpu bar=1 10").await; + assert_contains!( + res.unwrap_err().to_string(), + "Cannot write to this database: no mutable buffer configured" + ); + } + + #[tokio::test] + async fn read_write() { + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap(); + + let batches = run_query(&db, "select * from cpu").await; + + let expected = vec![ + "+-----+------+", + "| bar | time |", + "+-----+------+", + "| 1 | 10 |", + "+-----+------+", + ]; + assert_table_eq!(expected, &batches); + } + + #[tokio::test] + async fn write_with_rollover() { + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap(); + assert_eq!(vec!["1970-01-01T00"], db.partition_keys().await.unwrap()); + + let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); + assert_eq!(mb_chunk.id(), 0); + + let expected = vec![ + "+-----+------+", + "| bar | time |", + "+-----+------+", + "| 1 | 10 |", + "+-----+------+", + ]; + let batches = run_query(&db, "select * from cpu").await; + assert_table_eq!(expected, &batches); + + // add new data + writer.write_lp_string(&db, "cpu bar=2 20").await.unwrap(); + let expected = vec![ + "+-----+------+", + "| bar | time |", + "+-----+------+", + "| 1 | 10 |", + "| 2 | 20 |", + "+-----+------+", + ]; + let batches = run_query(&db, "select * from cpu").await; + assert_table_eq!(expected, &batches); + + // And expect that we still get the same thing when data is rolled over again + let chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); + assert_eq!(chunk.id(), 1); + + let batches = run_query(&db, "select * from cpu").await; + assert_table_eq!(expected, &batches); + } + + #[tokio::test] + async fn read_from_read_buffer() { + // Test that data can be loaded into the ReadBuffer + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap(); + writer.write_lp_string(&db, "cpu bar=2 20").await.unwrap(); + + let partition_key = "1970-01-01T00"; + let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); + let rb_chunk = db + .load_chunk_to_read_buffer(partition_key, mb_chunk.id()) + .await + .unwrap(); + + // it should be the same chunk! + assert_eq!(mb_chunk.id(), rb_chunk.id()); + + // we should have chunks in both the mutable buffer and read buffer + // (Note the currently open chunk is not listed) + assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0, 1]); + assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]); + + // data should be readable + let expected = vec![ + "+-----+------+", + "| bar | time |", + "+-----+------+", + "| 1 | 10 |", + "| 2 | 20 |", + "+-----+------+", + ]; + let batches = run_query(&db, "select * from cpu").await; + assert_table_eq!(expected, &batches); + + // now, drop the mutable buffer chunk and results should still be the same + db.drop_mutable_buffer_chunk(partition_key, mb_chunk.id()) + .await + .unwrap(); + + assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![1]); + assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]); + + let batches = run_query(&db, "select * from cpu").await; + assert_table_eq!(expected, &batches); + + // drop, the chunk from the read buffer + db.drop_read_buffer_chunk(partition_key, mb_chunk.id()) + .await + .unwrap(); + assert_eq!( + read_buffer_chunk_ids(&db, partition_key).await, + vec![] as Vec + ); + + // Currently this doesn't work (as we need to teach the stores how to + // purge tables after data bas beend dropped println!("running + // query after all data dropped!"); let expected = vec![] as + // Vec<&str>; let batches = run_query(&db, "select * from + // cpu").await; assert_table_eq!(expected, &batches); + } + + #[tokio::test] + async fn chunk_id_listing() { + // Test that chunk id listing is hooked up + let db = make_db(); + let partition_key = "1970-01-01T00"; + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap(); + writer.write_lp_string(&db, "cpu bar=1 20").await.unwrap(); + + assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0]); + assert_eq!( + read_buffer_chunk_ids(&db, partition_key).await, + vec![] as Vec + ); + + let partition_key = "1970-01-01T00"; + let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); + assert_eq!(mb_chunk.id(), 0); + + // add a new chunk in immutable buffer, and move chunk1 (but + // not chunk 0) to read buffer + writer.write_lp_string(&db, "cpu bar=1 30").await.unwrap(); + let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); + db.load_chunk_to_read_buffer(partition_key, mb_chunk.id()) + .await + .unwrap(); + + writer.write_lp_string(&db, "cpu bar=1 40").await.unwrap(); + + assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0, 1, 2]); + assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![1]); + } + + // run a sql query against the database, returning the results as record batches + async fn run_query(db: &Db, query: &str) -> Vec { + let planner = SQLQueryPlanner::default(); + let executor = Executor::new(); + + let physical_plan = planner.query(db, query, &executor).await.unwrap(); + + collect(physical_plan).await.unwrap() + } + + async fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec { + let mut chunk_ids: Vec = db + .mutable_buffer_chunks(partition_key) + .await + .iter() + .map(|chunk| chunk.id()) + .collect(); + chunk_ids.sort_unstable(); + chunk_ids + } + + async fn read_buffer_chunk_ids(db: &Db, partition_key: &str) -> Vec { + let mut chunk_ids: Vec = db + .read_buffer_chunks(partition_key) + .await + .iter() + .map(|chunk| chunk.id()) + .collect(); + chunk_ids.sort_unstable(); + chunk_ids + } +} diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index fcfa91a5e1..646d7b566e 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -1,7 +1,10 @@ -use query::PartitionChunk; +use query::{predicate::PredicateBuilder, PartitionChunk}; +use read_buffer::{ColumnSelection, Database as ReadBufferDb}; use snafu::{ResultExt, Snafu}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; + +use super::pred::to_read_buffer_predicate; #[derive(Debug, Snafu)] pub enum Error { @@ -9,6 +12,12 @@ pub enum Error { MutableBufferChunk { source: mutable_buffer::chunk::Error, }, + + #[snafu(display("Read Buffer Chunk Error: {}", source))] + ReadBufferChunk { source: read_buffer::Error }, + + #[snafu(display("Internal Predicate Conversion Error: {}", source))] + InternalPredicateConversion { source: super::pred::Error }, } pub type Result = std::result::Result; @@ -16,26 +25,53 @@ pub type Result = std::result::Result; /// MutableBuffer, ReadBuffer, or a ParquetFile #[derive(Debug)] pub enum DBChunk { - MutableBuffer(Arc), - ReadBuffer, // TODO add appropriate type here + MutableBuffer { + chunk: Arc, + }, + ReadBuffer { + db: Arc>, + partition_key: String, + chunk_id: u32, + }, ParquetFile, // TODO add appropriate type here } +impl DBChunk { + /// Create a new mutable buffer chunk + pub fn new_mb(chunk: Arc) -> Arc { + Arc::new(Self::MutableBuffer { chunk }) + } + + /// create a new read buffer chunk + pub fn new_rb( + db: Arc>, + partition_key: impl Into, + chunk_id: u32, + ) -> Arc { + let partition_key = partition_key.into(); + Arc::new(Self::ReadBuffer { + db, + chunk_id, + partition_key, + }) + } +} + impl PartitionChunk for DBChunk { type Error = Error; fn id(&self) -> u32 { match self { - Self::MutableBuffer(chunk) => chunk.id(), - Self::ReadBuffer => unimplemented!("read buffer not implemented"), + Self::MutableBuffer { chunk } => chunk.id(), + Self::ReadBuffer { chunk_id, .. } => *chunk_id, Self::ParquetFile => unimplemented!("parquet file not implemented"), } } fn table_stats(&self) -> Result, Self::Error> { match self { - Self::MutableBuffer(chunk) => chunk.table_stats().context(MutableBufferChunk), - Self::ReadBuffer => unimplemented!("read buffer not implemented"), + Self::MutableBuffer { chunk } => chunk.table_stats().context(MutableBufferChunk), + Self::ReadBuffer { .. } => unimplemented!("read buffer not implemented"), Self::ParquetFile => unimplemented!("parquet file not implemented"), } } @@ -47,11 +83,45 @@ impl PartitionChunk for DBChunk { columns: &[&str], ) -> Result<(), Self::Error> { match self { - Self::MutableBuffer(chunk) => chunk - .table_to_arrow(dst, table_name, columns) - .context(MutableBufferChunk), - Self::ReadBuffer => unimplemented!("read buffer not implemented"), + Self::MutableBuffer { chunk } => { + chunk + .table_to_arrow(dst, table_name, columns) + .context(MutableBufferChunk)?; + } + Self::ReadBuffer { + db, + partition_key, + chunk_id, + } => { + // Translate the predicates to ReadBuffer style + let predicate = PredicateBuilder::default().build(); + let predicate = + to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?; + + // translate column selection + let column_selection = if columns.is_empty() { + ColumnSelection::All + } else { + ColumnSelection::Some(columns) + }; + + // run the query + let db = db.read().unwrap(); + let read_result = db + .read_filter( + partition_key, + table_name, + &[*chunk_id], + predicate, + column_selection, + ) + .context(ReadBufferChunk)?; + + // copy the RecordBatches into dst + dst.extend(read_result); + } Self::ParquetFile => unimplemented!("parquet file not implemented"), } + Ok(()) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 7b46580d98..61004972a7 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -661,7 +661,7 @@ mod tests { let planner = SQLQueryPlanner::default(); let executor = server.executor(); let physical_plan = planner - .query(buff.as_ref(), "select * from cpu", executor.as_ref()) + .query(buff, "select * from cpu", executor.as_ref()) .await .unwrap();