diff --git a/Cargo.lock b/Cargo.lock index ed81e039f9..a83654c9bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1010,6 +1010,7 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" name = "entry" version = "0.1.0" dependencies = [ + "bytes", "chrono", "data_types", "flatbuffers", @@ -1801,7 +1802,6 @@ version = "0.1.0" dependencies = [ "futures", "parking_lot", - "snafu", "time 0.1.0", "tokio", ] diff --git a/entry/Cargo.toml b/entry/Cargo.toml index c4c5178c1d..459f6a59ae 100644 --- a/entry/Cargo.toml +++ b/entry/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" description = "The entry format used by the write buffer" [dependencies] +bytes = "1.0" chrono = "0.4" data_types = { path = "../data_types" } # See docs/regenerating_flatbuffers.md about updating generated code when updating the diff --git a/entry/src/entry.rs b/entry/src/entry.rs index f2cbc1a315..5c845f7819 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -1,6 +1,7 @@ //! This module contains helper code for building `Entry` from line protocol and the //! `DatabaseRules` configuration. +use bytes::Bytes; use std::{collections::BTreeMap, convert::TryFrom, fmt::Formatter}; use chrono::{DateTime, TimeZone, Utc}; @@ -738,7 +739,7 @@ pub struct ShardedEntry { /// iterating through the partitioned writes. #[self_referencing] pub struct Entry { - data: Vec, + data: Bytes, #[borrows(data)] #[covariant] fb: entry_fb::Entry<'this>, @@ -799,6 +800,14 @@ impl TryFrom> for Entry { type Error = flatbuffers::InvalidFlatbuffer; fn try_from(data: Vec) -> Result { + Self::try_from(Bytes::from(data)) + } +} + +impl TryFrom for Entry { + type Error = flatbuffers::InvalidFlatbuffer; + + fn try_from(data: Bytes) -> Result { EntryTryBuilder { data, fb_builder: |data| flatbuffers::root::>(data), @@ -807,7 +816,7 @@ impl TryFrom> for Entry { } } -impl From for Vec { +impl From for Bytes { fn from(entry: Entry) -> Self { entry.into_heads().data } @@ -815,8 +824,8 @@ impl From for Vec { impl Clone for Entry { fn clone(&self) -> Self { - Self::try_from(self.data().to_vec()) - .expect("flatbuffer was valid, should not be broken now") + let bytes: &Bytes = self.borrow_data(); + Self::try_from(bytes.clone()).expect("flatbuffer was valid, should not be broken now") } } diff --git a/generated_types/build.rs b/generated_types/build.rs index 78450d5406..390e9f73b2 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -78,6 +78,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { ".influxdata.iox.management.v1.PersistChunks.chunks", ".influxdata.iox.management.v1.WriteChunk.chunk_id", ".influxdata.iox.management.v1.UnloadPartitionChunkRequest.chunk_id", + ".influxdata.iox.write.v1.WriteEntryRequest.entry", ]) .btree_map(&[ ".influxdata.iox.catalog.v1.DatabaseCheckpoint.sequencer_numbers", diff --git a/influxdb_iox_client/src/client/write.rs b/influxdb_iox_client/src/client/write.rs index 660fbc76be..7c7f0ac307 100644 --- a/influxdb_iox_client/src/client/write.rs +++ b/influxdb_iox_client/src/client/write.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use thiserror::Error; use generated_types::influxdata::iox::write::v1 as write; @@ -88,7 +89,7 @@ impl Client { pub async fn write_entry( &mut self, db_name: impl Into + Send, - entry: impl Into> + Send, + entry: impl Into + Send, ) -> Result<(), WriteError> { let db_name = db_name.into(); let entry = entry.into(); diff --git a/internal_types/Cargo.toml b/internal_types/Cargo.toml index 89a0e3865e..9d44a6c55e 100644 --- a/internal_types/Cargo.toml +++ b/internal_types/Cargo.toml @@ -8,7 +8,6 @@ readme = "README.md" [dependencies] parking_lot = "0.11" -snafu = "0.6" time = { path = "../time" } tokio = { version = "1.11", features = ["sync"] } diff --git a/server/src/application.rs b/server/src/application.rs index 1268643654..79e74eb203 100644 --- a/server/src/application.rs +++ b/server/src/application.rs @@ -48,17 +48,6 @@ impl ApplicationState { } } - /// Overrides the write_buffer_factory - pub fn with_write_buffer_factory( - self, - write_buffer_factory: Arc, - ) -> Self { - Self { - write_buffer_factory, - ..self - } - } - pub fn object_store(&self) -> &Arc { &self.object_store } diff --git a/server/src/database.rs b/server/src/database.rs index ae0269c196..dfa88c281e 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1226,7 +1226,7 @@ mod tests { }; use time::Time; use uuid::Uuid; - use write_buffer::{config::WriteBufferConfigFactory, mock::MockBufferSharedState}; + use write_buffer::mock::MockBufferSharedState; #[tokio::test] async fn database_shutdown_waits_for_jobs() { @@ -1429,12 +1429,13 @@ mod tests { )); // setup application - let application = ApplicationState::new(Arc::new(ObjectStore::new_in_memory()), None); - - let mut factory = WriteBufferConfigFactory::new(Arc::clone(application.time_provider())); - factory.register_mock("my_mock".to_string(), state.clone()); - - let application = Arc::new(application.with_write_buffer_factory(Arc::new(factory))); + let application = Arc::new(ApplicationState::new( + Arc::new(ObjectStore::new_in_memory()), + None, + )); + application + .write_buffer_factory() + .register_mock("my_mock".to_string(), state.clone()); let server_id = ServerId::try_from(1).unwrap(); diff --git a/server/src/lib.rs b/server/src/lib.rs index 5413dd5aae..c8d83e0c84 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1225,10 +1225,48 @@ where } } +pub mod test_utils { + use super::*; + use crate::connection::test_helpers::TestConnectionManager; + use object_store::ObjectStore; + + /// Create a new [`ApplicationState`] with an in-memory object store + pub fn make_application() -> Arc { + Arc::new(ApplicationState::new( + Arc::new(ObjectStore::new_in_memory()), + None, + )) + } + + /// Creates a new server with the provided [`ApplicationState`] + pub fn make_server(application: Arc) -> Arc> { + Arc::new(Server::new( + TestConnectionManager::new(), + application, + Default::default(), + )) + } + + /// Creates a new server with the provided [`ApplicationState`] + /// + /// Sets the `server_id` provided and waits for it to initialize + pub async fn make_initialized_server( + server_id: ServerId, + application: Arc, + ) -> Arc> { + let server = make_server(application); + server.set_id(server_id).unwrap(); + server.wait_for_init().await.unwrap(); + server + } +} + #[cfg(test)] mod tests { - use super::*; - use ::write_buffer::config::WriteBufferConfigFactory; + use super::{ + test_utils::{make_application, make_server}, + *, + }; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; use bytes::Bytes; @@ -1267,21 +1305,6 @@ mod tests { const ARBITRARY_DEFAULT_TIME: i64 = 456; - fn make_application() -> Arc { - Arc::new(ApplicationState::new( - Arc::new(ObjectStore::new_in_memory()), - None, - )) - } - - fn make_server(application: Arc) -> Arc> { - Arc::new(Server::new( - TestConnectionManager::new(), - application, - Default::default(), - )) - } - #[tokio::test] async fn server_api_calls_return_error_with_no_id_set() { let server = make_server(make_application()); @@ -2412,12 +2435,11 @@ mod tests { #[tokio::test] async fn write_buffer_errors_propagate() { - let application = ApplicationState::new(Arc::new(ObjectStore::new_in_memory()), None); + let application = make_application(); - let mut factory = WriteBufferConfigFactory::new(Arc::clone(application.time_provider())); - factory.register_always_fail_mock("my_mock".to_string()); - - let application = Arc::new(application.with_write_buffer_factory(Arc::new(factory))); + application + .write_buffer_factory() + .register_always_fail_mock("my_mock".to_string()); let server = make_server(application); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs index d0c2e691c0..c00cf172d5 100644 --- a/server/src/write_buffer.rs +++ b/server/src/write_buffer.rs @@ -192,18 +192,14 @@ async fn stream_in_sequenced_entries<'a>( mod tests { use std::collections::BTreeMap; use std::convert::TryFrom; - use std::num::{NonZeroU32, NonZeroUsize}; + use std::num::NonZeroU32; - use ::test_helpers::assert_contains; use arrow_util::assert_batches_eq; - use data_types::database_rules::{PartitionTemplate, TemplatePart}; use data_types::sequence::Sequence; use entry::test_helpers::lp_to_entry; use persistence_windows::min_max_sequence::MinMaxSequence; use query::exec::ExecutionContextProvider; use query::frontend::sql::SqlQueryPlanner; - use query::QueryDatabase; - use test_helpers::tracing::TracingCapture; use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; use crate::db::test_helpers::run_query; @@ -435,87 +431,6 @@ mod tests { assert_batches_eq!(expected, &batches); } - #[tokio::test] - async fn write_buffer_reads_wait_for_compaction() { - let tracing_capture = TracingCapture::new(); - - // setup write buffer - // these numbers are handtuned to trigger hard buffer limits w/o making the test too big - let n_entries = 50u64; - let write_buffer_state = - MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); - for sequence_number in 0..n_entries { - let lp = format!( - "table_1,tag_partition_by=a foo=\"hello\",bar=1 {}", - sequence_number / 2 - ); - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(0, sequence_number), - Time::from_timestamp_nanos(0), - lp_to_entry(&lp), - )); - } - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(0, n_entries), - Time::from_timestamp_nanos(0), - lp_to_entry("table_2,partition_by=a foo=1 0"), - )); - - // create DB - let partition_template = PartitionTemplate { - parts: vec![TemplatePart::Column("tag_partition_by".to_string())], - }; - let test_db = TestDb::builder() - .lifecycle_rules(data_types::database_rules::LifecycleRules { - buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()), - mub_row_threshold: NonZeroUsize::new(10).unwrap(), - ..Default::default() - }) - .partition_template(partition_template) - .build() - .await; - let db = test_db.db; - - // start background task loop - let shutdown: CancellationToken = Default::default(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(&db); - let join_handle = - tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); - - let consumer = WriteBufferConsumer::new( - Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()), - Arc::clone(&db), - test_db.metric_registry.as_ref(), - ); - - // after a while the table should exist - let t_0 = Instant::now(); - loop { - if db.table_schema("table_2").is_some() { - break; - } - - assert!(t_0.elapsed() < Duration::from_secs(10)); - tokio::time::sleep(Duration::from_millis(100)).await; - } - - // do: stop background task loop - shutdown.cancel(); - join_handle.await.unwrap(); - - consumer.shutdown(); - consumer.join().await.unwrap(); - - // no rows should be dropped - let batches = run_query(db, "select sum(bar) as n from table_1").await; - let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"]; - assert_batches_eq!(expected, &batches); - - // check that hard buffer limit was actually hit (otherwise this test is pointless/outdated) - assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up"); - } - #[tokio::test] async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() { let write_buffer_state = diff --git a/server/tests/delete.rs b/server/tests/delete.rs index 767d2f296f..8ea5d3cc7c 100644 --- a/server/tests/delete.rs +++ b/server/tests/delete.rs @@ -14,42 +14,26 @@ use data_types::{ DatabaseName, }; use iox_object_store::IoxObjectStore; -use object_store::ObjectStore; use predicate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate}; use query::{QueryChunk, QueryChunkMeta, QueryDatabase}; use server::{ - connection::test_helpers::TestConnectionManager, db::test_helpers::{run_query, write_lp}, rules::ProvidedDatabaseRules, - ApplicationState, Db, Server, + test_utils::{make_application, make_initialized_server}, + Db, }; use test_helpers::maybe_start_logging; -async fn start_server( - server_id: ServerId, - application: Arc, -) -> Arc> { - let server = Arc::new(Server::new( - TestConnectionManager::new(), - application, - Default::default(), - )); - server.set_id(server_id).unwrap(); - server.wait_for_init().await.unwrap(); - server -} - #[tokio::test] async fn delete_predicate_preservation() { maybe_start_logging(); // ==================== setup ==================== - let object_store = Arc::new(ObjectStore::new_in_memory()); let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap(); - let application = Arc::new(ApplicationState::new(Arc::clone(&object_store), None)); - let server = start_server(server_id, Arc::clone(&application)).await; + let application = make_application(); + let server = make_initialized_server(server_id, Arc::clone(&application)).await; // Test that delete predicates are stored within the preserved catalog diff --git a/server/tests/write_buffer.rs b/server/tests/write_buffer.rs new file mode 100644 index 0000000000..0889ec39b9 --- /dev/null +++ b/server/tests/write_buffer.rs @@ -0,0 +1,112 @@ +use std::num::{NonZeroU32, NonZeroUsize}; +use std::time::{Duration, Instant}; + +use arrow_util::assert_batches_eq; +use data_types::database_rules::{ + DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart, WriteBufferConnection, + WriteBufferDirection, +}; +use data_types::{sequence::Sequence, server_id::ServerId, DatabaseName}; +use entry::{test_helpers::lp_to_entry, SequencedEntry}; +use query::QueryDatabase; +use server::{ + db::test_helpers::run_query, + rules::ProvidedDatabaseRules, + test_utils::{make_application, make_initialized_server}, +}; +use test_helpers::{assert_contains, tracing::TracingCapture}; +use time::Time; +use write_buffer::mock::MockBufferSharedState; + +#[tokio::test] +async fn write_buffer_reads_wait_for_compaction() { + let tracing_capture = TracingCapture::new(); + + // ==================== setup ==================== + let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); + let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap(); + + // Test that delete predicates are stored within the preserved catalog + + // setup write buffer + // these numbers are handtuned to trigger hard buffer limits w/o making the test too big + let n_entries = 50u64; + let write_buffer_state = + MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::new(1).unwrap()); + + for sequence_number in 0..n_entries { + let lp = format!( + "table_1,tag_partition_by=a foo=\"hello\",bar=1 {}", + sequence_number / 2 + ); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, sequence_number), + Time::from_timestamp_nanos(0), + lp_to_entry(&lp), + )); + } + + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, n_entries), + Time::from_timestamp_nanos(0), + lp_to_entry("table_2,partition_by=a foo=1 0"), + )); + + let application = make_application(); + application + .write_buffer_factory() + .register_mock("my_mock".to_string(), write_buffer_state); + + let server = make_initialized_server(server_id, application).await; + + // create DB + let rules = DatabaseRules { + partition_template: PartitionTemplate { + parts: vec![TemplatePart::Column("tag_partition_by".to_string())], + }, + lifecycle_rules: LifecycleRules { + buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()), + mub_row_threshold: NonZeroUsize::new(10).unwrap(), + ..Default::default() + }, + write_buffer_connection: Some(WriteBufferConnection { + direction: WriteBufferDirection::Read, + type_: "mock".to_string(), + connection: "my_mock".to_string(), + ..Default::default() + }), + ..DatabaseRules::new(db_name.clone()) + }; + + let database = server + .create_database(ProvidedDatabaseRules::new_rules(rules.clone().into()).unwrap()) + .await + .unwrap(); + + let db = database.initialized_db().unwrap(); + + // after a while the table should exist + let t_0 = Instant::now(); + loop { + if db.table_schema("table_2").is_some() { + break; + } + + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // no rows should be dropped + let batches = run_query(db, "select sum(bar) as n from table_1").await; + let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"]; + assert_batches_eq!(expected, &batches); + + // check that hard buffer limit was actually hit (otherwise this test is pointless/outdated) + assert_contains!( + tracing_capture.to_string(), + "Hard limit reached while reading from write buffer, waiting for compaction to catch up" + ); + + server.shutdown(); + server.join().await.unwrap(); +} diff --git a/tests/end_to_end_cases/write_api.rs b/tests/end_to_end_cases/write_api.rs index d2db41306f..2ef481894d 100644 --- a/tests/end_to_end_cases/write_api.rs +++ b/tests/end_to_end_cases/write_api.rs @@ -113,7 +113,7 @@ async fn test_write_entry() { lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &partitioner(1)) .unwrap(); - let entry: Vec = sharded_entries.into_iter().next().unwrap().entry.into(); + let entry = sharded_entries.into_iter().next().unwrap().entry; write_client.write_entry(&db_name, entry).await.unwrap(); diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index cf7cd6c3a3..d05645a241 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -1,3 +1,4 @@ +use parking_lot::RwLock; use std::{ collections::{btree_map::Entry, BTreeMap}, sync::Arc, @@ -35,7 +36,7 @@ enum Mock { /// from [`WriteBufferConnection`]. #[derive(Debug)] pub struct WriteBufferConfigFactory { - mocks: BTreeMap, + mocks: RwLock>, time_provider: Arc, } @@ -52,7 +53,7 @@ impl WriteBufferConfigFactory { /// /// # Panics /// When mock with identical name is already registered. - pub fn register_mock(&mut self, name: String, state: MockBufferSharedState) { + pub fn register_mock(&self, name: String, state: MockBufferSharedState) { self.set_mock(name, Mock::Normal(state)); } @@ -60,12 +61,13 @@ impl WriteBufferConfigFactory { /// /// # Panics /// When mock with identical name is already registered. - pub fn register_always_fail_mock(&mut self, name: String) { + pub fn register_always_fail_mock(&self, name: String) { self.set_mock(name, Mock::AlwaysFailing); } - fn set_mock(&mut self, name: String, mock: Mock) { - match self.mocks.entry(name) { + fn set_mock(&self, name: String, mock: Mock) { + let mut mocks = self.mocks.write(); + match mocks.entry(name) { Entry::Vacant(v) => { v.insert(mock); } @@ -77,6 +79,7 @@ impl WriteBufferConfigFactory { fn get_mock(&self, name: &str) -> Result { self.mocks + .read() .get(name) .cloned() .ok_or_else::(|| format!("Unknown mock ID: {}", name).into()) @@ -237,7 +240,7 @@ mod tests { #[tokio::test] async fn test_writing_mock() { let time = Arc::new(time::SystemProvider::new()); - let mut factory = WriteBufferConfigFactory::new(time); + let factory = WriteBufferConfigFactory::new(time); let state = MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); @@ -277,7 +280,7 @@ mod tests { let trace_collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); let time = Arc::new(time::SystemProvider::new()); - let mut factory = WriteBufferConfigFactory::new(time); + let factory = WriteBufferConfigFactory::new(time); let state = MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); @@ -316,7 +319,7 @@ mod tests { #[tokio::test] async fn test_writing_mock_failing() { let time = Arc::new(time::SystemProvider::new()); - let mut factory = WriteBufferConfigFactory::new(time); + let factory = WriteBufferConfigFactory::new(time); let mock_name = "some_mock"; factory.register_always_fail_mock(mock_name.to_string()); @@ -354,7 +357,7 @@ mod tests { let trace_collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); let time = Arc::new(time::SystemProvider::new()); - let mut factory = WriteBufferConfigFactory::new(time); + let factory = WriteBufferConfigFactory::new(time); let mock_name = "some_mock"; factory.register_always_fail_mock(mock_name.to_string()); @@ -393,7 +396,7 @@ mod tests { #[should_panic(expected = "Mock with the name 'some_mock' already registered")] fn test_register_mock_twice_panics() { let time = Arc::new(time::SystemProvider::new()); - let mut factory = WriteBufferConfigFactory::new(time); + let factory = WriteBufferConfigFactory::new(time); let state = MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());