From d427fed9dcd940206830f7f3bfb7bf23269aff6a Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Wed, 14 Jul 2021 13:54:18 +0200 Subject: [PATCH 1/4] fix: Remove bad max.request.size config param --- server/src/write_buffer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs index e68892f5ad..011ce08244 100644 --- a/server/src/write_buffer.rs +++ b/server/src/write_buffer.rs @@ -126,7 +126,6 @@ impl KafkaBufferProducer { let mut cfg = ClientConfig::new(); cfg.set("bootstrap.servers", &conn); cfg.set("message.timeout.ms", "5000"); - cfg.set("max.request.size", "10000000"); let producer: FutureProducer = cfg.create()?; From 9cb9ae08741a65a2a8ce0e57baf4e19697987dd4 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 14 Jul 2021 11:39:49 +0200 Subject: [PATCH 2/4] chore: move write buffer into its own crate --- Cargo.lock | 13 +- Cargo.toml | 1 + server/Cargo.toml | 2 +- server/src/config.rs | 2 +- server/src/db.rs | 9 +- server/src/init.rs | 2 +- server/src/lib.rs | 16 +- server/src/utils.rs | 2 +- write_buffer/Cargo.toml | 11 ++ write_buffer/src/config.rs | 44 ++++++ write_buffer/src/core.rs | 28 ++++ .../src/kafka.rs | 141 +----------------- write_buffer/src/lib.rs | 14 ++ write_buffer/src/mock.rs | 78 ++++++++++ 14 files changed, 207 insertions(+), 156 deletions(-) create mode 100644 write_buffer/Cargo.toml create mode 100644 write_buffer/src/config.rs create mode 100644 write_buffer/src/core.rs rename server/src/write_buffer.rs => write_buffer/src/kafka.rs (51%) create mode 100644 write_buffer/src/lib.rs create mode 100644 write_buffer/src/mock.rs diff --git a/Cargo.lock b/Cargo.lock index aa0a5b3cb7..5f8042202d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3803,7 +3803,6 @@ dependencies = [ "query", "rand 0.8.4", "rand_distr", - "rdkafka", "read_buffer", "serde", "serde_json", @@ -3816,6 +3815,7 @@ dependencies = [ "tokio-util", "tracker", "uuid", + "write_buffer", ] [[package]] @@ -4964,6 +4964,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "write_buffer" +version = "0.1.0" +dependencies = [ + "async-trait", + "data_types", + "entry", + "futures", + "rdkafka", +] + [[package]] name = "wyz" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 1ade40441c..2f49612710 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ members = [ "trogging", "grpc-router", "grpc-router/grpc-router-test-gen", + "write_buffer", ] [profile.release] diff --git a/server/Cargo.toml b/server/Cargo.toml index 3c303c8a5a..7bf4077e8b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -39,7 +39,6 @@ persistence_windows = { path = "../persistence_windows" } query = { path = "../query" } rand = "0.8.3" rand_distr = "0.4.0" -rdkafka = "0.26.0" read_buffer = { path = "../read_buffer" } serde = "1.0" serde_json = "1.0" @@ -51,6 +50,7 @@ tokio = { version = "1.0", features = ["macros", "time"] } tokio-util = { version = "0.6.3" } tracker = { path = "../tracker" } uuid = { version = "0.8", features = ["serde", "v4"] } +write_buffer = { path = "../write_buffer" } [dev-dependencies] # In alphabetical order arrow_util = { path = "../arrow_util" } diff --git a/server/src/config.rs b/server/src/config.rs index 1674955a37..3fb35941ae 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -11,11 +11,11 @@ use metrics::MetricRegistry; use object_store::{path::ObjectStorePath, ObjectStore}; use parquet_file::catalog::PreservedCatalog; use query::exec::Executor; +use write_buffer::config::WriteBufferConfig; /// This module contains code for managing the configuration of the server. use crate::{ db::{catalog::Catalog, DatabaseToCommit, Db}, - write_buffer::WriteBufferConfig, Error, JobRegistry, Result, }; use observability_deps::tracing::{self, error, info, warn, Instrument}; diff --git a/server/src/db.rs b/server/src/db.rs index a01233564f..8d114be11c 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -11,7 +11,6 @@ use crate::{ catalog::{chunk::CatalogChunk, partition::Partition, Catalog, TableNameFilter}, lifecycle::{LockableCatalogChunk, LockableCatalogPartition}, }, - write_buffer::{WriteBufferConfig, WriteBufferError}, JobRegistry, }; use ::lifecycle::{LockableChunk, LockablePartition}; @@ -49,6 +48,8 @@ use std::{ }, time::{Duration, Instant}, }; +use write_buffer::config::WriteBufferConfig; +use write_buffer::core::WriteBufferError; pub mod access; pub mod catalog; @@ -954,9 +955,6 @@ mod tests { test_helpers::{try_write_lp, write_lp}, }, utils::{make_db, TestDb}, - write_buffer::test_helpers::{ - MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, - }, }; use ::test_helpers::assert_contains; use arrow::record_batch::RecordBatch; @@ -991,6 +989,9 @@ mod tests { time::{Duration, Instant}, }; use tokio_util::sync::CancellationToken; + use write_buffer::mock::{ + MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, + }; type TestError = Box; type Result = std::result::Result; diff --git a/server/src/init.rs b/server/src/init.rs index 03da187764..a46d7b008d 100644 --- a/server/src/init.rs +++ b/server/src/init.rs @@ -25,11 +25,11 @@ use std::{ }, }; use tokio::sync::Semaphore; +use write_buffer::config::WriteBufferConfig; use crate::{ config::{object_store_path_for_database_config, Config, DatabaseHandle, DB_RULES_FILE_NAME}, db::load::load_or_create_preserved_catalog, - write_buffer::WriteBufferConfig, DatabaseError, }; diff --git a/server/src/lib.rs b/server/src/lib.rs index 6ee36d1110..c95bca0ba9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -95,6 +95,7 @@ use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry}; use object_store::{ObjectStore, ObjectStoreApi}; use query::{exec::Executor, DatabaseStore}; use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; +use write_buffer::config::WriteBufferConfig; pub use crate::config::RemoteTemplate; use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString}; @@ -109,7 +110,6 @@ use std::collections::HashMap; mod config; pub mod db; mod init; -mod write_buffer; /// Utility modules used by benchmarks and tests pub mod utils; @@ -556,11 +556,9 @@ where .context(CannotCreatePreservedCatalog)?; let write_buffer = - write_buffer::WriteBufferConfig::new(server_id, &rules).map_err(|e| { - Error::CreatingWriteBuffer { - config: rules.write_buffer_connection.clone(), - source: e, - } + WriteBufferConfig::new(server_id, &rules).map_err(|e| Error::CreatingWriteBuffer { + config: rules.write_buffer_connection.clone(), + source: e, })?; info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config"); db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?; @@ -1150,10 +1148,7 @@ impl RemoteServer for RemoteServerImpl { #[cfg(test)] mod tests { use super::*; - use crate::{ - utils::TestDb, - write_buffer::{test_helpers::MockBufferForWritingThatAlwaysErrors, WriteBufferConfig}, - }; + use crate::utils::TestDb; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; use async_trait::async_trait; @@ -1182,6 +1177,7 @@ mod tests { use tempfile::TempDir; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; + use write_buffer::mock::MockBufferForWritingThatAlwaysErrors; const ARBITRARY_DEFAULT_TIME: i64 = 456; diff --git a/server/src/utils.rs b/server/src/utils.rs index 2e210a1dd3..84aa53c56f 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -8,10 +8,10 @@ use data_types::{ }; use object_store::ObjectStore; use query::{exec::Executor, QueryDatabase}; +use write_buffer::config::WriteBufferConfig; use crate::{ db::{load::load_or_create_preserved_catalog, DatabaseToCommit, Db}, - write_buffer::WriteBufferConfig, JobRegistry, }; diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml new file mode 100644 index 0000000000..ad5c96969e --- /dev/null +++ b/write_buffer/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "write_buffer" +version = "0.1.0" +edition = "2018" + +[dependencies] +async-trait = "0.1" +data_types = { path = "../data_types" } +entry = { path = "../entry" } +futures = "0.3" +rdkafka = "0.26.0" diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs new file mode 100644 index 0000000000..32fa5656fc --- /dev/null +++ b/write_buffer/src/config.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use data_types::{ + database_rules::{DatabaseRules, WriteBufferConnection}, + server_id::ServerId, +}; + +use crate::{ + core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}, + kafka::{KafkaBufferConsumer, KafkaBufferProducer}, +}; + +#[derive(Debug)] +pub enum WriteBufferConfig { + Writing(Arc), + Reading(Arc), +} + +impl WriteBufferConfig { + pub fn new( + server_id: ServerId, + rules: &DatabaseRules, + ) -> Result, WriteBufferError> { + let name = rules.db_name(); + + // Right now, the Kafka producer and consumers ar the only production implementations of the + // `WriteBufferWriting` and `WriteBufferReading` traits. If/when there are other kinds of + // write buffers, additional configuration will be needed to determine what kind of write + // buffer to use here. + match rules.write_buffer_connection.as_ref() { + Some(WriteBufferConnection::Writing(conn)) => { + let kafka_buffer = KafkaBufferProducer::new(conn, name)?; + + Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _))) + } + Some(WriteBufferConnection::Reading(conn)) => { + let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name)?; + + Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _))) + } + None => Ok(None), + } + } +} diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs new file mode 100644 index 0000000000..7d0e650a12 --- /dev/null +++ b/write_buffer/src/core.rs @@ -0,0 +1,28 @@ +use async_trait::async_trait; +use entry::{Entry, Sequence, SequencedEntry}; +use futures::stream::BoxStream; + +/// Generic boxed error type that is used in this crate. +/// +/// The dynamic boxing makes it easier to deal with error from different implementations. +pub type WriteBufferError = Box; + +/// Writing to a Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading +/// entries from the Write Buffer at a later time. +#[async_trait] +pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static { + /// Send an `Entry` to the write buffer and return information that can be used to restore + /// entries at a later time. + async fn store_entry(&self, entry: &Entry) -> Result; +} + +/// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using +/// `Db::stream_in_sequenced_entries`. +pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static { + fn stream<'life0, 'async_trait>( + &'life0 self, + ) -> BoxStream<'async_trait, Result> + where + 'life0: 'async_trait, + Self: 'async_trait; +} diff --git a/server/src/write_buffer.rs b/write_buffer/src/kafka.rs similarity index 51% rename from server/src/write_buffer.rs rename to write_buffer/src/kafka.rs index 011ce08244..f01729a500 100644 --- a/server/src/write_buffer.rs +++ b/write_buffer/src/kafka.rs @@ -1,8 +1,7 @@ +use std::convert::{TryFrom, TryInto}; + use async_trait::async_trait; -use data_types::{ - database_rules::{DatabaseRules, WriteBufferConnection}, - server_id::ServerId, -}; +use data_types::server_id::ServerId; use entry::{Entry, Sequence, SequencedEntry}; use futures::{stream::BoxStream, StreamExt}; use rdkafka::{ @@ -11,65 +10,8 @@ use rdkafka::{ producer::{FutureProducer, FutureRecord}, ClientConfig, Message, }; -use std::{ - convert::{TryFrom, TryInto}, - sync::Arc, -}; -pub type WriteBufferError = Box; - -#[derive(Debug)] -pub enum WriteBufferConfig { - Writing(Arc), - Reading(Arc), -} - -impl WriteBufferConfig { - pub fn new( - server_id: ServerId, - rules: &DatabaseRules, - ) -> Result, WriteBufferError> { - let name = rules.db_name(); - - // Right now, the Kafka producer and consumers ar the only production implementations of the - // `WriteBufferWriting` and `WriteBufferReading` traits. If/when there are other kinds of - // write buffers, additional configuration will be needed to determine what kind of write - // buffer to use here. - match rules.write_buffer_connection.as_ref() { - Some(WriteBufferConnection::Writing(conn)) => { - let kafka_buffer = KafkaBufferProducer::new(conn, name)?; - - Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _))) - } - Some(WriteBufferConnection::Reading(conn)) => { - let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name)?; - - Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _))) - } - None => Ok(None), - } - } -} - -/// Writing to a Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading -/// entries from the Write Buffer at a later time. -#[async_trait] -pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static { - /// Send an `Entry` to the write buffer and return information that can be used to restore - /// entries at a later time. - async fn store_entry(&self, entry: &Entry) -> Result; -} - -/// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using -/// `Db::stream_in_sequenced_entries`. -pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static { - fn stream<'life0, 'async_trait>( - &'life0 self, - ) -> BoxStream<'async_trait, Result> - where - 'life0: 'async_trait, - Self: 'async_trait; -} +use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; pub struct KafkaBufferProducer { conn: String, @@ -210,78 +152,3 @@ impl KafkaBufferConsumer { }) } } - -#[cfg(test)] -pub mod test_helpers { - use super::*; - use futures::stream::{self, StreamExt}; - use std::sync::{Arc, Mutex}; - - #[derive(Debug, Default)] - pub struct MockBufferForWriting { - pub entries: Arc>>, - } - - #[async_trait] - impl WriteBufferWriting for MockBufferForWriting { - async fn store_entry(&self, entry: &Entry) -> Result { - let mut entries = self.entries.lock().unwrap(); - let offset = entries.len() as u64; - entries.push(entry.clone()); - - Ok(Sequence { - id: 0, - number: offset, - }) - } - } - - #[derive(Debug, Default)] - pub struct MockBufferForWritingThatAlwaysErrors; - - #[async_trait] - impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { - async fn store_entry(&self, _entry: &Entry) -> Result { - Err(String::from( - "Something bad happened on the way to writing an entry in the write buffer", - ) - .into()) - } - } - - type MoveableEntries = Arc>>>; - pub struct MockBufferForReading { - entries: MoveableEntries, - } - - impl std::fmt::Debug for MockBufferForReading { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MockBufferForReading").finish() - } - } - - impl MockBufferForReading { - pub fn new(entries: Vec>) -> Self { - Self { - entries: Arc::new(Mutex::new(entries)), - } - } - } - - impl WriteBufferReading for MockBufferForReading { - fn stream<'life0, 'async_trait>( - &'life0 self, - ) -> BoxStream<'async_trait, Result> - where - 'life0: 'async_trait, - Self: 'async_trait, - { - // move the entries out of `self` to move them into the stream - let entries: Vec<_> = self.entries.lock().unwrap().drain(..).collect(); - - stream::iter(entries.into_iter()) - .chain(stream::pending()) - .boxed() - } - } -} diff --git a/write_buffer/src/lib.rs b/write_buffer/src/lib.rs new file mode 100644 index 0000000000..9e9472940a --- /dev/null +++ b/write_buffer/src/lib.rs @@ -0,0 +1,14 @@ +#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)] +#![warn( + missing_copy_implementations, + missing_debug_implementations, + clippy::explicit_iter_loop, + clippy::future_not_send, + clippy::use_self, + clippy::clone_on_ref_ptr +)] + +pub mod config; +pub mod core; +pub mod kafka; +pub mod mock; diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs new file mode 100644 index 0000000000..046ca2334a --- /dev/null +++ b/write_buffer/src/mock.rs @@ -0,0 +1,78 @@ +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use entry::{Entry, Sequence, SequencedEntry}; +use futures::{ + stream::{self, BoxStream}, + StreamExt, +}; + +use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; + +#[derive(Debug, Default)] +pub struct MockBufferForWriting { + pub entries: Arc>>, +} + +#[async_trait] +impl WriteBufferWriting for MockBufferForWriting { + async fn store_entry(&self, entry: &Entry) -> Result { + let mut entries = self.entries.lock().unwrap(); + let offset = entries.len() as u64; + entries.push(entry.clone()); + + Ok(Sequence { + id: 0, + number: offset, + }) + } +} + +#[derive(Debug, Default, Clone, Copy)] +pub struct MockBufferForWritingThatAlwaysErrors; + +#[async_trait] +impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { + async fn store_entry(&self, _entry: &Entry) -> Result { + Err(String::from( + "Something bad happened on the way to writing an entry in the write buffer", + ) + .into()) + } +} + +type MoveableEntries = Arc>>>; +pub struct MockBufferForReading { + entries: MoveableEntries, +} + +impl std::fmt::Debug for MockBufferForReading { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MockBufferForReading").finish() + } +} + +impl MockBufferForReading { + pub fn new(entries: Vec>) -> Self { + Self { + entries: Arc::new(Mutex::new(entries)), + } + } +} + +impl WriteBufferReading for MockBufferForReading { + fn stream<'life0, 'async_trait>( + &'life0 self, + ) -> BoxStream<'async_trait, Result> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + // move the entries out of `self` to move them into the stream + let entries: Vec<_> = self.entries.lock().unwrap().drain(..).collect(); + + stream::iter(entries.into_iter()) + .chain(stream::pending()) + .boxed() + } +} From 886a87f0119b6e5035c55a06249fd1e07b8f7358 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 14 Jul 2021 11:48:02 +0200 Subject: [PATCH 3/4] chore: enable linting for `persistence_windows` crate --- persistence_windows/src/checkpoint.rs | 4 ++-- persistence_windows/src/lib.rs | 10 ++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/persistence_windows/src/checkpoint.rs b/persistence_windows/src/checkpoint.rs index ce843df72b..1f8e19e1d9 100644 --- a/persistence_windows/src/checkpoint.rs +++ b/persistence_windows/src/checkpoint.rs @@ -509,8 +509,8 @@ impl ReplayPlanner { return Err(Error::PartitionCheckpointMinimumBeforeDatabase { partition_checkpoint_sequence_number: min_max.min(), database_checkpoint_sequence_number: database_wide_min_max.min(), - table_name: table_name.clone(), - partition_key: partition_key.clone(), + table_name: Arc::clone(table_name), + partition_key: Arc::clone(partition_key), }); } } diff --git a/persistence_windows/src/lib.rs b/persistence_windows/src/lib.rs index 60c50bb0a7..3e65756e4f 100644 --- a/persistence_windows/src/lib.rs +++ b/persistence_windows/src/lib.rs @@ -1,3 +1,13 @@ +#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)] +#![warn( + missing_copy_implementations, + missing_debug_implementations, + clippy::explicit_iter_loop, + clippy::future_not_send, + clippy::use_self, + clippy::clone_on_ref_ptr +)] + pub mod checkpoint; pub mod min_max_sequence; pub mod persistence_windows; From 8d23dd6d6d5797809a1184ea378e4500c038c6ad Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Wed, 14 Jul 2021 14:46:49 +0200 Subject: [PATCH 4/4] fix: Set kafka max message size in client --- write_buffer/src/kafka.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index f01729a500..2f26436f66 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -68,6 +68,7 @@ impl KafkaBufferProducer { let mut cfg = ClientConfig::new(); cfg.set("bootstrap.servers", &conn); cfg.set("message.timeout.ms", "5000"); + cfg.set("message.max.bytes", "10000000"); let producer: FutureProducer = cfg.create()?;