diff --git a/Cargo.lock b/Cargo.lock index be9d3ae399..926f985bf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.33" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1c13101a3224fb178860ae372a031ce350bbd92d39968518f016744dde0bf7" +checksum = "687c230d85c0a52504709705fc8a53e4a692b83a2184f03dae73e38e1e93a783" dependencies = [ "proc-macro2", "quote", @@ -628,6 +628,7 @@ version = "0.1.0" dependencies = [ "arrow", "assert_cmd", + "async-trait", "byteorder", "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index a56229ee19..2416220fd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,8 @@ integer-encoding = "1.0.7" hyper = "0.13" tokio = { version = "0.2", features = ["full"] } +async-trait = "0.1.40" + clap = "2.33.1" dotenv = "0.15.0" dirs = "2.0.2" diff --git a/src/commands/write_buffer.rs b/src/commands/write_buffer.rs index 8801c47082..82eb717fac 100644 --- a/src/commands/write_buffer.rs +++ b/src/commands/write_buffer.rs @@ -26,7 +26,7 @@ pub async fn main() -> Result<(), Box> { }; debug!("Delorean Server using database directory: {:?}", db_dir); - let storage = WriteBufferDatabases::new(&db_dir); + let storage = Arc::new(WriteBufferDatabases::new(&db_dir)); let dirs = storage.wal_dirs()?; // TODO: make recovery of multiple databases multi-threaded @@ -45,7 +45,6 @@ pub async fn main() -> Result<(), Box> { } }; - let storage = Arc::new(storage); let make_svc = make_service_fn(move |_conn| { let storage = storage.clone(); async move { diff --git a/src/server/write_buffer_routes.rs b/src/server/write_buffer_routes.rs index dd24ed9ca5..3390bbf6bf 100644 --- a/src/server/write_buffer_routes.rs +++ b/src/server/write_buffer_routes.rs @@ -9,7 +9,7 @@ use http::header::CONTENT_ENCODING; use tracing::{debug, error, info}; -use delorean::storage::write_buffer_database::{Error as DatabaseError, WriteBufferDatabases}; +use delorean::storage::{Database, DatabaseStore}; use delorean_line_parser::parse_lines; use bytes::{Bytes, BytesMut}; @@ -32,7 +32,7 @@ pub enum ApplicationError { BucketByName { org: String, bucket_name: String, - source: DatabaseError, + source: Box, }, #[snafu(display( @@ -44,7 +44,7 @@ pub enum ApplicationError { WritingPoints { org: String, bucket_name: String, - source: DatabaseError, + source: Box, }, #[snafu(display( @@ -54,7 +54,7 @@ pub enum ApplicationError { ))] Query { database: String, - source: DatabaseError, + source: Box, }, // Application level errors @@ -74,7 +74,9 @@ pub enum ApplicationError { }, #[snafu(display("Query error: {}", source))] - QueryError { source: DatabaseError }, + QueryError { + source: Box, + }, #[snafu(display("Invalid request body '{}': {}", request_body, source))] InvalidRequestBody { @@ -196,9 +198,9 @@ async fn parse_body(req: hyper::Request) -> Result( req: hyper::Request, - storage: Arc, + storage: Arc, ) -> Result, ApplicationError> { let query = req.uri().query().context(ExpectedQueryString)?; @@ -209,6 +211,7 @@ async fn write( let db = storage .db_or_create(&write_info.org, &write_info.bucket) .await + .map_err(|e| Box::new(e) as _) .context(BucketByName { org: write_info.org.clone(), bucket_name: write_info.bucket.clone(), @@ -218,16 +221,19 @@ async fn write( let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?; - let lines: Vec<_> = parse_lines(body) + let lines = parse_lines(body) .collect::, delorean_line_parser::Error>>() .context(ParsingLineProtocol)?; debug!("Parsed {} lines", lines.len()); - db.write_lines(&lines).await.context(WritingPoints { - org: write_info.org.clone(), - bucket_name: write_info.bucket.clone(), - })?; + db.write_lines(&lines) + .await + .map_err(|e| Box::new(e) as _) + .context(WritingPoints { + org: write_info.org.clone(), + bucket_name: write_info.bucket.clone(), + })?; Ok(None) } @@ -244,9 +250,9 @@ struct ReadInfo { // TODO: figure out how to stream read results out rather than rendering the whole thing in mem #[tracing::instrument(level = "debug")] -async fn read( +async fn read( req: hyper::Request, - storage: Arc, + storage: Arc, ) -> Result, ApplicationError> { let query = req.uri().query().context(ExpectedQueryString {})?; @@ -262,7 +268,11 @@ async fn read( bucket: read_info.bucket.clone(), })?; - let results = db.query(&read_info.query).await.context(QueryError {})?; + let results = db + .query(&read_info.query) + .await + .map_err(|e| Box::new(e) as _) + .context(QueryError {})?; let results = arrow::util::pretty::pretty_format_batches(&results).unwrap(); Ok(Some(results.into_bytes().into())) @@ -280,9 +290,9 @@ fn no_op(name: &str) -> Result, ApplicationError> { Ok(None) } -pub async fn service( +pub async fn service( req: hyper::Request, - storage: Arc, + storage: Arc, ) -> http::Result> { let method = req.method().clone(); let uri = req.uri().clone(); @@ -318,3 +328,216 @@ pub async fn service( info!(method = ?method, uri = ?uri, status = ?result.status(), "Handled request"); Ok(result) } + +#[cfg(test)] +mod tests { + use super::*; + use std::{collections::BTreeMap, net::SocketAddr}; + + use arrow::record_batch::RecordBatch; + use delorean::storage::{org_and_bucket_to_database, Database, DatabaseStore}; + use delorean_line_parser::ParsedLine; + use reqwest::{Client, Response}; + use tonic::async_trait; + + use hyper::service::{make_service_fn, service_fn}; + use hyper::Server; + use tokio::sync::Mutex; + + type Error = Box; + type Result = std::result::Result; + + #[tokio::test] + async fn test_write() -> Result<()> { + let test_storage = Arc::new(TestDatabaseStore::new()); + let server_url = test_server(test_storage.clone()); + println!("Started server at {}", server_url); + // now, make a http client and send some requests + + let client = Client::new(); + let response = client + .request(Method::GET, &format!("{}/ping", server_url)) + .send() + .await; + + // Print the response so if the test fails, we have a log of what went wrong + check_response("ping", response, StatusCode::OK, "PONG").await; + + let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1568756160"; + + // send write data + let bucket_name = "MyBucket"; + let org_name = "MyOrg"; + let response = client + .request( + Method::POST, + &format!( + "{}/api/v2/write?bucket={}&org={}", + server_url, bucket_name, org_name + ), + ) + .body(lp_data) + .send() + .await; + + check_response("write", response, StatusCode::NO_CONTENT, "").await; + + // Check that the data got into the right bucket + let test_db = test_storage + .db("MyOrg", "MyBucket") + .await + .expect("Database exists"); + + // Ensure the same line protocol data gets through + assert_eq!(test_db.get_lines().await, vec![lp_data]); + Ok(()) + } + + /// checks a http response against expected results + async fn check_response( + description: &str, + response: Result, + expected_status: StatusCode, + expected_body: &str, + ) { + // Print the response so if the test fails, we have a log of + // what went wrong + println!("{} response: {:?}", description, response); + + if let Ok(response) = response { + let status = response.status(); + let body = response + .text() + .await + .expect("Converting request body to string"); + + assert_eq!(status, expected_status); + assert_eq!(body, expected_body); + } else { + panic!("Unexpected error response: {:?}", response); + } + } + + /// creates an instance of the http service backed by a in-memory + /// testable database. Returns the url of the server + fn test_server(storage: Arc) -> String { + let make_svc = make_service_fn(move |_conn| { + let storage = storage.clone(); + async move { + Ok::<_, http::Error>(service_fn(move |req| { + let state = storage.clone(); + super::service(req, state) + })) + } + }); + + // TODO pick the port dynamically and return it + let bind_addr: SocketAddr = "127.0.0.1:18080".parse().unwrap(); + let server = Server::bind(&bind_addr).serve(make_svc); + let server_url = format!("http://{}", bind_addr); + tokio::task::spawn(server); + server_url + } + + #[derive(Debug)] + struct TestDatabase { + // lines which have been written to this database, in order + saved_lines: Mutex>, + } + + #[derive(Snafu, Debug)] + enum TestError {} + + impl TestDatabase { + fn new() -> Self { + Self { + saved_lines: Mutex::new(Vec::new()), + } + } + + /// Get all lines written to this database + async fn get_lines(&self) -> Vec { + self.saved_lines.lock().await.clone() + } + } + + #[async_trait] + impl Database for TestDatabase { + type Error = TestError; + + /// writes parsed lines into this database + async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> { + let mut saved_lines = self.saved_lines.lock().await; + for line in lines { + saved_lines.push(line.to_string()) + } + Ok(()) + } + + /// Execute the specified query and return arrow record batches with the result + async fn query(&self, _query: &str) -> Result, Self::Error> { + unimplemented!("query Not yet implemented"); + } + + /// Fetch the specified table names and columns as Arrow RecordBatches + async fn table_to_arrow( + &self, + _table_name: &str, + _columns: &[&str], + ) -> Result, Self::Error> { + unimplemented!("table_to_arrow Not yet implemented"); + } + } + + #[derive(Debug)] + struct TestDatabaseStore { + databases: Mutex>>, + } + + impl TestDatabaseStore { + fn new() -> Self { + Self { + databases: Mutex::new(BTreeMap::new()), + } + } + } + + #[async_trait] + impl DatabaseStore for TestDatabaseStore { + type Database = TestDatabase; + type Error = TestError; + /// Retrieve the database specified by the org and bucket name, + /// returning None if no such database exists + /// + /// TODO: change this to take a single database name, and move the + /// computation of org/bucket to the callers + async fn db(&self, org: &str, bucket: &str) -> Option> { + let db_name = org_and_bucket_to_database(org, bucket); + let databases = self.databases.lock().await; + + databases.get(&db_name).cloned() + } + + /// Retrieve the database specified by the org and bucket name, + /// creating it if it doesn't exist. + /// + /// TODO: change this to take a single database name, and move the computation of org/bucket + /// to the callers + async fn db_or_create( + &self, + org: &str, + bucket: &str, + ) -> Result, Self::Error> { + let db_name = org_and_bucket_to_database(org, bucket); + let mut databases = self.databases.lock().await; + + if let Some(db) = databases.get(&db_name) { + Ok(db.clone()) + } else { + let new_db = Arc::new(TestDatabase::new()); + databases.insert(db_name, new_db.clone()); + Ok(new_db) + } + } + } +} diff --git a/src/storage.rs b/src/storage.rs index d23a22f92f..a4ef5d6841 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,3 +1,8 @@ +//! This module defines the traits by which the rest of Delorean +//! interacts with the storage system. The goal is to define a clear +//! interface as well as being able to test other parts of Delorean +//! using mockups that conform to these traits + use std::convert::TryFrom; pub mod block; @@ -9,6 +14,12 @@ pub mod remote_partition; pub mod s3_partition; pub mod write_buffer_database; +use std::{fmt::Debug, sync::Arc}; + +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use delorean_line_parser::ParsedLine; + #[derive(Debug, Eq, PartialEq, Clone)] pub struct ReadPoint { pub time: i64, @@ -59,3 +70,57 @@ impl TryFrom for SeriesDataType { } } } + +#[async_trait] +/// A `Database` stores data and provides an interface to query that data. +pub trait Database: Debug + Send + Sync { + type Error: std::error::Error + Send + Sync + 'static; + + /// writes parsed lines into this database + async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error>; + + /// Execute the specified query and return arrow record batches with the result + async fn query(&self, query: &str) -> Result, Self::Error>; + + /// Fetch the specified table names and columns as Arrow RecordBatches + async fn table_to_arrow( + &self, + table_name: &str, + columns: &[&str], + ) -> Result, Self::Error>; +} + +#[async_trait] +/// Storage for `Databases` which can be retrieved by name +pub trait DatabaseStore: Debug + Send + Sync { + /// The type of database that is stored by this DatabaseStore + type Database: Database; + + /// The type of error this DataBase store generates + type Error: std::error::Error + Send + Sync + 'static; + + /// Retrieve the database specified by the org and bucket name, + /// returning None if no such database exists + /// + /// TODO: change this to take a single database name, and move the + /// computation of org/bucket to the callers + async fn db(&self, org: &str, bucket: &str) -> Option>; + + /// Retrieve the database specified by the org and bucket name, + /// creating it if it doesn't exist. + /// + /// TODO: change this to take a single database name, and move the computation of org/bucket + /// to the callers + async fn db_or_create( + &self, + org: &str, + bucket: &str, + ) -> Result, Self::Error>; +} + +/// return the database name to use for the specified org and bucket name. +/// +/// TODO move to somewhere else / change the traits to take the database name directly +pub fn org_and_bucket_to_database(org: &str, bucket: &str) -> String { + org.to_owned() + "_" + bucket +} diff --git a/src/storage/write_buffer_database.rs b/src/storage/write_buffer_database.rs index 22961f8e82..74b264b32c 100644 --- a/src/storage/write_buffer_database.rs +++ b/src/storage/write_buffer_database.rs @@ -1,3 +1,5 @@ +use super::org_and_bucket_to_database; +use tonic::async_trait; use tracing::info; use crate::generated_types::wal as wb; @@ -193,8 +195,14 @@ impl WriteBufferDatabases { let mut databases = self.databases.write().await; databases.insert(db.name.clone(), Arc::new(db)); } +} - pub async fn db(&self, org: &str, bucket: &str) -> Option> { +#[async_trait] +impl super::DatabaseStore for WriteBufferDatabases { + type Database = Db; + type Error = Error; + + async fn db(&self, org: &str, bucket: &str) -> Option> { let databases = self.databases.read().await; databases @@ -202,7 +210,11 @@ impl WriteBufferDatabases { .cloned() } - pub async fn db_or_create(&self, org: &str, bucket: &str) -> Result> { + async fn db_or_create( + &self, + org: &str, + bucket: &str, + ) -> Result, Self::Error> { let db_name = org_and_bucket_to_database(org, bucket); // get it through a read lock first if we can @@ -230,10 +242,6 @@ impl WriteBufferDatabases { } } -fn org_and_bucket_to_database(org: &str, bucket: &str) -> String { - org.to_owned() + "_" + bucket -} - #[derive(Debug)] pub struct Db { name: String, @@ -384,10 +392,15 @@ impl Db { wal_details: Some(wal_details), }) } +} + +#[async_trait] +impl super::Database for Db { + type Error = Error; // TODO: writes lines creates a column named "time" for the timestmap data. If // we keep this we need to validate that no tag or field has the same name. - pub async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<()> { + async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> { let partition_keys: Vec<_> = lines.iter().map(|l| (l, self.partition_key(l))).collect(); let mut partitions = self.partitions.write().await; @@ -429,20 +442,20 @@ impl Db { Ok(()) } - pub async fn table_to_arrow( + async fn table_to_arrow( &self, table_name: &str, _columns: &[&str], - ) -> Result> { + ) -> Result, Self::Error> { let partitions = self.partitions.read().await; partitions .iter() .map(|p| p.table_to_arrow(table_name)) - .collect() + .collect::>>() } - pub async fn query(&self, query: &str) -> Result> { + async fn query(&self, query: &str) -> Result, Self::Error> { let mut tables = vec![]; let dialect = GenericDialect {}; @@ -478,29 +491,24 @@ impl Db { let mut ctx = ExecutionContext::new(); for table in tables { - let provider = MemTable::new(table.schema, vec![table.data]).context(QueryError { - query: query.to_string(), - })?; + let provider = + MemTable::new(table.schema, vec![table.data]).context(QueryError { query })?; ctx.register_table(&table.name, Box::new(provider)); } - let plan = ctx.create_logical_plan(&query).context(QueryError { - query: query.to_string(), - })?; - let plan = ctx.optimize(&plan).context(QueryError { - query: query.to_string(), - })?; + let plan = ctx + .create_logical_plan(&query) + .context(QueryError { query })?; + let plan = ctx.optimize(&plan).context(QueryError { query })?; let plan = ctx .create_physical_plan(&plan, 1024 * 1024) - .context(QueryError { - query: query.to_string(), - })?; + .context(QueryError { query })?; - ctx.collect(plan.as_ref()).context(QueryError { - query: query.to_string(), - }) + ctx.collect(plan.as_ref()).context(QueryError { query }) } +} +impl Db { // partition_key returns the partition key for the given line. The key will be the prefix of a // partition name (multiple partitions can exist for each key). It uses the user defined // partitioning rules to construct this key @@ -1264,6 +1272,7 @@ impl Column { #[cfg(test)] mod tests { use super::*; + use crate::storage::Database; use arrow::util::pretty::pretty_format_batches; use delorean_line_parser::parse_lines;