Merge branch 'main' into alamb/go_go_go_go

pull/24376/head
Edd Robinson 2021-07-14 13:56:35 +01:00 committed by GitHub
commit 0e5276ed20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 220 additions and 159 deletions

13
Cargo.lock generated
View File

@ -3786,7 +3786,6 @@ dependencies = [
"query", "query",
"rand 0.8.4", "rand 0.8.4",
"rand_distr", "rand_distr",
"rdkafka",
"read_buffer", "read_buffer",
"serde", "serde",
"serde_json", "serde_json",
@ -3799,6 +3798,7 @@ dependencies = [
"tokio-util", "tokio-util",
"tracker", "tracker",
"uuid", "uuid",
"write_buffer",
] ]
[[package]] [[package]]
@ -4947,6 +4947,17 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "write_buffer"
version = "0.1.0"
dependencies = [
"async-trait",
"data_types",
"entry",
"futures",
"rdkafka",
]
[[package]] [[package]]
name = "wyz" name = "wyz"
version = "0.2.0" version = "0.2.0"

View File

@ -39,6 +39,7 @@ members = [
"trogging", "trogging",
"grpc-router", "grpc-router",
"grpc-router/grpc-router-test-gen", "grpc-router/grpc-router-test-gen",
"write_buffer",
] ]
[profile.release] [profile.release]

View File

@ -509,8 +509,8 @@ impl ReplayPlanner {
return Err(Error::PartitionCheckpointMinimumBeforeDatabase { return Err(Error::PartitionCheckpointMinimumBeforeDatabase {
partition_checkpoint_sequence_number: min_max.min(), partition_checkpoint_sequence_number: min_max.min(),
database_checkpoint_sequence_number: database_wide_min_max.min(), database_checkpoint_sequence_number: database_wide_min_max.min(),
table_name: table_name.clone(), table_name: Arc::clone(table_name),
partition_key: partition_key.clone(), partition_key: Arc::clone(partition_key),
}); });
} }
} }

View File

@ -1,3 +1,13 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub mod checkpoint; pub mod checkpoint;
pub mod min_max_sequence; pub mod min_max_sequence;
pub mod persistence_windows; pub mod persistence_windows;

View File

@ -39,7 +39,6 @@ persistence_windows = { path = "../persistence_windows" }
query = { path = "../query" } query = { path = "../query" }
rand = "0.8.3" rand = "0.8.3"
rand_distr = "0.4.0" rand_distr = "0.4.0"
rdkafka = "0.26.0"
read_buffer = { path = "../read_buffer" } read_buffer = { path = "../read_buffer" }
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
@ -51,6 +50,7 @@ tokio = { version = "1.0", features = ["macros", "time"] }
tokio-util = { version = "0.6.3" } tokio-util = { version = "0.6.3" }
tracker = { path = "../tracker" } tracker = { path = "../tracker" }
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
write_buffer = { path = "../write_buffer" }
[dev-dependencies] # In alphabetical order [dev-dependencies] # In alphabetical order
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }

View File

@ -11,11 +11,11 @@ use metrics::MetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore}; use object_store::{path::ObjectStorePath, ObjectStore};
use parquet_file::catalog::PreservedCatalog; use parquet_file::catalog::PreservedCatalog;
use query::exec::Executor; use query::exec::Executor;
use write_buffer::config::WriteBufferConfig;
/// This module contains code for managing the configuration of the server. /// This module contains code for managing the configuration of the server.
use crate::{ use crate::{
db::{catalog::Catalog, DatabaseToCommit, Db}, db::{catalog::Catalog, DatabaseToCommit, Db},
write_buffer::WriteBufferConfig,
Error, JobRegistry, Result, Error, JobRegistry, Result,
}; };
use observability_deps::tracing::{self, error, info, warn, Instrument}; use observability_deps::tracing::{self, error, info, warn, Instrument};

View File

@ -11,7 +11,6 @@ use crate::{
catalog::{chunk::CatalogChunk, partition::Partition, Catalog, TableNameFilter}, catalog::{chunk::CatalogChunk, partition::Partition, Catalog, TableNameFilter},
lifecycle::{LockableCatalogChunk, LockableCatalogPartition}, lifecycle::{LockableCatalogChunk, LockableCatalogPartition},
}, },
write_buffer::{WriteBufferConfig, WriteBufferError},
JobRegistry, JobRegistry,
}; };
use ::lifecycle::{LockableChunk, LockablePartition}; use ::lifecycle::{LockableChunk, LockablePartition};
@ -49,6 +48,8 @@ use std::{
}, },
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use write_buffer::config::WriteBufferConfig;
use write_buffer::core::WriteBufferError;
pub mod access; pub mod access;
pub mod catalog; pub mod catalog;
@ -954,9 +955,6 @@ mod tests {
test_helpers::{try_write_lp, write_lp}, test_helpers::{try_write_lp, write_lp},
}, },
utils::{make_db, TestDb}, utils::{make_db, TestDb},
write_buffer::test_helpers::{
MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors,
},
}; };
use ::test_helpers::assert_contains; use ::test_helpers::assert_contains;
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
@ -991,6 +989,9 @@ mod tests {
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use write_buffer::mock::{
MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors,
};
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>; type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = TestError> = std::result::Result<T, E>; type Result<T, E = TestError> = std::result::Result<T, E>;

View File

@ -25,11 +25,11 @@ use std::{
}, },
}; };
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use write_buffer::config::WriteBufferConfig;
use crate::{ use crate::{
config::{object_store_path_for_database_config, Config, DatabaseHandle, DB_RULES_FILE_NAME}, config::{object_store_path_for_database_config, Config, DatabaseHandle, DB_RULES_FILE_NAME},
db::load::load_or_create_preserved_catalog, db::load::load_or_create_preserved_catalog,
write_buffer::WriteBufferConfig,
DatabaseError, DatabaseError,
}; };

View File

@ -95,6 +95,7 @@ use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry};
use object_store::{ObjectStore, ObjectStoreApi}; use object_store::{ObjectStore, ObjectStoreApi};
use query::{exec::Executor, DatabaseStore}; use query::{exec::Executor, DatabaseStore};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
use write_buffer::config::WriteBufferConfig;
pub use crate::config::RemoteTemplate; pub use crate::config::RemoteTemplate;
use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString}; use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString};
@ -109,7 +110,6 @@ use std::collections::HashMap;
mod config; mod config;
pub mod db; pub mod db;
mod init; mod init;
mod write_buffer;
/// Utility modules used by benchmarks and tests /// Utility modules used by benchmarks and tests
pub mod utils; pub mod utils;
@ -556,11 +556,9 @@ where
.context(CannotCreatePreservedCatalog)?; .context(CannotCreatePreservedCatalog)?;
let write_buffer = let write_buffer =
write_buffer::WriteBufferConfig::new(server_id, &rules).map_err(|e| { WriteBufferConfig::new(server_id, &rules).map_err(|e| Error::CreatingWriteBuffer {
Error::CreatingWriteBuffer { config: rules.write_buffer_connection.clone(),
config: rules.write_buffer_connection.clone(), source: e,
source: e,
}
})?; })?;
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config"); info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?; db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?;
@ -1150,10 +1148,7 @@ impl RemoteServer for RemoteServerImpl {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{ use crate::utils::TestDb;
utils::TestDb,
write_buffer::{test_helpers::MockBufferForWritingThatAlwaysErrors, WriteBufferConfig},
};
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use async_trait::async_trait; use async_trait::async_trait;
@ -1182,6 +1177,7 @@ mod tests {
use tempfile::TempDir; use tempfile::TempDir;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use write_buffer::mock::MockBufferForWritingThatAlwaysErrors;
const ARBITRARY_DEFAULT_TIME: i64 = 456; const ARBITRARY_DEFAULT_TIME: i64 = 456;

View File

@ -8,10 +8,10 @@ use data_types::{
}; };
use object_store::ObjectStore; use object_store::ObjectStore;
use query::{exec::Executor, QueryDatabase}; use query::{exec::Executor, QueryDatabase};
use write_buffer::config::WriteBufferConfig;
use crate::{ use crate::{
db::{load::load_or_create_preserved_catalog, DatabaseToCommit, Db}, db::{load::load_or_create_preserved_catalog, DatabaseToCommit, Db},
write_buffer::WriteBufferConfig,
JobRegistry, JobRegistry,
}; };

11
write_buffer/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "write_buffer"
version = "0.1.0"
edition = "2018"
[dependencies]
async-trait = "0.1"
data_types = { path = "../data_types" }
entry = { path = "../entry" }
futures = "0.3"
rdkafka = "0.26.0"

View File

@ -0,0 +1,44 @@
use std::sync::Arc;
use data_types::{
database_rules::{DatabaseRules, WriteBufferConnection},
server_id::ServerId,
};
use crate::{
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
kafka::{KafkaBufferConsumer, KafkaBufferProducer},
};
#[derive(Debug)]
pub enum WriteBufferConfig {
Writing(Arc<dyn WriteBufferWriting>),
Reading(Arc<dyn WriteBufferReading>),
}
impl WriteBufferConfig {
pub fn new(
server_id: ServerId,
rules: &DatabaseRules,
) -> Result<Option<Self>, WriteBufferError> {
let name = rules.db_name();
// Right now, the Kafka producer and consumers ar the only production implementations of the
// `WriteBufferWriting` and `WriteBufferReading` traits. If/when there are other kinds of
// write buffers, additional configuration will be needed to determine what kind of write
// buffer to use here.
match rules.write_buffer_connection.as_ref() {
Some(WriteBufferConnection::Writing(conn)) => {
let kafka_buffer = KafkaBufferProducer::new(conn, name)?;
Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _)))
}
Some(WriteBufferConnection::Reading(conn)) => {
let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name)?;
Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _)))
}
None => Ok(None),
}
}
}

28
write_buffer/src/core.rs Normal file
View File

@ -0,0 +1,28 @@
use async_trait::async_trait;
use entry::{Entry, Sequence, SequencedEntry};
use futures::stream::BoxStream;
/// Generic boxed error type that is used in this crate.
///
/// The dynamic boxing makes it easier to deal with error from different implementations.
pub type WriteBufferError = Box<dyn std::error::Error + Sync + Send>;
/// Writing to a Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading
/// entries from the Write Buffer at a later time.
#[async_trait]
pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
/// Send an `Entry` to the write buffer and return information that can be used to restore
/// entries at a later time.
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError>;
}
/// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using
/// `Db::stream_in_sequenced_entries`.
pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static {
fn stream<'life0, 'async_trait>(
&'life0 self,
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
where
'life0: 'async_trait,
Self: 'async_trait;
}

View File

@ -1,8 +1,7 @@
use std::convert::{TryFrom, TryInto};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{ use data_types::server_id::ServerId;
database_rules::{DatabaseRules, WriteBufferConnection},
server_id::ServerId,
};
use entry::{Entry, Sequence, SequencedEntry}; use entry::{Entry, Sequence, SequencedEntry};
use futures::{stream::BoxStream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use rdkafka::{ use rdkafka::{
@ -11,65 +10,8 @@ use rdkafka::{
producer::{FutureProducer, FutureRecord}, producer::{FutureProducer, FutureRecord},
ClientConfig, Message, ClientConfig, Message,
}; };
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
pub type WriteBufferError = Box<dyn std::error::Error + Sync + Send>; use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
#[derive(Debug)]
pub enum WriteBufferConfig {
Writing(Arc<dyn WriteBufferWriting>),
Reading(Arc<dyn WriteBufferReading>),
}
impl WriteBufferConfig {
pub fn new(
server_id: ServerId,
rules: &DatabaseRules,
) -> Result<Option<Self>, WriteBufferError> {
let name = rules.db_name();
// Right now, the Kafka producer and consumers ar the only production implementations of the
// `WriteBufferWriting` and `WriteBufferReading` traits. If/when there are other kinds of
// write buffers, additional configuration will be needed to determine what kind of write
// buffer to use here.
match rules.write_buffer_connection.as_ref() {
Some(WriteBufferConnection::Writing(conn)) => {
let kafka_buffer = KafkaBufferProducer::new(conn, name)?;
Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _)))
}
Some(WriteBufferConnection::Reading(conn)) => {
let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name)?;
Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _)))
}
None => Ok(None),
}
}
}
/// Writing to a Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading
/// entries from the Write Buffer at a later time.
#[async_trait]
pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
/// Send an `Entry` to the write buffer and return information that can be used to restore
/// entries at a later time.
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError>;
}
/// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using
/// `Db::stream_in_sequenced_entries`.
pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static {
fn stream<'life0, 'async_trait>(
&'life0 self,
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
where
'life0: 'async_trait,
Self: 'async_trait;
}
pub struct KafkaBufferProducer { pub struct KafkaBufferProducer {
conn: String, conn: String,
@ -126,7 +68,7 @@ impl KafkaBufferProducer {
let mut cfg = ClientConfig::new(); let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", &conn); cfg.set("bootstrap.servers", &conn);
cfg.set("message.timeout.ms", "5000"); cfg.set("message.timeout.ms", "5000");
cfg.set("max.request.size", "10000000"); cfg.set("message.max.bytes", "10000000");
let producer: FutureProducer = cfg.create()?; let producer: FutureProducer = cfg.create()?;
@ -211,78 +153,3 @@ impl KafkaBufferConsumer {
}) })
} }
} }
#[cfg(test)]
pub mod test_helpers {
use super::*;
use futures::stream::{self, StreamExt};
use std::sync::{Arc, Mutex};
#[derive(Debug, Default)]
pub struct MockBufferForWriting {
pub entries: Arc<Mutex<Vec<Entry>>>,
}
#[async_trait]
impl WriteBufferWriting for MockBufferForWriting {
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError> {
let mut entries = self.entries.lock().unwrap();
let offset = entries.len() as u64;
entries.push(entry.clone());
Ok(Sequence {
id: 0,
number: offset,
})
}
}
#[derive(Debug, Default)]
pub struct MockBufferForWritingThatAlwaysErrors;
#[async_trait]
impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
async fn store_entry(&self, _entry: &Entry) -> Result<Sequence, WriteBufferError> {
Err(String::from(
"Something bad happened on the way to writing an entry in the write buffer",
)
.into())
}
}
type MoveableEntries = Arc<Mutex<Vec<Result<SequencedEntry, WriteBufferError>>>>;
pub struct MockBufferForReading {
entries: MoveableEntries,
}
impl std::fmt::Debug for MockBufferForReading {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MockBufferForReading").finish()
}
}
impl MockBufferForReading {
pub fn new(entries: Vec<Result<SequencedEntry, WriteBufferError>>) -> Self {
Self {
entries: Arc::new(Mutex::new(entries)),
}
}
}
impl WriteBufferReading for MockBufferForReading {
fn stream<'life0, 'async_trait>(
&'life0 self,
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
// move the entries out of `self` to move them into the stream
let entries: Vec<_> = self.entries.lock().unwrap().drain(..).collect();
stream::iter(entries.into_iter())
.chain(stream::pending())
.boxed()
}
}
}

14
write_buffer/src/lib.rs Normal file
View File

@ -0,0 +1,14 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub mod config;
pub mod core;
pub mod kafka;
pub mod mock;

78
write_buffer/src/mock.rs Normal file
View File

@ -0,0 +1,78 @@
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use entry::{Entry, Sequence, SequencedEntry};
use futures::{
stream::{self, BoxStream},
StreamExt,
};
use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
#[derive(Debug, Default)]
pub struct MockBufferForWriting {
pub entries: Arc<Mutex<Vec<Entry>>>,
}
#[async_trait]
impl WriteBufferWriting for MockBufferForWriting {
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError> {
let mut entries = self.entries.lock().unwrap();
let offset = entries.len() as u64;
entries.push(entry.clone());
Ok(Sequence {
id: 0,
number: offset,
})
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct MockBufferForWritingThatAlwaysErrors;
#[async_trait]
impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
async fn store_entry(&self, _entry: &Entry) -> Result<Sequence, WriteBufferError> {
Err(String::from(
"Something bad happened on the way to writing an entry in the write buffer",
)
.into())
}
}
type MoveableEntries = Arc<Mutex<Vec<Result<SequencedEntry, WriteBufferError>>>>;
pub struct MockBufferForReading {
entries: MoveableEntries,
}
impl std::fmt::Debug for MockBufferForReading {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MockBufferForReading").finish()
}
}
impl MockBufferForReading {
pub fn new(entries: Vec<Result<SequencedEntry, WriteBufferError>>) -> Self {
Self {
entries: Arc::new(Mutex::new(entries)),
}
}
}
impl WriteBufferReading for MockBufferForReading {
fn stream<'life0, 'async_trait>(
&'life0 self,
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
// move the entries out of `self` to move them into the stream
let entries: Vec<_> = self.entries.lock().unwrap().drain(..).collect();
stream::iter(entries.into_iter())
.chain(stream::pending())
.boxed()
}
}