diff --git a/internal_types/src/entry.rs b/internal_types/src/entry.rs index 1172b5ab6d..32279b4b95 100644 --- a/internal_types/src/entry.rs +++ b/internal_types/src/entry.rs @@ -1202,7 +1202,7 @@ enum InnerClockValueError { ValueMayNotBeZero, } -pub trait SequencedEntry { +pub trait SequencedEntry: Send + Sync + std::fmt::Debug { fn partition_writes(&self) -> Option>>; fn fb(&self) -> &entry_fb::SequencedEntry<'_>; @@ -1392,7 +1392,7 @@ impl Segment { segment_id: u64, server_id: ServerId, clock_value: Option, - entries: &[Arc], + entries: &[Arc], ) -> Self { let mut fbb = FlatBufferBuilder::new_with_capacity(1024); diff --git a/server/src/buffer.rs b/server/src/buffer.rs index 63c7c9e39c..ffc16202fd 100644 --- a/server/src/buffer.rs +++ b/server/src/buffer.rs @@ -1,9 +1,7 @@ //! This module contains code for managing the Write Buffer use data_types::{database_rules::WriteBufferRollover, server_id::ServerId, DatabaseName}; -use internal_types::entry::{ - ClockValue, OwnedSequencedEntry, Segment as EntrySegment, SequencedEntry, -}; +use internal_types::entry::{ClockValue, Segment as EntrySegment, SequencedEntry}; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use std::{convert::TryInto, mem, sync::Arc}; @@ -128,7 +126,7 @@ impl Buffer { /// has been closed out. If the max size of the buffer would be exceeded /// by accepting the write, the oldest (first) of the closed segments /// will be dropped, if it is persisted. Otherwise, an error is returned. - pub fn append(&mut self, write: Arc) -> Result>> { + pub fn append(&mut self, write: Arc) -> Result>> { let write_size = write.size(); while self.current_size + write_size > self.max_size { @@ -193,7 +191,7 @@ impl Buffer { } /// Returns any writes after the passed in `ClockValue` - pub fn writes_since(&self, since: ClockValue) -> Vec> { + pub fn writes_since(&self, since: ClockValue) -> Vec> { let mut writes = Vec::new(); // start with the newest writes and go back. Hopefully they're asking for @@ -203,7 +201,7 @@ impl Buffer { writes.reverse(); return writes; } - writes.push(Arc::clone(&w)); + writes.push(Arc::clone(w)); } for s in self.closed_segments.iter().rev() { @@ -212,7 +210,7 @@ impl Buffer { writes.reverse(); return writes; } - writes.push(Arc::clone(&w)); + writes.push(Arc::clone(w)); } } @@ -234,7 +232,7 @@ impl Buffer { pub struct Segment { pub(crate) id: u64, size: usize, - pub writes: Vec>, + pub writes: Vec>, // Time this segment was initialized created_at: DateTime, // Persistence metadata if segment is persisted @@ -257,7 +255,7 @@ impl Segment { } // appends the write to the segment - fn append(&mut self, write: Arc) { + fn append(&mut self, write: Arc) { self.size += write.size(); self.writes.push(write); } @@ -416,7 +414,7 @@ mod tests { use super::*; use internal_types::entry::{test_helpers::lp_to_sequenced_entry as lp_2_se, SequencedEntry}; use object_store::memory::InMemory; - use std::convert::TryFrom; + use std::{convert::TryFrom, ops::Deref}; #[test] fn append_increments_current_size_and_uses_existing_segment() { @@ -617,7 +615,7 @@ mod tests { } fn equal_to_server_id_and_clock_value( - sequenced_entry: &OwnedSequencedEntry, + sequenced_entry: &dyn SequencedEntry, expected_server_id: ServerId, expected_clock_value: u64, ) { @@ -666,9 +664,9 @@ mod tests { let writes = buf.writes_since(ClockValue::try_from(1).unwrap()); assert_eq!(3, writes.len()); - equal_to_server_id_and_clock_value(&writes[0], server_id1, 2); - equal_to_server_id_and_clock_value(&writes[1], server_id1, 3); - equal_to_server_id_and_clock_value(&writes[2], server_id2, 2); + equal_to_server_id_and_clock_value(writes[0].deref(), server_id1, 2); + equal_to_server_id_and_clock_value(writes[1].deref(), server_id1, 3); + equal_to_server_id_and_clock_value(writes[2].deref(), server_id2, 2); } #[test] @@ -745,7 +743,7 @@ mod tests { server_id: ServerId, clock_value: u64, line_protocol: &str, - ) -> Arc { + ) -> Arc { Arc::new(lp_2_se(line_protocol, server_id.get_u32(), clock_value)) } } diff --git a/server/src/db.rs b/server/src/db.rs index 5106ac5400..1584e37cad 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1,7 +1,10 @@ //! This module contains the main IOx Database object which has the //! instances of the mutable buffer, read buffer, and object store -use super::{buffer::Buffer, JobRegistry}; +use super::{ + buffer::{self, Buffer}, + JobRegistry, +}; use arrow_deps::{ arrow::datatypes::SchemaRef as ArrowSchemaRef, datafusion::{ @@ -202,6 +205,9 @@ pub enum Error { #[snafu(display("Invalid Clock Value: {}", source))] InvalidClockValue { source: ClockValueError }, + + #[snafu(display("Error sending Sequenced Entry to Write Buffer: {}", source))] + WriteBufferError { source: buffer::Error }, } pub type Result = std::result::Result; @@ -1070,28 +1076,42 @@ impl Db { } /// Stores an entry based on the configuration. The Entry will first be - /// converted into a Sequenced Entry with the logical clock assigned - /// from the database. If the write buffer is configured, the sequenced - /// entry is written into the buffer and replicated based on the - /// configured rules. If the mutable buffer is configured, the sequenced - /// entry is then written into the mutable buffer. + /// converted into a `SequencedEntry` with the logical clock assigned + /// from the database, and then the `SequencedEntry` will be passed to + /// `store_sequenced_entry`. pub fn store_entry(&self, entry: Entry) -> Result<()> { - // TODO: build this based on either this or on the write buffer, if configured - let sequenced_entry = OwnedSequencedEntry::new_from_entry_bytes( - ClockValue::try_from(self.next_sequence()).context(InvalidClockValue)?, - self.server_id, - entry.data(), - ) - .context(SequencedEntryError)?; - - if self.rules.read().write_buffer_config.is_some() { - todo!("route to the Write Buffer. TODO: carols10cents #1157") - } + let sequenced_entry = Arc::new( + OwnedSequencedEntry::new_from_entry_bytes( + ClockValue::try_from(self.next_sequence()).context(InvalidClockValue)?, + self.server_id, + entry.data(), + ) + .context(SequencedEntryError)?, + ); self.store_sequenced_entry(sequenced_entry) } - pub fn store_sequenced_entry(&self, sequenced_entry: impl SequencedEntry) -> Result<()> { + /// Given a `SequencedEntry`: + /// + /// - If the write buffer is configured, write the `SequencedEntry` into the buffer, which + /// will replicate the `SequencedEntry` based on the configured rules. + /// - If the mutable buffer is configured, the `SequencedEntry` is then written into the + /// mutable buffer. + /// + /// Note that if the write buffer is configured but there is an error storing the + /// `SequencedEntry` in the write buffer, the `SequencedEntry` will *not* reach the mutable + /// buffer. + pub fn store_sequenced_entry(&self, sequenced_entry: Arc) -> Result<()> { + // Send to the write buffer, if configured + if let Some(wb) = &self.write_buffer { + wb.lock() + .append(Arc::clone(&sequenced_entry)) + .context(WriteBufferError)?; + } + + // Send to the mutable buffer + let rules = self.rules.read(); let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold; if rules.lifecycle_rules.immutable { @@ -1283,7 +1303,11 @@ pub mod test_helpers { #[cfg(test)] mod tests { - use crate::query_tests::utils::{make_database, make_db}; + use super::{ + test_helpers::{try_write_lp, write_lp}, + *, + }; + use crate::query_tests::utils::{make_db, TestDb}; use ::test_helpers::assert_contains; use arrow_deps::{ arrow::record_batch::RecordBatch, assert_batches_eq, assert_batches_sorted_eq, @@ -1295,20 +1319,11 @@ mod tests { database_rules::{Order, Sort, SortOrder}, partition_metadata::{ColumnSummary, StatValues, Statistics, TableSummary}, }; - use object_store::{ - disk::File, path::ObjectStorePath, path::Path, ObjectStore, ObjectStoreApi, - }; - use query::{frontend::sql::SqlQueryPlanner, PartitionChunk}; - - use super::*; - use futures::stream; - use futures::{StreamExt, TryStreamExt}; - use std::{convert::TryFrom, iter::Iterator}; - - use super::test_helpers::{try_write_lp, write_lp}; + use futures::{stream, StreamExt, TryStreamExt}; use internal_types::entry::test_helpers::lp_to_entry; - use std::num::NonZeroUsize; - use std::str; + use object_store::{disk::File, path::Path, ObjectStore, ObjectStoreApi}; + use query::{frontend::sql::SqlQueryPlanner, PartitionChunk}; + use std::{convert::TryFrom, iter::Iterator, num::NonZeroUsize, str}; use tempfile::TempDir; type Error = Box; @@ -1761,7 +1776,12 @@ mod tests { // Create a DB given a server id, an object store and a db name let server_id = ServerId::try_from(10).unwrap(); let db_name = "parquet_test_db"; - let test_db = make_database(server_id, Arc::clone(&object_store), db_name); + let test_db = TestDb::builder() + .server_id(server_id) + .object_store(Arc::clone(&object_store)) + .db_name(db_name) + .build(); + let db = Arc::new(test_db.db); // Write some line protocols in Mutable buffer of the DB @@ -1822,17 +1842,21 @@ mod tests { assert_eq!(path_list, paths.clone()); // Get full string path - let root_path = format!("{:?}", root.path()); - let root_path = root_path.trim_matches('"'); - let path = format!("{}/{}", root_path, paths[0].display()); - println!("path: {}", path); + let path0 = match &paths[0] { + Path::File(file_path) => file_path.to_raw(), + other => panic!("expected `Path::File`, got: {:?}", other), + }; + + let mut path = root.path().to_path_buf(); + path.push(&path0); + println!("path: {}", path.display()); // Create External table of this parquet file to get its content in a human // readable form // Note: We do not care about escaping quotes here because it is just a test let sql = format!( "CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'", - path + path.display() ); let mut ctx = context::ExecutionContext::new(); @@ -1867,7 +1891,12 @@ mod tests { // Create a DB given a server id, an object store and a db name let server_id = ServerId::try_from(10).unwrap(); let db_name = "unload_read_buffer_test_db"; - let test_db = make_database(server_id, Arc::clone(&object_store), db_name); + let test_db = TestDb::builder() + .server_id(server_id) + .object_store(Arc::clone(&object_store)) + .db_name(db_name) + .build(); + let db = Arc::new(test_db.db); // Write some line protocols in Mutable buffer of the DB @@ -1956,17 +1985,21 @@ mod tests { assert_eq!(path_list, paths.clone()); // Get full string path - let root_path = format!("{:?}", root.path()); - let root_path = root_path.trim_matches('"'); - let path = format!("{}/{}", root_path, paths[0].display()); - println!("path: {}", path); + let path0 = match &paths[0] { + Path::File(file_path) => file_path.to_raw(), + other => panic!("expected `Path::File`, got: {:?}", other), + }; + + let mut path = root.path().to_path_buf(); + path.push(&path0); + println!("path: {}", path.display()); // Create External table of this parquet file to get its content in a human // readable form // Note: We do not care about escaping quotes here because it is just a test let sql = format!( "CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'", - path + path.display() ); let mut ctx = context::ExecutionContext::new(); @@ -2609,4 +2642,13 @@ mod tests { Err(super::Error::HardLimitReached {}) )); } + + #[tokio::test] + async fn write_goes_to_write_buffer_if_configured() { + let db = Arc::new(TestDb::builder().write_buffer(true).build().db); + + assert_eq!(db.write_buffer.as_ref().unwrap().lock().size(), 0); + write_lp(db.as_ref(), "cpu bar=1 10"); + assert_ne!(db.write_buffer.as_ref().unwrap().lock().size(), 0); + } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 87556bea27..a6ef4a5319 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -666,7 +666,7 @@ impl Server { db: &Db, sequenced_entry: OwnedSequencedEntry, ) -> Result<()> { - db.store_sequenced_entry(sequenced_entry) + db.store_sequenced_entry(Arc::new(sequenced_entry)) .map_err(|e| Error::UnknownDatabaseError { source: Box::new(e), })?; diff --git a/server/src/query_tests/utils.rs b/server/src/query_tests/utils.rs index b91588e408..94e764d204 100644 --- a/server/src/query_tests/utils.rs +++ b/server/src/query_tests/utils.rs @@ -1,14 +1,14 @@ use data_types::{ chunk::{ChunkStorage, ChunkSummary}, - database_rules::DatabaseRules, + database_rules::{DatabaseRules, WriteBufferRollover}, server_id::ServerId, DatabaseName, }; use object_store::{disk::File, ObjectStore}; use query::{exec::Executor, Database}; -use crate::{db::Db, JobRegistry}; -use std::{convert::TryFrom, sync::Arc}; +use crate::{buffer::Buffer, db::Db, JobRegistry}; +use std::{borrow::Cow, convert::TryFrom, sync::Arc}; use tempfile::TempDir; // A wrapper around a Db and a metrics registry allowing for isolated testing @@ -19,49 +19,99 @@ pub struct TestDb { pub metric_registry: metrics::TestMetricRegistry, } -/// Used for testing: create a Database with a local store -pub fn make_db() -> TestDb { - let server_id = ServerId::try_from(1).unwrap(); - // TODO: When we support parquet file in memory, we will either turn this test back to memory - // or have both tests: local disk and memory - //let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); - // - // Create an object store with a specified location in a local disk - let root = TempDir::new().unwrap(); - let object_store = Arc::new(ObjectStore::new_file(File::new(root.path()))); - let exec = Arc::new(Executor::new(1)); - let metrics_registry = Arc::new(metrics::MetricRegistry::new()); - - TestDb { - metric_registry: metrics::TestMetricRegistry::new(Arc::clone(&metrics_registry)), - db: Db::new( - DatabaseRules::new(DatabaseName::new("placeholder").unwrap()), - server_id, - object_store, - exec, - None, // write buffer - Arc::new(JobRegistry::new()), - metrics_registry, - ), +impl TestDb { + pub fn builder() -> TestDbBuilder { + TestDbBuilder::new() } } -/// Used for testing: create a Database with a local store and a specified name -pub fn make_database(server_id: ServerId, object_store: Arc, db_name: &str) -> TestDb { - let exec = Arc::new(Executor::new(1)); - let metrics_registry = Arc::new(metrics::MetricRegistry::new()); - TestDb { - metric_registry: metrics::TestMetricRegistry::new(Arc::clone(&metrics_registry)), - db: Db::new( - DatabaseRules::new(DatabaseName::new(db_name.to_string()).unwrap()), - server_id, - object_store, - exec, - None, // write buffer - Arc::new(JobRegistry::new()), - metrics_registry, - ), +#[derive(Debug, Default)] +pub struct TestDbBuilder { + server_id: Option, + object_store: Option>, + db_name: Option>, + write_buffer: bool, +} + +impl TestDbBuilder { + pub fn new() -> Self { + Self::default() } + + pub fn build(self) -> TestDb { + let server_id = self + .server_id + .unwrap_or_else(|| ServerId::try_from(1).unwrap()); + let db_name = self + .db_name + .unwrap_or_else(|| DatabaseName::new("placeholder").unwrap()); + + // TODO: When we support parquet file in memory, we will either turn this test back to + // memory or have both tests: local disk and memory + //let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + // + // Unless otherwise specified, create an object store with a specified location in a local + // disk + let object_store = self.object_store.unwrap_or_else(|| { + let root = TempDir::new().unwrap(); + Arc::new(ObjectStore::new_file(File::new(root.path()))) + }); + + let exec = Arc::new(Executor::new(1)); + let metrics_registry = Arc::new(metrics::MetricRegistry::new()); + + let write_buffer = if self.write_buffer { + let max = 1 << 32; + let segment = 1 << 16; + Some(Buffer::new( + max, + segment, + WriteBufferRollover::ReturnError, + false, + server_id, + )) + } else { + None + }; + + TestDb { + metric_registry: metrics::TestMetricRegistry::new(Arc::clone(&metrics_registry)), + db: Db::new( + DatabaseRules::new(db_name), + server_id, + object_store, + exec, + write_buffer, + Arc::new(JobRegistry::new()), + metrics_registry, + ), + } + } + + pub fn server_id(mut self, server_id: ServerId) -> Self { + self.server_id = Some(server_id); + self + } + + pub fn object_store(mut self, object_store: Arc) -> Self { + self.object_store = Some(object_store); + self + } + + pub fn db_name>>(mut self, db_name: T) -> Self { + self.db_name = Some(DatabaseName::new(db_name).unwrap()); + self + } + + pub fn write_buffer(mut self, enabled: bool) -> Self { + self.write_buffer = enabled; + self + } +} + +/// Used for testing: create a Database with a local store +pub fn make_db() -> TestDb { + TestDb::builder().build() } fn chunk_summary_iter(db: &Db) -> impl Iterator + '_ { diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index 2e635e2908..be0a111b66 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -267,20 +267,15 @@ where #[cfg(test)] mod tests { - use std::convert::TryFrom; - - use crate::{ - db::{Db, DbChunk}, - JobRegistry, - }; - use super::*; - use crate::db::test_helpers::write_lp; - use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName}; + use crate::{ + db::{test_helpers::write_lp, DbChunk}, + query_tests::utils::TestDb, + }; use futures::TryStreamExt; use mutable_buffer::chunk::Chunk as ChunkWB; use object_store::memory::InMemory; - use query::{exec::Executor, predicate::Predicate, Database}; + use query::{predicate::Predicate, Database}; use tracker::MemRegistry; #[tokio::test] @@ -291,7 +286,10 @@ cpu,host=A,region=west user=3.2,system=50.1 10 cpu,host=B,region=east user=10.0,system=74.1 1 "#; - let db = make_db(); + let db = TestDb::builder() + .object_store(Arc::new(ObjectStore::new_in_memory(InMemory::new()))) + .build() + .db; write_lp(&db, &lp); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); @@ -388,22 +386,4 @@ cpu,host=B,region=east user=10.0,system=74.1 1 snapshot.mark_table_finished(2); assert!(snapshot.finished()); } - - /// Create a Database with a local store - pub fn make_db() -> Db { - let metrics_registry = Arc::new(metrics::MetricRegistry::new()); - let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); - let server_id = ServerId::try_from(1).unwrap(); - let exec = Arc::new(Executor::new(1)); - - Db::new( - DatabaseRules::new(DatabaseName::new("placeholder").unwrap()), - server_id, - object_store, - exec, - None, // write buffer - Arc::new(JobRegistry::new()), - Arc::clone(&metrics_registry), - ) - } }