Merge branch 'main' into cn/cold-compaction-selection

pull/24376/head
kodiakhq[bot] 2023-04-12 15:19:06 +00:00 committed by GitHub
commit 14d7098ddd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 0 additions and 6505 deletions

76
Cargo.lock generated
View File

@ -447,17 +447,6 @@ dependencies = [
"zstd-safe 5.0.2+zstd.1.5.2",
]
[[package]]
name = "async-socks5"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77f634add2445eb2c1f785642a67ca1073fedd71e73dc3ca69435ef9b9bdedc7"
dependencies = [
"async-trait",
"thiserror",
"tokio",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -908,7 +897,6 @@ dependencies = [
"trogging",
"uuid",
"workspace-hack",
"write_buffer",
]
[[package]]
@ -1210,15 +1198,6 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "crc32c"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dfea2db42e9927a3845fb268a10a72faed6d416065f77873f05e411457c363e"
dependencies = [
"rustc_version",
]
[[package]]
name = "crc32fast"
version = "1.3.2"
@ -4863,27 +4842,6 @@ dependencies = [
"write_summary",
]
[[package]]
name = "rskafka"
version = "0.3.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=abb2a28cff5ce39d186e814a0c5012267b9690a4#abb2a28cff5ce39d186e814a0c5012267b9690a4"
dependencies = [
"async-socks5",
"async-trait",
"bytes",
"chrono",
"crc32c",
"futures",
"integer-encoding",
"parking_lot 0.12.1",
"pin-project-lite",
"rand",
"thiserror",
"tokio",
"tracing",
"zstd 0.12.3+zstd.1.5.2",
]
[[package]]
name = "rustc-demangle"
version = "0.1.22"
@ -6908,40 +6866,6 @@ dependencies = [
"zstd-sys",
]
[[package]]
name = "write_buffer"
version = "0.1.0"
dependencies = [
"async-trait",
"data_types",
"dml",
"dotenvy",
"futures",
"generated_types",
"hashbrown 0.13.2",
"http",
"httparse",
"iox_time",
"metric",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",
"parking_lot 0.12.1",
"pin-project",
"prost",
"rskafka",
"schema",
"tempfile",
"test_helpers",
"tokio",
"tokio-util",
"trace",
"trace_http",
"uuid",
"workspace-hack",
]
[[package]]
name = "write_summary"
version = "0.1.0"

View File

@ -81,7 +81,6 @@ members = [
"trogging",
"wal",
"workspace-hack",
"write_buffer",
"write_summary",
]
default-members = ["influxdb_iox"]

View File

@ -26,7 +26,6 @@ trace_exporters = { path = "../trace_exporters" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
write_buffer = { path = "../write_buffer" }
[dev-dependencies]
test_helpers = { path = "../test_helpers" }

View File

@ -24,4 +24,3 @@ pub mod querier;
pub mod router2;
pub mod run_config;
pub mod socket_addr;
pub mod write_buffer;

View File

@ -1,205 +0,0 @@
//! Config for [`write_buffer`].
use iox_time::SystemProvider;
use observability_deps::tracing::*;
use std::{collections::BTreeMap, num::NonZeroU32, ops::Range, path::PathBuf, sync::Arc};
use tempfile::TempDir;
use trace::TraceCollector;
use write_buffer::{
config::{WriteBufferConfigFactory, WriteBufferConnection, WriteBufferCreationConfig},
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
};
/// Config for [`write_buffer`].
#[derive(Debug, clap::Parser)]
pub struct WriteBufferConfig {
/// The type of write buffer to use.
///
/// Valid options are: file, kafka
#[clap(long = "write-buffer", env = "INFLUXDB_IOX_WRITE_BUFFER_TYPE", action)]
pub(crate) type_: String,
/// The address to the write buffer.
#[clap(
long = "write-buffer-addr",
env = "INFLUXDB_IOX_WRITE_BUFFER_ADDR",
action
)]
pub(crate) connection_string: String,
/// Write buffer topic/database that should be used.
#[clap(
long = "write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared",
action
)]
pub(crate) topic: String,
/// Write buffer connection config.
///
/// The concrete options depend on the write buffer type.
///
/// Command line arguments are passed as
/// `--write-buffer-connection-config key1=value1,key2=value2`.
///
/// Environment variables are passed as `key1=value1,key2=value2,...`.
#[clap(
long = "write-buffer-connection-config",
env = "INFLUXDB_IOX_WRITE_BUFFER_CONNECTION_CONFIG",
default_value = "",
use_value_delimiter = true,
action = clap::ArgAction::Append
)]
pub(crate) connection_config: Vec<String>,
/// The number of topics to create automatically, if any. Default is to not create any topics.
#[clap(
long = "write-buffer-auto-create-topics",
env = "INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS"
)]
pub(crate) auto_create_topics: Option<NonZeroU32>,
}
impl WriteBufferConfig {
/// Create a new instance for all-in-one mode, only allowing some arguments.
/// If `database_directory` is not specified, creates a new temporary directory.
pub fn new(topic: &str, database_directory: Option<PathBuf>) -> Self {
let connection_string = database_directory
.map(|pathbuf| pathbuf.display().to_string())
.unwrap_or_else(|| {
TempDir::new()
.expect("Creating a temporary directory should work")
.into_path()
.display()
.to_string()
});
info!("Write buffer: File-based in `{}`", connection_string);
Self {
type_: "file".to_string(),
connection_string,
topic: topic.to_string(),
connection_config: Default::default(),
auto_create_topics: Some(NonZeroU32::new(1).unwrap()),
}
}
/// Initialize a [`WriteBufferWriting`].
pub async fn writing(
&self,
metrics: Arc<metric::Registry>,
partitions: Option<Range<i32>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
let conn = self.conn();
let factory = Self::factory(metrics);
factory
.new_config_write(&self.topic, partitions, trace_collector.as_ref(), &conn)
.await
}
/// Initialize a [`WriteBufferReading`].
pub async fn reading(
&self,
metrics: Arc<metric::Registry>,
partitions: Option<Range<i32>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Arc<dyn WriteBufferReading>, WriteBufferError> {
let conn = self.conn();
let factory = Self::factory(metrics);
factory
.new_config_read(&self.topic, partitions, trace_collector.as_ref(), &conn)
.await
}
fn connection_config(&self) -> BTreeMap<String, String> {
let mut cfg = BTreeMap::new();
for s in &self.connection_config {
if s.is_empty() {
continue;
}
if let Some((k, v)) = s.split_once('=') {
cfg.insert(k.to_owned(), v.to_owned());
} else {
cfg.insert(s.clone(), String::from(""));
}
}
cfg
}
fn conn(&self) -> WriteBufferConnection {
let creation_config = self
.auto_create_topics
.map(|n_shards| WriteBufferCreationConfig {
n_shards,
..Default::default()
});
WriteBufferConnection {
type_: self.type_.clone(),
connection: self.connection_string.clone(),
connection_config: self.connection_config(),
creation_config,
}
}
fn factory(metrics: Arc<metric::Registry>) -> WriteBufferConfigFactory {
WriteBufferConfigFactory::new(Arc::new(SystemProvider::default()), metrics)
}
/// Get a reference to the write buffer config's topic.
pub fn topic(&self) -> &str {
self.topic.as_ref()
}
/// Get the write buffer config's auto create topics.
pub fn auto_create_topics(&self) -> Option<NonZeroU32> {
self.auto_create_topics
}
/// Set the write buffer config's auto create topics.
pub fn set_auto_create_topics(&mut self, auto_create_topics: Option<NonZeroU32>) {
self.auto_create_topics = auto_create_topics;
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[test]
fn test_connection_config() {
let cfg = WriteBufferConfig::try_parse_from([
"my_binary",
"--write-buffer",
"kafka",
"--write-buffer-addr",
"localhost:1234",
"--write-buffer-connection-config",
"foo=bar",
"--write-buffer-connection-config",
"",
"--write-buffer-connection-config",
"x=",
"--write-buffer-connection-config",
"y",
"--write-buffer-connection-config",
"foo=baz",
"--write-buffer-connection-config",
"so=many=args",
])
.unwrap();
let actual = cfg.connection_config();
let expected = BTreeMap::from([
(String::from("foo"), String::from("baz")),
(String::from("x"), String::from("")),
(String::from("y"), String::from("")),
(String::from("so"), String::from("many=args")),
]);
assert_eq!(actual, expected);
}
}

View File

@ -1,42 +0,0 @@
[package]
name = "write_buffer"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1"
data_types = { path = "../data_types" }
dml = { path = "../dml" }
dotenvy = "0.15.7"
futures = "0.3"
generated_types = { path = "../generated_types" }
hashbrown = { workspace = true }
http = "0.2"
httparse = "1.8"
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
pin-project = "1.0"
prost = "0.11"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="abb2a28cff5ce39d186e814a0c5012267b9690a4", default-features = false, features = ["compression-zstd", "transport-socks5"] }
schema = { path = "../schema" }
tokio = { version = "1.27", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-util = "0.7.7"
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
tempfile = "3.5.0"
test_helpers = { path = "../test_helpers" }
[package.metadata.cargo-udeps.ignore]
# used within the `maybe_skip_kafka_integration` macro and cannot be detected by a normal analysis pass
normal = ["dotenvy"]

View File

@ -1,347 +0,0 @@
//! Encode/Decode for messages
use crate::core::WriteBufferError;
use data_types::{NamespaceId, NonEmptyString, PartitionKey, Sequence, TableId};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use generated_types::{
google::FromOptionalField,
influxdata::iox::{
delete::v1::DeletePayload,
write_buffer::v1::{write_buffer_payload::Payload, WriteBufferPayload},
},
};
use http::{HeaderMap, HeaderValue};
use iox_time::Time;
use mutable_batch_pb::decode::decode_database_batch;
use prost::Message;
use std::{borrow::Cow, sync::Arc};
use trace::{ctx::SpanContext, TraceCollector};
use trace_http::ctx::{format_jaeger_trace_context, TraceHeaderParser};
/// Pbdata based content type
pub const CONTENT_TYPE_PROTOBUF: &str =
r#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#;
/// Message header that determines message content type.
pub const HEADER_CONTENT_TYPE: &str = "content-type";
/// Message header for tracing context.
pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id";
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ContentType {
Protobuf,
}
/// IOx-specific headers attached to every write buffer message.
#[derive(Debug)]
pub struct IoxHeaders {
content_type: ContentType,
span_context: Option<SpanContext>,
}
impl IoxHeaders {
/// Create new headers with sane default values and given span context.
pub fn new(content_type: ContentType, span_context: Option<SpanContext>) -> Self {
Self {
content_type,
span_context,
}
}
/// Creates a new IoxHeaders from an iterator of headers
pub fn from_headers(
headers: impl IntoIterator<Item = (impl AsRef<str>, impl AsRef<[u8]>)>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
) -> Result<Self, WriteBufferError> {
let mut span_context = None;
let mut content_type = None;
for (name, value) in headers {
let name = name.as_ref();
if name.eq_ignore_ascii_case(HEADER_CONTENT_TYPE) {
content_type = match std::str::from_utf8(value.as_ref()) {
Ok(CONTENT_TYPE_PROTOBUF) => Some(ContentType::Protobuf),
Ok(c) => {
return Err(WriteBufferError::invalid_data(format!(
"Unknown message format: {c}"
)))
}
Err(e) => {
return Err(WriteBufferError::invalid_data(format!(
"Error decoding content type header: {e}"
)))
}
};
}
if let Some(trace_collector) = trace_collector {
if name.eq_ignore_ascii_case(HEADER_TRACE_CONTEXT) {
if let Ok(header_value) = HeaderValue::from_bytes(value.as_ref()) {
let mut headers = HeaderMap::new();
headers.insert(HEADER_TRACE_CONTEXT, header_value);
let parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(HEADER_TRACE_CONTEXT);
span_context = match parser.parse(Some(trace_collector), &headers) {
Ok(None) => None,
Ok(Some(ctx)) => ctx.sampled.then_some(ctx),
Err(e) => {
return Err(WriteBufferError::invalid_data(format!(
"Error decoding trace context: {e}"
)))
}
};
}
}
}
}
let content_type =
content_type.ok_or_else(|| WriteBufferError::invalid_data("No content type header"))?;
Ok(Self {
content_type,
span_context,
})
}
/// Gets the content type
#[allow(dead_code)] // this function is only used in optionally-compiled kafka code
pub fn content_type(&self) -> ContentType {
self.content_type
}
/// Gets the span context if any
#[allow(dead_code)] // this function is only used in optionally-compiled kafka code
pub fn span_context(&self) -> Option<&SpanContext> {
self.span_context.as_ref()
}
/// Returns the header map to encode
pub fn headers(&self) -> impl Iterator<Item = (&str, Cow<'static, str>)> + '_ {
let content_type = match self.content_type {
ContentType::Protobuf => CONTENT_TYPE_PROTOBUF.into(),
};
std::iter::once((HEADER_CONTENT_TYPE, content_type)).chain(
self.span_context
.as_ref()
.map(|ctx| {
(
HEADER_TRACE_CONTEXT,
format_jaeger_trace_context(ctx).into(),
)
})
.into_iter(),
)
}
}
/// Decode a message payload
pub fn decode(
data: &[u8],
headers: IoxHeaders,
sequence: Sequence,
producer_ts: Time,
bytes_read: usize,
) -> Result<DmlOperation, WriteBufferError> {
match headers.content_type {
ContentType::Protobuf => {
let meta = DmlMeta::sequenced(sequence, producer_ts, headers.span_context, bytes_read);
let payload: WriteBufferPayload = prost::Message::decode(data)
.map_err(|e| format!("failed to decode WriteBufferPayload: {e}"))?;
let payload = payload.payload.ok_or_else(|| "no payload".to_string())?;
match payload {
Payload::Write(write) => {
let tables = decode_database_batch(&write).map_err(|e| {
WriteBufferError::invalid_data(format!(
"failed to decode database batch: {e}"
))
})?;
let partition_key = if write.partition_key.is_empty() {
return Err(WriteBufferError::invalid_data(
"write contains no partition key",
));
} else {
PartitionKey::from(write.partition_key)
};
Ok(DmlOperation::Write(DmlWrite::new(
NamespaceId::new(write.database_id),
tables
.into_iter()
.map(|(k, v)| (TableId::new(k), v))
.collect(),
partition_key,
meta,
)))
}
Payload::Delete(delete) => {
let predicate = delete
.predicate
.required("predicate")
.map_err(WriteBufferError::invalid_data)?;
Ok(DmlOperation::Delete(DmlDelete::new(
NamespaceId::new(delete.database_id),
predicate,
NonEmptyString::new(delete.table_name),
meta,
)))
}
}
}
}
}
/// Encodes a [`DmlOperation`] as a protobuf [`WriteBufferPayload`]
pub fn encode_operation(
operation: &DmlOperation,
buf: &mut Vec<u8>,
) -> Result<(), WriteBufferError> {
let payload = match operation {
DmlOperation::Write(write) => {
let namespace_id = write.namespace_id().get();
let batch = mutable_batch_pb::encode::encode_write(namespace_id, write);
Payload::Write(batch)
}
DmlOperation::Delete(delete) => Payload::Delete(DeletePayload {
database_id: delete.namespace_id().get(),
table_name: delete
.table_name()
.map(ToString::to_string)
.unwrap_or_default(),
predicate: Some(delete.predicate().clone().into()),
}),
};
let payload = WriteBufferPayload {
payload: Some(payload),
};
payload.encode(buf).map_err(WriteBufferError::invalid_input)
}
#[cfg(test)]
mod tests {
use data_types::{SequenceNumber, ShardIndex};
use iox_time::{SystemProvider, TimeProvider};
use trace::RingBufferTraceCollector;
use crate::core::test_utils::{assert_span_context_eq_or_linked, lp_to_batches};
use super::*;
#[test]
fn headers_roundtrip() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context_parent = SpanContext::new(Arc::clone(&collector));
let span_context = span_context_parent.child("foo").ctx;
let iox_headers1 = IoxHeaders::new(ContentType::Protobuf, Some(span_context));
let encoded: Vec<_> = iox_headers1
.headers()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let iox_headers2 = IoxHeaders::from_headers(encoded, Some(&collector)).unwrap();
assert_eq!(iox_headers1.content_type, iox_headers2.content_type);
assert_span_context_eq_or_linked(
iox_headers1.span_context.as_ref().unwrap(),
iox_headers2.span_context.as_ref().unwrap(),
vec![],
);
}
#[test]
fn headers_case_handling() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let headers = vec![
("conTent-Type", CONTENT_TYPE_PROTOBUF),
("uber-trace-id", "1:2:3:1"),
("uber-trace-ID", "5:6:7:1"),
// Namespace is no longer used; test that specifying it doesn't cause errors
("iOx-Namespace", "namespace"),
];
let actual = IoxHeaders::from_headers(headers.into_iter(), Some(&collector)).unwrap();
assert_eq!(actual.content_type, ContentType::Protobuf);
let span_context = actual.span_context.unwrap();
assert_eq!(span_context.trace_id.get(), 5);
assert_eq!(span_context.span_id.get(), 6);
}
#[test]
fn headers_no_trace_collector_on_consumer_side() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context = SpanContext::new(Arc::clone(&collector));
let iox_headers1 = IoxHeaders::new(ContentType::Protobuf, Some(span_context));
let encoded: Vec<_> = iox_headers1
.headers()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let iox_headers2 = IoxHeaders::from_headers(encoded, None).unwrap();
assert!(iox_headers2.span_context.is_none());
}
#[test]
fn test_dml_write_round_trip() {
let data = lp_to_batches("platanos great=42 100\nbananas greatness=1000 100");
let w = DmlWrite::new(
NamespaceId::new(42),
data,
PartitionKey::from("2022-01-01"),
DmlMeta::default(),
);
let mut buf = Vec::new();
encode_operation(&DmlOperation::Write(w.clone()), &mut buf)
.expect("should encode valid DmlWrite successfully");
let time = SystemProvider::new().now();
let got = decode(
&buf,
IoxHeaders::new(ContentType::Protobuf, None),
Sequence::new(ShardIndex::new(1), SequenceNumber::new(42)),
time,
424242,
)
.expect("failed to decode valid wire format");
let got = match got {
DmlOperation::Write(w) => w,
_ => panic!("wrong op type"),
};
assert_eq!(w.namespace_id(), got.namespace_id());
assert_eq!(w.table_count(), got.table_count());
assert_eq!(w.min_timestamp(), got.min_timestamp());
assert_eq!(w.max_timestamp(), got.max_timestamp());
// Validate the table IDs all appear in the DML writes.
let mut a = w.tables().map(|(id, _)| id).collect::<Vec<_>>();
a.sort_unstable();
let mut b = got.tables().map(|(id, _)| id).collect::<Vec<_>>();
b.sort_unstable();
assert_eq!(a, b);
}
}

View File

@ -1,498 +0,0 @@
use crate::{
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
file::{FileBufferConsumer, FileBufferProducer},
kafka::{RSKafkaConsumer, RSKafkaProducer},
mock::{
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
},
};
use iox_time::TimeProvider;
use parking_lot::RwLock;
use std::{
collections::{btree_map::Entry, BTreeMap},
num::NonZeroU32,
ops::Range,
path::PathBuf,
sync::Arc,
};
use trace::TraceCollector;
pub const DEFAULT_N_SHARDS: u32 = 1;
#[derive(Debug, Clone)]
enum Mock {
Normal(MockBufferSharedState),
AlwaysFailing,
}
/// Configures the use of a write buffer.
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferConnection {
/// Which type should be used (e.g. "kafka", "mock")
pub type_: String,
/// Connection string, depends on [`type_`](Self::type_).
/// When Kafka type is selected, multiple bootstrap_broker can be separated by commas.
pub connection: String,
/// Special configs to be applied when establishing the connection.
///
/// This depends on [`type_`](Self::type_) and can configure aspects like timeouts.
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub connection_config: BTreeMap<String, String>,
/// Specifies if the shards (e.g. for Kafka in form of a topic) should be automatically
/// created if they do not existing prior to reading or writing.
pub creation_config: Option<WriteBufferCreationConfig>,
}
impl Default for WriteBufferConnection {
fn default() -> Self {
Self {
type_: "unspecified".to_string(),
connection: Default::default(),
connection_config: Default::default(),
creation_config: Default::default(),
}
}
}
/// Configs shard auto-creation for write buffers.
///
/// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/
/// [`n_shards`](Self::n_shards) partitions.
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferCreationConfig {
/// Number of shards.
///
/// How they are implemented depends on [type](WriteBufferConnection::type_), e.g. for Kafka
/// this is mapped to the number of partitions.
pub n_shards: NonZeroU32,
/// Special configs to by applied when shards are created.
///
/// This depends on [type](WriteBufferConnection::type_) and can setup parameters like
/// retention policy.
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub options: BTreeMap<String, String>,
}
impl Default for WriteBufferCreationConfig {
fn default() -> Self {
Self {
n_shards: NonZeroU32::try_from(DEFAULT_N_SHARDS).unwrap(),
options: Default::default(),
}
}
}
/// Factory that creates [`WriteBufferReading`] and [`WriteBufferWriting`]
/// from [`WriteBufferConnection`].
#[derive(Debug)]
pub struct WriteBufferConfigFactory {
mocks: RwLock<BTreeMap<String, Mock>>,
time_provider: Arc<dyn TimeProvider>,
#[allow(dead_code)] // this field is only used in optionally-compiled kafka code
metric_registry: Arc<metric::Registry>,
}
impl WriteBufferConfigFactory {
/// Create new factory w/o any mocks.
pub fn new(
time_provider: Arc<dyn TimeProvider>,
metric_registry: Arc<metric::Registry>,
) -> Self {
Self {
mocks: Default::default(),
time_provider,
metric_registry,
}
}
/// Registers new mock.
///
/// # Panics
/// When mock with identical name is already registered.
pub fn register_mock(&self, name: String, state: MockBufferSharedState) {
self.set_mock(name, Mock::Normal(state));
}
/// Registers new mock that always fail.
///
/// # Panics
/// When mock with identical name is already registered.
pub fn register_always_fail_mock(&self, name: String) {
self.set_mock(name, Mock::AlwaysFailing);
}
fn set_mock(&self, name: String, mock: Mock) {
let mut mocks = self.mocks.write();
match mocks.entry(name) {
Entry::Vacant(v) => {
v.insert(mock);
}
Entry::Occupied(o) => {
panic!("Mock with the name '{}' already registered", o.key());
}
}
}
fn get_mock(&self, name: &str) -> Result<Mock, WriteBufferError> {
self.mocks
.read()
.get(name)
.cloned()
.ok_or_else::<WriteBufferError, _>(|| format!("Unknown mock ID: {name}").into())
}
/// Returns a new [`WriteBufferWriting`] for the provided [`WriteBufferConnection`]
///
pub async fn new_config_write(
&self,
db_name: &str,
partitions: Option<Range<i32>>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
let writer = match &cfg.type_[..] {
"file" => {
let root = PathBuf::from(&cfg.connection);
let file_buffer = FileBufferProducer::new(
&root,
db_name,
cfg.creation_config.as_ref(),
Arc::clone(&self.time_provider),
)
.await?;
Arc::new(file_buffer) as _
}
"kafka" => {
let rskafa_buffer = RSKafkaProducer::new(
cfg.connection.clone(),
db_name.to_owned(),
&cfg.connection_config,
Arc::clone(&self.time_provider),
cfg.creation_config.as_ref(),
partitions,
trace_collector.map(Arc::clone),
&self.metric_registry,
)
.await?;
Arc::new(rskafa_buffer) as _
}
"mock" => match self.get_mock(&cfg.connection)? {
Mock::Normal(state) => {
let mock_buffer = MockBufferForWriting::new(
state,
cfg.creation_config.as_ref(),
Arc::clone(&self.time_provider),
)?;
Arc::new(mock_buffer) as _
}
Mock::AlwaysFailing => {
let mock_buffer = MockBufferForWritingThatAlwaysErrors {};
Arc::new(mock_buffer) as _
}
},
other => {
return Err(format!("Unknown write buffer type: {other}").into());
}
};
Ok(writer)
}
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
pub async fn new_config_read(
&self,
db_name: &str,
partitions: Option<Range<i32>>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferReading>, WriteBufferError> {
let reader = match &cfg.type_[..] {
"file" => {
let root = PathBuf::from(&cfg.connection);
let file_buffer = FileBufferConsumer::new(
&root,
db_name,
cfg.creation_config.as_ref(),
trace_collector,
)
.await?;
Arc::new(file_buffer) as _
}
"kafka" => {
let rskafka_buffer = RSKafkaConsumer::new(
cfg.connection.clone(),
db_name.to_owned(),
&cfg.connection_config,
cfg.creation_config.as_ref(),
partitions,
trace_collector.map(Arc::clone),
)
.await?;
Arc::new(rskafka_buffer) as _
}
"mock" => match self.get_mock(&cfg.connection)? {
Mock::Normal(state) => {
let mock_buffer =
MockBufferForReading::new(state, cfg.creation_config.as_ref())?;
Arc::new(mock_buffer) as _
}
Mock::AlwaysFailing => {
let mock_buffer = MockBufferForReadingThatAlwaysErrors {};
Arc::new(mock_buffer) as _
}
},
other => {
return Err(format!("Unknown write buffer type: {other}").into());
}
};
Ok(reader)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
core::test_utils::random_topic_name, maybe_skip_kafka_integration,
mock::MockBufferSharedState,
};
use data_types::NamespaceName;
use std::{convert::TryFrom, num::NonZeroU32};
use tempfile::TempDir;
#[tokio::test]
async fn test_writing_file() {
let root = TempDir::new().unwrap();
let factory = factory();
let db_name = NamespaceName::try_from("foo").unwrap();
let cfg = WriteBufferConnection {
type_: "file".to_string(),
connection: root.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "file");
}
#[tokio::test]
async fn test_reading_file() {
let root = TempDir::new().unwrap();
let factory = factory();
let db_name = NamespaceName::try_from("foo").unwrap();
let cfg = WriteBufferConnection {
type_: "file".to_string(),
connection: root.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "file");
}
#[tokio::test]
async fn test_writing_mock() {
let factory = factory();
let state = MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap());
let mock_name = "some_mock";
factory.register_mock(mock_name.to_string(), state);
let db_name = NamespaceName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "mock".to_string(),
connection: mock_name.to_string(),
..Default::default()
};
let conn = factory
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock");
// will error when state is unknown
let cfg = WriteBufferConnection {
type_: "mock".to_string(),
connection: "bar".to_string(),
..Default::default()
};
let err = factory
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().contains("Unknown mock ID:"));
}
#[tokio::test]
async fn test_reading_mock() {
let factory = factory();
let state = MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap());
let mock_name = "some_mock";
factory.register_mock(mock_name.to_string(), state);
let db_name = NamespaceName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "mock".to_string(),
connection: mock_name.to_string(),
..Default::default()
};
let conn = factory
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock");
// will error when state is unknown
let cfg = WriteBufferConnection {
type_: "mock".to_string(),
connection: "bar".to_string(),
..Default::default()
};
let err = factory
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().contains("Unknown mock ID:"));
}
#[tokio::test]
async fn test_writing_mock_failing() {
let factory = factory();
let mock_name = "some_mock";
factory.register_always_fail_mock(mock_name.to_string());
let db_name = NamespaceName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "mock".to_string(),
connection: mock_name.to_string(),
..Default::default()
};
let conn = factory
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock_failing");
// will error when state is unknown
let cfg = WriteBufferConnection {
type_: "mock".to_string(),
connection: "bar".to_string(),
..Default::default()
};
let err = factory
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().contains("Unknown mock ID:"));
}
#[tokio::test]
async fn test_reading_mock_failing() {
let factory = factory();
let mock_name = "some_mock";
factory.register_always_fail_mock(mock_name.to_string());
let db_name = NamespaceName::new("foo").unwrap();
let cfg = WriteBufferConnection {
type_: "mock".to_string(),
connection: mock_name.to_string(),
..Default::default()
};
let conn = factory
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock_failing");
// will error when state is unknown
let cfg = WriteBufferConnection {
type_: "mock".to_string(),
connection: "bar".to_string(),
..Default::default()
};
let err = factory
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().contains("Unknown mock ID:"));
}
#[test]
#[should_panic(expected = "Mock with the name 'some_mock' already registered")]
fn test_register_mock_twice_panics() {
let factory = factory();
let state = MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap());
let mock_name = "some_mock";
factory.register_always_fail_mock(mock_name.to_string());
factory.register_mock(mock_name.to_string(), state);
}
fn factory() -> WriteBufferConfigFactory {
let time = Arc::new(iox_time::SystemProvider::new());
let registry = Arc::new(metric::Registry::new());
WriteBufferConfigFactory::new(time, registry)
}
#[tokio::test]
async fn test_writing_kafka() {
let conn = maybe_skip_kafka_integration!();
let factory = factory();
let db_name = NamespaceName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "kafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
}
#[tokio::test]
async fn test_reading_kafka() {
let conn = maybe_skip_kafka_integration!();
let factory = factory();
let db_name = NamespaceName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "kafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,968 +0,0 @@
//! Write buffer that uses files to encode messages.
//!
//! This implementation can be used by multiple readers and writers at the same time. It is ideal
//! for local end2end testing. However it might not perform extremely well when dealing with large
//! messages and (currently) does not implement any message pruning.
//!
//! # Format
//! Given a root path, the database name and the number of shards, the directory structure
//! looks like this:
//!
//! ```text
//! <root>/<db_name>/
//! /active | Location of current state
//! : |
//! : |
//! : | symlink
//! : |
//! : +--------+
//! : |
//! : |
//! : |
//! /version/ V
//! /<uuid>/
//! : /0/
//! : : /committed/0 \
//! : : : /1 | Message files
//! : : : /2 | (finished)
//! : : : ... /
//! : : :
//! : : :
//! : : /temp/<uuid> \
//! : : /<uuid> | Message files
//! : : /<uuid> | (to be committed)
//! : : ... /
//! : :
//! : :
//! : /1/... \
//! : /2/... | More shards
//! : ... /
//! :
//! :
//! /<uuid>/ \
//! /<uuid>/ | Incomplete initialization attempts
//! ... /
//! ```
//!
//! Every message file then uses an HTTP-inspired format:
//!
//! ```text
//! <header_1>: <value_1>
//! <header_2>: <value_2>
//! ...
//! <header_n>: <value_n>
//!
//! <payload>
//! ```
//!
//! The payload is binary data. The headers contain metadata about it (like timestamp, format,
//! tracing information).
//!
//! # Implementation Notes
//!
//! Some notes about file system functionality that shaped this implementation
//!
//! ## Atomic File Creation
//!
//! It is quite easy to create a file and ensure that it did not exist beforehand using [`open(2)`]
//! together with `O_CREAT` and `O_EXCL`. However writing actual content to that file requires time
//! and a reader could already see an incomplete version of that. A workaround is to use a
//! scratchpad file at a temporary location, write the entire desired content to it and then move
//! the file to the target location. This assumes that the target location and the file content are
//! independent, e.g. that the file itself does not contain the `sequence_number`. Now we need to
//! find a way to make this move operation reliable though.
//!
//! Files can be renamed using [`rename(2)`]. There is the `RENAME_NOREPLACE` flag that prevents
//! that we silently overwrite the target file. This however is only implemented for a handful of
//! filesystems (notable NOT [NFS]). So to use [`rename(2)`] we would need some additional locking.
//!
//! Then there is [`link(2)`] which creates a new link to an existing file. It explicitly states
//! that the target is NEVER overwritten. According to <https://unix.stackexchange.com/a/125946>
//! this should even work properly on [NFS]. We then need to use [`unlink(2)`] to clean the
//! scratchpad file.
//!
//! ## Atomic Directory Creation
//!
//! To setup a new shard config we need to create the directory structure in an atomic way.
//! Hardlinks don't work for directories, but [`symlink(2)`] does and -- like [`link(2)`] -- does
//! not overwrite existing targets.
//!
//! ## File Locking
//!
//! Instead of atomic operations we could also use file locking. Under Linux there are a few ways
//! this can be archived:
//!
//! - **[`fcntl(2)`] via `F_SETLK`, `F_SETLKW`, `F_GETLK`:** <br />
//! Works on [NFS], but is process-bound (aka if you have multiple writers within the same
//! process, only one can
//! acquire the lock).
//! - **[`fcntl(2)`] via `F_OFD_SETLK`, `F_OFD_SETLKW`, `F_OFD_GETLK`:** <br />
//! Works on [NFS] and is file-descriptor-bound.
//! - **[`flock(2)`]:** <br />
//! Works on [NFS] but is technically emulated via [`fcntl(2)`] so the latter should probably be
//! preferred.
//!
//! The biggest issue with file locking is what happens when an operation fails while a lock is
//! being held. Either the resulting state is obviously unfinished (e.g. due to some checksum or
//! size mismatch, due to some missing marker) or we would need to implement some form of lock
//! poisoning. Since this can get quite tricky, I have decided that atomic file and directory
//! operations are easier to reason about.
//!
//! ## Message Metadata
//!
//! We are NOT using any file-based metadata (like `mtime` or extended attributes) because they are
//! often broken.
//!
//!
//! [`fcntl(2)`]: https://www.man7.org/linux/man-pages/man2/fcntl.2.html
//! [`flock(2)`]: https://www.man7.org/linux/man-pages/man2/flock.2.html
//! [`link(2)`]: https://man7.org/linux/man-pages/man2/link.2.html
//! [NFS]: https://en.wikipedia.org/wiki/Network_File_System
//! [`open(2)`]: https://man7.org/linux/man-pages/man2/open.2.html
//! [`rename(2)`]: https://man7.org/linux/man-pages/man2/rename.2.html
//! [`symlink(2)`]: https://man7.org/linux/man-pages/man2/symlink.2.html
//! [`unlink(2)`]: https://man7.org/linux/man-pages/man2/unlink.2.html
use crate::{
codec::{ContentType, IoxHeaders},
config::WriteBufferCreationConfig,
core::{WriteBufferError, WriteBufferReading, WriteBufferStreamHandler, WriteBufferWriting},
};
use async_trait::async_trait;
use data_types::{Sequence, SequenceNumber, ShardIndex};
use dml::{DmlMeta, DmlOperation};
use futures::{stream::BoxStream, Stream, StreamExt};
use iox_time::{Time, TimeProvider};
use pin_project::pin_project;
use std::{
collections::{BTreeMap, BTreeSet},
path::{Path, PathBuf},
pin::Pin,
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering},
Arc,
},
};
use tokio_util::sync::ReusableBoxFuture;
use trace::TraceCollector;
use uuid::Uuid;
/// Header used to declare the creation time of the message.
pub const HEADER_TIME: &str = "last-modified";
/// File-based write buffer writer.
#[derive(Debug)]
pub struct FileBufferProducer {
dirs: BTreeMap<ShardIndex, PathBuf>,
time_provider: Arc<dyn TimeProvider>,
}
impl FileBufferProducer {
/// Create new writer.
pub async fn new(
root: &Path,
database_name: &str,
creation_config: Option<&WriteBufferCreationConfig>,
time_provider: Arc<dyn TimeProvider>,
) -> Result<Self, WriteBufferError> {
let root = root.join(database_name);
let dirs = maybe_auto_create_directories(&root, creation_config).await?;
Ok(Self {
dirs,
time_provider,
})
}
}
#[async_trait]
impl WriteBufferWriting for FileBufferProducer {
fn shard_indexes(&self) -> BTreeSet<ShardIndex> {
self.dirs.keys().cloned().collect()
}
async fn store_operation(
&self,
shard_index: ShardIndex,
operation: DmlOperation,
) -> Result<DmlMeta, WriteBufferError> {
let shard_path = self
.dirs
.get(&shard_index)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown shard index: {shard_index}").into()
})?;
// measure time
let now = operation
.meta()
.producer_ts()
.unwrap_or_else(|| self.time_provider.now());
// assemble message
let mut message: Vec<u8> = format!("{}: {}\n", HEADER_TIME, now.to_rfc3339()).into_bytes();
let iox_headers = IoxHeaders::new(
ContentType::Protobuf,
operation.meta().span_context().cloned(),
);
for (name, value) in iox_headers.headers() {
message.extend(format!("{name}: {value}\n").into_bytes())
}
message.extend(b"\n");
crate::codec::encode_operation(&operation, &mut message)?;
// write data to scratchpad file in temp directory
let temp_file = shard_path.join("temp").join(Uuid::new_v4().to_string());
tokio::fs::write(&temp_file, &message).await?;
// scan existing files to figure out new sequence number
let committed = shard_path.join("committed");
let existing_files = scan_dir::<i64>(&committed, FileType::File).await?;
let mut sequence_number = if let Some(max) = existing_files.keys().max() {
max.checked_add(1).ok_or_else::<WriteBufferError, _>(|| {
"Overflow during sequence number calculation"
.to_string()
.into()
})?
} else {
0
};
// try to link scratchpad file to "current" dir
loop {
let committed_file = committed.join(sequence_number.to_string());
if tokio::fs::hard_link(&temp_file, &committed_file)
.await
.is_ok()
{
break;
}
sequence_number = sequence_number
.checked_add(1)
.ok_or_else::<WriteBufferError, _>(|| {
"Overflow during sequence number calculation"
.to_string()
.into()
})?;
}
// unlink scratchpad file (and ignore error)
tokio::fs::remove_file(&temp_file).await.ok();
Ok(DmlMeta::sequenced(
Sequence::new(shard_index, SequenceNumber::new(sequence_number)),
now,
operation.meta().span_context().cloned(),
message.len(),
))
}
async fn flush(&self) -> Result<(), WriteBufferError> {
// no buffer
Ok(())
}
fn type_name(&self) -> &'static str {
"file"
}
}
#[derive(Debug)]
pub struct FileBufferStreamHandler {
shard_index: ShardIndex,
path: PathBuf,
next_sequence_number: Arc<AtomicI64>,
terminated: Arc<AtomicBool>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
#[async_trait]
impl WriteBufferStreamHandler for FileBufferStreamHandler {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
let committed = self.path.join("committed");
ConsumerStream::new(
self.shard_index,
committed,
Arc::clone(&self.next_sequence_number),
Arc::clone(&self.terminated),
self.trace_collector.clone(),
)
.boxed()
}
async fn seek(&mut self, sequence_number: SequenceNumber) -> Result<(), WriteBufferError> {
let offset = sequence_number.get();
// Find the current high watermark
let committed = self.path.join("committed");
let existing_files = scan_dir::<i64>(&committed, FileType::File).await?;
let current = existing_files.keys().max().cloned().unwrap_or_default();
if offset > current {
return Err(WriteBufferError::sequence_number_after_watermark(format!(
"attempted to seek to offset {offset}, but current high \
watermark for partition {p} is {current}",
p = self.shard_index
)));
}
self.next_sequence_number
.store(sequence_number.get(), Ordering::SeqCst);
self.terminated.store(false, Ordering::SeqCst);
Ok(())
}
fn reset_to_earliest(&mut self) {
self.next_sequence_number.store(0, Ordering::SeqCst);
self.terminated.store(false, Ordering::SeqCst);
}
}
/// File-based write buffer reader.
#[derive(Debug)]
pub struct FileBufferConsumer {
dirs: BTreeMap<ShardIndex, (PathBuf, Arc<AtomicU64>)>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl FileBufferConsumer {
/// Create new reader.
pub async fn new(
root: &Path,
database_name: &str,
creation_config: Option<&WriteBufferCreationConfig>,
// `trace_collector` has to be a reference due to https://github.com/rust-lang/rust/issues/63033
trace_collector: Option<&Arc<dyn TraceCollector>>,
) -> Result<Self, WriteBufferError> {
let root = root.join(database_name);
let dirs = maybe_auto_create_directories(&root, creation_config)
.await?
.into_iter()
.map(|(shard_index, path)| (shard_index, (path, Arc::new(AtomicU64::new(0)))))
.collect();
Ok(Self {
dirs,
trace_collector: trace_collector.map(Arc::clone),
})
}
}
#[async_trait]
impl WriteBufferReading for FileBufferConsumer {
fn shard_indexes(&self) -> BTreeSet<ShardIndex> {
self.dirs.keys().copied().collect()
}
async fn stream_handler(
&self,
shard_index: ShardIndex,
) -> Result<Box<dyn WriteBufferStreamHandler>, WriteBufferError> {
let (path, _next_sequence_number) = self
.dirs
.get(&shard_index)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown shard index: {shard_index}").into()
})?;
Ok(Box::new(FileBufferStreamHandler {
shard_index,
path: path.clone(),
next_sequence_number: Arc::new(AtomicI64::new(0)),
terminated: Arc::new(AtomicBool::new(false)),
trace_collector: self.trace_collector.clone(),
}))
}
async fn fetch_high_watermark(
&self,
shard_index: ShardIndex,
) -> Result<SequenceNumber, WriteBufferError> {
let (path, _next_sequence_number) = self
.dirs
.get(&shard_index)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown shard index: {shard_index}").into()
})?;
let committed = path.join("committed");
let sequence_number = watermark(&committed).await?;
Ok(SequenceNumber::new(sequence_number))
}
fn type_name(&self) -> &'static str {
"file"
}
}
#[pin_project]
struct ConsumerStream {
fut: ReusableBoxFuture<'static, Option<Result<DmlOperation, WriteBufferError>>>,
shard_index: ShardIndex,
path: PathBuf,
next_sequence_number: Arc<AtomicI64>,
terminated: Arc<AtomicBool>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl ConsumerStream {
fn new(
shard_index: ShardIndex,
path: PathBuf,
next_sequence_number: Arc<AtomicI64>,
terminated: Arc<AtomicBool>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Self {
Self {
fut: ReusableBoxFuture::new(Self::poll_next_inner(
shard_index,
path.clone(),
Arc::clone(&next_sequence_number),
Arc::clone(&terminated),
trace_collector.clone(),
)),
shard_index,
path,
next_sequence_number,
terminated,
trace_collector,
}
}
async fn poll_next_inner(
shard_index: ShardIndex,
path: PathBuf,
next_sequence_number: Arc<AtomicI64>,
terminated: Arc<AtomicBool>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Option<Result<DmlOperation, WriteBufferError>> {
loop {
let sequence_number = next_sequence_number.load(Ordering::SeqCst);
if terminated.load(Ordering::SeqCst) {
return None;
}
// read file
let file_path = path.join(sequence_number.to_string());
let msg = match tokio::fs::read(&file_path).await {
Ok(data) => {
// decode file
let sequence = Sequence {
shard_index,
sequence_number: SequenceNumber::new(sequence_number),
};
match Self::decode_file(data, sequence, trace_collector.clone()) {
Ok(write) => {
match next_sequence_number.compare_exchange(
sequence_number,
sequence_number + 1,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
// can send to output
Ok(write)
}
Err(_) => {
// interleaving change, retry
continue;
}
}
}
Err(e) => Err(e),
}
}
Err(error) => {
match error.kind() {
std::io::ErrorKind::NotFound => {
// figure out watermark and see if there's a gap in the stream
if let Ok(watermark) = watermark(&path).await {
// watermark is "last sequence number + 1", so substract 1 before comparing
if watermark.saturating_sub(1) > sequence_number {
// while generating the watermark, a writer might have created the file that we've
// tried to read, so we need to double-check
if let Err(std::io::ErrorKind::NotFound) =
tokio::fs::metadata(&file_path).await.map_err(|e| e.kind())
{
// update position
// failures are OK here since we'll re-read this value next round
next_sequence_number
.compare_exchange(
sequence_number,
sequence_number + 1,
Ordering::SeqCst,
Ordering::SeqCst,
)
.ok();
continue;
}
} else if sequence_number > watermark {
terminated.store(true, Ordering::SeqCst);
return Some(Err(WriteBufferError::sequence_number_after_watermark(
format!("unknown sequence number, high watermark is {watermark}"),
)));
}
};
// no gap detected, just wait a bit for new data
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
_ => {
// cannot read file => communicate to user
Err(error.into())
}
}
}
};
return Some(msg);
}
}
fn decode_file(
mut data: Vec<u8>,
sequence: Sequence,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<DmlOperation, WriteBufferError> {
let mut headers = [httparse::EMPTY_HEADER; 16];
let status =
httparse::parse_headers(&data, &mut headers).map_err(WriteBufferError::invalid_data)?;
match status {
httparse::Status::Complete((offset, headers)) => {
let iox_headers = IoxHeaders::from_headers(
headers.iter().map(|header| (header.name, header.value)),
trace_collector.as_ref(),
)?;
// parse timestamp
let mut timestamp = None;
for header in headers {
if header.name.eq_ignore_ascii_case(HEADER_TIME) {
if let Ok(value) = String::from_utf8(header.value.to_vec()) {
if let Ok(time) = Time::from_rfc3339(&value) {
timestamp = Some(time);
}
}
}
}
let timestamp = if let Some(timestamp) = timestamp {
timestamp
} else {
return Err("Timestamp missing".to_string().into());
};
// parse entry
let full_data_length = data.len();
let entry_data = data.split_off(offset);
crate::codec::decode(
&entry_data,
iox_headers,
sequence,
timestamp,
full_data_length,
)
}
httparse::Status::Partial => Err("Too many headers".to_string().into()),
}
}
}
impl Stream for ConsumerStream {
type Item = Result<DmlOperation, WriteBufferError>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
match this.fut.poll(cx) {
std::task::Poll::Ready(res) => {
this.fut.set(Self::poll_next_inner(
*this.shard_index,
this.path.clone(),
Arc::clone(this.next_sequence_number),
Arc::clone(this.terminated),
this.trace_collector.clone(),
));
std::task::Poll::Ready(res)
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
async fn maybe_auto_create_directories(
root: &Path,
creation_config: Option<&WriteBufferCreationConfig>,
) -> Result<BTreeMap<ShardIndex, PathBuf>, WriteBufferError> {
loop {
// figure out if a active version exists
let active = root.join("active");
if tokio::fs::metadata(&active).await.is_ok() {
// Scan for directories
let directories = scan_dir(&active, FileType::Dir).await?;
if directories.is_empty() {
return Err("Active configuration has zero shards.".to_string().into());
}
return Ok(directories);
}
// no active config exists
if let Some(creation_config) = creation_config {
// create version directory
let version = root.join("version").join(Uuid::new_v4().to_string());
tokio::fs::create_dir_all(&version).await?;
let mut directories = BTreeMap::new();
for shard_index in 0..creation_config.n_shards.get() {
let shard_path_in_version = version.join(shard_index.to_string());
tokio::fs::create_dir(&shard_path_in_version).await?;
let committed = shard_path_in_version.join("committed");
tokio::fs::create_dir(&committed).await?;
let temp = shard_path_in_version.join("temp");
tokio::fs::create_dir(&temp).await?;
let shard_path_in_active = active.join(shard_index.to_string());
directories.insert(ShardIndex::new(shard_index as i32), shard_path_in_active);
}
// A symlink target is resolved relative to the parent directory of
// the link itself.
let target = version
.strip_prefix(root)
.expect("symlink target not in root workspace");
// symlink active->version
if tokio::fs::symlink(target, active).await.is_ok() {
// linking worked
return Ok(directories);
} else {
// linking did not work, assuming a concurrent initialization
// process. Remove version and and try again.
tokio::fs::remove_dir_all(&version).await?;
continue;
}
} else {
return Err("no file shards initialized".to_string().into());
}
}
}
#[derive(Debug, Clone, Copy)]
enum FileType {
Dir,
File,
}
async fn scan_dir<T>(
dir: &Path,
file_type: FileType,
) -> Result<BTreeMap<T, PathBuf>, WriteBufferError>
where
T: FromStr + Ord + Send,
{
let mut results = BTreeMap::new();
let mut read_dir = tokio::fs::read_dir(dir).await?;
while let Some(dir_entry) = read_dir.next_entry().await? {
let path = dir_entry.path();
let ftype = dir_entry.file_type().await?;
match file_type {
FileType::Dir => {
if !ftype.is_dir() {
return Err(format!("'{}' is not a directory", path.display()).into());
}
}
FileType::File => {
if !ftype.is_file() {
return Err(format!("'{}' is not a file", path.display()).into());
}
}
}
if let Some(shard_index) = path
.file_name()
.and_then(|p| p.to_str())
.and_then(|p| p.parse::<T>().ok())
{
results.insert(shard_index, path);
} else {
return Err(format!("Cannot parse '{}'", path.display()).into());
}
}
Ok(results)
}
async fn watermark(path: &Path) -> Result<i64, WriteBufferError> {
let files = scan_dir::<i64>(path, FileType::File).await?;
let watermark = files.keys().max().map(|n| n + 1).unwrap_or(0);
Ok(watermark)
}
pub mod test_utils {
use std::path::Path;
use data_types::{SequenceNumber, ShardIndex};
/// Remove specific entry from write buffer.
pub async fn remove_entry(
write_buffer_path: &Path,
database_name: &str,
shard_index: ShardIndex,
sequence_number: SequenceNumber,
) {
tokio::fs::remove_file(
write_buffer_path
.join(database_name)
.join("active")
.join(shard_index.to_string())
.join("committed")
.join(sequence_number.get().to_string()),
)
.await
.unwrap()
}
}
#[cfg(test)]
mod tests {
use std::{num::NonZeroU32, time::Duration};
use data_types::PartitionKey;
use dml::test_util::assert_write_op_eq;
use tempfile::TempDir;
use trace::RingBufferTraceCollector;
use crate::core::test_utils::{perform_generic_tests, write, TestAdapter, TestContext};
use super::test_utils::remove_entry;
use super::*;
struct FileTestAdapter {
tempdir: TempDir,
}
impl FileTestAdapter {
fn new() -> Self {
Self {
tempdir: TempDir::new().unwrap(),
}
}
}
#[async_trait]
impl TestAdapter for FileTestAdapter {
type Context = FileTestContext;
async fn new_context_with_time(
&self,
n_shards: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
) -> Self::Context {
FileTestContext {
path: self.tempdir.path().to_path_buf(),
database_name: format!("test_db_{}", Uuid::new_v4()),
n_shards,
time_provider,
trace_collector: Arc::new(RingBufferTraceCollector::new(100)),
}
}
}
struct FileTestContext {
path: PathBuf,
database_name: String,
n_shards: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Arc<RingBufferTraceCollector>,
}
impl FileTestContext {
fn creation_config(&self, value: bool) -> Option<WriteBufferCreationConfig> {
value.then(|| WriteBufferCreationConfig {
n_shards: self.n_shards,
..Default::default()
})
}
}
#[async_trait]
impl TestContext for FileTestContext {
type Writing = FileBufferProducer;
type Reading = FileBufferConsumer;
async fn writing(&self, creation_config: bool) -> Result<Self::Writing, WriteBufferError> {
FileBufferProducer::new(
&self.path,
&self.database_name,
self.creation_config(creation_config).as_ref(),
Arc::clone(&self.time_provider),
)
.await
}
async fn reading(&self, creation_config: bool) -> Result<Self::Reading, WriteBufferError> {
FileBufferConsumer::new(
&self.path,
&self.database_name,
self.creation_config(creation_config).as_ref(),
Some(&(self.trace_collector() as Arc<_>)),
)
.await
}
fn trace_collector(&self) -> Arc<RingBufferTraceCollector> {
Arc::clone(&self.trace_collector)
}
}
#[tokio::test]
async fn test_generic() {
perform_generic_tests(FileTestAdapter::new()).await;
}
#[tokio::test]
async fn test_ignores_missing_files_multi() {
let adapter = FileTestAdapter::new();
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let writer = ctx.writing(true).await.unwrap();
let shard_index = writer.shard_indexes().into_iter().next().unwrap();
let entry_1 = "upc,region=east user=1 100";
let entry_2 = "upc,region=east user=2 200";
let entry_3 = "upc,region=east user=3 300";
let entry_4 = "upc,region=east user=4 400";
let w1 = write(
&writer,
entry_1,
shard_index,
PartitionKey::from("bananas"),
None,
)
.await;
let w2 = write(
&writer,
entry_2,
shard_index,
PartitionKey::from("bananas"),
None,
)
.await;
let w3 = write(
&writer,
entry_3,
shard_index,
PartitionKey::from("bananas"),
None,
)
.await;
let w4 = write(
&writer,
entry_4,
shard_index,
PartitionKey::from("bananas"),
None,
)
.await;
remove_entry(
&ctx.path,
&ctx.database_name,
shard_index,
w2.meta().sequence().unwrap().sequence_number,
)
.await;
remove_entry(
&ctx.path,
&ctx.database_name,
shard_index,
w3.meta().sequence().unwrap().sequence_number,
)
.await;
let reader = ctx.reading(true).await.unwrap();
let mut handler = reader.stream_handler(shard_index).await.unwrap();
let mut stream = handler.stream().await;
assert_write_op_eq(&stream.next().await.unwrap().unwrap(), &w1);
assert_write_op_eq(&stream.next().await.unwrap().unwrap(), &w4);
}
#[tokio::test]
async fn test_ignores_missing_files_single() {
let adapter = FileTestAdapter::new();
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let writer = ctx.writing(true).await.unwrap();
let shard_index = writer.shard_indexes().into_iter().next().unwrap();
let entry_1 = "upc,region=east user=1 100";
let entry_2 = "upc,region=east user=2 200";
let w1 = write(
&writer,
entry_1,
shard_index,
PartitionKey::from("bananas"),
None,
)
.await;
let w2 = write(
&writer,
entry_2,
shard_index,
PartitionKey::from("bananas"),
None,
)
.await;
remove_entry(
&ctx.path,
&ctx.database_name,
shard_index,
w1.meta().sequence().unwrap().sequence_number,
)
.await;
let reader = ctx.reading(true).await.unwrap();
let mut handler = reader.stream_handler(shard_index).await.unwrap();
let mut stream = handler.stream().await;
assert_write_op_eq(&stream.next().await.unwrap().unwrap(), &w2);
}
#[tokio::test]
async fn test_maybe_auto_create_dirs() {
let path = Path::new("./test-file-write-buffer");
let config = WriteBufferCreationConfig::default();
tokio::time::timeout(Duration::from_secs(5), async {
maybe_auto_create_directories(path, Some(&config))
.await
.expect("failed to create new dir");
maybe_auto_create_directories(path, Some(&config))
.await
.expect("failed to use existing dir");
})
.await
.expect("timeout");
tokio::fs::remove_dir_all(path)
.await
.expect("failed to clean up test dir")
}
}

View File

@ -1,364 +0,0 @@
use crate::{config::WriteBufferCreationConfig, core::WriteBufferError};
use std::{collections::BTreeMap, fmt::Display, str::FromStr, time::Duration};
/// Generic client config that is used for consumers, producers as well as admin operations (like
/// "create topic").
#[derive(Debug, PartialEq, Eq)]
pub struct ClientConfig {
/// Client ID.
pub client_id: Option<String>,
/// Maximum message size in bytes.
///
/// extracted from `max_message_size`. Defaults to `None` (rskafka default).
pub max_message_size: Option<usize>,
/// Optional SOCKS5 proxy to use for connecting to the brokers.
///
/// extracted from `socks5_proxy`. Defaults to `None`.
pub socks5_proxy: Option<String>,
}
impl TryFrom<&BTreeMap<String, String>> for ClientConfig {
type Error = WriteBufferError;
fn try_from(cfg: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
Ok(Self {
client_id: parse_key(cfg, "client.id")?,
// TODO: Revert this back to after we have proper prod config management.
// See https://github.com/influxdata/influxdb_iox/issues/3723
//
// max_message_size: parse_key(cfg, "max_message_size")?,
max_message_size: Some(parse_key(cfg, "max_message_size")?.unwrap_or(10485760)),
socks5_proxy: parse_key(cfg, "socks5_proxy")?,
})
}
}
/// Config for topic creation.
#[derive(Debug, PartialEq, Eq)]
pub struct TopicCreationConfig {
/// Number of partitions.
pub num_partitions: i32,
/// Replication factor.
///
/// Extracted from `replication_factor` option. Defaults to `1`.
pub replication_factor: i16,
/// Timeout in ms.
///
/// Extracted from `timeout_ms` option. Defaults to `5_000`.
pub timeout_ms: i32,
}
impl TryFrom<&WriteBufferCreationConfig> for TopicCreationConfig {
type Error = WriteBufferError;
fn try_from(cfg: &WriteBufferCreationConfig) -> Result<Self, Self::Error> {
Ok(Self {
num_partitions: i32::try_from(cfg.n_shards.get())
.map_err(WriteBufferError::invalid_input)?,
replication_factor: parse_key(&cfg.options, "replication_factor")?.unwrap_or(1),
timeout_ms: parse_key(&cfg.options, "timeout_ms")?.unwrap_or(5_000),
})
}
}
/// Config for consumers.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ConsumerConfig {
/// Will wait for at least `min_batch_size` bytes of data
///
/// Extracted from `consumer_max_wait_ms`. Defaults to `None` (rskafka default).
pub max_wait_ms: Option<i32>,
/// The maximum amount of data to fetch in a single batch
///
/// Extracted from `consumer_min_batch_size`. Defaults to `None` (rskafka default).
pub min_batch_size: Option<i32>,
/// The maximum amount of time to wait for data before returning
///
/// Extracted from `consumer_max_batch_size`. Defaults to `None` (rskafka default).
pub max_batch_size: Option<i32>,
}
impl TryFrom<&BTreeMap<String, String>> for ConsumerConfig {
type Error = WriteBufferError;
fn try_from(cfg: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
Ok(Self {
max_wait_ms: parse_key(cfg, "consumer_max_wait_ms")?,
min_batch_size: parse_key(cfg, "consumer_min_batch_size")?,
// TODO: Revert this back to after we have proper prod config management.
// See https://github.com/influxdata/influxdb_iox/issues/3723
//
// max_batch_size: parse_key(cfg, "consumer_max_batch_size")?,
max_batch_size: Some(parse_key(cfg, "consumer_max_batch_size")?.unwrap_or(5242880)),
})
}
}
/// Config for producers.
#[derive(Debug, PartialEq, Eq)]
pub struct ProducerConfig {
/// Linger time.
///
/// Extracted from `producer_linger_ms`. Defaults to `None` (rskafka default).
pub linger: Option<Duration>,
/// Maximum batch size in bytes.
///
/// Extracted from `producer_max_batch_size`. Defaults to `512 * 1024`.
pub max_batch_size: usize,
}
impl TryFrom<&BTreeMap<String, String>> for ProducerConfig {
type Error = WriteBufferError;
fn try_from(cfg: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
let linger_ms: Option<u64> = parse_key(cfg, "producer_linger_ms")?;
Ok(Self {
linger: linger_ms.map(Duration::from_millis),
// TODO: Revert this back to after we have proper prod config management.
// See https://github.com/influxdata/influxdb_iox/issues/3723
//
// max_batch_size: parse_key(cfg, "producer_max_batch_size")?.unwrap_or(512 * 1024),
max_batch_size: parse_key(cfg, "producer_max_batch_size")?.unwrap_or(2621440),
})
}
}
fn parse_key<T>(cfg: &BTreeMap<String, String>, key: &str) -> Result<Option<T>, WriteBufferError>
where
T: FromStr,
T::Err: Display,
{
if let Some(s) = cfg.get(key) {
s.parse()
.map(Some)
.map_err(|e| format!("Cannot parse `{key}` from '{s}': {e}").into())
} else {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, num::NonZeroU32};
use test_helpers::assert_contains;
use super::*;
#[test]
fn test_client_config_default() {
let actual = ClientConfig::try_from(&BTreeMap::default()).unwrap();
let expected = ClientConfig {
client_id: None,
max_message_size: Some(10485760),
socks5_proxy: None,
};
assert_eq!(actual, expected);
}
#[test]
fn test_client_config_parse() {
let actual = ClientConfig::try_from(&BTreeMap::from([
(String::from("client.id"), String::from("my_id")),
(String::from("max_message_size"), String::from("1024")),
(String::from("socks5_proxy"), String::from("my_proxy")),
(String::from("foo"), String::from("bar")),
]))
.unwrap();
let expected = ClientConfig {
client_id: Some(String::from("my_id")),
max_message_size: Some(1024),
socks5_proxy: Some(String::from("my_proxy")),
};
assert_eq!(actual, expected);
}
#[test]
fn test_client_config_error() {
let err = ClientConfig::try_from(&BTreeMap::from([(
String::from("max_message_size"),
String::from("xyz"),
)]))
.unwrap_err();
assert_contains!(
err.to_string(),
"Cannot parse `max_message_size` from 'xyz': invalid digit found in string"
);
}
#[test]
fn test_topic_creation_config_default() {
let actual = TopicCreationConfig::try_from(&WriteBufferCreationConfig {
n_shards: NonZeroU32::new(2).unwrap(),
options: BTreeMap::default(),
})
.unwrap();
let expected = TopicCreationConfig {
num_partitions: 2,
replication_factor: 1,
timeout_ms: 5_000,
};
assert_eq!(actual, expected);
}
#[test]
fn test_topic_creation_config_parse() {
let actual = TopicCreationConfig::try_from(&WriteBufferCreationConfig {
n_shards: NonZeroU32::new(2).unwrap(),
options: BTreeMap::from([
(String::from("replication_factor"), String::from("3")),
(String::from("timeout_ms"), String::from("100")),
(String::from("foo"), String::from("bar")),
]),
})
.unwrap();
let expected = TopicCreationConfig {
num_partitions: 2,
replication_factor: 3,
timeout_ms: 100,
};
assert_eq!(actual, expected);
}
#[test]
fn test_topic_creation_config_err() {
let err = TopicCreationConfig::try_from(&WriteBufferCreationConfig {
n_shards: NonZeroU32::new(2).unwrap(),
options: BTreeMap::from([(String::from("replication_factor"), String::from("xyz"))]),
})
.unwrap_err();
assert_contains!(
err.to_string(),
"Cannot parse `replication_factor` from 'xyz': invalid digit found in string"
);
let err = TopicCreationConfig::try_from(&WriteBufferCreationConfig {
n_shards: NonZeroU32::new(2).unwrap(),
options: BTreeMap::from([(String::from("timeout_ms"), String::from("xyz"))]),
})
.unwrap_err();
assert_contains!(
err.to_string(),
"Cannot parse `timeout_ms` from 'xyz': invalid digit found in string"
);
}
#[test]
fn test_consumer_config_default() {
let actual = ConsumerConfig::try_from(&BTreeMap::default()).unwrap();
let expected = ConsumerConfig {
max_wait_ms: None,
min_batch_size: None,
max_batch_size: Some(5242880),
};
assert_eq!(actual, expected);
}
#[test]
fn test_consumer_config_parse() {
let actual = ConsumerConfig::try_from(&BTreeMap::from([
(String::from("consumer_max_wait_ms"), String::from("11")),
(String::from("consumer_min_batch_size"), String::from("22")),
(String::from("consumer_max_batch_size"), String::from("33")),
(String::from("foo"), String::from("bar")),
]))
.unwrap();
let expected = ConsumerConfig {
max_wait_ms: Some(11),
min_batch_size: Some(22),
max_batch_size: Some(33),
};
assert_eq!(actual, expected);
}
#[test]
fn test_consumer_config_err() {
let err = ConsumerConfig::try_from(&BTreeMap::from([(
String::from("consumer_max_wait_ms"),
String::from("xyz"),
)]))
.unwrap_err();
assert_contains!(
err.to_string(),
"Cannot parse `consumer_max_wait_ms` from 'xyz': invalid digit found in string"
);
let err = ConsumerConfig::try_from(&BTreeMap::from([(
String::from("consumer_min_batch_size"),
String::from("xyz"),
)]))
.unwrap_err();
assert_contains!(
err.to_string(),
"Cannot parse `consumer_min_batch_size` from 'xyz': invalid digit found in string"
);
let err = ConsumerConfig::try_from(&BTreeMap::from([(
String::from("consumer_max_batch_size"),
String::from("xyz"),
)]))
.unwrap_err();
assert_contains!(
err.to_string(),
"Cannot parse `consumer_max_batch_size` from 'xyz': invalid digit found in string"
);
}
#[test]
fn test_producer_config_default() {
let actual = ProducerConfig::try_from(&BTreeMap::default()).unwrap();
let expected = ProducerConfig {
linger: None,
max_batch_size: 2621440,
};
assert_eq!(actual, expected);
}
#[test]
fn test_producer_config_parse() {
let actual = ProducerConfig::try_from(&BTreeMap::from([
(String::from("producer_linger_ms"), String::from("42")),
(
String::from("producer_max_batch_size"),
String::from("1337"),
),
(String::from("foo"), String::from("bar")),
]))
.unwrap();
let expected = ProducerConfig {
linger: Some(Duration::from_millis(42)),
max_batch_size: 1337,
};
assert_eq!(actual, expected);
}
#[test]
fn test_producer_config_err() {
let err = ProducerConfig::try_from(&BTreeMap::from([(
String::from("producer_linger_ms"),
String::from("xyz"),
)]))
.unwrap_err();
assert_contains!(
err.to_string(),
"Cannot parse `producer_linger_ms` from 'xyz': invalid digit found in string"
);
let err = ProducerConfig::try_from(&BTreeMap::from([(
String::from("producer_max_batch_size"),
String::from("xyz"),
)]))
.unwrap_err();
assert_contains!(
err.to_string(),
"Cannot parse `producer_max_batch_size` from 'xyz': invalid digit found in string"
);
}
}

View File

@ -1,297 +0,0 @@
use std::result::Result;
use data_types::ShardIndex;
use futures::future::BoxFuture;
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram, U64Histogram, U64HistogramOptions};
use rskafka::{
client::{partition::Compression, producer::ProducerClient},
record::Record,
};
/// An instrumentation layer that decorates a [`ProducerClient`] implementation,
/// recording the latency distribution and success/error result of the
/// underlying [`ProducerClient::produce()`] call, which includes serialisation
/// & protocol overhead, as well as the actual network I/O.
///
/// Captures the approximate, uncompressed size of the resulting Kafka message's
/// payload wrote to the wire by summing the [`Record::approximate_size()`] of
/// the batch. This value reflects the size of the message before client
/// compression, or broker compression - messages on the wire may be
/// significantly smaller.
///
/// The metrics created by this instrumentation are labelled with the kafka
/// topic & partition specified at initialisation.
#[derive(Debug)]
pub struct KafkaProducerMetrics<P = SystemProvider> {
inner: Box<dyn ProducerClient>,
time_provider: P,
enqueue_success: DurationHistogram,
enqueue_error: DurationHistogram,
msg_size: U64Histogram,
}
impl KafkaProducerMetrics {
/// Decorate the specified [`ProducerClient`] implementation with an
/// instrumentation layer.
pub fn new(
client: Box<dyn ProducerClient>,
kafka_topic_name: String,
shard_index: ShardIndex,
metrics: &metric::Registry,
) -> Self {
let attr = Attributes::from([
("kafka_partition", shard_index.to_string().into()),
("kafka_topic", kafka_topic_name.into()),
]);
// Capture the distribution of message sizes (sum of Record size)
let msg_size = metrics
.register_metric_with_options::<U64Histogram, _>(
"write_buffer_client_payload_size",
"distribution of approximate uncompressed message \
payload size wrote to Kafka",
|| {
U64HistogramOptions::new(
// 512 bytes to 16MiB buckets.
[
512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144,
524288, 1048576, 2097152, 4194304, 8388608, 16777216,
],
)
},
)
.recorder(attr.clone());
let enqueue = metrics.register_metric::<DurationHistogram>(
"write_buffer_client_produce_duration",
"duration of time taken to push a set of records to kafka \
- includes codec, protocol, and network overhead",
);
let enqueue_success = enqueue.recorder({
let mut attr = attr.clone();
attr.insert("result", "success");
attr
});
let enqueue_error = enqueue.recorder({
let mut attr = attr;
attr.insert("result", "error");
attr
});
Self {
inner: client,
time_provider: Default::default(),
enqueue_success,
enqueue_error,
msg_size,
}
}
}
impl<P> KafkaProducerMetrics<P>
where
P: TimeProvider,
{
#[cfg(test)]
fn with_time_provider<T>(self, time_provider: T) -> KafkaProducerMetrics<T> {
KafkaProducerMetrics {
inner: self.inner,
time_provider,
enqueue_error: self.enqueue_error,
enqueue_success: self.enqueue_success,
msg_size: self.msg_size,
}
}
/// Call the inner [`ProducerClient`] implementation and record latency in
/// the appropriate success/error metric depending on the result.
async fn instrument(
&self,
records: Vec<Record>,
compression: Compression,
) -> Result<Vec<i64>, rskafka::client::error::Error> {
// Capture the approximate message size.
self.msg_size
.record(records.iter().map(|v| v.approximate_size() as u64).sum());
let t = self.time_provider.now();
let res = self.inner.produce(records, compression).await;
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self.enqueue_success.record(delta),
Err(_) => self.enqueue_error.record(delta),
}
}
res
}
}
impl<P> rskafka::client::producer::ProducerClient for KafkaProducerMetrics<P>
where
P: TimeProvider,
{
fn produce(
&self,
records: Vec<Record>,
compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, rskafka::client::error::Error>> {
Box::pin(self.instrument(records, compression))
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use iox_time::Time;
use metric::Metric;
use parking_lot::Mutex;
use rskafka::chrono::{self, Utc};
use super::*;
const KAFKA_TOPIC: &str = "bananas";
const SHARD_INDEX: ShardIndex = ShardIndex::new(42);
/// The duration of time the MockProducer::produce() takes to "execute"
const CALL_LATENCY: Duration = Duration::from_secs(1);
#[derive(Debug)]
pub struct MockProducer {
clock: Arc<iox_time::MockProvider>,
ret: Mutex<Option<Result<Vec<i64>, rskafka::client::error::Error>>>,
}
impl MockProducer {
pub fn new(
clock: Arc<iox_time::MockProvider>,
ret: Result<Vec<i64>, rskafka::client::error::Error>,
) -> Self {
Self {
clock,
ret: Mutex::new(Some(ret)),
}
}
}
impl rskafka::client::producer::ProducerClient for MockProducer {
fn produce(
&self,
_records: Vec<Record>,
_compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, rskafka::client::error::Error>> {
// Jump the clock by 1s so the metrics to observe a 1s latency
self.clock.inc(CALL_LATENCY);
// And return the configured response
Box::pin(async { self.ret.lock().take().expect("multiple calls to mock") })
}
}
#[tokio::test]
async fn test_produce_instrumentation_success() {
let clock = Arc::new(iox_time::MockProvider::new(Time::MIN));
let producer = Box::new(MockProducer::new(Arc::clone(&clock), Ok(Vec::default())));
let metrics = metric::Registry::default();
let wrapper =
KafkaProducerMetrics::new(producer, KAFKA_TOPIC.to_string(), SHARD_INDEX, &metrics)
.with_time_provider(Arc::clone(&clock));
let record = Record {
key: Some("bananas".into()),
value: None,
headers: Default::default(),
timestamp: chrono::DateTime::<Utc>::MIN_UTC,
};
wrapper
.produce(vec![record.clone()], Compression::Zstd)
.await
.expect("produce call should succeed");
// Ensure the latency was correctly recorded.
let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("write_buffer_client_produce_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_topic", KAFKA_TOPIC),
("kafka_partition", "42"),
("result", "success"),
]))
.expect("failed to get observer")
.fetch();
assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, CALL_LATENCY);
// Ensure the size was captured
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("write_buffer_client_payload_size")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_topic", KAFKA_TOPIC),
("kafka_partition", "42"),
]))
.expect("failed to get observer")
.fetch();
assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, record.approximate_size() as u64);
}
#[tokio::test]
async fn test_produce_instrumentation_error() {
let clock = Arc::new(iox_time::MockProvider::new(Time::MIN));
let producer = Box::new(MockProducer::new(
Arc::clone(&clock),
Err(rskafka::client::error::Error::InvalidResponse(
"bananas".to_string(),
)),
));
let metrics = metric::Registry::default();
let wrapper =
KafkaProducerMetrics::new(producer, KAFKA_TOPIC.to_string(), SHARD_INDEX, &metrics)
.with_time_provider(Arc::clone(&clock));
wrapper
.produce(Vec::new(), Compression::Zstd)
.await
.expect_err("produce call should fail");
// Ensure the latency was correctly recorded.
let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("write_buffer_client_produce_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_topic", KAFKA_TOPIC),
("kafka_partition", "42"),
("result", "error"),
]))
.expect("failed to get observer")
.fetch();
assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, CALL_LATENCY);
// Ensure the size was captured
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("write_buffer_client_payload_size")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_topic", KAFKA_TOPIC),
("kafka_partition", "42"),
]))
.expect("failed to get observer")
.fetch();
assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, 0);
}
}

View File

@ -1,865 +0,0 @@
use self::{
config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig},
instrumentation::KafkaProducerMetrics,
record_aggregator::RecordAggregator,
};
use crate::{
codec::IoxHeaders,
config::WriteBufferCreationConfig,
core::{
WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler,
WriteBufferWriting,
},
};
use async_trait::async_trait;
use data_types::{Sequence, SequenceNumber, ShardIndex};
use dml::{DmlMeta, DmlOperation};
use futures::{
stream::{self, BoxStream},
StreamExt, TryStreamExt,
};
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::warn;
use parking_lot::Mutex;
use rskafka::{
client::{
consumer::{StartOffset, StreamConsumerBuilder},
error::{Error as RSKafkaError, ProtocolError},
partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling},
producer::{BatchProducer, BatchProducerBuilder},
ClientBuilder,
},
record::RecordAndOffset,
};
use std::{
collections::{BTreeMap, BTreeSet},
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use trace::TraceCollector;
mod config;
mod instrumentation;
mod record_aggregator;
/// Maximum number of jobs buffered and decoded concurrently.
const CONCURRENT_DECODE_JOBS: usize = 10;
type Result<T, E = WriteBufferError> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct RSKafkaProducer {
producers: BTreeMap<ShardIndex, BatchProducer<RecordAggregator>>,
}
impl RSKafkaProducer {
#[allow(clippy::too_many_arguments)]
pub async fn new<'a>(
conn: String,
topic_name: String,
connection_config: &'a BTreeMap<String, String>,
time_provider: Arc<dyn TimeProvider>,
creation_config: Option<&'a WriteBufferCreationConfig>,
partitions: Option<Range<i32>>,
_trace_collector: Option<Arc<dyn TraceCollector>>,
metric_registry: &'a metric::Registry,
) -> Result<Self> {
let partition_clients = setup_topic(
conn,
topic_name.clone(),
connection_config,
creation_config,
partitions,
)
.await?;
let producer_config = ProducerConfig::try_from(connection_config)?;
let producers = partition_clients
.into_iter()
.map(|(shard_index, partition_client)| {
// Instrument this kafka partition client.
let partition_client = KafkaProducerMetrics::new(
Box::new(partition_client),
topic_name.clone(),
shard_index,
metric_registry,
);
let mut producer_builder =
BatchProducerBuilder::new_with_client(Arc::new(partition_client))
.with_compression(Compression::Zstd);
if let Some(linger) = producer_config.linger {
producer_builder = producer_builder.with_linger(linger);
}
let producer = producer_builder.build(RecordAggregator::new(
shard_index,
producer_config.max_batch_size,
Arc::clone(&time_provider),
));
(shard_index, producer)
})
.collect();
Ok(Self { producers })
}
}
#[async_trait]
impl WriteBufferWriting for RSKafkaProducer {
fn shard_indexes(&self) -> BTreeSet<ShardIndex> {
self.producers.keys().copied().collect()
}
async fn store_operation(
&self,
shard_index: ShardIndex,
operation: DmlOperation,
) -> Result<DmlMeta, WriteBufferError> {
let producer = self
.producers
.get(&shard_index)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown shard index: {shard_index}").into()
})?;
Ok(producer.produce(operation).await?)
}
async fn flush(&self) -> Result<(), WriteBufferError> {
for producer in self.producers.values() {
producer.flush().await?;
}
Ok(())
}
fn type_name(&self) -> &'static str {
"kafka"
}
}
#[derive(Debug)]
pub struct RSKafkaStreamHandler {
partition_client: Arc<PartitionClient>,
next_offset: Arc<Mutex<Option<i64>>>,
terminated: Arc<AtomicBool>,
trace_collector: Option<Arc<dyn TraceCollector>>,
consumer_config: ConsumerConfig,
shard_index: ShardIndex,
}
/// Launch a tokio task that attempts to decode a DmlOperation from a
/// record.
///
/// Returns the offset (if a record was read successfully) and the
/// result of decoding. Note that `Some(offset)` is returned even if
/// there is an error decoding the data in the record, but not if
/// there was an error reading the record in the first place.
async fn try_decode(
record: Result<RecordAndOffset, WriteBufferError>,
shard_index: ShardIndex,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> (Option<i64>, Result<DmlOperation, WriteBufferError>) {
let offset = match &record {
Ok(record) => Some(record.offset),
Err(_) => None,
};
// launch a task to try and do the decode (which is CPU intensive)
// in parallel
let result = tokio::task::spawn(async move {
let record = record?;
let kafka_read_size = record.record.approximate_size();
let headers = IoxHeaders::from_headers(record.record.headers, trace_collector.as_ref())?;
let sequence = Sequence {
shard_index,
sequence_number: SequenceNumber::new(record.offset),
};
let timestamp = Time::from_date_time(record.record.timestamp);
let value = record
.record
.value
.ok_or_else::<WriteBufferError, _>(|| "Value missing".to_string().into())?;
crate::codec::decode(&value, headers, sequence, timestamp, kafka_read_size)
})
.await;
// Convert panics in the task to WriteBufferErrors
let dml_result = match result {
Err(e) => {
warn!(%e, "Decode panic");
// Was a join error (aka the task panic'd()
Err(WriteBufferError::unknown(e))
}
// normal error in the task, use that
Ok(res) => res,
};
(offset, dml_result)
}
#[async_trait]
impl WriteBufferStreamHandler for RSKafkaStreamHandler {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
if self.terminated.load(Ordering::SeqCst) {
return futures::stream::empty().boxed();
}
let trace_collector = self.trace_collector.clone();
let next_offset = Arc::clone(&self.next_offset);
let terminated = Arc::clone(&self.terminated);
let start_offset: Option<i64> = {
// need to trick a bit to make this async function `Send`
*next_offset.lock()
};
let start_offset = match start_offset {
Some(x) => StartOffset::At(x),
None => StartOffset::Earliest,
};
let mut stream_builder =
StreamConsumerBuilder::new(Arc::clone(&self.partition_client), start_offset);
if let Some(max_wait_ms) = self.consumer_config.max_wait_ms {
stream_builder = stream_builder.with_max_wait_ms(max_wait_ms);
}
if let Some(min_batch_size) = self.consumer_config.min_batch_size {
stream_builder = stream_builder.with_min_batch_size(min_batch_size);
}
if let Some(max_batch_size) = self.consumer_config.max_batch_size {
stream_builder = stream_builder.with_max_batch_size(max_batch_size);
}
let stream = stream_builder.build();
let shard_index = self.shard_index;
// Use buffered streams to pipeline the reading of a message from kafka from with its
// decoding.
//
// ┌─────┬──────┬──────┬─────┬──────┬──────┬─────┬──────┬──────┐
// │ Read│ Read │ Read │ Read│ Read │ Read │ Read│ Read │ Read │
// │Kafka│Kafka │Kafka │Kafka│Kafka │Kafka │Kafka│Kafka │Kafka │
// │ │ │ │ │ │ │ │ │ │
// └─────┴──────┴──────┴─────┴──────┴──────┴─────┴──────┴──────┘
//
// ┌──────────────────┐
// │ │
// │ Decode │
// │ │
// └──────────────────┘
// ... up to 10 ..
// ┌──────────────────┐
// │ │
// │ Decode │
// │ │
// └──────────────────┘
//
// ─────────────────────────────────────────────────────────────────────────▶ Time
// this stream reads `RecordAndOffset` from kafka
let stream = stream.map(move |res| {
let (record, _watermark) = match res {
Ok(x) => x,
Err(e) => {
terminated.store(true, Ordering::SeqCst);
let kind = match e {
RSKafkaError::ServerError {
protocol_error: ProtocolError::OffsetOutOfRange,
// NOTE: the high watermark included in this
// response is always -1 when reading before/after
// valid offsets.
..
} => WriteBufferErrorKind::SequenceNumberNoLongerExists,
_ => WriteBufferErrorKind::Unknown,
};
return Err(WriteBufferError::new(kind, e));
}
};
Ok(record)
});
// Now decode the records in a second, parallel step by making
// a stream of futures and [`FuturesExt::buffered`].
let stream = stream
.map(move |record| {
// appease borrow checker
let trace_collector = trace_collector.clone();
try_decode(record, shard_index, trace_collector)
})
// the decode jobs in parallel
// (`buffered` does NOT reorder, so the API user still gets an ordered stream)
.buffered(CONCURRENT_DECODE_JOBS)
.map(move |(offset, dml_result)| {
// but only update the offset when a decoded recorded
// is actually returned to the consumer of the stream
// (not when it was decoded or when it was read from
// kafka). This is to ensure that if a new stream is
// created, we do not lose records that were never
// consumed.
//
// Note that we update the offset as long as a record was
// read (even if there was an error decoding) so we don't
// get stuck on invalid records
if let Some(offset) = offset {
*next_offset.lock() = Some(offset + 1);
}
dml_result
});
stream.boxed()
}
async fn seek(&mut self, sequence_number: SequenceNumber) -> Result<(), WriteBufferError> {
let offset = sequence_number.get();
let current = self.partition_client.get_offset(OffsetAt::Latest).await?;
if offset > current {
return Err(WriteBufferError::sequence_number_after_watermark(format!(
"attempted to seek to offset {offset}, but current high \
watermark for partition {p} is {current}",
p = self.shard_index
)));
}
*self.next_offset.lock() = Some(offset);
self.terminated.store(false, Ordering::SeqCst);
Ok(())
}
fn reset_to_earliest(&mut self) {
*self.next_offset.lock() = None;
self.terminated.store(false, Ordering::SeqCst);
}
}
#[derive(Debug)]
pub struct RSKafkaConsumer {
partition_clients: BTreeMap<ShardIndex, Arc<PartitionClient>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
consumer_config: ConsumerConfig,
}
impl RSKafkaConsumer {
pub async fn new(
conn: String,
topic_name: String,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
partitions: Option<Range<i32>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Self> {
let partition_clients = setup_topic(
conn,
topic_name.clone(),
connection_config,
creation_config,
partitions,
)
.await?;
let partition_clients = partition_clients
.into_iter()
.map(|(k, v)| (k, Arc::new(v)))
.collect();
Ok(Self {
partition_clients,
trace_collector,
consumer_config: ConsumerConfig::try_from(connection_config)?,
})
}
}
#[async_trait]
impl WriteBufferReading for RSKafkaConsumer {
fn shard_indexes(&self) -> BTreeSet<ShardIndex> {
self.partition_clients.keys().copied().collect()
}
async fn stream_handler(
&self,
shard_index: ShardIndex,
) -> Result<Box<dyn WriteBufferStreamHandler>, WriteBufferError> {
let partition_client = self
.partition_clients
.get(&shard_index)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown shard index: {shard_index}").into()
})?;
Ok(Box::new(RSKafkaStreamHandler {
partition_client: Arc::clone(partition_client),
next_offset: Arc::new(Mutex::new(None)),
terminated: Arc::new(AtomicBool::new(false)),
trace_collector: self.trace_collector.clone(),
consumer_config: self.consumer_config.clone(),
shard_index,
}))
}
async fn fetch_high_watermark(
&self,
shard_index: ShardIndex,
) -> Result<SequenceNumber, WriteBufferError> {
let partition_client = self
.partition_clients
.get(&shard_index)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown shard index: {shard_index}").into()
})?;
let watermark = partition_client.get_offset(OffsetAt::Latest).await?;
Ok(SequenceNumber::new(watermark))
}
fn type_name(&self) -> &'static str {
"kafka"
}
}
async fn setup_topic(
conn: String,
topic_name: String,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
partitions: Option<Range<i32>>,
) -> Result<BTreeMap<ShardIndex, PartitionClient>> {
let client_config = ClientConfig::try_from(connection_config)?;
let mut client_builder =
ClientBuilder::new(conn.split(',').map(|s| s.trim().to_owned()).collect());
if let Some(client_id) = client_config.client_id {
client_builder = client_builder.client_id(client_id);
}
if let Some(max_message_size) = client_config.max_message_size {
client_builder = client_builder.max_message_size(max_message_size);
}
if let Some(sock5_proxy) = client_config.socks5_proxy {
client_builder = client_builder.socks5_proxy(sock5_proxy);
}
let client = client_builder.build().await?;
let controller_client = client.controller_client()?;
loop {
// check if topic already exists
let topics = client.list_topics().await?;
if let Some(topic) = topics.into_iter().find(|t| t.name == topic_name) {
// Instantiate 10 partition clients concurrently until all are ready
// speed up server init.
let client_ref = &client;
let clients = stream::iter(
topic
.partitions
.into_iter()
.filter(|p| {
partitions
.as_ref()
.map(|want| want.contains(p))
.unwrap_or(true)
})
.map(|p| {
let topic_name = topic_name.clone();
async move {
let shard_index = ShardIndex::new(p);
let c = client_ref
.partition_client(&topic_name, p, UnknownTopicHandling::Error)
.await?;
Result::<_, WriteBufferError>::Ok((shard_index, c))
}
}),
)
.buffer_unordered(10)
.try_collect::<BTreeMap<_, _>>()
.await?;
if let Some(p) = partitions {
assert_eq!(
p.len(),
clients.len(),
"requested partition clients not initialised"
);
}
return Ok(clients);
}
// create topic
if let Some(creation_config) = creation_config {
let topic_creation_config = TopicCreationConfig::try_from(creation_config)?;
match controller_client
.create_topic(
&topic_name,
topic_creation_config.num_partitions,
topic_creation_config.replication_factor,
topic_creation_config.timeout_ms,
)
.await
{
Ok(_) => {}
// race condition between check and creation action, that's OK
Err(RSKafkaError::ServerError {
protocol_error: ProtocolError::TopicAlreadyExists,
..
}) => {}
Err(e) => {
return Err(e.into());
}
}
} else {
return Err("no partitions found and auto-creation not requested"
.to_string()
.into());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
core::test_utils::{
assert_span_context_eq_or_linked, lp_to_batches, perform_generic_tests,
random_topic_name, set_pop_first, TestAdapter, TestContext,
},
maybe_skip_kafka_integration,
};
use data_types::{DeletePredicate, NamespaceId, PartitionKey, TimestampRange};
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
use futures::{stream::FuturesUnordered, TryStreamExt};
use iox_time::TimeProvider;
use rskafka::{client::partition::Compression, record::Record};
use std::num::NonZeroU32;
use test_helpers::assert_contains;
use trace::{ctx::SpanContext, RingBufferTraceCollector};
struct RSKafkaTestAdapter {
conn: String,
}
impl RSKafkaTestAdapter {
fn new(conn: String) -> Self {
Self { conn }
}
}
#[async_trait]
impl TestAdapter for RSKafkaTestAdapter {
type Context = RSKafkaTestContext;
async fn new_context_with_time(
&self,
n_shards: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
) -> Self::Context {
RSKafkaTestContext {
conn: self.conn.clone(),
topic_name: random_topic_name(),
n_shards,
time_provider,
trace_collector: Arc::new(RingBufferTraceCollector::new(100)),
metrics: metric::Registry::default(),
}
}
}
struct RSKafkaTestContext {
conn: String,
topic_name: String,
n_shards: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Arc<RingBufferTraceCollector>,
metrics: metric::Registry,
}
impl RSKafkaTestContext {
fn creation_config(&self, value: bool) -> Option<WriteBufferCreationConfig> {
value.then(|| WriteBufferCreationConfig {
n_shards: self.n_shards,
..Default::default()
})
}
#[allow(dead_code)]
fn metrics(&self) -> &metric::Registry {
&self.metrics
}
}
#[async_trait]
impl TestContext for RSKafkaTestContext {
type Writing = RSKafkaProducer;
type Reading = RSKafkaConsumer;
async fn writing(&self, creation_config: bool) -> Result<Self::Writing, WriteBufferError> {
RSKafkaProducer::new(
self.conn.clone(),
self.topic_name.clone(),
&BTreeMap::default(),
Arc::clone(&self.time_provider),
self.creation_config(creation_config).as_ref(),
None,
Some(self.trace_collector() as Arc<_>),
&self.metrics,
)
.await
}
async fn reading(&self, creation_config: bool) -> Result<Self::Reading, WriteBufferError> {
RSKafkaConsumer::new(
self.conn.clone(),
self.topic_name.clone(),
&BTreeMap::default(),
self.creation_config(creation_config).as_ref(),
None,
Some(self.trace_collector() as Arc<_>),
)
.await
}
fn trace_collector(&self) -> Arc<RingBufferTraceCollector> {
Arc::clone(&self.trace_collector)
}
}
#[tokio::test]
async fn test_generic() {
let conn = maybe_skip_kafka_integration!();
perform_generic_tests(RSKafkaTestAdapter::new(conn)).await;
}
#[tokio::test]
async fn test_setup_topic_race() {
let conn = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_shards = NonZeroU32::new(2).unwrap();
let mut jobs: FuturesUnordered<_> = (0..10)
.map(|_| {
let conn = conn.clone();
let topic_name = topic_name.clone();
tokio::spawn(async move {
setup_topic(
conn,
topic_name,
&BTreeMap::default(),
Some(&WriteBufferCreationConfig {
n_shards,
..Default::default()
}),
None,
)
.await
.unwrap();
})
})
.collect();
while jobs.try_next().await.unwrap().is_some() {}
}
#[tokio::test]
async fn test_offset_after_broken_message() {
let conn = maybe_skip_kafka_integration!();
let adapter = RSKafkaTestAdapter::new(conn.clone());
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let producer = ctx.writing(true).await.unwrap();
// write broken message followed by a real one
let shard_index = set_pop_first(&mut producer.shard_indexes()).unwrap();
ClientBuilder::new(vec![conn])
.build()
.await
.unwrap()
.partition_client(
ctx.topic_name.clone(),
shard_index.get(),
UnknownTopicHandling::Retry,
)
.await
.unwrap()
.produce(
vec![Record {
key: None,
value: None,
headers: Default::default(),
timestamp: rskafka::chrono::Utc::now(),
}],
Compression::Zstd,
)
.await
.unwrap();
let w = crate::core::test_utils::write(
&producer,
"table foo=1 1",
shard_index,
"bananas".into(),
None,
)
.await;
let consumer = ctx.reading(true).await.unwrap();
let mut handler = consumer.stream_handler(shard_index).await.unwrap();
// read broken message from stream
let mut stream = handler.stream().await;
let err = stream.next().await.unwrap().unwrap_err();
assert_contains!(err.to_string(), "No content type header");
// re-creating the stream should advance past the broken message
drop(stream);
let mut stream = handler.stream().await;
let op = stream.next().await.unwrap().unwrap();
assert_write_op_eq(&op, &w);
}
#[tokio::test]
async fn test_batching() {
let conn = maybe_skip_kafka_integration!();
let adapter = RSKafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let trace_collector = ctx.trace_collector();
let producer = ctx.writing(true).await.unwrap();
let shard_index = set_pop_first(&mut producer.shard_indexes()).unwrap();
let (w1_1, w1_2, w2_1, d1_1, d1_2, w1_3, w1_4, w2_2) = tokio::join!(
// ns1: batch 1
write(&producer, &trace_collector, shard_index, "bananas"),
write(&producer, &trace_collector, shard_index, "bananas"),
// ns2: batch 1, part A
write(&producer, &trace_collector, shard_index, "bananas"),
// ns1: batch 2
delete(&producer, &trace_collector, shard_index),
// ns1: batch 3
delete(&producer, &trace_collector, shard_index),
// ns1: batch 4
write(&producer, &trace_collector, shard_index, "bananas"),
write(&producer, &trace_collector, shard_index, "bananas"),
// ns2: batch 1, part B
write(&producer, &trace_collector, shard_index, "bananas"),
);
// ensure that write operations were NOT fused
assert_ne!(w1_1.sequence().unwrap(), w1_2.sequence().unwrap());
assert_ne!(w1_2.sequence().unwrap(), d1_1.sequence().unwrap());
assert_ne!(d1_1.sequence().unwrap(), d1_2.sequence().unwrap());
assert_ne!(d1_2.sequence().unwrap(), w1_3.sequence().unwrap());
assert_ne!(w1_3.sequence().unwrap(), w1_4.sequence().unwrap());
assert_ne!(w1_4.sequence().unwrap(), w1_1.sequence().unwrap());
assert_ne!(w2_1.sequence().unwrap(), w1_1.sequence().unwrap());
assert_ne!(w2_1.sequence().unwrap(), w2_2.sequence().unwrap());
let consumer = ctx.reading(true).await.unwrap();
let mut handler = consumer.stream_handler(shard_index).await.unwrap();
let mut stream = handler.stream().await;
// get output, note that the write operations were NOT fused
let op_w1_1 = stream.next().await.unwrap().unwrap();
let op_w1_2 = stream.next().await.unwrap().unwrap();
let op_w2_1 = stream.next().await.unwrap().unwrap();
let op_d1_1 = stream.next().await.unwrap().unwrap();
let op_d1_2 = stream.next().await.unwrap().unwrap();
let op_w1_3 = stream.next().await.unwrap().unwrap();
let op_w1_4 = stream.next().await.unwrap().unwrap();
let op_w2_2 = stream.next().await.unwrap().unwrap();
// ensure that sequence numbers map as expected
assert_eq!(op_w1_1.meta().sequence().unwrap(), w1_1.sequence().unwrap(),);
assert_eq!(op_w1_2.meta().sequence().unwrap(), w1_2.sequence().unwrap(),);
assert_eq!(op_d1_1.meta().sequence().unwrap(), d1_1.sequence().unwrap(),);
assert_eq!(op_d1_2.meta().sequence().unwrap(), d1_2.sequence().unwrap(),);
assert_eq!(op_w1_3.meta().sequence().unwrap(), w1_3.sequence().unwrap(),);
assert_eq!(op_w1_4.meta().sequence().unwrap(), w1_4.sequence().unwrap(),);
assert_eq!(op_w2_1.meta().sequence().unwrap(), w2_1.sequence().unwrap(),);
assert_eq!(op_w2_2.meta().sequence().unwrap(), w2_2.sequence().unwrap(),);
// check tracing span links
assert_span_context_eq_or_linked(
w1_1.span_context().unwrap(),
op_w1_1.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w1_2.span_context().unwrap(),
op_w1_2.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
d1_1.span_context().unwrap(),
op_d1_1.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
d1_2.span_context().unwrap(),
op_d1_2.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w1_3.span_context().unwrap(),
op_w1_3.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w1_4.span_context().unwrap(),
op_w1_4.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w2_1.span_context().unwrap(),
op_w2_1.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w2_2.span_context().unwrap(),
op_w2_2.meta().span_context().unwrap(),
trace_collector.spans(),
);
}
async fn write(
producer: &RSKafkaProducer,
trace_collector: &Arc<RingBufferTraceCollector>,
shard_index: ShardIndex,
partition_key: impl Into<PartitionKey> + Send,
) -> DmlMeta {
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
let tables = lp_to_batches("table foo=1");
let write = DmlWrite::new(
NamespaceId::new(42),
tables,
partition_key.into(),
DmlMeta::unsequenced(Some(span_ctx)),
);
let op = DmlOperation::Write(write);
producer.store_operation(shard_index, op).await.unwrap()
}
async fn delete(
producer: &RSKafkaProducer,
trace_collector: &Arc<RingBufferTraceCollector>,
shard_index: ShardIndex,
) -> DmlMeta {
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
let op = DmlOperation::Delete(DmlDelete::new(
NamespaceId::new(42),
DeletePredicate {
range: TimestampRange::new(0, 1),
exprs: vec![],
},
None,
DmlMeta::unsequenced(Some(span_ctx)),
));
producer.store_operation(shard_index, op).await.unwrap()
}
}

View File

@ -1,311 +0,0 @@
use std::sync::Arc;
use data_types::{Sequence, SequenceNumber, ShardIndex};
use dml::{DmlMeta, DmlOperation};
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::warn;
use rskafka::{
client::producer::aggregator::{
Aggregator, Error, RecordAggregator as RecordAggregatorDelegate,
RecordAggregatorStatusDeaggregator, StatusDeaggregator, TryPush,
},
record::Record,
};
use trace::ctx::SpanContext;
use crate::codec::{ContentType, IoxHeaders};
/// The [`Tag`] is a data-carrying token identifier used to de-aggregate
/// responses from a batch aggregated of requests using the
/// [`DmlMetaDeaggregator`].
#[derive(Debug)]
pub struct Tag {
/// The tag into the batch returned by the
/// [`RecordAggregatorDelegate::try_push()`] call.
idx: usize,
/// The timestamp assigned to the resulting Kafka [`Record`].
timestamp: Time,
/// A span extracted from the original [`DmlOperation`].
span_ctx: Option<SpanContext>,
/// The approximate byte size of the serialised [`Record`], as calculated by
/// [`Record::approximate_size()`].
approx_kafka_write_size: usize,
}
/// A [`RecordAggregator`] implements [rskafka]'s abstract [`Aggregator`]
/// behaviour to provide batching of requests for a single Kafka partition.
///
/// Specifically the [`RecordAggregator`] maps [`DmlOperation`] instances to
/// Kafka [`Record`] instances, and delegates the batching to the
/// [`RecordAggregatorDelegate`] implementation maintained within [rskafka]
/// itself.
///
/// [rskafka]: https://github.com/influxdata/rskafka
#[derive(Debug)]
pub struct RecordAggregator {
time_provider: Arc<dyn TimeProvider>,
/// The shard index (Kafka partition number) this aggregator batches ops for (from Kafka,
/// not the catalog).
shard_index: ShardIndex,
/// The underlying record aggregator the non-IOx-specific batching is
/// delegated to.
aggregator: RecordAggregatorDelegate,
}
impl RecordAggregator {
/// Initialise a new [`RecordAggregator`] to aggregate up to
/// `max_batch_size` number of bytes per message.
pub fn new(
shard_index: ShardIndex,
max_batch_size: usize,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self {
shard_index,
aggregator: RecordAggregatorDelegate::new(max_batch_size),
time_provider,
}
}
}
impl RecordAggregator {
/// Serialise the [`DmlOperation`] destined for the specified `db_name` into a
/// [`Record`], returning the producer timestamp assigned to it.
fn to_record(&self, op: &DmlOperation) -> Result<(Record, Time), Error> {
let now = op
.meta()
.producer_ts()
.unwrap_or_else(|| self.time_provider.now());
let headers = IoxHeaders::new(ContentType::Protobuf, op.meta().span_context().cloned());
let mut buf = Vec::new();
crate::codec::encode_operation(op, &mut buf)?;
buf.shrink_to_fit();
let record = Record {
key: None,
value: Some(buf),
headers: headers
.headers()
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec()))
.collect(),
timestamp: now.date_time(),
};
Ok((record, now))
}
}
impl Aggregator for RecordAggregator {
type Input = DmlOperation;
type Tag = <DmlMetaDeaggregator as StatusDeaggregator>::Tag;
type StatusDeaggregator = DmlMetaDeaggregator;
/// Callers should retain the returned [`Tag`] in order to de-aggregate the
/// [`DmlMeta`] from the request response.
fn try_push(&mut self, op: Self::Input) -> Result<TryPush<Self::Input, Self::Tag>, Error> {
// Encode the DML op to a Record
let (record, timestamp) = self.to_record(&op)?;
// Capture various metadata necessary to construct the Tag/DmlMeta for
// the caller once a batch has been flushed.
let span_ctx = op.meta().span_context().cloned();
let approx_kafka_write_size = record.approximate_size();
// And delegate batching to rskafka's RecordAggregator implementation
Ok(match self.aggregator.try_push(record)? {
// NoCapacity returns the original input to the caller when the
// batching fails.
//
// The RecordBatcher delegate is returning the Record encoded from
// op above, but the caller of this fn is expecting the original op.
//
// Map to the original input op this fn was called with, discarding
// the encoded Record.
TryPush::NoCapacity(_) => {
// Log a warning if this occurs - this allows an operator to
// increase the maximum Kafka message size, or lower the linger
// time to minimise latency while still producing large enough
// batches for it to be worth while.
warn!("aggregated batch reached maximum capacity");
TryPush::NoCapacity(op)
}
// A successful delegate aggregation returns the tag for offset
// de-aggregation later. For simplicity, the tag this layer returns
// also carries the various (small) metadata elements needed to
// construct the DmlMeta at the point of de-aggregation.
TryPush::Aggregated(idx) => TryPush::Aggregated(Tag {
idx,
timestamp,
span_ctx,
approx_kafka_write_size,
}),
})
}
fn flush(&mut self) -> Result<(Vec<Record>, Self::StatusDeaggregator), Error> {
let records = self.aggregator.flush()?.0;
Ok((records, DmlMetaDeaggregator::new(self.shard_index)))
}
}
/// The de-aggregation half of the [`RecordAggregator`], this type consumes the
/// caller's [`Tag`] obtained from the aggregator to return the corresponding
/// [`DmlMeta`] from the batched response.
///
/// The [`DmlMetaDeaggregator`] is a stateless wrapper over the (also stateless)
/// [`RecordAggregatorStatusDeaggregator`] delegate, with most of the metadata
/// elements carried in the [`Tag`] itself.
#[derive(Debug)]
pub struct DmlMetaDeaggregator {
shard_index: ShardIndex,
}
impl DmlMetaDeaggregator {
pub fn new(shard_index: ShardIndex) -> Self {
Self { shard_index }
}
}
impl StatusDeaggregator for DmlMetaDeaggregator {
type Status = DmlMeta;
type Tag = Tag;
fn deaggregate(&self, input: &[i64], tag: Self::Tag) -> Result<Self::Status, Error> {
// Delegate de-aggregation to the (stateless) record batch
// de-aggregator for forwards compatibility.
let offset = RecordAggregatorStatusDeaggregator::default()
.deaggregate(input, tag.idx)
.expect("invalid de-aggregation index");
Ok(DmlMeta::sequenced(
Sequence::new(self.shard_index, SequenceNumber::new(offset)),
tag.timestamp,
tag.span_ctx,
tag.approx_kafka_write_size,
))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::{NamespaceId, TableId};
use dml::DmlWrite;
use hashbrown::HashMap;
use iox_time::MockProvider;
use mutable_batch::{writer::Writer, MutableBatch};
use trace::LogTraceCollector;
use crate::codec::{CONTENT_TYPE_PROTOBUF, HEADER_CONTENT_TYPE, HEADER_TRACE_CONTEXT};
use super::*;
const SHARD_INDEX: ShardIndex = ShardIndex::new(42);
const TIMESTAMP_MILLIS: i64 = 1659990497000;
fn test_op() -> DmlOperation {
let mut batch = MutableBatch::new();
let mut writer = Writer::new(&mut batch, 1);
writer
// Date: "1970-01-01"
.write_time("time", [42].into_iter())
.unwrap();
writer
.write_i64("A", Some(&[0b00000001]), [1].into_iter())
.unwrap();
writer.commit();
let mut m = HashMap::default();
m.insert(TableId::new(24), batch);
let span = SpanContext::new(Arc::new(LogTraceCollector::new()));
DmlOperation::Write(DmlWrite::new(
NamespaceId::new(42),
m,
"1970-01-01".into(),
DmlMeta::unsequenced(Some(span)),
))
}
#[test]
fn test_record_aggregate() {
let clock = Arc::new(MockProvider::new(
Time::from_timestamp_millis(TIMESTAMP_MILLIS).unwrap(),
));
let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MAX, clock);
let write = test_op();
let res = agg.try_push(write).expect("aggregate call should succeed");
let tag = match res {
TryPush::NoCapacity(_) => panic!("unexpected no capacity"),
TryPush::Aggregated(tag) => tag,
};
// Flush the aggregator to acquire the records
let (records, deagg) = agg.flush().expect("should flush");
assert_eq!(records.len(), 1);
// Another flush should not yield the same records
let (records2, _) = agg.flush().expect("should flush");
assert!(records2.is_empty());
// Assert properties of the resulting record
let record = records[0].clone();
assert_eq!(record.key, None);
assert!(record.value.is_some());
assert_eq!(
*record
.headers
.get(HEADER_CONTENT_TYPE)
.expect("no content type"),
Vec::<u8>::from(CONTENT_TYPE_PROTOBUF),
);
assert!(record.headers.get(HEADER_TRACE_CONTEXT).is_some());
assert_eq!(record.timestamp.timestamp(), 1659990497);
// Extract the DmlMeta from the de-aggregator
let got = deagg
.deaggregate(&[4242], tag)
.expect("de-aggregate should succeed");
// Assert the metadata properties
assert!(got.span_context().is_some());
assert_eq!(
*got.sequence().expect("should be sequenced"),
Sequence::new(SHARD_INDEX, SequenceNumber::new(4242))
);
assert_eq!(
got.producer_ts().expect("no producer timestamp"),
Time::from_timestamp_millis(TIMESTAMP_MILLIS).unwrap(),
);
assert_eq!(
got.bytes_read().expect("no approx size"),
record.approximate_size()
);
}
#[test]
fn test_record_aggregate_no_capacity() {
let clock = Arc::new(MockProvider::new(
Time::from_timestamp_millis(TIMESTAMP_MILLIS).unwrap(),
));
let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MIN, clock);
let write = test_op();
let res = agg
.try_push(write.clone())
.expect("aggregate call should succeed");
match res {
TryPush::NoCapacity(res) => assert_eq!(res.namespace_id(), write.namespace_id()),
TryPush::Aggregated(_) => panic!("expected no capacity"),
};
}
}

View File

@ -1,18 +0,0 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::todo,
clippy::dbg_macro,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub(crate) mod codec;
pub mod config;
pub mod core;
pub mod file;
pub mod kafka;
pub mod mock;

File diff suppressed because it is too large Load Diff