feat: Chunk Migration APIs and query data in the read buffer via SQL (#668)
* feat: Chunk Migration APIs and query data in the read buffer via SQL * fix: Make code more consistent * fix: fmt / clippy * chore: Apply suggestions from code review Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> * refactor: Remove unecessary Result and make chunks() infallable * chore: Apply more suggestions from code review Co-authored-by: Edd Robinson <me@edd.io> Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> Co-authored-by: Edd Robinson <me@edd.io>pull/24376/head
parent
5f30d4b2fc
commit
7969808f09
|
@ -3298,6 +3298,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"snafu",
|
||||
"snap",
|
||||
"test_helpers",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
|
|
|
@ -313,13 +313,12 @@ impl Database for MutableBufferDb {
|
|||
|
||||
/// Return the list of chunks, in order of id, for the specified
|
||||
/// partition_key
|
||||
async fn chunks(&self, partition_key: &str) -> Result<Vec<Arc<Chunk>>> {
|
||||
Ok(self
|
||||
.get_partition(partition_key)
|
||||
async fn chunks(&self, partition_key: &str) -> Vec<Arc<Chunk>> {
|
||||
self.get_partition(partition_key)
|
||||
.await
|
||||
.read()
|
||||
.await
|
||||
.chunks())
|
||||
.chunks()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -921,7 +921,11 @@ impl Table {
|
|||
Arc::new(builder.finish())
|
||||
}
|
||||
Column::I64(vals, _) => {
|
||||
schema_builder = schema_builder.field(column_name, ArrowDataType::Int64);
|
||||
schema_builder = if column_name == TIME_COLUMN_NAME {
|
||||
schema_builder.timestamp()
|
||||
} else {
|
||||
schema_builder.field(column_name, ArrowDataType::Int64)
|
||||
};
|
||||
let mut builder = Int64Builder::new(vals.len());
|
||||
|
||||
for v in vals {
|
||||
|
|
|
@ -69,7 +69,7 @@ impl SQLQueryPlanner {
|
|||
for table in &table_names {
|
||||
let mut data = Vec::new();
|
||||
for partition_key in &partition_keys {
|
||||
for chunk in database.chunks(partition_key).await.unwrap() {
|
||||
for chunk in database.chunks(partition_key).await {
|
||||
chunk
|
||||
.table_to_arrow(&mut data, &table, &[])
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
|
|
|
@ -50,7 +50,7 @@ pub trait Database: Debug + Send + Sync {
|
|||
/// Returns a covering set of chunks in the specified partition. A
|
||||
/// covering set means that together the chunks make up a single
|
||||
/// complete copy of the data being queried.
|
||||
async fn chunks(&self, partition_key: &str) -> Result<Vec<Arc<Self::Chunk>>, Self::Error>;
|
||||
async fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>>;
|
||||
|
||||
// ----------
|
||||
// The functions below are slated for removal (migration into a gRPC query
|
||||
|
|
|
@ -423,7 +423,7 @@ impl Database for TestDatabase {
|
|||
unimplemented!("table_names_for_partition not implemented for test database");
|
||||
}
|
||||
|
||||
async fn chunks(&self, _partition_key: &str) -> Result<Vec<Arc<Self::Chunk>>, Self::Error> {
|
||||
async fn chunks(&self, _partition_key: &str) -> Vec<Arc<Self::Chunk>> {
|
||||
unimplemented!("query_chunks for test database");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -144,8 +144,17 @@ impl Database {
|
|||
}
|
||||
|
||||
// Lists all partition keys with data for this database.
|
||||
pub fn partition_keys(&mut self) -> Vec<&String> {
|
||||
self.partitions.keys().collect::<Vec<_>>()
|
||||
pub fn partition_keys(&self) -> Vec<&String> {
|
||||
self.partitions.keys().collect()
|
||||
}
|
||||
|
||||
/// Lists all chunk ids in the given partition key. Returns empty
|
||||
/// `Vec` if no partition with the given key exists
|
||||
pub fn chunk_ids(&self, partition_key: &str) -> Vec<u32> {
|
||||
self.partitions
|
||||
.get(partition_key)
|
||||
.map(|partition| partition.chunk_ids())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
|
@ -452,6 +461,11 @@ impl Partition {
|
|||
};
|
||||
}
|
||||
|
||||
/// Return the chunk ids stored in this partition, in order of id
|
||||
fn chunk_ids(&self) -> Vec<u32> {
|
||||
self.chunks.keys().cloned().collect()
|
||||
}
|
||||
|
||||
fn chunks_by_ids(&self, ids: &[u32]) -> Result<Vec<&Chunk>> {
|
||||
let mut chunks = vec![];
|
||||
for chunk_id in ids {
|
||||
|
@ -492,6 +506,19 @@ pub struct ReadFilterResults<'input, 'chunk> {
|
|||
select_columns: table::ColumnSelection<'input>,
|
||||
}
|
||||
|
||||
impl<'input, 'chunk> fmt::Debug for ReadFilterResults<'input, 'chunk> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("ReadFilterResults")
|
||||
.field("chunks.len", &self.chunks.len())
|
||||
.field("next_i", &self.next_i)
|
||||
.field("curr_table_results", &"<OPAQUE>")
|
||||
.field("table_name", &self.table_name)
|
||||
.field("predicate", &self.predicate)
|
||||
.field("select_columns", &self.select_columns)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'input, 'chunk> ReadFilterResults<'input, 'chunk> {
|
||||
fn new(
|
||||
chunks: Vec<&'chunk Chunk>,
|
||||
|
|
|
@ -522,6 +522,7 @@ impl MetaData {
|
|||
///
|
||||
/// The `All` variant denotes that the caller wishes to include all table
|
||||
/// columns in the results.
|
||||
#[derive(Debug)]
|
||||
pub enum ColumnSelection<'a> {
|
||||
All,
|
||||
Some(&'a [&'a str]),
|
||||
|
|
|
@ -28,3 +28,6 @@ uuid = { version = "0.8", features = ["serde", "v4"]}
|
|||
flatbuffers = "0.6"
|
||||
crc32fast = "1.2.0"
|
||||
snap = "1.0.0"
|
||||
|
||||
[dev-dependencies]
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
|
|
|
@ -37,12 +37,12 @@ impl Config {
|
|||
}
|
||||
|
||||
let mutable_buffer = if rules.store_locally {
|
||||
Some(Arc::new(MutableBufferDb::new(name.to_string())))
|
||||
Some(MutableBufferDb::new(name.to_string()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let read_buffer = Arc::new(ReadBufferDb::new());
|
||||
let read_buffer = ReadBufferDb::new();
|
||||
|
||||
let wal_buffer = rules.wal_buffer_config.as_ref().map(Into::into);
|
||||
let db = Arc::new(Db::new(rules, mutable_buffer, read_buffer, wal_buffer));
|
||||
|
|
368
server/src/db.rs
368
server/src/db.rs
|
@ -5,7 +5,7 @@ use std::{
|
|||
collections::BTreeMap,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc, Mutex,
|
||||
Arc, Mutex, RwLock,
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -30,12 +30,20 @@ pub enum Error {
|
|||
source: mutable_buffer::chunk::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))]
|
||||
UnknownMutableBufferChunk { chunk_id: u32 },
|
||||
|
||||
#[snafu(display("Cannot write to this database: no mutable buffer configured"))]
|
||||
DatatbaseNotWriteable {},
|
||||
|
||||
#[snafu(display("Cannot read to this database: no mutable buffer configured"))]
|
||||
DatabaseNotReadable {},
|
||||
|
||||
#[snafu(display("Error dropping data from mutable buffer: {}", source))]
|
||||
MutableBufferDrop {
|
||||
source: mutable_buffer::database::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error rolling partition: {}", source))]
|
||||
RollingPartition {
|
||||
source: mutable_buffer::database::Error,
|
||||
|
@ -50,6 +58,9 @@ pub enum Error {
|
|||
MutableBufferWrite {
|
||||
source: mutable_buffer::database::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error dropping data from read buffer: {}", source))]
|
||||
ReadBufferDrop { source: read_buffer::Error },
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
|
@ -66,12 +77,14 @@ pub struct Db {
|
|||
/// The (optional) mutable buffer stores incoming writes. If a
|
||||
/// database does not have a mutable buffer it can not accept
|
||||
/// writes (it is a read replica)
|
||||
pub mutable_buffer: Option<Arc<MutableBufferDb>>,
|
||||
pub mutable_buffer: Option<MutableBufferDb>,
|
||||
|
||||
#[serde(skip)]
|
||||
/// The read buffer holds chunk data in an in-memory optimized
|
||||
/// format.
|
||||
pub read_buffer: Arc<ReadBufferDb>,
|
||||
///
|
||||
/// TODO: finer grained locking see ticket https://github.com/influxdata/influxdb_iox/issues/669
|
||||
pub read_buffer: Arc<RwLock<ReadBufferDb>>,
|
||||
|
||||
#[serde(skip)]
|
||||
/// The wal buffer holds replicated writes in an append in-memory
|
||||
|
@ -85,11 +98,12 @@ pub struct Db {
|
|||
impl Db {
|
||||
pub fn new(
|
||||
rules: DatabaseRules,
|
||||
mutable_buffer: Option<Arc<MutableBufferDb>>,
|
||||
read_buffer: Arc<ReadBufferDb>,
|
||||
mutable_buffer: Option<MutableBufferDb>,
|
||||
read_buffer: ReadBufferDb,
|
||||
wal_buffer: Option<Buffer>,
|
||||
) -> Self {
|
||||
let wal_buffer = wal_buffer.map(Mutex::new);
|
||||
let read_buffer = Arc::new(RwLock::new(read_buffer));
|
||||
Self {
|
||||
rules,
|
||||
mutable_buffer,
|
||||
|
@ -106,7 +120,7 @@ impl Db {
|
|||
.rollover_partition(partition_key)
|
||||
.await
|
||||
.context(RollingPartition)
|
||||
.map(|c| Arc::new(DBChunk::MutableBuffer(c)))
|
||||
.map(DBChunk::new_mb)
|
||||
} else {
|
||||
DatatbaseNotWriteable {}.fail()
|
||||
}
|
||||
|
@ -114,19 +128,112 @@ impl Db {
|
|||
|
||||
// Return a list of all chunks in the mutable_buffer (that can
|
||||
// potentially be migrated into the read buffer or object store)
|
||||
pub async fn mutable_buffer_chunks(&self, partition_key: &str) -> Result<Vec<Arc<DBChunk>>> {
|
||||
pub async fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
|
||||
let chunks = if let Some(mutable_buffer) = self.mutable_buffer.as_ref() {
|
||||
mutable_buffer
|
||||
.chunks(partition_key)
|
||||
.await
|
||||
.context(MutableBufferRead)?
|
||||
.into_iter()
|
||||
.map(|c| Arc::new(DBChunk::MutableBuffer(c)))
|
||||
.map(DBChunk::new_mb)
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
Ok(chunks)
|
||||
chunks
|
||||
}
|
||||
|
||||
/// List chunks that are currently in the read buffer
|
||||
pub async fn read_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
|
||||
self.read_buffer
|
||||
.read()
|
||||
.expect("mutex poisoned")
|
||||
.chunk_ids(partition_key)
|
||||
.into_iter()
|
||||
.map(|chunk_id| DBChunk::new_rb(self.read_buffer.clone(), partition_key, chunk_id))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Drops the specified chunk from the mutable buffer, returning
|
||||
/// the dropped chunk.
|
||||
pub async fn drop_mutable_buffer_chunk(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
chunk_id: u32,
|
||||
) -> Result<Arc<DBChunk>> {
|
||||
self.mutable_buffer
|
||||
.as_ref()
|
||||
.context(DatatbaseNotWriteable)?
|
||||
.drop_chunk(partition_key, chunk_id)
|
||||
.await
|
||||
.map(DBChunk::new_mb)
|
||||
.context(MutableBufferDrop)
|
||||
}
|
||||
|
||||
/// Drops the specified chunk from the read buffer, returning
|
||||
/// the dropped chunk.
|
||||
pub async fn drop_read_buffer_chunk(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
chunk_id: u32,
|
||||
) -> Result<Arc<DBChunk>> {
|
||||
self.read_buffer
|
||||
.write()
|
||||
.expect("mutex poisoned")
|
||||
.drop_chunk(partition_key, chunk_id)
|
||||
.context(ReadBufferDrop)?;
|
||||
|
||||
Ok(DBChunk::new_rb(
|
||||
self.read_buffer.clone(),
|
||||
partition_key,
|
||||
chunk_id,
|
||||
))
|
||||
}
|
||||
|
||||
/// Loads a chunk into the ReadBuffer.
|
||||
///
|
||||
/// If the chunk is present in the mutable_buffer then it is
|
||||
/// loaded from there. Otherwise, the chunk must be fetched from the
|
||||
/// object store (Not yet implemented)
|
||||
///
|
||||
/// Also uncontemplated as of yet is ensuring the read buffer does
|
||||
/// not exceed a memory limit)
|
||||
///
|
||||
/// This (async) function returns when this process is complete,
|
||||
/// but the process may take a long time
|
||||
///
|
||||
/// Returns a reference to the newly loaded chunk in the read buffer
|
||||
pub async fn load_chunk_to_read_buffer(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
chunk_id: u32,
|
||||
) -> Result<Arc<DBChunk>> {
|
||||
let mb_chunk = self
|
||||
.mutable_buffer
|
||||
.as_ref()
|
||||
.context(DatatbaseNotWriteable)?
|
||||
.get_chunk(partition_key, chunk_id)
|
||||
.await
|
||||
.context(UnknownMutableBufferChunk { chunk_id })?;
|
||||
|
||||
let mut batches = Vec::new();
|
||||
for stats in mb_chunk.table_stats().unwrap() {
|
||||
mb_chunk
|
||||
.table_to_arrow(&mut batches, &stats.name, &[])
|
||||
.unwrap();
|
||||
for batch in batches.drain(..) {
|
||||
// As implemented now, taking this write lock will wait
|
||||
// until all reads to the read buffer to complete and
|
||||
// then will block all reads while the insert is occuring
|
||||
let mut read_buffer = self.read_buffer.write().expect("mutex poisoned");
|
||||
read_buffer.upsert_partition(partition_key, mb_chunk.id(), &stats.name, batch)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DBChunk::new_rb(
|
||||
self.read_buffer.clone(),
|
||||
partition_key,
|
||||
mb_chunk.id,
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns the next write sequence number
|
||||
|
@ -148,19 +255,22 @@ impl Database for Db {
|
|||
type Chunk = DBChunk;
|
||||
|
||||
/// Return a covering set of chunks for a particular partition
|
||||
async fn chunks(&self, partition_key: &str) -> Result<Vec<Arc<Self::Chunk>>, Self::Error> {
|
||||
async fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>> {
|
||||
// return a coverting set of chunks. TODO include read buffer
|
||||
// chunks and take them preferentially from the read buffer.
|
||||
let mutable_chunk_iter = self.mutable_buffer_chunks(partition_key).await?.into_iter();
|
||||
// returns a coverting set of chunks -- aka take chunks from read buffer
|
||||
// preferentially
|
||||
let mutable_chunk_iter = self.mutable_buffer_chunks(partition_key).await.into_iter();
|
||||
|
||||
let read_buffer_chunk_iter = self.read_buffer_chunks(partition_key).await.into_iter();
|
||||
|
||||
let chunks: BTreeMap<_, _> = mutable_chunk_iter
|
||||
.chain(read_buffer_chunk_iter)
|
||||
.map(|chunk| (chunk.id(), chunk))
|
||||
.collect();
|
||||
|
||||
// inserting into the map will have removed any dupes
|
||||
let chunks: Vec<_> = chunks.into_iter().map(|(_id, chunk)| chunk).collect();
|
||||
|
||||
Ok(chunks)
|
||||
chunks.into_iter().map(|(_id, chunk)| chunk).collect()
|
||||
}
|
||||
|
||||
// Note that most of the functions below will eventually be removed from
|
||||
|
@ -270,3 +380,231 @@ impl Database for Db {
|
|||
.context(MutableBufferRead)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow_deps::{
|
||||
arrow::record_batch::RecordBatch, assert_table_eq, datafusion::physical_plan::collect,
|
||||
};
|
||||
use query::{
|
||||
exec::Executor, frontend::sql::SQLQueryPlanner, test::TestLPWriter, PartitionChunk,
|
||||
};
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use super::*;
|
||||
|
||||
/// Create a Database with a local store
|
||||
fn make_db() -> Db {
|
||||
let name = "test_db";
|
||||
Db::new(
|
||||
DatabaseRules::default(),
|
||||
Some(MutableBufferDb::new(name)),
|
||||
ReadBufferDb::new(),
|
||||
None, // wal buffer
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_no_mutable_buffer() {
|
||||
// Validate that writes are rejected if there is no mutable buffer
|
||||
let mutable_buffer = None;
|
||||
let db = make_db();
|
||||
let db = Db {
|
||||
mutable_buffer,
|
||||
..db
|
||||
};
|
||||
|
||||
let mut writer = TestLPWriter::default();
|
||||
let res = writer.write_lp_string(&db, "cpu bar=1 10").await;
|
||||
assert_contains!(
|
||||
res.unwrap_err().to_string(),
|
||||
"Cannot write to this database: no mutable buffer configured"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_write() {
|
||||
let db = make_db();
|
||||
let mut writer = TestLPWriter::default();
|
||||
writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap();
|
||||
|
||||
let batches = run_query(&db, "select * from cpu").await;
|
||||
|
||||
let expected = vec![
|
||||
"+-----+------+",
|
||||
"| bar | time |",
|
||||
"+-----+------+",
|
||||
"| 1 | 10 |",
|
||||
"+-----+------+",
|
||||
];
|
||||
assert_table_eq!(expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_with_rollover() {
|
||||
let db = make_db();
|
||||
let mut writer = TestLPWriter::default();
|
||||
writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap();
|
||||
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().await.unwrap());
|
||||
|
||||
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
|
||||
assert_eq!(mb_chunk.id(), 0);
|
||||
|
||||
let expected = vec![
|
||||
"+-----+------+",
|
||||
"| bar | time |",
|
||||
"+-----+------+",
|
||||
"| 1 | 10 |",
|
||||
"+-----+------+",
|
||||
];
|
||||
let batches = run_query(&db, "select * from cpu").await;
|
||||
assert_table_eq!(expected, &batches);
|
||||
|
||||
// add new data
|
||||
writer.write_lp_string(&db, "cpu bar=2 20").await.unwrap();
|
||||
let expected = vec![
|
||||
"+-----+------+",
|
||||
"| bar | time |",
|
||||
"+-----+------+",
|
||||
"| 1 | 10 |",
|
||||
"| 2 | 20 |",
|
||||
"+-----+------+",
|
||||
];
|
||||
let batches = run_query(&db, "select * from cpu").await;
|
||||
assert_table_eq!(expected, &batches);
|
||||
|
||||
// And expect that we still get the same thing when data is rolled over again
|
||||
let chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
|
||||
assert_eq!(chunk.id(), 1);
|
||||
|
||||
let batches = run_query(&db, "select * from cpu").await;
|
||||
assert_table_eq!(expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_from_read_buffer() {
|
||||
// Test that data can be loaded into the ReadBuffer
|
||||
let db = make_db();
|
||||
let mut writer = TestLPWriter::default();
|
||||
writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap();
|
||||
writer.write_lp_string(&db, "cpu bar=2 20").await.unwrap();
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
|
||||
let rb_chunk = db
|
||||
.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// it should be the same chunk!
|
||||
assert_eq!(mb_chunk.id(), rb_chunk.id());
|
||||
|
||||
// we should have chunks in both the mutable buffer and read buffer
|
||||
// (Note the currently open chunk is not listed)
|
||||
assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0, 1]);
|
||||
assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]);
|
||||
|
||||
// data should be readable
|
||||
let expected = vec![
|
||||
"+-----+------+",
|
||||
"| bar | time |",
|
||||
"+-----+------+",
|
||||
"| 1 | 10 |",
|
||||
"| 2 | 20 |",
|
||||
"+-----+------+",
|
||||
];
|
||||
let batches = run_query(&db, "select * from cpu").await;
|
||||
assert_table_eq!(expected, &batches);
|
||||
|
||||
// now, drop the mutable buffer chunk and results should still be the same
|
||||
db.drop_mutable_buffer_chunk(partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![1]);
|
||||
assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]);
|
||||
|
||||
let batches = run_query(&db, "select * from cpu").await;
|
||||
assert_table_eq!(expected, &batches);
|
||||
|
||||
// drop, the chunk from the read buffer
|
||||
db.drop_read_buffer_chunk(partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
read_buffer_chunk_ids(&db, partition_key).await,
|
||||
vec![] as Vec<u32>
|
||||
);
|
||||
|
||||
// Currently this doesn't work (as we need to teach the stores how to
|
||||
// purge tables after data bas beend dropped println!("running
|
||||
// query after all data dropped!"); let expected = vec![] as
|
||||
// Vec<&str>; let batches = run_query(&db, "select * from
|
||||
// cpu").await; assert_table_eq!(expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chunk_id_listing() {
|
||||
// Test that chunk id listing is hooked up
|
||||
let db = make_db();
|
||||
let partition_key = "1970-01-01T00";
|
||||
let mut writer = TestLPWriter::default();
|
||||
writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap();
|
||||
writer.write_lp_string(&db, "cpu bar=1 20").await.unwrap();
|
||||
|
||||
assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0]);
|
||||
assert_eq!(
|
||||
read_buffer_chunk_ids(&db, partition_key).await,
|
||||
vec![] as Vec<u32>
|
||||
);
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
|
||||
assert_eq!(mb_chunk.id(), 0);
|
||||
|
||||
// add a new chunk in immutable buffer, and move chunk1 (but
|
||||
// not chunk 0) to read buffer
|
||||
writer.write_lp_string(&db, "cpu bar=1 30").await.unwrap();
|
||||
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
|
||||
db.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
writer.write_lp_string(&db, "cpu bar=1 40").await.unwrap();
|
||||
|
||||
assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0, 1, 2]);
|
||||
assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![1]);
|
||||
}
|
||||
|
||||
// run a sql query against the database, returning the results as record batches
|
||||
async fn run_query(db: &Db, query: &str) -> Vec<RecordBatch> {
|
||||
let planner = SQLQueryPlanner::default();
|
||||
let executor = Executor::new();
|
||||
|
||||
let physical_plan = planner.query(db, query, &executor).await.unwrap();
|
||||
|
||||
collect(physical_plan).await.unwrap()
|
||||
}
|
||||
|
||||
async fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
|
||||
let mut chunk_ids: Vec<u32> = db
|
||||
.mutable_buffer_chunks(partition_key)
|
||||
.await
|
||||
.iter()
|
||||
.map(|chunk| chunk.id())
|
||||
.collect();
|
||||
chunk_ids.sort_unstable();
|
||||
chunk_ids
|
||||
}
|
||||
|
||||
async fn read_buffer_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
|
||||
let mut chunk_ids: Vec<u32> = db
|
||||
.read_buffer_chunks(partition_key)
|
||||
.await
|
||||
.iter()
|
||||
.map(|chunk| chunk.id())
|
||||
.collect();
|
||||
chunk_ids.sort_unstable();
|
||||
chunk_ids
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
use query::PartitionChunk;
|
||||
use query::{predicate::PredicateBuilder, PartitionChunk};
|
||||
use read_buffer::{ColumnSelection, Database as ReadBufferDb};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::pred::to_read_buffer_predicate;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
|
@ -9,6 +12,12 @@ pub enum Error {
|
|||
MutableBufferChunk {
|
||||
source: mutable_buffer::chunk::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Read Buffer Chunk Error: {}", source))]
|
||||
ReadBufferChunk { source: read_buffer::Error },
|
||||
|
||||
#[snafu(display("Internal Predicate Conversion Error: {}", source))]
|
||||
InternalPredicateConversion { source: super::pred::Error },
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
|
@ -16,26 +25,53 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// MutableBuffer, ReadBuffer, or a ParquetFile
|
||||
#[derive(Debug)]
|
||||
pub enum DBChunk {
|
||||
MutableBuffer(Arc<mutable_buffer::chunk::Chunk>),
|
||||
ReadBuffer, // TODO add appropriate type here
|
||||
MutableBuffer {
|
||||
chunk: Arc<mutable_buffer::chunk::Chunk>,
|
||||
},
|
||||
ReadBuffer {
|
||||
db: Arc<RwLock<ReadBufferDb>>,
|
||||
partition_key: String,
|
||||
chunk_id: u32,
|
||||
},
|
||||
ParquetFile, // TODO add appropriate type here
|
||||
}
|
||||
|
||||
impl DBChunk {
|
||||
/// Create a new mutable buffer chunk
|
||||
pub fn new_mb(chunk: Arc<mutable_buffer::chunk::Chunk>) -> Arc<Self> {
|
||||
Arc::new(Self::MutableBuffer { chunk })
|
||||
}
|
||||
|
||||
/// create a new read buffer chunk
|
||||
pub fn new_rb(
|
||||
db: Arc<RwLock<ReadBufferDb>>,
|
||||
partition_key: impl Into<String>,
|
||||
chunk_id: u32,
|
||||
) -> Arc<Self> {
|
||||
let partition_key = partition_key.into();
|
||||
Arc::new(Self::ReadBuffer {
|
||||
db,
|
||||
chunk_id,
|
||||
partition_key,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl PartitionChunk for DBChunk {
|
||||
type Error = Error;
|
||||
|
||||
fn id(&self) -> u32 {
|
||||
match self {
|
||||
Self::MutableBuffer(chunk) => chunk.id(),
|
||||
Self::ReadBuffer => unimplemented!("read buffer not implemented"),
|
||||
Self::MutableBuffer { chunk } => chunk.id(),
|
||||
Self::ReadBuffer { chunk_id, .. } => *chunk_id,
|
||||
Self::ParquetFile => unimplemented!("parquet file not implemented"),
|
||||
}
|
||||
}
|
||||
|
||||
fn table_stats(&self) -> Result<Vec<data_types::partition_metadata::Table>, Self::Error> {
|
||||
match self {
|
||||
Self::MutableBuffer(chunk) => chunk.table_stats().context(MutableBufferChunk),
|
||||
Self::ReadBuffer => unimplemented!("read buffer not implemented"),
|
||||
Self::MutableBuffer { chunk } => chunk.table_stats().context(MutableBufferChunk),
|
||||
Self::ReadBuffer { .. } => unimplemented!("read buffer not implemented"),
|
||||
Self::ParquetFile => unimplemented!("parquet file not implemented"),
|
||||
}
|
||||
}
|
||||
|
@ -47,11 +83,45 @@ impl PartitionChunk for DBChunk {
|
|||
columns: &[&str],
|
||||
) -> Result<(), Self::Error> {
|
||||
match self {
|
||||
Self::MutableBuffer(chunk) => chunk
|
||||
.table_to_arrow(dst, table_name, columns)
|
||||
.context(MutableBufferChunk),
|
||||
Self::ReadBuffer => unimplemented!("read buffer not implemented"),
|
||||
Self::MutableBuffer { chunk } => {
|
||||
chunk
|
||||
.table_to_arrow(dst, table_name, columns)
|
||||
.context(MutableBufferChunk)?;
|
||||
}
|
||||
Self::ReadBuffer {
|
||||
db,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
} => {
|
||||
// Translate the predicates to ReadBuffer style
|
||||
let predicate = PredicateBuilder::default().build();
|
||||
let predicate =
|
||||
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
|
||||
|
||||
// translate column selection
|
||||
let column_selection = if columns.is_empty() {
|
||||
ColumnSelection::All
|
||||
} else {
|
||||
ColumnSelection::Some(columns)
|
||||
};
|
||||
|
||||
// run the query
|
||||
let db = db.read().unwrap();
|
||||
let read_result = db
|
||||
.read_filter(
|
||||
partition_key,
|
||||
table_name,
|
||||
&[*chunk_id],
|
||||
predicate,
|
||||
column_selection,
|
||||
)
|
||||
.context(ReadBufferChunk)?;
|
||||
|
||||
// copy the RecordBatches into dst
|
||||
dst.extend(read_result);
|
||||
}
|
||||
Self::ParquetFile => unimplemented!("parquet file not implemented"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -661,7 +661,7 @@ mod tests {
|
|||
let planner = SQLQueryPlanner::default();
|
||||
let executor = server.executor();
|
||||
let physical_plan = planner
|
||||
.query(buff.as_ref(), "select * from cpu", executor.as_ref())
|
||||
.query(buff, "select * from cpu", executor.as_ref())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
Loading…
Reference in New Issue