refactor: Remove impl of `query::Database` in mutable_buffer (#914)
parent
945d2f8d45
commit
3be5c26f92
|
@ -3,14 +3,12 @@ use data_types::{
|
|||
database_rules::{PartitionSort, PartitionSortRules},
|
||||
};
|
||||
use generated_types::wal;
|
||||
use query::Database;
|
||||
|
||||
use crate::{chunk::Chunk, partition::Partition};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::database_rules::Order;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::sync::RwLock;
|
||||
|
@ -177,14 +175,8 @@ impl MutableBufferDb {
|
|||
|
||||
partitions
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for MutableBufferDb {
|
||||
type Error = Error;
|
||||
type Chunk = Chunk;
|
||||
|
||||
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> {
|
||||
pub async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<()> {
|
||||
match write.write_buffer_batch() {
|
||||
Some(b) => self.write_entries_to_partitions(&b)?,
|
||||
None => {
|
||||
|
@ -199,7 +191,7 @@ impl Database for MutableBufferDb {
|
|||
}
|
||||
|
||||
/// Return the partition keys for data in this DB
|
||||
fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
|
||||
pub fn partition_keys(&self) -> Result<Vec<String>> {
|
||||
let partitions = self.partitions.read().expect("mutex poisoned");
|
||||
let keys = partitions.keys().cloned().collect();
|
||||
Ok(keys)
|
||||
|
@ -207,7 +199,7 @@ impl Database for MutableBufferDb {
|
|||
|
||||
/// Return the list of chunks, in order of id, for the specified
|
||||
/// partition_key
|
||||
fn chunks(&self, partition_key: &str) -> Vec<Arc<Chunk>> {
|
||||
pub fn chunks(&self, partition_key: &str) -> Vec<Arc<Chunk>> {
|
||||
let partition = self.get_partition(partition_key);
|
||||
let partition = partition.read().expect("mutex poisoned");
|
||||
partition.chunks()
|
||||
|
@ -247,7 +239,10 @@ impl MutableBufferDb {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use data_types::selection::Selection;
|
||||
use chrono::{DateTime, Utc};
|
||||
use data_types::{
|
||||
data::lines_to_replicated_write, database_rules::Partitioner, selection::Selection,
|
||||
};
|
||||
|
||||
use arrow_deps::arrow::array::{Array, StringArray};
|
||||
use data_types::database_rules::Order;
|
||||
|
@ -264,17 +259,15 @@ mod tests {
|
|||
// the values in prior rows for the region column are
|
||||
// null. Likewise the `core` tag is introduced in the third
|
||||
// line so the prior columns are null
|
||||
let lines: Vec<_> = parse_lines(
|
||||
"cpu,region=west user=23.2 10\n\
|
||||
cpu, user=10.0 11\n\
|
||||
cpu,core=one user=10.0 11\n",
|
||||
)
|
||||
.map(|l| l.unwrap())
|
||||
.collect();
|
||||
write_lines(&db, &lines).await;
|
||||
|
||||
let lines = vec![
|
||||
"cpu,region=west user=23.2 10",
|
||||
"cpu, user=10.0 11",
|
||||
"cpu,core=one user=10.0 11",
|
||||
];
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
||||
write_lines_to_partition(&db, &lines, partition_key).await;
|
||||
|
||||
let chunk = db.get_chunk(partition_key, 0).unwrap();
|
||||
let mut batches = Vec::new();
|
||||
let selection = Selection::Some(&["region", "core"]);
|
||||
|
@ -328,7 +321,7 @@ mod tests {
|
|||
.join("\n");
|
||||
|
||||
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
|
||||
write_lines(&db, &lines).await;
|
||||
write_lp(&db, &lines).await;
|
||||
|
||||
assert_eq!(429, db.size());
|
||||
}
|
||||
|
@ -336,10 +329,10 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn partitions_sorted_by_times() {
|
||||
let db = MutableBufferDb::new("foo");
|
||||
write_lp_to_partition(&db, &["cpu val=1 2"], "p1").await;
|
||||
write_lp_to_partition(&db, &["mem val=2 1"], "p2").await;
|
||||
write_lp_to_partition(&db, &["cpu val=1 2"], "p1").await;
|
||||
write_lp_to_partition(&db, &["mem val=2 1"], "p2").await;
|
||||
write_lines_to_partition(&db, &["cpu val=1 2"], "p1").await;
|
||||
write_lines_to_partition(&db, &["mem val=2 1"], "p2").await;
|
||||
write_lines_to_partition(&db, &["cpu val=1 2"], "p1").await;
|
||||
write_lines_to_partition(&db, &["mem val=2 1"], "p2").await;
|
||||
|
||||
let sort_rules = PartitionSortRules {
|
||||
order: Order::Desc,
|
||||
|
@ -359,21 +352,53 @@ mod tests {
|
|||
}
|
||||
|
||||
/// write lines into this database
|
||||
async fn write_lines(database: &MutableBufferDb, lines: &[ParsedLine<'_>]) {
|
||||
let mut writer = query::test::TestLPWriter::default();
|
||||
writer.write_lines(database, lines).await.unwrap()
|
||||
async fn write_lp(database: &MutableBufferDb, lp: &[ParsedLine<'_>]) {
|
||||
write_lp_to_partition(database, lp, "test_partition_key").await
|
||||
}
|
||||
|
||||
async fn write_lp_to_partition(
|
||||
async fn write_lines_to_partition(
|
||||
database: &MutableBufferDb,
|
||||
lp: &[&str],
|
||||
lines: &[&str],
|
||||
partition_key: impl Into<String>,
|
||||
) {
|
||||
let lp_data = lp.join("\n");
|
||||
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
|
||||
let mut writer = query::test::TestLPWriter::default();
|
||||
writer
|
||||
.write_lines_to_partition(database, partition_key, &lines)
|
||||
.await;
|
||||
let lines_string = lines.join("\n");
|
||||
let lp: Vec<_> = parse_lines(&lines_string).map(|l| l.unwrap()).collect();
|
||||
write_lp_to_partition(database, &lp, partition_key).await
|
||||
}
|
||||
|
||||
/// Writes lines the the given partition
|
||||
async fn write_lp_to_partition(
|
||||
database: &MutableBufferDb,
|
||||
lines: &[ParsedLine<'_>],
|
||||
partition_key: impl Into<String>,
|
||||
) {
|
||||
let writer_id = 0;
|
||||
let sequence_number = 0;
|
||||
let partitioner = TestPartitioner {
|
||||
key: partition_key.into(),
|
||||
};
|
||||
let replicated_write =
|
||||
lines_to_replicated_write(writer_id, sequence_number, &lines, &partitioner);
|
||||
|
||||
database
|
||||
.store_replicated_write(&replicated_write)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
// Outputs a set partition key for testing. Used for parsing line protocol into
|
||||
// ReplicatedWrite and setting an explicit partition key for all writes therein.
|
||||
struct TestPartitioner {
|
||||
key: String,
|
||||
}
|
||||
|
||||
impl Partitioner for TestPartitioner {
|
||||
fn partition_key(
|
||||
&self,
|
||||
_line: &ParsedLine<'_>,
|
||||
_default_time: &DateTime<Utc>,
|
||||
) -> data_types::database_rules::Result<String> {
|
||||
Ok(self.key.clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ use data_types::{
|
|||
};
|
||||
use influxdb_line_protocol::ParsedLine;
|
||||
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
||||
use query::{exec::Executor, Database, DatabaseStore};
|
||||
use query::{exec::Executor, DatabaseStore};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
|
|
Loading…
Reference in New Issue