fix: Move write buffer data types to write_buffer crate

pull/24376/head
Carol (Nichols || Goulding) 2022-05-05 14:00:48 -04:00
parent d7304c1114
commit 44209faa8e
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
8 changed files with 117 additions and 107 deletions

View File

@ -1,11 +1,10 @@
use data_types::write_buffer::{WriteBufferConnection, WriteBufferCreationConfig};
use iox_time::SystemProvider;
use observability_deps::tracing::*;
use std::{collections::BTreeMap, num::NonZeroU32, path::PathBuf, sync::Arc};
use tempfile::TempDir;
use trace::TraceCollector;
use write_buffer::{
config::WriteBufferConfigFactory,
config::{WriteBufferConfigFactory, WriteBufferConnection, WriteBufferCreationConfig},
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
};

View File

@ -15,5 +15,4 @@ pub mod consistent_hasher;
pub mod job;
pub mod partition_metadata;
pub mod timestamp;
pub mod write_buffer;
pub mod write_summary;

View File

@ -1,64 +0,0 @@
use std::{collections::BTreeMap, num::NonZeroU32};
pub const DEFAULT_N_SEQUENCERS: u32 = 1;
/// Configures the use of a write buffer.
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferConnection {
/// Which type should be used (e.g. "kafka", "mock")
pub type_: String,
/// Connection string, depends on [`type_`](Self::type_).
pub connection: String,
/// Special configs to be applied when establishing the connection.
///
/// This depends on [`type_`](Self::type_) and can configure aspects like timeouts.
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub connection_config: BTreeMap<String, String>,
/// Specifies if the sequencers (e.g. for Kafka in form of a topic) should be automatically created if they do not
/// existing prior to reading or writing.
pub creation_config: Option<WriteBufferCreationConfig>,
}
impl Default for WriteBufferConnection {
fn default() -> Self {
Self {
type_: "unspecified".to_string(),
connection: Default::default(),
connection_config: Default::default(),
creation_config: Default::default(),
}
}
}
/// Configs sequencer auto-creation for write buffers.
///
/// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/
/// [`n_sequencers`](Self::n_sequencers) partitions.
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferCreationConfig {
/// Number of sequencers.
///
/// How they are implemented depends on [type](WriteBufferConnection::type_), e.g. for Kafka this is mapped to the
/// number of partitions.
pub n_sequencers: NonZeroU32,
/// Special configs to by applied when sequencers are created.
///
/// This depends on [type](WriteBufferConnection::type_) and can setup parameters like retention policy.
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub options: BTreeMap<String, String>,
}
impl Default for WriteBufferCreationConfig {
fn default() -> Self {
Self {
n_sequencers: NonZeroU32::try_from(DEFAULT_N_SEQUENCERS).unwrap(),
options: Default::default(),
}
}
}

View File

@ -7,22 +7,86 @@ use crate::{
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
},
};
use data_types::write_buffer::WriteBufferConnection;
use iox_time::TimeProvider;
use parking_lot::RwLock;
use std::{
collections::{btree_map::Entry, BTreeMap},
num::NonZeroU32,
path::PathBuf,
sync::Arc,
};
use trace::TraceCollector;
pub const DEFAULT_N_SEQUENCERS: u32 = 1;
#[derive(Debug, Clone)]
enum Mock {
Normal(MockBufferSharedState),
AlwaysFailing,
}
/// Configures the use of a write buffer.
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferConnection {
/// Which type should be used (e.g. "kafka", "mock")
pub type_: String,
/// Connection string, depends on [`type_`](Self::type_).
pub connection: String,
/// Special configs to be applied when establishing the connection.
///
/// This depends on [`type_`](Self::type_) and can configure aspects like timeouts.
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub connection_config: BTreeMap<String, String>,
/// Specifies if the sequencers (e.g. for Kafka in form of a topic) should be automatically
/// created if they do not existing prior to reading or writing.
pub creation_config: Option<WriteBufferCreationConfig>,
}
impl Default for WriteBufferConnection {
fn default() -> Self {
Self {
type_: "unspecified".to_string(),
connection: Default::default(),
connection_config: Default::default(),
creation_config: Default::default(),
}
}
}
/// Configs sequencer auto-creation for write buffers.
///
/// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/
/// [`n_sequencers`](Self::n_sequencers) partitions.
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferCreationConfig {
/// Number of sequencers.
///
/// How they are implemented depends on [type](WriteBufferConnection::type_), e.g. for Kafka
/// this is mapped to the number of partitions.
pub n_sequencers: NonZeroU32,
/// Special configs to by applied when sequencers are created.
///
/// This depends on [type](WriteBufferConnection::type_) and can setup parameters like
/// retention policy.
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub options: BTreeMap<String, String>,
}
impl Default for WriteBufferCreationConfig {
fn default() -> Self {
Self {
n_sequencers: NonZeroU32::try_from(DEFAULT_N_SEQUENCERS).unwrap(),
options: Default::default(),
}
}
}
/// Factory that creates [`WriteBufferReading`] and [`WriteBufferWriting`]
/// from [`WriteBufferConnection`].
#[derive(Debug)]
@ -193,7 +257,6 @@ mod tests {
core::test_utils::random_topic_name, maybe_skip_kafka_integration,
mock::MockBufferSharedState,
};
use data_types::write_buffer::WriteBufferCreationConfig;
use data_types2::DatabaseName;
use std::{convert::TryFrom, num::NonZeroU32};
use tempfile::TempDir;

View File

@ -1,11 +1,12 @@
//! Write buffer that uses files to encode messages.
//!
//! This implementation can be used by multiple readers and writers at the same time. It is ideal for local end2end
//! testing. However it might not perform extremely well when dealing with large messages and (currently) does not
//! implement any message pruning.
//! This implementation can be used by multiple readers and writers at the same time. It is ideal
//! for local end2end testing. However it might not perform extremely well when dealing with large
//! messages and (currently) does not implement any message pruning.
//!
//! # Format
//! Given a root path, the database name and the number of sequencers, the directory structure looks like this:
//! Given a root path, the database name and the number of sequencers, the directory structure
//! looks like this:
//!
//! ```text
//! <root>/<db_name>/
@ -54,50 +55,63 @@
//! <payload>
//! ```
//!
//! The payload is binary data. The headers contain metadata about it (like timestamp, format, tracing information).
//!
//! The payload is binary data. The headers contain metadata about it (like timestamp, format,
//! tracing information).
//!
//! # Implementation Notes
//!
//! Some notes about file system functionality that shaped this implementation
//!
//! ## Atomic File Creation
//! It is quite easy to create a file and ensure that it did not exist beforehand using [`open(2)`] together with
//! `O_CREAT` and `O_EXCL`. However writing actual content to that file requires time and a reader could already see an
//! incomplete version of that. A workaround is to use a scratchpad file at a temporary location, write the entire
//! desired content to it and then move the file to the target location. This assumes that the target location and the
//! file content are independent, e.g. that the file itself does not contain the `sequence_number`. Now we need to find
//! a way to make this move operation reliable though.
//!
//! Files can be renamed using [`rename(2)`]. There is the `RENAME_NOREPLACE` flag that prevents that we silently
//! overwrite the target file. This however is only implemented for a handful of filesystems (notable NOT [NFS]). So to
//! use [`rename(2)`] we would need some additional locking.
//! It is quite easy to create a file and ensure that it did not exist beforehand using [`open(2)`]
//! together with `O_CREAT` and `O_EXCL`. However writing actual content to that file requires time
//! and a reader could already see an incomplete version of that. A workaround is to use a
//! scratchpad file at a temporary location, write the entire desired content to it and then move
//! the file to the target location. This assumes that the target location and the file content are
//! independent, e.g. that the file itself does not contain the `sequence_number`. Now we need to
//! find a way to make this move operation reliable though.
//!
//! Then there is [`link(2)`] which creates a new link to an existing file. It explicitly states that the target is
//! NEVER overwritten. According to <https://unix.stackexchange.com/a/125946> this should even work properly on [NFS].
//! We then need to use [`unlink(2)`] to clean the scratchpad file.
//! Files can be renamed using [`rename(2)`]. There is the `RENAME_NOREPLACE` flag that prevents
//! that we silently overwrite the target file. This however is only implemented for a handful of
//! filesystems (notable NOT [NFS]). So to use [`rename(2)`] we would need some additional locking.
//!
//! Then there is [`link(2)`] which creates a new link to an existing file. It explicitly states
//! that the target is NEVER overwritten. According to <https://unix.stackexchange.com/a/125946>
//! this should even work properly on [NFS]. We then need to use [`unlink(2)`] to clean the
//! scratchpad file.
//!
//! ## Atomic Directory Creation
//! To setup a new sequencer config we need to create the directory structure in an atomic way. Hardlinks don't work for
//! directories, but [`symlink(2)`] does and -- like [`link(2)`] -- does not overwrite existing targets.
//!
//! To setup a new sequencer config we need to create the directory structure in an atomic way.
//! Hardlinks don't work for directories, but [`symlink(2)`] does and -- like [`link(2)`] -- does
//! not overwrite existing targets.
//!
//! ## File Locking
//! Instead of atomic operations we could also use file locking. Under Linux there are a few ways this can be archived:
//!
//! Instead of atomic operations we could also use file locking. Under Linux there are a few ways
//! this can be archived:
//!
//! - **[`fcntl(2)`] via `F_SETLK`, `F_SETLKW`, `F_GETLK`:** <br />
//! Works on [NFS], but is process-bound (aka if you have multiple writers within the same process, only one can
//! Works on [NFS], but is process-bound (aka if you have multiple writers within the same
//! process, only one can
//! acquire the lock).
//! - **[`fcntl(2)`] via `F_OFD_SETLK`, `F_OFD_SETLKW`, `F_OFD_GETLK`:** <br />
//! Works on [NFS] and is file-descriptor-bound.
//! - **[`flock(2)`]:** <br />
//! Works on [NFS] but is technically emulated via [`fcntl(2)`] so the latter should probably be preferred.
//! Works on [NFS] but is technically emulated via [`fcntl(2)`] so the latter should probably be
//! preferred.
//!
//! The biggest issue with file locking is what happens when an operation fails while a lock is being held. Either the
//! resulting state is obviously unfinished (e.g. due to some checksum or size mismatch, due to some missing marker) or
//! we would need to implement some form of lock poisoning. Since this can get quite tricky, I have decided that atomic
//! file and directory operations are easier to reason about.
//! The biggest issue with file locking is what happens when an operation fails while a lock is
//! being held. Either the resulting state is obviously unfinished (e.g. due to some checksum or
//! size mismatch, due to some missing marker) or we would need to implement some form of lock
//! poisoning. Since this can get quite tricky, I have decided that atomic file and directory
//! operations are easier to reason about.
//!
//! ## Message Metadata
//! We are NOT using any file-based metadata (like `mtime` or extended attributes) because they are often broken.
//!
//! We are NOT using any file-based metadata (like `mtime` or extended attributes) because they are
//! often broken.
//!
//!
//! [`fcntl(2)`]: https://www.man7.org/linux/man-pages/man2/fcntl.2.html
@ -108,12 +122,13 @@
//! [`rename(2)`]: https://man7.org/linux/man-pages/man2/rename.2.html
//! [`symlink(2)`]: https://man7.org/linux/man-pages/man2/symlink.2.html
//! [`unlink(2)`]: https://man7.org/linux/man-pages/man2/unlink.2.html
use crate::{
codec::{ContentType, IoxHeaders},
config::WriteBufferCreationConfig,
core::{WriteBufferError, WriteBufferReading, WriteBufferStreamHandler, WriteBufferWriting},
};
use async_trait::async_trait;
use data_types::write_buffer::WriteBufferCreationConfig;
use data_types2::Sequence;
use dml::{DmlMeta, DmlOperation};
use futures::{stream::BoxStream, Stream, StreamExt};

View File

@ -1,10 +1,8 @@
use crate::{config::WriteBufferCreationConfig, core::WriteBufferError};
use std::{collections::BTreeMap, fmt::Display, str::FromStr, time::Duration};
use data_types::write_buffer::WriteBufferCreationConfig;
use crate::core::WriteBufferError;
/// Generic client config that is used for consumers, producers as well as admin operations (like "create topic").
/// Generic client config that is used for consumers, producers as well as admin operations (like
/// "create topic").
#[derive(Debug, PartialEq, Eq)]
pub struct ClientConfig {
/// Maximum message size in bytes.

View File

@ -4,13 +4,13 @@ use self::{
};
use crate::{
codec::IoxHeaders,
config::WriteBufferCreationConfig,
core::{
WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler,
WriteBufferWriting,
},
};
use async_trait::async_trait;
use data_types::write_buffer::WriteBufferCreationConfig;
use data_types2::Sequence;
use dml::{DmlMeta, DmlOperation};
use futures::{stream::BoxStream, StreamExt};

View File

@ -1,8 +1,8 @@
use crate::core::{
WriteBufferError, WriteBufferReading, WriteBufferStreamHandler, WriteBufferWriting,
use crate::{
config::WriteBufferCreationConfig,
core::{WriteBufferError, WriteBufferReading, WriteBufferStreamHandler, WriteBufferWriting},
};
use async_trait::async_trait;
use data_types::write_buffer::WriteBufferCreationConfig;
use data_types2::Sequence;
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::{stream::BoxStream, StreamExt};