Merge branch 'main' into dom/codec-object-store

pull/24376/head
Dom 2022-05-23 15:39:54 +01:00 committed by GitHub
commit f0d0f1ba0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 915 additions and 262 deletions

View File

@ -131,6 +131,7 @@ jobs:
doc:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: medium+ # use of a smaller executor runs out of memory
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"

12
Cargo.lock generated
View File

@ -2008,9 +2008,9 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
@ -4418,9 +4418,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.5.5"
version = "1.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1"
dependencies = [
"aho-corasick",
"memchr",
@ -4438,9 +4438,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.6.25"
version = "0.6.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64"
[[package]]
name = "remove_dir_all"

View File

@ -5,6 +5,7 @@ pub mod addressable_heap;
pub mod hash_map;
pub mod lru;
pub mod resource_consumption;
pub mod shared;
pub mod ttl;
#[cfg(test)]

View File

@ -0,0 +1,156 @@
//! Backend that supports custom removal / expiry of keys
use parking_lot::Mutex;
use std::{any::Any, fmt::Debug, hash::Hash, sync::Arc};
use super::CacheBackend;
/// Cache backend that allows another backend to be shared by managing
/// a mutex internally.
///
/// This allows explicitly removing entries from the cache, for
/// example, based on a policy.
#[derive(Debug, Clone)]
pub struct SharedBackend<K, V>
where
K: Clone + Eq + Debug + Hash + Ord + Send + 'static,
V: Clone + Debug + Send + 'static,
{
inner_backend: Arc<Mutex<Box<dyn CacheBackend<K = K, V = V>>>>,
}
impl<K, V> SharedBackend<K, V>
where
K: Clone + Eq + Debug + Hash + Ord + Send + 'static,
V: Clone + Debug + Send + 'static,
{
/// Create new backend around the inner backend
pub fn new(inner_backend: Box<dyn CacheBackend<K = K, V = V>>) -> Self {
Self {
inner_backend: Arc::new(Mutex::new(inner_backend)),
}
}
/// "remove" a key (aka remove it from the shared backend) if the
/// specified predicate is true. If the key is removed return
/// true, otherwise return false
///
/// Note that the predicate function is called while the lock is
/// held (and thus the inner backend can't be concurrently accessed
pub fn remove_if<P>(&self, k: &K, predicate: P) -> bool
where
P: Fn(V) -> bool,
{
let mut inner_backend = self.inner_backend.lock();
if let Some(v) = inner_backend.get(k) {
if predicate(v) {
inner_backend.remove(k);
return true;
}
}
false
}
}
impl<K, V> CacheBackend for SharedBackend<K, V>
where
K: Clone + Eq + Debug + Hash + Ord + Send + 'static,
V: Clone + Debug + Send + 'static,
{
type K = K;
type V = V;
fn get(&mut self, k: &Self::K) -> Option<Self::V> {
self.inner_backend.lock().get(k)
}
fn set(&mut self, k: Self::K, v: Self::V) {
self.inner_backend.lock().set(k, v);
}
fn remove(&mut self, k: &Self::K) {
self.inner_backend.lock().remove(k)
}
fn is_empty(&self) -> bool {
self.inner_backend.lock().is_empty()
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
#[test]
fn test_generic() {
crate::backend::test_util::test_generic(|| SharedBackend::new(test_backend()))
}
#[test]
fn test_is_shared() {
let mut backend1 = SharedBackend::new(test_backend());
let mut backend2 = backend1.clone();
// test that a shared backend is really shared
backend1.set(1, "foo".into());
backend2.set(2, "bar".into());
assert_eq!(backend1.get(&1), Some("foo".into()));
assert_eq!(backend2.get(&1), Some("foo".into()));
assert_eq!(backend1.get(&2), Some("bar".into()));
assert_eq!(backend2.get(&2), Some("bar".into()));
// make a third backend and it should also modify the previous ones
let mut backend3 = backend1.clone();
assert_eq!(backend3.get(&1), Some("foo".into()));
assert_eq!(backend3.get(&2), Some("bar".into()));
// update key 2
backend3.set(2, "baz".into());
assert_eq!(backend1.get(&2), Some("baz".into()));
assert_eq!(backend2.get(&2), Some("baz".into()));
assert_eq!(backend3.get(&2), Some("baz".into()));
}
#[test]
fn test_remove_if() {
let mut backend = SharedBackend::new(test_backend());
backend.set(1, "foo".into());
backend.set(2, "bar".into());
backend.remove_if(&1, |v| v == "zzz");
assert_eq!(backend.get(&1), Some("foo".into()));
assert_eq!(backend.get(&2), Some("bar".into()));
backend.remove_if(&1, |v| v == "foo");
assert_eq!(backend.get(&1), None);
assert_eq!(backend.get(&2), Some("bar".into()));
backend.remove_if(&1, |v| v == "bar");
assert_eq!(backend.get(&1), None);
assert_eq!(backend.get(&2), Some("bar".into()));
}
#[test]
fn test_remove_if_shared() {
let mut backend = SharedBackend::new(test_backend());
backend.set(1, "foo".into());
backend.set(2, "bar".into());
let backend2 = backend.clone();
backend2.remove_if(&1, |v| v == "foo");
// original backend should reflect the changes
assert_eq!(backend.get(&1), None);
assert_eq!(backend.get(&2), Some("bar".into()));
}
fn test_backend() -> Box<dyn CacheBackend<K = u8, V = String>> {
Box::new(HashMap::new())
}
}

View File

@ -15,19 +15,18 @@ pub struct IngesterConfig {
)]
pub write_buffer_partition_range_end: i32,
/// The ingester will continue to pull data and buffer it from Kafka
/// as long as it is below this size. If it hits this size it will pause
/// ingest from Kafka until persistence goes below this threshold.
/// The ingester will continue to pull data and buffer it from the write buffer as long as the
/// ingester buffer is below this size. If the ingester buffer hits this size, ingest from the
/// write buffer will pause until the ingester buffer goes below this threshold.
#[clap(
long = "--pause-ingest-size-bytes",
env = "INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES"
)]
pub pause_ingest_size_bytes: usize,
/// Once the ingester crosses this threshold of data buffered across
/// all sequencers, it will pick the largest partitions and persist
/// them until it falls below this threshold. An ingester running in
/// a steady state is expected to take up this much memory.
/// Once the ingester crosses this threshold of data buffered across all sequencers, it will
/// pick the largest partitions and persist them until it falls below this threshold. An
/// ingester running in a steady state is expected to take up this much memory.
#[clap(
long = "--persist-memory-threshold-bytes",
env = "INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES"
@ -43,10 +42,9 @@ pub struct IngesterConfig {
)]
pub persist_partition_size_threshold_bytes: usize,
/// If a partition has had data buffered for longer than this period of time
/// it will be persisted. This puts an upper bound on how far back the
/// ingester may need to read in Kafka on restart or recovery. The default value
/// is 30 minutes (in seconds).
/// If a partition has had data buffered for longer than this period of time, it will be
/// persisted. This puts an upper bound on how far back the ingester may need to read from the
/// write buffer on restart or recovery. The default value is 30 minutes (in seconds).
#[clap(
long = "--persist-partition-age-threshold-seconds",
env = "INFLUXDB_IOX_PERSIST_PARTITION_AGE_THRESHOLD_SECONDS",
@ -62,4 +60,14 @@ pub struct IngesterConfig {
default_value = "300"
)]
pub persist_partition_cold_threshold_seconds: u64,
/// If the catalog's max sequence number for the partition is no longer available in the write
/// buffer due to the retention policy, by default the ingester will panic. If this flag is
/// specified, the ingester will skip any sequence numbers that have not been retained in the
/// write buffer and will start up successfully with the oldest available data.
#[clap(
long = "--skip-to-oldest-available",
env = "INFLUXDB_IOX_SKIP_TO_OLDEST_AVAILABLE"
)]
pub skip_to_oldest_available: bool,
}

View File

@ -1852,8 +1852,8 @@ mod tests {
// They should be 2 groups
assert_eq!(groups.len(), 2, "There should have been two group");
groups[0].parquet_files.contains(&pf1);
groups[1].parquet_files.contains(&pf2);
assert!(groups[0].parquet_files.contains(&pf1));
assert!(groups[1].parquet_files.contains(&pf2));
}
#[test]

View File

@ -1,10 +1,11 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
// `clippy::use_self` is deliberately excluded from the lints this crate uses.
// See <https://github.com/rust-lang/rust-clippy/issues/6902>.
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr,
clippy::future_not_send
)]

View File

@ -71,10 +71,13 @@ impl ServerFixture {
let shared_server = SHARED_SERVER.get_or_init(|| parking_lot::Mutex::new(Weak::new()));
let mut shared_server = shared_server.lock();
let shared_upgraded = {
let locked = shared_server.lock();
locked.upgrade()
};
// is a shared server already present?
let server = match shared_server.upgrade() {
let server = match shared_upgraded {
Some(server) => server,
None => {
// if not, create one
@ -86,11 +89,11 @@ impl ServerFixture {
// save a reference for other threads that may want to
// use this server, but don't prevent it from being
// destroyed when going out of scope
let mut shared_server = shared_server.lock();
*shared_server = Arc::downgrade(&server);
server
}
};
std::mem::drop(shared_server);
Self { server }
}

View File

@ -334,6 +334,10 @@ impl Config {
let write_buffer_partition_range_start = 0;
let write_buffer_partition_range_end = 0;
// Use whatever data is available in the write buffer rather than erroring if the sequence
// number has not been retained in the write buffer.
let skip_to_oldest_available = true;
let ingester_config = IngesterConfig {
write_buffer_partition_range_start,
write_buffer_partition_range_end,
@ -342,6 +346,7 @@ impl Config {
persist_partition_size_threshold_bytes,
persist_partition_age_threshold_seconds,
persist_partition_cold_threshold_seconds,
skip_to_oldest_available,
};
// create a CompactorConfig for the all in one server based on

View File

@ -2206,8 +2206,8 @@ mod tests {
.await
.unwrap();
{
let tables = data.tables.read();
let table = tables.get("mem").unwrap().read().await;
let table_data = data.table_data("mem").unwrap();
let table = table_data.read().await;
let p = table.partition_data.get("1970-01-01").unwrap();
assert_eq!(
p.data.max_persisted_sequence_number,
@ -2229,8 +2229,8 @@ mod tests {
.await
.unwrap();
let tables = data.tables.read();
let table = tables.get("mem").unwrap().read().await;
let table_data = data.table_data("mem").unwrap();
let table = table_data.read().await;
let partition = table.partition_data.get("1970-01-01").unwrap();
assert_eq!(
partition.data.buffer.as_ref().unwrap().min_sequence_number,

View File

@ -13,7 +13,7 @@ use crate::{
};
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{KafkaPartition, KafkaTopic, Sequencer};
use data_types::{KafkaPartition, KafkaTopic, SequenceNumber, Sequencer};
use futures::{
future::{BoxFuture, Shared},
stream::FuturesUnordered,
@ -123,6 +123,7 @@ impl IngestHandlerImpl {
write_buffer: Arc<dyn WriteBufferReading>,
exec: Arc<Executor>,
metric_registry: Arc<metric::Registry>,
skip_to_oldest_available: bool,
) -> Result<Self> {
// build the initial ingester data state
let mut sequencers = BTreeMap::new();
@ -216,12 +217,14 @@ impl IngestHandlerImpl {
let kafka_topic_name = kafka_topic_name.clone();
async move {
let handler = SequencedStreamHandler::new(
op_stream.stream().await,
op_stream,
SequenceNumber::new(sequencer.min_unpersisted_sequence_number),
sink,
lifecycle_handle,
kafka_topic_name,
sequencer.kafka_partition,
&*metric_registry,
skip_to_oldest_available,
);
handler.run(shutdown).await
@ -350,6 +353,7 @@ impl<T> Drop for IngestHandlerImpl<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::data::SnapshotBatch;
use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
@ -557,8 +561,11 @@ mod tests {
.unwrap();
}
#[tokio::test]
async fn seeks_on_initialization() {
async fn ingester_test_setup(
write_operations: Vec<DmlWrite>,
min_unpersisted_sequence_number: i64,
skip_to_oldest_available: bool,
) -> (IngestHandlerImpl, Sequencer, Namespace) {
let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
@ -576,11 +583,14 @@ mod tests {
.create_or_get(&kafka_topic, kafka_partition)
.await
.unwrap();
// update the min unpersisted so we can verify this was what was seeked to later
sequencer.min_unpersisted_sequence_number = 2;
// update the min unpersisted
sequencer.min_unpersisted_sequence_number = min_unpersisted_sequence_number;
// this probably isn't necessary, but just in case something changes later
txn.sequencers()
.update_min_unpersisted_sequence_number(sequencer.id, SequenceNumber::new(2))
.update_min_unpersisted_sequence_number(
sequencer.id,
SequenceNumber::new(min_unpersisted_sequence_number),
)
.await
.unwrap();
@ -591,25 +601,14 @@ mod tests {
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
let ingest_ts1 = Time::from_timestamp_millis(42);
let ingest_ts2 = Time::from_timestamp_millis(1337);
let w1 = DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 20", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(0, 1), ingest_ts1, None, 150),
);
let _schema = validate_or_insert_schema(w1.tables(), &schema, txn.deref_mut())
.await
.unwrap()
.unwrap();
let w2 = DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 30", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(0, 2), ingest_ts2, None, 150),
);
for write_operation in write_operations {
validate_or_insert_schema(write_operation.tables(), &schema, txn.deref_mut())
.await
.unwrap()
.unwrap();
write_buffer_state.push_write(write_operation);
}
txn.commit().await.unwrap();
write_buffer_state.push_write(w1);
write_buffer_state.push_write(w2);
let reading: Arc<dyn WriteBufferReading> =
Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap());
@ -631,27 +630,32 @@ mod tests {
reading,
Arc::new(Executor::new(1)),
Arc::clone(&metrics),
skip_to_oldest_available,
)
.await
.unwrap();
(ingester, sequencer, namespace)
}
async fn verify_ingester_buffer_has_data(
ingester: IngestHandlerImpl,
sequencer: Sequencer,
namespace: Namespace,
custom_batch_verification: impl Fn(&SnapshotBatch) + Send,
) {
// give the writes some time to go through the buffer. Exit once we've verified there's
// data in there
tokio::time::timeout(Duration::from_secs(1), async {
tokio::time::timeout(Duration::from_secs(1), async move {
loop {
let mut has_measurement = false;
if let Some(data) = ingester
.data
.sequencer(sequencer.id)
{
if let Some(data) = ingester.data.sequencer(sequencer.id) {
if let Some(data) = data.namespace(&namespace.name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("cpu", "1970-01-01").await {
if let Some(b) = b.first() {
if b.min_sequencer_number == SequenceNumber::new(1) {
panic!("initialization did a seek to the beginning rather than the min_unpersisted");
}
custom_batch_verification(b);
if b.data.num_rows() == 1 {
has_measurement = true;
@ -668,8 +672,76 @@ mod tests {
tokio::time::sleep(Duration::from_millis(200)).await;
}
})
.await
.expect("timeout");
}
#[tokio::test]
async fn seeks_on_initialization() {
let ingest_ts1 = Time::from_timestamp_millis(42);
let ingest_ts2 = Time::from_timestamp_millis(1337);
let write_operations = vec![
DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 20", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(0, 1), ingest_ts1, None, 150),
),
DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 30", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(0, 2), ingest_ts2, None, 150),
),
];
let (ingester, sequencer, namespace) =
ingester_test_setup(write_operations, 2, false).await;
verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| {
if first_batch.min_sequencer_number == SequenceNumber::new(1) {
panic!(
"initialization did a seek to the beginning rather than \
the min_unpersisted"
);
}
})
.await;
}
#[tokio::test]
#[should_panic(expected = "JoinError::Panic")]
async fn seeks_on_initialization_unknown_sequence_number() {
// No write operations means the stream will return unknown sequence number
// Ingester will panic because skip_to_oldest_available is false
let (ingester, _sequencer, _namespace) = ingester_test_setup(vec![], 2, false).await;
tokio::time::timeout(Duration::from_millis(1000), ingester.join())
.await
.expect("timeout");
.unwrap();
}
#[tokio::test]
async fn seeks_on_initialization_unknown_sequence_number_skip_to_oldest_available() {
let ingest_ts1 = Time::from_timestamp_millis(42);
let write_operations = vec![DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 20", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(0, 1), ingest_ts1, None, 150),
)];
// Set the min unpersisted to something bigger than the write's sequence number to
// cause an UnknownSequenceNumber error. Skip to oldest available = true, so ingester
// should find data
let (ingester, sequencer, namespace) =
ingester_test_setup(write_operations, 10, true).await;
verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| {
assert_eq!(
first_batch.min_sequencer_number,
SequenceNumber::new(1),
"re-initialization didn't seek to the beginning",
);
})
.await;
}
struct TestIngester {
@ -730,6 +802,7 @@ mod tests {
reading,
Arc::new(Executor::new(1)),
Arc::clone(&metrics),
false,
)
.await
.unwrap();

View File

@ -1,14 +1,14 @@
use super::DmlSink;
use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl};
use data_types::KafkaPartition;
use data_types::{KafkaPartition, SequenceNumber};
use dml::DmlOperation;
use futures::{pin_mut, FutureExt, Stream, StreamExt};
use futures::{pin_mut, FutureExt, StreamExt};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, U64Counter, U64Gauge};
use observability_deps::tracing::*;
use std::{fmt::Debug, time::Duration};
use tokio_util::sync::CancellationToken;
use write_buffer::core::{WriteBufferError, WriteBufferErrorKind};
use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler};
/// When the [`LifecycleManager`] indicates that ingest should be paused because
/// of memory pressure, the sequencer will loop, sleeping this long between
@ -31,8 +31,11 @@ const INGEST_POLL_INTERVAL: Duration = Duration::from_millis(100);
/// [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest()
#[derive(Debug)]
pub struct SequencedStreamHandler<I, O, T = SystemProvider> {
/// A input stream of DML ops
stream: I,
/// Creator/manager of the stream of DML ops
write_buffer_stream_handler: I,
current_sequence_number: SequenceNumber,
/// An output sink that processes DML operations and applies them to
/// in-memory state.
sink: O,
@ -46,17 +49,22 @@ pub struct SequencedStreamHandler<I, O, T = SystemProvider> {
// Metrics
time_provider: T,
time_to_be_readable_ms: U64Gauge,
/// Duration of time ingest is paused at the request of the LifecycleManager
pause_duration_ms: U64Counter,
/// Errors during op stream reading
seq_unknown_sequence_number_count: U64Counter,
seq_invalid_data_count: U64Counter,
seq_unknown_error_count: U64Counter,
sink_apply_error_count: U64Counter,
skipped_sequence_number_amount: U64Counter,
/// Log context fields - otherwise unused.
kafka_topic_name: String,
kafka_partition: KafkaPartition,
skip_to_oldest_available: bool,
}
impl<I, O> SequencedStreamHandler<I, O> {
@ -66,18 +74,22 @@ impl<I, O> SequencedStreamHandler<I, O> {
/// A [`SequencedStreamHandler`] starts actively consuming items from
/// `stream` once [`SequencedStreamHandler::run()`] is called, and
/// gracefully stops when `shutdown` is cancelled.
#[allow(clippy::too_many_arguments)]
pub fn new(
stream: I,
write_buffer_stream_handler: I,
current_sequence_number: SequenceNumber,
sink: O,
lifecycle_handle: LifecycleHandleImpl,
kafka_topic_name: String,
kafka_partition: KafkaPartition,
metrics: &metric::Registry,
skip_to_oldest_available: bool,
) -> Self {
// TTBR
let time_to_be_readable_ms = metrics.register_metric::<U64Gauge>(
"ingester_ttbr_ms",
"duration of time between producer writing to consumer putting into queryable cache in milliseconds",
"duration of time between producer writing to consumer putting into queryable cache in \
milliseconds",
).recorder(metric_attrs(kafka_partition, &kafka_topic_name, None, false));
// Lifecycle-driven ingest pause duration
@ -115,9 +127,16 @@ impl<I, O> SequencedStreamHandler<I, O> {
Some("sink_apply_error"),
true,
));
let skipped_sequence_number_amount = ingest_errors.recorder(metric_attrs(
kafka_partition,
&kafka_topic_name,
Some("skipped_sequence_number_amount"),
true,
));
Self {
stream,
write_buffer_stream_handler,
current_sequence_number,
sink,
lifecycle_handle,
time_provider: SystemProvider::default(),
@ -127,8 +146,10 @@ impl<I, O> SequencedStreamHandler<I, O> {
seq_invalid_data_count,
seq_unknown_error_count,
sink_apply_error_count,
skipped_sequence_number_amount,
kafka_topic_name,
kafka_partition,
skip_to_oldest_available,
}
}
@ -136,7 +157,8 @@ impl<I, O> SequencedStreamHandler<I, O> {
#[cfg(test)]
pub(crate) fn with_time_provider<T>(self, provider: T) -> SequencedStreamHandler<I, O, T> {
SequencedStreamHandler {
stream: self.stream,
write_buffer_stream_handler: self.write_buffer_stream_handler,
current_sequence_number: self.current_sequence_number,
sink: self.sink,
lifecycle_handle: self.lifecycle_handle,
time_provider: provider,
@ -146,38 +168,42 @@ impl<I, O> SequencedStreamHandler<I, O> {
seq_invalid_data_count: self.seq_invalid_data_count,
seq_unknown_error_count: self.seq_unknown_error_count,
sink_apply_error_count: self.sink_apply_error_count,
skipped_sequence_number_amount: self.skipped_sequence_number_amount,
kafka_topic_name: self.kafka_topic_name,
kafka_partition: self.kafka_partition,
skip_to_oldest_available: self.skip_to_oldest_available,
}
}
}
impl<I, O, T> SequencedStreamHandler<I, O, T>
where
I: Stream<Item = Result<DmlOperation, WriteBufferError>> + Unpin + Send,
I: WriteBufferStreamHandler,
O: DmlSink,
T: TimeProvider,
{
/// Run the stream handler, consuming items from [`Stream`] and applying
/// them to the [`DmlSink`].
/// Run the stream handler, consuming items from the stream provided by the
/// [`WriteBufferStreamHandler`] and applying them to the [`DmlSink`].
///
/// This method blocks until gracefully shutdown by cancelling the
/// `shutdown` [`CancellationToken`]. Once cancelled, this handler will
/// complete the current operation it is processing before this method
/// returns.
///
/// # Panics
/// # Panics
///
/// This method panics if the input stream ends (yields a `None`).
pub async fn run(mut self, shutdown: CancellationToken) {
let shutdown_fut = shutdown.cancelled().fuse();
pin_mut!(shutdown_fut);
let mut stream = self.write_buffer_stream_handler.stream().await;
let mut sequence_number_before_reset: Option<SequenceNumber> = None;
loop {
// Wait for a DML operation from the sequencer, or a graceful stop
// signal.
// Wait for a DML operation from the sequencer, or a graceful stop signal.
let maybe_op = futures::select!(
next = self.stream.next().fuse() => next,
next = stream.next().fuse() => next,
_ = shutdown_fut => {
info!(
kafka_topic=%self.kafka_topic_name,
@ -196,17 +222,44 @@ where
// DmlSink, return None rather than continuing the loop to ensure
// ingest pauses are respected.
let maybe_op = match maybe_op {
Some(Ok(op)) => Some(op),
Some(Ok(op)) => {
if let Some(sequence_number) = op.meta().sequence().map(|s| s.sequence_number) {
let sequence_number = SequenceNumber::new(sequence_number as i64);
if let Some(before_reset) = sequence_number_before_reset {
// We've requested the stream to be reset and we've skipped this many
// sequence numbers. Store in a metric once.
if before_reset != sequence_number {
let difference = sequence_number.get() - before_reset.get();
self.skipped_sequence_number_amount.inc(difference as u64);
}
sequence_number_before_reset = None;
}
self.current_sequence_number = sequence_number;
}
Some(op)
}
Some(Err(e)) if e.kind() == WriteBufferErrorKind::UnknownSequenceNumber => {
error!(
error=%e,
kafka_topic=%self.kafka_topic_name,
kafka_partition=%self.kafka_partition,
potential_data_loss=true,
"unable to read from desired sequencer offset"
);
self.seq_unknown_sequence_number_count.inc(1);
None
// If we get an unknown sequence number, and we're fine potentially having
// missed writes that were too old to be retained, try resetting the stream
// once and getting the next operation again.
// Keep the current sequence number to compare with the sequence number
if self.skip_to_oldest_available && sequence_number_before_reset.is_none() {
sequence_number_before_reset = Some(self.current_sequence_number);
self.write_buffer_stream_handler.reset_to_earliest();
stream = self.write_buffer_stream_handler.stream().await;
continue;
} else {
error!(
error=%e,
kafka_topic=%self.kafka_topic_name,
kafka_partition=%self.kafka_partition,
potential_data_loss=true,
"unable to read from desired sequencer offset"
);
self.seq_unknown_sequence_number_count.inc(1);
None
}
}
Some(Err(e)) if e.kind() == WriteBufferErrorKind::IO => {
warn!(
@ -378,23 +431,24 @@ fn metric_attrs(
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::{
lifecycle::{LifecycleConfig, LifecycleManager},
stream_handler::mock_sink::MockDmlSink,
};
use assert_matches::assert_matches;
use async_trait::async_trait;
use data_types::{DeletePredicate, Sequence, TimestampRange};
use dml::{DmlDelete, DmlMeta, DmlWrite};
use futures::stream;
use futures::stream::{self, BoxStream};
use iox_time::{SystemProvider, Time};
use metric::Metric;
use mutable_batch_lp::lines_to_batches;
use std::sync::Arc;
use test_helpers::timeout::FutureTimeout;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use write_buffer::core::WriteBufferError;
lazy_static::lazy_static! {
static ref TEST_TIME: Time = SystemProvider::default().now();
@ -433,6 +487,75 @@ mod tests {
DmlDelete::new(name, pred, None, sequence)
}
#[derive(Debug)]
struct TestWriteBufferStreamHandler {
stream_ops: Vec<Vec<Result<DmlOperation, WriteBufferError>>>,
#[allow(clippy::type_complexity)]
completed_tx:
Option<oneshot::Sender<(mpsc::Sender<Result<DmlOperation, WriteBufferError>>, usize)>>,
}
impl TestWriteBufferStreamHandler {
fn new(
stream_ops: Vec<Vec<Result<DmlOperation, WriteBufferError>>>,
completed_tx: oneshot::Sender<(
mpsc::Sender<Result<DmlOperation, WriteBufferError>>,
usize,
)>,
) -> Self {
Self {
// reverse the order so we can pop off the end
stream_ops: stream_ops.into_iter().rev().collect(),
completed_tx: Some(completed_tx),
}
}
}
#[async_trait]
impl WriteBufferStreamHandler for TestWriteBufferStreamHandler {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
let stream_ops = self.stream_ops.pop().unwrap();
// Create a channel to pass input to the handler, with a
// buffer capacity of the number of operations to send (used to tell if all
// values have been received in the test thread).
let capacity = if stream_ops.is_empty() {
1 // channels can't have capacity 0, even if we're never sending anything
} else {
stream_ops.len()
};
let (tx, rx) = mpsc::channel(capacity);
// Push all inputs
for op in stream_ops {
tx.send(op)
.with_timeout_panic(Duration::from_secs(5))
.await
.expect("early handler exit");
}
// If this is the last expected call to stream,
// Send the transmitter and the capacity back to the test thread to wait for completion.
if self.stream_ops.is_empty() {
self.completed_tx
.take()
.unwrap()
.send((tx, capacity))
.unwrap();
}
ReceiverStream::new(rx).boxed()
}
async fn seek(&mut self, _sequence_number: u64) -> Result<(), WriteBufferError> {
Ok(())
}
fn reset_to_earliest(&mut self) {
// Intentionally left blank
}
}
// Generates a test that ensures that the handler given $stream_ops makes
// $want_sink calls.
//
@ -441,20 +564,24 @@ mod tests {
macro_rules! test_stream_handler {
(
$name:ident,
stream_ops = $stream_ops:expr, // An ordered set of stream items to feed to the handler
sink_rets = $sink_ret:expr, // An ordered set of values to return from the mock op sink
// Whether to skip to the oldest available sequence number if UnknownSequenceNumber
skip_to_oldest_available = $skip_to_oldest_available:expr,
stream_ops = $stream_ops:expr, // Ordered set of stream items to feed to the handler
sink_rets = $sink_ret:expr, // Ordered set of values to return from the mock op sink
want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds
// Optional set of ingest error metric label / values to assert
want_err_metrics = [$($metric_name:literal => $metric_count:literal),*],
want_sink = $($want_sink:tt)+ // A pattern to match against the calls made to the op sink
want_sink = $($want_sink:tt)+ // Pattern to match against calls made to the op sink
) => {
paste::paste! {
#[tokio::test]
async fn [<test_stream_handler_ $name>]() {
let metrics = Arc::new(metric::Registry::default());
let time_provider: Arc< dyn TimeProvider> = Arc::new(SystemProvider::default());
let time_provider: Arc<dyn TimeProvider> = Arc::new(SystemProvider::default());
let lifecycle = LifecycleManager::new(
LifecycleConfig::new(100, 2, 3, Duration::from_secs(4), Duration::from_secs(5)),
LifecycleConfig::new(
100, 2, 3, Duration::from_secs(4), Duration::from_secs(5)
),
Arc::clone(&metrics),
time_provider,
);
@ -465,17 +592,21 @@ mod tests {
.with_apply_return($sink_ret)
);
// Create an channel to pass input to the handler, with a
// buffer capacity of 1 (used below).
let (tx, rx) = mpsc::channel(1);
let (completed_tx, completed_rx) = oneshot::channel();
let write_buffer_stream_handler = TestWriteBufferStreamHandler::new(
$stream_ops,
completed_tx
);
let handler = SequencedStreamHandler::new(
ReceiverStream::new(rx),
write_buffer_stream_handler,
SequenceNumber::new(0),
Arc::clone(&sink),
lifecycle.handle(),
TEST_KAFKA_TOPIC.to_string(),
*TEST_KAFKA_PARTITION,
&*metrics,
$skip_to_oldest_available,
).with_time_provider(iox_time::MockProvider::new(*TEST_TIME));
// Run the handler in the background and push inputs to it
@ -485,24 +616,22 @@ mod tests {
handler.run(handler_shutdown).await;
});
// Push the input one at a time, wait for the the last
// message to be consumed by the handler (channel capacity
// increases to 1 once the message is read) and then request
// a graceful shutdown.
for op in $stream_ops {
tx.send(op)
.with_timeout_panic(Duration::from_secs(5))
.await
.expect("early handler exit");
}
// Wait for the handler to read the last op, restoring the
// capacity to 1.
let _permit = tx.reserve()
.with_timeout_panic(Duration::from_secs(5))
.await
.expect("early handler exit");
// When all operations have been read through the TestWriteBufferStreamHandler,
let (tx, capacity) = completed_rx.await.unwrap();
// Trigger graceful shutdown
// Wait for the handler to read the last op,
async {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
if tx.capacity() == capacity {
return;
}
}
}.with_timeout_panic(Duration::from_secs(5))
.await;
// Then trigger graceful shutdown
shutdown.cancel();
// And wait for the handler to stop.
@ -548,7 +677,8 @@ mod tests {
test_stream_handler!(
immediate_shutdown,
stream_ops = [],
skip_to_oldest_available = false,
stream_ops = vec![vec![]],
sink_rets = [],
want_ttbr = 0, // No ops, no TTBR
want_err_metrics = [],
@ -558,8 +688,9 @@ mod tests {
// Single write op applies OK, then shutdown.
test_stream_handler!(
write_ok,
stream_ops = [
Ok(DmlOperation::Write(make_write("bananas", 42)))
skip_to_oldest_available = false,
stream_ops = vec![
vec![Ok(DmlOperation::Write(make_write("bananas", 42)))]
],
sink_rets = [Ok(true)],
want_ttbr = 42,
@ -572,8 +703,9 @@ mod tests {
// Single delete op applies OK, then shutdown.
test_stream_handler!(
delete_ok,
stream_ops = [
Ok(DmlOperation::Delete(make_delete("platanos", 24)))
skip_to_oldest_available = false,
stream_ops = vec![
vec![Ok(DmlOperation::Delete(make_delete("platanos", 24)))]
],
sink_rets = [Ok(true)],
want_ttbr = 24,
@ -587,10 +719,11 @@ mod tests {
// affect the next op in the stream.
test_stream_handler!(
non_fatal_stream_io_error,
stream_ops = [
skip_to_oldest_available = false,
stream_ops = vec![vec![
Err(WriteBufferError::new(WriteBufferErrorKind::IO, "explosions")),
Ok(DmlOperation::Write(make_write("bananas", 13)))
],
]],
sink_rets = [Ok(true)],
want_ttbr = 13,
want_err_metrics = [
@ -598,7 +731,8 @@ mod tests {
"sequencer_unknown_sequence_number" => 0,
"sequencer_invalid_data" => 0,
"sequencer_unknown_error" => 0,
"sink_apply_error" => 0
"sink_apply_error" => 0,
"skipped_sequence_number_amount" => 0
],
want_sink = [DmlOperation::Write(op)] => {
assert_eq!(op.namespace(), "bananas");
@ -606,17 +740,46 @@ mod tests {
);
test_stream_handler!(
non_fatal_stream_offset_error,
stream_ops = [
skip_to_oldest_available = false,
stream_ops = vec![vec![
Err(WriteBufferError::new(WriteBufferErrorKind::UnknownSequenceNumber, "explosions")),
Ok(DmlOperation::Write(make_write("bananas", 31)))
],
]],
sink_rets = [Ok(true)],
want_ttbr = 31,
want_err_metrics = [
"sequencer_unknown_sequence_number" => 1,
"sequencer_invalid_data" => 0,
"sequencer_unknown_error" => 0,
"sink_apply_error" => 0
"sink_apply_error" => 0,
"skipped_sequence_number_amount" => 0
],
want_sink = [DmlOperation::Write(op)] => {
assert_eq!(op.namespace(), "bananas");
}
);
test_stream_handler!(
skip_to_oldest_on_unknown_sequence_number,
skip_to_oldest_available = true,
stream_ops = vec![
vec![
Err(
WriteBufferError::new(
WriteBufferErrorKind::UnknownSequenceNumber,
"explosions"
)
)
],
vec![Ok(DmlOperation::Write(make_write("bananas", 31)))],
],
sink_rets = [Ok(true)],
want_ttbr = 31,
want_err_metrics = [
"sequencer_unknown_sequence_number" => 0,
"sequencer_invalid_data" => 0,
"sequencer_unknown_error" => 0,
"sink_apply_error" => 0,
"skipped_sequence_number_amount" => 2
],
want_sink = [DmlOperation::Write(op)] => {
assert_eq!(op.namespace(), "bananas");
@ -624,17 +787,19 @@ mod tests {
);
test_stream_handler!(
non_fatal_stream_invalid_data,
stream_ops = [
skip_to_oldest_available = false,
stream_ops = vec![vec![
Err(WriteBufferError::new(WriteBufferErrorKind::InvalidData, "explosions")),
Ok(DmlOperation::Write(make_write("bananas", 50)))
],
]],
sink_rets = [Ok(true)],
want_ttbr = 50,
want_err_metrics = [
"sequencer_unknown_sequence_number" => 0,
"sequencer_invalid_data" => 1,
"sequencer_unknown_error" => 0,
"sink_apply_error" => 0
"sink_apply_error" => 0,
"skipped_sequence_number_amount" => 0
],
want_sink = [DmlOperation::Write(op)] => {
assert_eq!(op.namespace(), "bananas");
@ -642,17 +807,19 @@ mod tests {
);
test_stream_handler!(
non_fatal_stream_unknown_error,
stream_ops = [
skip_to_oldest_available = false,
stream_ops = vec![vec![
Err(WriteBufferError::new(WriteBufferErrorKind::Unknown, "explosions")),
Ok(DmlOperation::Write(make_write("bananas", 60)))
],
]],
sink_rets = [Ok(true)],
want_ttbr = 60,
want_err_metrics = [
"sequencer_unknown_sequence_number" => 0,
"sequencer_invalid_data" => 0,
"sequencer_unknown_error" => 1,
"sink_apply_error" => 0
"sink_apply_error" => 0,
"skipped_sequence_number_amount" => 0
],
want_sink = [DmlOperation::Write(op)] => {
assert_eq!(op.namespace(), "bananas");
@ -662,10 +829,11 @@ mod tests {
// Asserts the TTBR is not set unless an op is successfully sunk.
test_stream_handler!(
no_success_no_ttbr,
stream_ops = [Err(WriteBufferError::new(
skip_to_oldest_available = false,
stream_ops = vec![vec![Err(WriteBufferError::new(
WriteBufferErrorKind::IO,
"explosions"
)),],
))]],
sink_rets = [],
want_ttbr = 0,
want_err_metrics = [],
@ -675,12 +843,13 @@ mod tests {
// Asserts the TTBR is uses the last value in the stream.
test_stream_handler!(
reports_last_ttbr,
stream_ops = [
skip_to_oldest_available = false,
stream_ops = vec![vec![
Ok(DmlOperation::Write(make_write("bananas", 1))),
Ok(DmlOperation::Write(make_write("bananas", 2))),
Ok(DmlOperation::Write(make_write("bananas", 3))),
Ok(DmlOperation::Write(make_write("bananas", 42))),
],
]],
sink_rets = [Ok(true), Ok(false), Ok(true), Ok(false),],
want_ttbr = 42,
want_err_metrics = [
@ -688,7 +857,8 @@ mod tests {
"sequencer_unknown_sequence_number" => 0,
"sequencer_invalid_data" => 0,
"sequencer_unknown_error" => 0,
"sink_apply_error" => 0
"sink_apply_error" => 0,
"skipped_sequence_number_amount" => 0
],
want_sink = _
);
@ -697,12 +867,15 @@ mod tests {
// the next op in the stream from being processed.
test_stream_handler!(
non_fatal_sink_error,
stream_ops = [
skip_to_oldest_available = false,
stream_ops = vec![vec![
Ok(DmlOperation::Write(make_write("bad_op", 1))),
Ok(DmlOperation::Write(make_write("good_op", 2)))
],
]],
sink_rets = [
Err(crate::data::Error::Partitioning{source: String::from("Time column not present").into()}),
Err(crate::data::Error::Partitioning {
source: String::from("Time column not present").into()
}),
Ok(true),
],
want_ttbr = 2,
@ -710,7 +883,8 @@ mod tests {
"sequencer_unknown_sequence_number" => 0,
"sequencer_invalid_data" => 0,
"sequencer_unknown_error" => 0,
"sink_apply_error" => 1
"sink_apply_error" => 1,
"skipped_sequence_number_amount" => 0
],
want_sink = [
DmlOperation::Write(_), // First call into sink is bad_op, returning an error
@ -720,10 +894,30 @@ mod tests {
}
);
// An abnormal end to the steam causes a panic, rather than a silent stream
// reader exit.
#[derive(Debug)]
struct EmptyWriteBufferStreamHandler {}
#[async_trait]
impl WriteBufferStreamHandler for EmptyWriteBufferStreamHandler {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
stream::iter([]).boxed()
}
async fn seek(&mut self, _sequence_number: u64) -> Result<(), WriteBufferError> {
Ok(())
}
fn reset_to_earliest(&mut self) {
// Intentionally left blank
}
}
// An abnormal end to the steam causes a panic, rather than a silent stream reader exit.
#[tokio::test]
#[should_panic = "sequencer KafkaPartition(42) stream for topic kafka_topic_name ended without graceful shutdown"]
#[should_panic(
expected = "sequencer KafkaPartition(42) stream for topic kafka_topic_name ended without \
graceful shutdown"
)]
async fn test_early_stream_end_panic() {
let metrics = Arc::new(metric::Registry::default());
let time_provider = Arc::new(SystemProvider::default());
@ -734,16 +928,18 @@ mod tests {
);
// An empty stream iter immediately yields none.
let stream = stream::iter([]);
let write_buffer_stream_handler = EmptyWriteBufferStreamHandler {};
let sink = MockDmlSink::default();
let handler = SequencedStreamHandler::new(
stream,
write_buffer_stream_handler,
SequenceNumber::new(0),
sink,
lifecycle.handle(),
"kafka_topic_name".to_string(),
KafkaPartition::new(42),
&*metrics,
false,
);
handler

View File

@ -193,6 +193,7 @@ pub async fn create_ingester_server_type(
write_buffer,
exec,
Arc::clone(&metric_registry),
ingester_config.skip_to_oldest_available,
)
.await?,
);

View File

@ -392,7 +392,7 @@ mod test {
fn quote_not_printable() {
assert_eq!(quote_and_escape("foo\nbar"), r#""foo\nbar""#);
assert_eq!(quote_and_escape("foo\r\nbar"), r#""foo\r\nbar""#);
assert_eq!(quote_and_escape("foo\0bar"), r#""foo\u{0}bar""#);
assert_eq!(quote_and_escape("foo\0bar"), r#""foo\0bar""#);
}
#[test]

View File

@ -13,7 +13,7 @@ itertools = "0.10.2"
lazy_static = "1.4.0"
observability_deps = { path = "../observability_deps" }
regex = "1"
regex-syntax = "0.6.25"
regex-syntax = "0.6.26"
schema = { path = "../schema" }
snafu = "0.7"
workspace-hack = { path = "../workspace-hack"}

View File

@ -0,0 +1,8 @@
-- Test Setup: TwoChunksMissingColumns
-- SQL: SELECT * from "table" order by time;
+--------+--------+--------+------+------+------+--------------------------------+
| field1 | field2 | field3 | tag1 | tag2 | tag3 | time |
+--------+--------+--------+------+------+------+--------------------------------+
| 10 | 11 | | a | b | | 1970-01-01T00:00:00.000000100Z |
| 20 | | 22 | a | | c | 1970-01-01T00:00:00.000000200Z |
+--------+--------+--------+------+------+------+--------------------------------+

View File

@ -0,0 +1,5 @@
-- Basic query tests
-- IOX_SETUP: TwoChunksMissingColumns
-- query data
SELECT * from "table" order by time;

View File

@ -2,7 +2,7 @@
//! This file is auto generated by query_tests/generate.
//! Do not edit manually --> will result in sadness
use std::path::Path;
use crate::runner::{Runner, make_output_path, read_file};
use crate::runner::Runner;
#[tokio::test]
// Tests from "basic.sql",
@ -117,9 +117,9 @@ async fn test_cases_new_sql_system_tables_sql() {
}
#[tokio::test]
// Tests from "two_chunks.sql",
async fn test_cases_two_sql() {
let input_path = Path::new("cases").join("in").join("two_chunks.sql");
// Tests from "pushdown.sql",
async fn test_cases_pushdown_sql() {
let input_path = Path::new("cases").join("in").join("pushdown.sql");
let mut runner = Runner::new();
runner
.run(input_path)
@ -134,28 +134,6 @@ async fn test_cases_two_sql() {
// Tests from "several_chunks.sql",
async fn test_cases_several_chunks_sql() {
let input_path = Path::new("cases").join("in").join("several_chunks.sql");
let output_path = make_output_path(&input_path).unwrap();
let expected_path = input_path.with_extension("expected");
let mut runner = Runner::new();
let result = runner
.run(input_path)
.await;
if result.is_err() {
let output_contents = read_file(&output_path);
let expected_contents = read_file(&expected_path);
pretty_assertions::assert_eq!(expected_contents, output_contents);
} else {
runner
.flush()
.expect("flush worked");
}
}
#[tokio::test]
// Tests from "pushdown.sql",
async fn test_cases_pushdown_sql() {
let input_path = Path::new("cases").join("in").join("pushdown.sql");
let mut runner = Runner::new();
runner
.run(input_path)
@ -192,4 +170,32 @@ async fn test_cases_timestamps_sql() {
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "two_chunks.sql",
async fn test_cases_two_chunks_sql() {
let input_path = Path::new("cases").join("in").join("two_chunks.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "two_chunks_missing_columns.sql",
async fn test_cases_two_chunks_missing_columns_sql() {
let input_path = Path::new("cases").join("in").join("two_chunks_missing_columns.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}

View File

@ -61,6 +61,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(OneMeasurementRealisticTimes),
register_setup!(TwoMeasurementsManyFieldsTwoChunks),
register_setup!(ManyFieldsSeveralChunks),
register_setup!(TwoChunksMissingColumns),
]
.into_iter()
.map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>))

View File

@ -1322,3 +1322,30 @@ impl DbSetup for MeasurementForDefect2890 {
all_scenarios_for_one_chunk(vec![], vec![], lp, "mm", partition_key).await
}
}
#[derive(Debug)]
pub struct TwoChunksMissingColumns {}
#[async_trait]
impl DbSetup for TwoChunksMissingColumns {
async fn make(&self) -> Vec<DbScenario> {
let partition_key1 = "a";
let partition_key2 = "b";
let lp_lines1 = vec!["table,tag1=a,tag2=b field1=10,field2=11 100"];
let lp_lines2 = vec!["table,tag1=a,tag3=c field1=20,field3=22 200"];
make_n_chunks_scenario(&[
ChunkData {
lp_lines: lp_lines1,
partition_key: partition_key1,
..Default::default()
},
ChunkData {
lp_lines: lp_lines2,
partition_key: partition_key2,
..Default::default()
},
])
.await
}
}

View File

@ -94,7 +94,7 @@ impl<'a, 'b> ChunkData<'a, 'b> {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChunkStage {
/// In parquet file.
Parquet,
@ -383,25 +383,17 @@ pub async fn make_n_chunks_scenario(chunks: &[ChunkData<'_, '_>]) -> Vec<DbScena
let mut scenarios = vec![];
for stages in ChunkStage::all()
'stage_combinations: for stages in ChunkStage::all()
.into_iter()
.combinations_with_replacement(n_stages_unset)
.flat_map(|v| v.into_iter().permutations(n_stages_unset))
.unique()
{
// filter out unordered stages
if !stages.windows(2).all(|stages| {
stages[0]
.partial_cmp(&stages[1])
.map(|o| o.is_le())
.unwrap_or_default()
}) {
continue;
}
let mut scenario_name = format!("{} chunks:", chunks.len());
// combine stages w/ chunks
let chunks_orig = chunks;
let mut chunks = Vec::with_capacity(chunks.len());
let mut stages_it = stages.iter();
let mut mock_ingester = MockIngester::new().await;
for chunk_data in chunks {
for chunk_data in chunks_orig {
let mut chunk_data = chunk_data.clone();
if chunk_data.chunk_stage.is_none() {
@ -411,13 +403,43 @@ pub async fn make_n_chunks_scenario(chunks: &[ChunkData<'_, '_>]) -> Vec<DbScena
let chunk_data = chunk_data.replace_begin_and_end_delete_times();
chunks.push(chunk_data);
}
assert!(stages_it.next().is_none(), "generated too many stages");
// filter out unordered stages
let mut stage_by_partition = HashMap::<&str, Vec<ChunkStage>>::new();
for chunk_data in &chunks {
stage_by_partition
.entry(chunk_data.partition_key)
.or_default()
.push(
chunk_data
.chunk_stage
.expect("Stage should be initialized by now"),
);
}
for stages in stage_by_partition.values() {
if !stages.windows(2).all(|stages| {
stages[0]
.partial_cmp(&stages[1])
.map(|o| o.is_le())
.unwrap_or_default()
}) {
continue 'stage_combinations;
}
}
// build scenario
let mut scenario_name = format!("{} chunks:", chunks.len());
let mut mock_ingester = MockIngester::new().await;
for chunk_data in chunks {
let name = make_chunk(&mut mock_ingester, chunk_data).await;
write!(&mut scenario_name, ", {}", name).unwrap();
}
assert!(stages_it.next().is_none(), "generated too many stages");
let db = mock_ingester.into_query_namespace().await;
scenarios.push(DbScenario { scenario_name, db });
}
@ -550,7 +572,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
}
}
let mut name = format!("Chunk {}", chunk_stage);
let mut name = format!(
"Chunk stage={} partition={}",
chunk_stage, chunk.partition_key
);
let n_preds = chunk.preds.len();
if n_preds > 0 {
let delete_names: Vec<_> = chunk

View File

@ -1,3 +1,3 @@
[toolchain]
channel = "1.60"
channel = "1.61"
components = [ "rustfmt", "clippy" ]

View File

@ -20,7 +20,7 @@ arrow = { version = "14.0.0", features = ["prettyprint"] }
async-trait = "0.1"
futures = "0.3"
prost = "0.10"
regex = "1.5.4"
regex = "1.5.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.81"
snafu = "0.7"

View File

@ -294,9 +294,12 @@ unsafe impl<R: lock_api::RawRwLockUpgrade + Sized> lock_api::RawRwLockUpgrade
#[cfg(test)]
mod tests {
use std::time::Duration;
// Clippy isn't recognizing the explicit drops; none of these locks are actually being held
// across await points. See <https://github.com/rust-lang/rust-clippy/issues/6446>
#![allow(clippy::await_holding_lock)]
use super::*;
use std::time::Duration;
#[test]
fn test_counts() {

View File

@ -136,10 +136,11 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
///
/// The [`dml::DmlMeta`] will be propagated where applicable
///
/// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations into a single batch.
/// After this method returns the operation was actually written (i.e. it is NOT buffered any longer). You may use
/// [`flush`](Self::flush) to trigger an early submission (e.g. before some linger time expired), which can be
/// helpful for controlled shutdown.
/// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations
/// into a single batch. After this method returns the operation was actually written (i.e. it
/// is NOT buffered any longer). You may use [`flush`](Self::flush) to trigger an early
/// submission (e.g. before some linger time expired), which can be helpful for controlled
/// shutdown.
///
/// Returns the metadata that was written.
async fn store_operation(
@ -168,8 +169,9 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
/// Flush all currently blocking store operations ([`store_operation`](Self::store_operation) /
/// [`store_lp`](Self::store_lp)).
///
/// This call is pending while outstanding data is being submitted and will return AFTER the flush completed.
/// However you still need to poll the store operations to get the metadata for every write.
/// This call is pending while outstanding data is being submitted and will return AFTER the
/// flush completed. However you still need to poll the store operations to get the metadata
/// for every write.
async fn flush(&self) -> Result<(), WriteBufferError>;
/// Return type (like `"mock"` or `"kafka"`) of this writer.
@ -183,21 +185,43 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
pub trait WriteBufferStreamHandler: Sync + Send + Debug + 'static {
/// Stream that produces DML operations.
///
/// Note that due to the mutable borrow, it is not possible to have multiple streams from the same
/// [`WriteBufferStreamHandler`] instance at the same time. If all streams are dropped and requested again, the last
/// sequence number of the old streams will be the start sequence number for the new streams. If you want to
/// prevent that either create a new [`WriteBufferStreamHandler`] or use [`seek`](Self::seek).
/// Note that due to the mutable borrow, it is not possible to have multiple streams from the
/// same [`WriteBufferStreamHandler`] instance at the same time. If all streams are dropped and
/// requested again, the last sequence number of the old streams will be the start sequence
/// number for the new streams. If you want to prevent that either create a new
/// [`WriteBufferStreamHandler`] or use [`seek`](Self::seek).
///
/// If the sequence number that the stream wants to read is unknown (either because it is in the future or because
/// some retention policy removed it already), the stream will return an error with
/// [`WriteBufferErrorKind::UnknownSequenceNumber`] and will end immediately.
async fn stream(&mut self) -> BoxStream<'_, Result<DmlOperation, WriteBufferError>>;
/// If the sequence number that the stream wants to read is unknown (either because it is in
/// the future or because some retention policy removed it already), the stream will return an
/// error with [`WriteBufferErrorKind::UnknownSequenceNumber`] and will end immediately.
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>>;
/// Seek sequencer to given sequence number. The next output of related streams will be an entry with at least
/// the given sequence number (the actual sequence number might be skipped due to "holes" in the stream).
/// Seek sequencer to given sequence number. The next output of related streams will be an
/// entry with at least the given sequence number (the actual sequence number might be skipped
/// due to "holes" in the stream).
///
/// Note that due to the mutable borrow, it is not possible to seek while streams exists.
async fn seek(&mut self, sequence_number: u64) -> Result<(), WriteBufferError>;
/// Reset the sequencer to whatever is the earliest number available in the retained write
/// buffer. Useful to restart if [`WriteBufferErrorKind::UnknownSequenceNumber`] is returned
/// from [`stream`](Self::stream) but that isn't a problem.
fn reset_to_earliest(&mut self);
}
#[async_trait]
impl WriteBufferStreamHandler for Box<dyn WriteBufferStreamHandler> {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
self.as_mut().stream().await
}
async fn seek(&mut self, sequence_number: u64) -> Result<(), WriteBufferError> {
self.as_mut().seek(sequence_number).await
}
fn reset_to_earliest(&mut self) {
self.as_mut().reset_to_earliest()
}
}
/// Produce streams (one per sequencer) of [`DmlWrite`]s.
@ -231,8 +255,9 @@ pub trait WriteBufferReading: Sync + Send + Debug + 'static {
/// Get high watermark (= what we believe is the next sequence number to be added).
///
/// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to be added", it starts
/// at 0 and after the entry with sequence number 0 is added to the buffer, it is 1.
/// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to
/// be added", it starts at 0 and after the entry with sequence number 0 is added to the
/// buffer, it is 1.
async fn fetch_high_watermark(&self, sequencer_id: u32) -> Result<u64, WriteBufferError>;
/// Return type (like `"mock"` or `"kafka"`) of this reader.
@ -273,8 +298,8 @@ pub mod test_utils {
/// Create a new context.
///
/// This will be called multiple times during the test suite. Each resulting context must represent an isolated
/// environment.
/// This will be called multiple times during the test suite. Each resulting context must
/// represent an isolated environment.
async fn new_context(&self, n_sequencers: NonZeroU32) -> Self::Context {
self.new_context_with_time(n_sequencers, Arc::new(iox_time::SystemProvider::new()))
.await
@ -289,7 +314,8 @@ pub mod test_utils {
/// Context used during testing.
///
/// Represents an isolated environment. Actions like sequencer creations and writes must not leak across context boundaries.
/// Represents an isolated environment. Actions like sequencer creations and writes must not
/// leak across context boundaries.
#[async_trait]
pub trait TestContext: Send + Sync {
/// Write buffer writer implementation specific to this context and adapter.
@ -310,10 +336,11 @@ pub mod test_utils {
/// Generic test suite that must be passed by all proper write buffer implementations.
///
/// See [`TestAdapter`] for how to make a concrete write buffer implementation work with this test suite.
/// See [`TestAdapter`] for how to make a concrete write buffer implementation work with this
/// test suite.
///
/// Note that you might need more tests on top of this to assert specific implementation behaviors, edge cases, and
/// error handling.
/// Note that you might need more tests on top of this to assert specific implementation
/// behaviors, edge cases, and error handling.
pub async fn perform_generic_tests<T>(adapter: T)
where
T: TestAdapter,
@ -323,6 +350,7 @@ pub mod test_utils {
test_multi_sequencer_io(&adapter).await;
test_multi_writer_multi_reader(&adapter).await;
test_seek(&adapter).await;
test_reset_to_earliest(&adapter).await;
test_watermark(&adapter).await;
test_timestamp(&adapter).await;
test_sequencer_auto_creation(&adapter).await;
@ -366,6 +394,7 @@ pub mod test_utils {
/// Test IO with a single writer and single reader stream.
///
/// This tests that:
///
/// - streams process data in order
/// - readers can handle the "pending" state w/o erroring
/// - readers unblock after being in "pending" state
@ -410,6 +439,7 @@ pub mod test_utils {
/// Tests multiple subsequently created streams from a single [`WriteBufferStreamHandler`].
///
/// This tests that:
///
/// - readers remember their sequence number (and "pending" state) even when streams are dropped
/// - state is not shared between handlers
async fn test_multi_stream_io<T>(adapter: &T)
@ -437,8 +467,8 @@ pub mod test_utils {
let mut stream = stream_handler.stream().await;
assert_write_op_eq(&stream.next().await.unwrap().unwrap(), &w1);
// re-creating stream after reading remembers sequence number, but wait a bit to provoke the stream to buffer
// some entries
// re-creating stream after reading remembers sequence number, but wait a bit to provoke
// the stream to buffer some entries
tokio::time::sleep(Duration::from_millis(10)).await;
drop(stream);
let mut stream = stream_handler.stream().await;
@ -460,7 +490,9 @@ pub mod test_utils {
/// Test single reader-writer IO w/ multiple sequencers.
///
/// This tests that:
/// - writes go to and reads come from the right sequencer, aka that sequencers provide a namespace-like isolation
///
/// - writes go to and reads come from the right sequencer, aka that sequencers provide a
/// namespace-like isolation
/// - "pending" states are specific to a sequencer
async fn test_multi_sequencer_io<T>(adapter: &T)
where
@ -512,6 +544,7 @@ pub mod test_utils {
/// Test multiple multiple writers and multiple readers on multiple sequencers.
///
/// This tests that:
///
/// - writers retrieve consistent sequencer IDs
/// - writes go to and reads come from the right sequencer, similar
/// to [`test_multi_sequencer_io`] but less detailed
@ -556,10 +589,11 @@ pub mod test_utils {
/// Test seek implemention of readers.
///
/// This tests that:
///
/// - seeking is specific to the reader AND sequencer
/// - forward and backwards seeking works
/// - seeking past the end of the known content works (results in "pending" status and remembers sequence number and
/// not just "next entry")
/// - seeking past the end of the known content works (results in "pending" status and
/// remembers sequence number and not just "next entry")
async fn test_seek<T>(adapter: &T)
where
T: TestAdapter,
@ -609,7 +643,8 @@ pub mod test_utils {
assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2]).await;
// seek to far end and then add data
// The affected stream should error and then stop. The other streams should still be pending.
// The affected stream should error and then stop. The other streams should still be
// pending.
handler_1_1_a.seek(1_000_000).await.unwrap();
let w_east_3 = write("namespace", &writer, entry_east_3, 0, None).await;
@ -634,9 +669,55 @@ pub mod test_utils {
assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2, &w_east_3]).await;
}
/// Test reset to earliest implemention of readers.
///
/// This tests that:
///
/// - Calling the function jumps to the earliest available sequence number if the earliest
/// available sequence number is earlier than the current sequence number
/// - Calling the function jumps to the earliest available sequence number if the earliest
/// available sequence number is later than the current sequence number
async fn test_reset_to_earliest<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(NonZeroU32::try_from(2).unwrap()).await;
let entry_east_1 = "upc,region=east user=1 100";
let entry_east_2 = "upc,region=east user=2 200";
let writer = context.writing(true).await.unwrap();
let mut sequencer_ids = writer.sequencer_ids();
let sequencer_id_1 = set_pop_first(&mut sequencer_ids).unwrap();
let w_east_1 = write("namespace", &writer, entry_east_1, sequencer_id_1, None).await;
let w_east_2 = write("namespace", &writer, entry_east_2, sequencer_id_1, None).await;
let reader_1 = context.reading(true).await.unwrap();
let mut handler_1_1_a = reader_1.stream_handler(sequencer_id_1).await.unwrap();
// forward seek
handler_1_1_a
.seek(w_east_2.meta().sequence().unwrap().sequence_number)
.await
.unwrap();
assert_reader_content(&mut handler_1_1_a, &[&w_east_2]).await;
// reset to earliest goes back to 0; stream re-fetches earliest record
handler_1_1_a.reset_to_earliest();
assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2]).await;
// TODO: https://github.com/influxdata/influxdb_iox/issues/4651
// Remove first write operation to simulate retention policies evicting some records
// reset to earliest goes to whatever's available
}
/// Test watermark fetching.
///
/// This tests that:
///
/// - watermarks for empty sequencers is 0
/// - watermarks for non-empty sequencers is "last sequence ID plus 1"
async fn test_watermark<T>(adapter: &T)
@ -722,6 +803,7 @@ pub mod test_utils {
/// Test that sequencer auto-creation works.
///
/// This tests that:
///
/// - both writer and reader cannot be constructed when sequencers are missing
/// - both writer and reader can be auto-create sequencers
async fn test_sequencer_auto_creation<T>(adapter: &T)
@ -749,6 +831,7 @@ pub mod test_utils {
/// Test sequencer IDs reporting of readers and writers.
///
/// This tests that:
///
/// - all sequencers are reported
async fn test_sequencer_ids<T>(adapter: &T)
where
@ -864,6 +947,7 @@ pub mod test_utils {
/// Test usage w/ multiple namespaces.
///
/// Tests that:
///
/// - namespace names or propagated correctly from writer to reader
/// - all namespaces end up in a single stream
async fn test_multi_namespaces<T>(adapter: &T)
@ -928,14 +1012,16 @@ pub mod test_utils {
/// Assert that the content of the reader is as expected.
///
/// This will read `expected_writes.len()` from the reader and then ensures that the stream is pending.
/// This will read `expected_writes.len()` from the reader and then ensures that the stream is
/// pending.
async fn assert_reader_content(
actual_stream_handler: &mut Box<dyn WriteBufferStreamHandler>,
expected_writes: &[&DmlWrite],
) {
let actual_stream = actual_stream_handler.stream().await;
// we need to limit the stream to `expected_writes.len()` elements, otherwise it might be pending forever
// we need to limit the stream to `expected_writes.len()` elements, otherwise it might be
// pending forever
let actual_writes: Vec<_> = actual_stream
.take(expected_writes.len())
.try_collect()
@ -955,6 +1041,7 @@ pub mod test_utils {
/// Asserts that given span context are the same or that `second` links back to `first`.
///
/// "Same" means:
///
/// - identical trace ID
/// - identical span ID
/// - identical parent span ID
@ -983,7 +1070,8 @@ pub mod test_utils {
assert_eq!(first.parent_span_id, second.parent_span_id);
}
/// Assert that all span relations (parents, links) are found within the set of spans or within the set of roots.
/// Assert that all span relations (parents, links) are found within the set of spans or within
/// the set of roots.
fn assert_span_relations_closed(spans: &[Span], roots: &[SpanContext]) {
let all_ids: HashSet<_> = spans
.iter()
@ -1003,7 +1091,8 @@ pub mod test_utils {
/// Assert that given stream is pending.
///
/// This will will try to poll the stream for a bit to ensure that async IO has a chance to catch up.
/// This will will try to poll the stream for a bit to ensure that async IO has a chance to
/// catch up.
async fn assert_stream_pending<S>(stream: &mut S)
where
S: Stream + Send + Unpin,
@ -1063,8 +1152,8 @@ pub mod test_utils {
}
(false, None) => {
eprintln!(
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
run"
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT \
to run"
);
return;
}

View File

@ -285,7 +285,7 @@ pub struct FileBufferStreamHandler {
#[async_trait]
impl WriteBufferStreamHandler for FileBufferStreamHandler {
async fn stream(&mut self) -> BoxStream<'_, Result<DmlOperation, WriteBufferError>> {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
let committed = self.path.join("committed");
ConsumerStream::new(
@ -304,6 +304,11 @@ impl WriteBufferStreamHandler for FileBufferStreamHandler {
self.terminated.store(false, Ordering::SeqCst);
Ok(())
}
fn reset_to_earliest(&mut self) {
self.next_sequence_number.store(0, Ordering::SeqCst);
self.terminated.store(false, Ordering::SeqCst);
}
}
/// File-based write buffer reader.

View File

@ -130,7 +130,7 @@ pub struct RSKafkaStreamHandler {
#[async_trait]
impl WriteBufferStreamHandler for RSKafkaStreamHandler {
async fn stream(&mut self) -> BoxStream<'_, Result<DmlOperation, WriteBufferError>> {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
if self.terminated.load(Ordering::SeqCst) {
return futures::stream::empty().boxed();
}
@ -161,6 +161,8 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
}
let stream = stream_builder.build();
let sequencer_id = self.sequencer_id;
let stream = stream.map(move |res| {
let (record, _watermark) = match res {
Ok(x) => x,
@ -185,7 +187,7 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
IoxHeaders::from_headers(record.record.headers, trace_collector.as_ref())?;
let sequence = Sequence {
sequencer_id: self.sequencer_id,
sequencer_id,
sequence_number: record
.offset
.try_into()
@ -220,6 +222,11 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
self.terminated.store(false, Ordering::SeqCst);
Ok(())
}
fn reset_to_earliest(&mut self) {
*self.next_offset.lock() = None;
self.terminated.store(false, Ordering::SeqCst);
}
}
#[derive(Debug)]

View File

@ -11,7 +11,10 @@ use parking_lot::Mutex;
use std::{
collections::{BTreeMap, BTreeSet},
num::NonZeroU32,
sync::Arc,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
Arc,
},
task::{Poll, Waker},
};
@ -70,7 +73,8 @@ pub struct MockBufferSharedState {
impl MockBufferSharedState {
/// Create new shared state w/ N sequencers.
///
/// This is equivalent to [`uninitialized`](Self::uninitialized) followed by [`init`](Self::init).
/// This is equivalent to [`uninitialized`](Self::uninitialized) followed by
/// [`init`](Self::init).
pub fn empty_with_n_sequencers(n_sequencers: NonZeroU32) -> Self {
let state = Self::uninitialized();
state.init(n_sequencers);
@ -87,6 +91,7 @@ impl MockBufferSharedState {
/// Initialize shared state w/ N sequencers.
///
/// # Panics
///
/// - when state is already initialized
pub fn init(&self, n_sequencers: NonZeroU32) {
let mut guard = self.writes.lock();
@ -107,6 +112,7 @@ impl MockBufferSharedState {
/// Push a new delete to the specified sequencer
///
/// # Panics
///
/// - when delete is not sequenced
/// - when no sequencer was initialized
/// - when specified sequencer does not exist
@ -118,6 +124,7 @@ impl MockBufferSharedState {
/// Push a new entry to the specified sequencer.
///
/// # Panics
///
/// - when write is not sequenced
/// - when no sequencer was initialized
/// - when specified sequencer does not exist
@ -129,6 +136,7 @@ impl MockBufferSharedState {
/// Push a new operation to the specified sequencer
///
/// # Panics
///
/// - when operation is not sequenced
/// - when no sequencer was initialized
/// - when specified sequencer does not exist
@ -168,6 +176,7 @@ impl MockBufferSharedState {
/// Push error to specified sequencer.
///
/// # Panics
///
/// - when no sequencer was initialized
/// - when sequencer does not exist
pub fn push_error(&self, error: WriteBufferError, sequencer_id: u32) {
@ -183,6 +192,7 @@ impl MockBufferSharedState {
/// Get messages (entries and errors) for specified sequencer.
///
/// # Panics
///
/// - when no sequencer was initialized
/// - when sequencer does not exist
pub fn get_messages(&self, sequencer_id: u32) -> Vec<Result<DmlOperation, WriteBufferError>> {
@ -203,6 +213,7 @@ impl MockBufferSharedState {
/// Provides a way to wipe messages (e.g. to simulate retention periods in Kafka)
///
/// # Panics
///
/// - when no sequencer was initialized
/// - when sequencer does not exist
pub fn clear_messages(&self, sequencer_id: u32) {
@ -346,7 +357,7 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
#[derive(Debug)]
pub struct MockBufferForReading {
shared_state: MockBufferSharedState,
shared_state: Arc<MockBufferSharedState>,
n_sequencers: u32,
}
@ -369,7 +380,7 @@ impl MockBufferForReading {
};
Ok(Self {
shared_state: state,
shared_state: Arc::new(state),
n_sequencers,
})
}
@ -379,45 +390,55 @@ impl MockBufferForReading {
#[derive(Debug)]
pub struct MockBufferStreamHandler {
/// Shared state.
shared_state: MockBufferSharedState,
shared_state: Arc<MockBufferSharedState>,
/// Own sequencer ID.
sequencer_id: u32,
/// Index within the entry vector.
vector_index: usize,
vector_index: Arc<AtomicUsize>,
/// Offset within the sequencer IDs.
offset: u64,
/// Flags if the stream is terminated, e.g. due to "offset out of range"
terminated: bool,
terminated: Arc<AtomicBool>,
}
#[async_trait]
impl WriteBufferStreamHandler for MockBufferStreamHandler {
async fn stream(&mut self) -> BoxStream<'_, Result<DmlOperation, WriteBufferError>> {
futures::stream::poll_fn(|cx| {
if self.terminated {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
// Don't reference `self` in the closure, move these instead
let terminated = Arc::clone(&self.terminated);
let shared_state = Arc::clone(&self.shared_state);
let sequencer_id = self.sequencer_id;
let vector_index = Arc::clone(&self.vector_index);
let offset = self.offset;
futures::stream::poll_fn(move |cx| {
if terminated.load(SeqCst) {
return Poll::Ready(None);
}
let mut guard = self.shared_state.writes.lock();
let mut guard = shared_state.writes.lock();
let writes = guard.as_mut().unwrap();
let writes_vec = writes.get_mut(&self.sequencer_id).unwrap();
let writes_vec = writes.get_mut(&sequencer_id).unwrap();
let entries = &writes_vec.writes;
while entries.len() > self.vector_index {
let write_result = &entries[self.vector_index];
let mut vi = vector_index.load(SeqCst);
while entries.len() > vi {
let write_result = &entries[vi];
// consume entry
self.vector_index += 1;
vi = vector_index
.fetch_update(SeqCst, SeqCst, |n| Some(n + 1))
.unwrap();
match write_result {
Ok(write) => {
// found an entry => need to check if it is within the offset
let sequence = write.meta().sequence().unwrap();
if sequence.sequence_number >= self.offset {
if sequence.sequence_number >= offset {
// within offset => return entry to caller
return Poll::Ready(Some(Ok(write.clone())));
} else {
@ -432,7 +453,7 @@ impl WriteBufferStreamHandler for MockBufferStreamHandler {
}
}
// check if we have seeked to far
// check if we have seeked too far
let next_offset = entries
.iter()
.filter_map(|write_result| {
@ -446,8 +467,8 @@ impl WriteBufferStreamHandler for MockBufferStreamHandler {
.max()
.map(|x| x + 1)
.unwrap_or_default();
if self.offset > next_offset {
self.terminated = true;
if offset > next_offset {
terminated.store(true, SeqCst);
return Poll::Ready(Some(Err(WriteBufferError::unknown_sequence_number(
format!("unknown sequence number, high watermark is {next_offset}"),
))));
@ -464,13 +485,19 @@ impl WriteBufferStreamHandler for MockBufferStreamHandler {
self.offset = sequence_number;
// reset position to start since seeking might go backwards
self.vector_index = 0;
self.vector_index.store(0, SeqCst);
// reset termination state
self.terminated = false;
self.terminated.store(false, SeqCst);
Ok(())
}
fn reset_to_earliest(&mut self) {
self.offset = 0;
self.vector_index.store(0, SeqCst);
self.terminated.store(false, SeqCst);
}
}
#[async_trait]
@ -478,6 +505,7 @@ impl WriteBufferReading for MockBufferForReading {
fn sequencer_ids(&self) -> BTreeSet<u32> {
(0..self.n_sequencers).into_iter().collect()
}
async fn stream_handler(
&self,
sequencer_id: u32,
@ -487,11 +515,11 @@ impl WriteBufferReading for MockBufferForReading {
}
Ok(Box::new(MockBufferStreamHandler {
shared_state: self.shared_state.clone(),
shared_state: Arc::clone(&self.shared_state),
sequencer_id,
vector_index: 0,
vector_index: Arc::new(AtomicUsize::new(0)),
offset: 0,
terminated: false,
terminated: Arc::new(AtomicBool::new(false)),
}))
}
@ -521,7 +549,7 @@ pub struct MockStreamHandlerThatAlwaysErrors;
#[async_trait]
impl WriteBufferStreamHandler for MockStreamHandlerThatAlwaysErrors {
async fn stream(&mut self) -> BoxStream<'_, Result<DmlOperation, WriteBufferError>> {
async fn stream(&mut self) -> BoxStream<'static, Result<DmlOperation, WriteBufferError>> {
futures::stream::poll_fn(|_cx| {
Poll::Ready(Some(Err(String::from(
"Something bad happened while reading from stream",
@ -534,6 +562,10 @@ impl WriteBufferStreamHandler for MockStreamHandlerThatAlwaysErrors {
async fn seek(&mut self, _sequence_number: u64) -> Result<(), WriteBufferError> {
Err(String::from("Something bad happened while seeking the stream").into())
}
fn reset_to_earliest(&mut self) {
// Intentionally left blank
}
}
#[async_trait]