test: traits for database and tests for http handler (#284)

* test: traits for database and tests for http handler

* refactor: Use generics and trait bounds instead of trait objects

* refactor: Replace trait objects with an associated type

* refactor: Extract an associated Error type on the Database traits

* refactor: Remove some explicit conversions to_string that Snafu takes care of

* docs: add comments

* refactor: move traits into storage module

Co-authored-by: Carol (Nichols || Goulding) <carol.nichols@integer32.com>
pull/24376/head
Andrew Lamb 2020-09-11 17:42:00 -04:00 committed by GitHub
parent 44bb230b2d
commit 82d5f485c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 346 additions and 47 deletions

5
Cargo.lock generated
View File

@ -151,9 +151,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.33"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1c13101a3224fb178860ae372a031ce350bbd92d39968518f016744dde0bf7"
checksum = "687c230d85c0a52504709705fc8a53e4a692b83a2184f03dae73e38e1e93a783"
dependencies = [
"proc-macro2",
"quote",
@ -628,6 +628,7 @@ version = "0.1.0"
dependencies = [
"arrow",
"assert_cmd",
"async-trait",
"byteorder",
"bytes",
"chrono",

View File

@ -40,6 +40,8 @@ integer-encoding = "1.0.7"
hyper = "0.13"
tokio = { version = "0.2", features = ["full"] }
async-trait = "0.1.40"
clap = "2.33.1"
dotenv = "0.15.0"
dirs = "2.0.2"

View File

@ -26,7 +26,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
};
debug!("Delorean Server using database directory: {:?}", db_dir);
let storage = WriteBufferDatabases::new(&db_dir);
let storage = Arc::new(WriteBufferDatabases::new(&db_dir));
let dirs = storage.wal_dirs()?;
// TODO: make recovery of multiple databases multi-threaded
@ -45,7 +45,6 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}
};
let storage = Arc::new(storage);
let make_svc = make_service_fn(move |_conn| {
let storage = storage.clone();
async move {

View File

@ -9,7 +9,7 @@
use http::header::CONTENT_ENCODING;
use tracing::{debug, error, info};
use delorean::storage::write_buffer_database::{Error as DatabaseError, WriteBufferDatabases};
use delorean::storage::{Database, DatabaseStore};
use delorean_line_parser::parse_lines;
use bytes::{Bytes, BytesMut};
@ -32,7 +32,7 @@ pub enum ApplicationError {
BucketByName {
org: String,
bucket_name: String,
source: DatabaseError,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display(
@ -44,7 +44,7 @@ pub enum ApplicationError {
WritingPoints {
org: String,
bucket_name: String,
source: DatabaseError,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display(
@ -54,7 +54,7 @@ pub enum ApplicationError {
))]
Query {
database: String,
source: DatabaseError,
source: Box<dyn std::error::Error + Send + Sync>,
},
// Application level errors
@ -74,7 +74,9 @@ pub enum ApplicationError {
},
#[snafu(display("Query error: {}", source))]
QueryError { source: DatabaseError },
QueryError {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Invalid request body '{}': {}", request_body, source))]
InvalidRequestBody {
@ -196,9 +198,9 @@ async fn parse_body(req: hyper::Request<Body>) -> Result<Bytes, ApplicationError
}
#[tracing::instrument(level = "debug")]
async fn write(
async fn write<T: DatabaseStore>(
req: hyper::Request<Body>,
storage: Arc<WriteBufferDatabases>,
storage: Arc<T>,
) -> Result<Option<Body>, ApplicationError> {
let query = req.uri().query().context(ExpectedQueryString)?;
@ -209,6 +211,7 @@ async fn write(
let db = storage
.db_or_create(&write_info.org, &write_info.bucket)
.await
.map_err(|e| Box::new(e) as _)
.context(BucketByName {
org: write_info.org.clone(),
bucket_name: write_info.bucket.clone(),
@ -218,16 +221,19 @@ async fn write(
let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?;
let lines: Vec<_> = parse_lines(body)
let lines = parse_lines(body)
.collect::<Result<Vec<_>, delorean_line_parser::Error>>()
.context(ParsingLineProtocol)?;
debug!("Parsed {} lines", lines.len());
db.write_lines(&lines).await.context(WritingPoints {
org: write_info.org.clone(),
bucket_name: write_info.bucket.clone(),
})?;
db.write_lines(&lines)
.await
.map_err(|e| Box::new(e) as _)
.context(WritingPoints {
org: write_info.org.clone(),
bucket_name: write_info.bucket.clone(),
})?;
Ok(None)
}
@ -244,9 +250,9 @@ struct ReadInfo {
// TODO: figure out how to stream read results out rather than rendering the whole thing in mem
#[tracing::instrument(level = "debug")]
async fn read(
async fn read<T: DatabaseStore>(
req: hyper::Request<Body>,
storage: Arc<WriteBufferDatabases>,
storage: Arc<T>,
) -> Result<Option<Body>, ApplicationError> {
let query = req.uri().query().context(ExpectedQueryString {})?;
@ -262,7 +268,11 @@ async fn read(
bucket: read_info.bucket.clone(),
})?;
let results = db.query(&read_info.query).await.context(QueryError {})?;
let results = db
.query(&read_info.query)
.await
.map_err(|e| Box::new(e) as _)
.context(QueryError {})?;
let results = arrow::util::pretty::pretty_format_batches(&results).unwrap();
Ok(Some(results.into_bytes().into()))
@ -280,9 +290,9 @@ fn no_op(name: &str) -> Result<Option<Body>, ApplicationError> {
Ok(None)
}
pub async fn service(
pub async fn service<T: DatabaseStore>(
req: hyper::Request<Body>,
storage: Arc<WriteBufferDatabases>,
storage: Arc<T>,
) -> http::Result<hyper::Response<Body>> {
let method = req.method().clone();
let uri = req.uri().clone();
@ -318,3 +328,216 @@ pub async fn service(
info!(method = ?method, uri = ?uri, status = ?result.status(), "Handled request");
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
use std::{collections::BTreeMap, net::SocketAddr};
use arrow::record_batch::RecordBatch;
use delorean::storage::{org_and_bucket_to_database, Database, DatabaseStore};
use delorean_line_parser::ParsedLine;
use reqwest::{Client, Response};
use tonic::async_trait;
use hyper::service::{make_service_fn, service_fn};
use hyper::Server;
use tokio::sync::Mutex;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
#[tokio::test]
async fn test_write() -> Result<()> {
let test_storage = Arc::new(TestDatabaseStore::new());
let server_url = test_server(test_storage.clone());
println!("Started server at {}", server_url);
// now, make a http client and send some requests
let client = Client::new();
let response = client
.request(Method::GET, &format!("{}/ping", server_url))
.send()
.await;
// Print the response so if the test fails, we have a log of what went wrong
check_response("ping", response, StatusCode::OK, "PONG").await;
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1568756160";
// send write data
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.request(
Method::POST,
&format!(
"{}/api/v2/write?bucket={}&org={}",
server_url, bucket_name, org_name
),
)
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, "").await;
// Check that the data got into the right bucket
let test_db = test_storage
.db("MyOrg", "MyBucket")
.await
.expect("Database exists");
// Ensure the same line protocol data gets through
assert_eq!(test_db.get_lines().await, vec![lp_data]);
Ok(())
}
/// checks a http response against expected results
async fn check_response(
description: &str,
response: Result<Response, reqwest::Error>,
expected_status: StatusCode,
expected_body: &str,
) {
// Print the response so if the test fails, we have a log of
// what went wrong
println!("{} response: {:?}", description, response);
if let Ok(response) = response {
let status = response.status();
let body = response
.text()
.await
.expect("Converting request body to string");
assert_eq!(status, expected_status);
assert_eq!(body, expected_body);
} else {
panic!("Unexpected error response: {:?}", response);
}
}
/// creates an instance of the http service backed by a in-memory
/// testable database. Returns the url of the server
fn test_server(storage: Arc<TestDatabaseStore>) -> String {
let make_svc = make_service_fn(move |_conn| {
let storage = storage.clone();
async move {
Ok::<_, http::Error>(service_fn(move |req| {
let state = storage.clone();
super::service(req, state)
}))
}
});
// TODO pick the port dynamically and return it
let bind_addr: SocketAddr = "127.0.0.1:18080".parse().unwrap();
let server = Server::bind(&bind_addr).serve(make_svc);
let server_url = format!("http://{}", bind_addr);
tokio::task::spawn(server);
server_url
}
#[derive(Debug)]
struct TestDatabase {
// lines which have been written to this database, in order
saved_lines: Mutex<Vec<String>>,
}
#[derive(Snafu, Debug)]
enum TestError {}
impl TestDatabase {
fn new() -> Self {
Self {
saved_lines: Mutex::new(Vec::new()),
}
}
/// Get all lines written to this database
async fn get_lines(&self) -> Vec<String> {
self.saved_lines.lock().await.clone()
}
}
#[async_trait]
impl Database for TestDatabase {
type Error = TestError;
/// writes parsed lines into this database
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> {
let mut saved_lines = self.saved_lines.lock().await;
for line in lines {
saved_lines.push(line.to_string())
}
Ok(())
}
/// Execute the specified query and return arrow record batches with the result
async fn query(&self, _query: &str) -> Result<Vec<RecordBatch>, Self::Error> {
unimplemented!("query Not yet implemented");
}
/// Fetch the specified table names and columns as Arrow RecordBatches
async fn table_to_arrow(
&self,
_table_name: &str,
_columns: &[&str],
) -> Result<Vec<RecordBatch>, Self::Error> {
unimplemented!("table_to_arrow Not yet implemented");
}
}
#[derive(Debug)]
struct TestDatabaseStore {
databases: Mutex<BTreeMap<String, Arc<TestDatabase>>>,
}
impl TestDatabaseStore {
fn new() -> Self {
Self {
databases: Mutex::new(BTreeMap::new()),
}
}
}
#[async_trait]
impl DatabaseStore for TestDatabaseStore {
type Database = TestDatabase;
type Error = TestError;
/// Retrieve the database specified by the org and bucket name,
/// returning None if no such database exists
///
/// TODO: change this to take a single database name, and move the
/// computation of org/bucket to the callers
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>> {
let db_name = org_and_bucket_to_database(org, bucket);
let databases = self.databases.lock().await;
databases.get(&db_name).cloned()
}
/// Retrieve the database specified by the org and bucket name,
/// creating it if it doesn't exist.
///
/// TODO: change this to take a single database name, and move the computation of org/bucket
/// to the callers
async fn db_or_create(
&self,
org: &str,
bucket: &str,
) -> Result<Arc<Self::Database>, Self::Error> {
let db_name = org_and_bucket_to_database(org, bucket);
let mut databases = self.databases.lock().await;
if let Some(db) = databases.get(&db_name) {
Ok(db.clone())
} else {
let new_db = Arc::new(TestDatabase::new());
databases.insert(db_name, new_db.clone());
Ok(new_db)
}
}
}
}

View File

@ -1,3 +1,8 @@
//! This module defines the traits by which the rest of Delorean
//! interacts with the storage system. The goal is to define a clear
//! interface as well as being able to test other parts of Delorean
//! using mockups that conform to these traits
use std::convert::TryFrom;
pub mod block;
@ -9,6 +14,12 @@ pub mod remote_partition;
pub mod s3_partition;
pub mod write_buffer_database;
use std::{fmt::Debug, sync::Arc};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use delorean_line_parser::ParsedLine;
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct ReadPoint<T: Clone> {
pub time: i64,
@ -59,3 +70,57 @@ impl TryFrom<u8> for SeriesDataType {
}
}
}
#[async_trait]
/// A `Database` stores data and provides an interface to query that data.
pub trait Database: Debug + Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
/// writes parsed lines into this database
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error>;
/// Execute the specified query and return arrow record batches with the result
async fn query(&self, query: &str) -> Result<Vec<RecordBatch>, Self::Error>;
/// Fetch the specified table names and columns as Arrow RecordBatches
async fn table_to_arrow(
&self,
table_name: &str,
columns: &[&str],
) -> Result<Vec<RecordBatch>, Self::Error>;
}
#[async_trait]
/// Storage for `Databases` which can be retrieved by name
pub trait DatabaseStore: Debug + Send + Sync {
/// The type of database that is stored by this DatabaseStore
type Database: Database;
/// The type of error this DataBase store generates
type Error: std::error::Error + Send + Sync + 'static;
/// Retrieve the database specified by the org and bucket name,
/// returning None if no such database exists
///
/// TODO: change this to take a single database name, and move the
/// computation of org/bucket to the callers
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>>;
/// Retrieve the database specified by the org and bucket name,
/// creating it if it doesn't exist.
///
/// TODO: change this to take a single database name, and move the computation of org/bucket
/// to the callers
async fn db_or_create(
&self,
org: &str,
bucket: &str,
) -> Result<Arc<Self::Database>, Self::Error>;
}
/// return the database name to use for the specified org and bucket name.
///
/// TODO move to somewhere else / change the traits to take the database name directly
pub fn org_and_bucket_to_database(org: &str, bucket: &str) -> String {
org.to_owned() + "_" + bucket
}

View File

@ -1,3 +1,5 @@
use super::org_and_bucket_to_database;
use tonic::async_trait;
use tracing::info;
use crate::generated_types::wal as wb;
@ -193,8 +195,14 @@ impl WriteBufferDatabases {
let mut databases = self.databases.write().await;
databases.insert(db.name.clone(), Arc::new(db));
}
}
pub async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Db>> {
#[async_trait]
impl super::DatabaseStore for WriteBufferDatabases {
type Database = Db;
type Error = Error;
async fn db(&self, org: &str, bucket: &str) -> Option<Arc<Self::Database>> {
let databases = self.databases.read().await;
databases
@ -202,7 +210,11 @@ impl WriteBufferDatabases {
.cloned()
}
pub async fn db_or_create(&self, org: &str, bucket: &str) -> Result<Arc<Db>> {
async fn db_or_create(
&self,
org: &str,
bucket: &str,
) -> Result<Arc<Self::Database>, Self::Error> {
let db_name = org_and_bucket_to_database(org, bucket);
// get it through a read lock first if we can
@ -230,10 +242,6 @@ impl WriteBufferDatabases {
}
}
fn org_and_bucket_to_database(org: &str, bucket: &str) -> String {
org.to_owned() + "_" + bucket
}
#[derive(Debug)]
pub struct Db {
name: String,
@ -384,10 +392,15 @@ impl Db {
wal_details: Some(wal_details),
})
}
}
#[async_trait]
impl super::Database for Db {
type Error = Error;
// TODO: writes lines creates a column named "time" for the timestmap data. If
// we keep this we need to validate that no tag or field has the same name.
pub async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<()> {
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> {
let partition_keys: Vec<_> = lines.iter().map(|l| (l, self.partition_key(l))).collect();
let mut partitions = self.partitions.write().await;
@ -429,20 +442,20 @@ impl Db {
Ok(())
}
pub async fn table_to_arrow(
async fn table_to_arrow(
&self,
table_name: &str,
_columns: &[&str],
) -> Result<Vec<RecordBatch>> {
) -> Result<Vec<RecordBatch>, Self::Error> {
let partitions = self.partitions.read().await;
partitions
.iter()
.map(|p| p.table_to_arrow(table_name))
.collect()
.collect::<Result<Vec<_>>>()
}
pub async fn query(&self, query: &str) -> Result<Vec<RecordBatch>> {
async fn query(&self, query: &str) -> Result<Vec<RecordBatch>, Self::Error> {
let mut tables = vec![];
let dialect = GenericDialect {};
@ -478,29 +491,24 @@ impl Db {
let mut ctx = ExecutionContext::new();
for table in tables {
let provider = MemTable::new(table.schema, vec![table.data]).context(QueryError {
query: query.to_string(),
})?;
let provider =
MemTable::new(table.schema, vec![table.data]).context(QueryError { query })?;
ctx.register_table(&table.name, Box::new(provider));
}
let plan = ctx.create_logical_plan(&query).context(QueryError {
query: query.to_string(),
})?;
let plan = ctx.optimize(&plan).context(QueryError {
query: query.to_string(),
})?;
let plan = ctx
.create_logical_plan(&query)
.context(QueryError { query })?;
let plan = ctx.optimize(&plan).context(QueryError { query })?;
let plan = ctx
.create_physical_plan(&plan, 1024 * 1024)
.context(QueryError {
query: query.to_string(),
})?;
.context(QueryError { query })?;
ctx.collect(plan.as_ref()).context(QueryError {
query: query.to_string(),
})
ctx.collect(plan.as_ref()).context(QueryError { query })
}
}
impl Db {
// partition_key returns the partition key for the given line. The key will be the prefix of a
// partition name (multiple partitions can exist for each key). It uses the user defined
// partitioning rules to construct this key
@ -1264,6 +1272,7 @@ impl Column {
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::Database;
use arrow::util::pretty::pretty_format_batches;
use delorean_line_parser::parse_lines;