feat: Add `SequencerProgress` reporting to ingester (#4238)

* feat: Add `SequencerProgress` reporting to ingester

* refactor: Use KafkaPartition in write_summary

* fix: Update docstrings

* refactor: Change ingester to use KafkaPartition everywhere

* refactor: add SequencerProgress::combine

* refactor: return new SequencerProgress rather than updating

* fix: distinguish between yes/no/unknown in WriteSummary

* docs: Update data_types2/src/lib.rs

Co-authored-by: Paul Dix <paul@pauldix.net>

Co-authored-by: Paul Dix <paul@pauldix.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-04-06 11:13:21 -04:00 committed by GitHub
parent 438e739344
commit c244b03281
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 680 additions and 55 deletions

3
Cargo.lock generated
View File

@ -2343,6 +2343,7 @@ dependencies = [
"uuid",
"workspace-hack",
"write_buffer",
"write_summary",
]
[[package]]
@ -6999,7 +7000,9 @@ dependencies = [
"data_types2",
"dml",
"generated_types",
"observability_deps",
"serde_json",
"snafu",
"time 0.1.0",
"workspace-hack",
]

View File

@ -1,6 +1,8 @@
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct Sequence {
/// The sequencer id (kafka partition id)
pub sequencer_id: u32,
/// The sequence number (kafka offset)
pub sequence_number: u64,
}

View File

@ -127,7 +127,11 @@ impl ColumnId {
}
}
/// Unique ID for a `Sequencer`
/// Unique ID for a `Sequencer`. Note this is NOT the same as the
/// "sequencer_number" in the `write_buffer` which currently means
/// "kafka partition".
///
/// https://github.com/influxdata/influxdb_iox/issues/4237
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct SequencerId(i16);

View File

@ -44,6 +44,7 @@ tracker = { path = "../tracker" }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
write_buffer = { path = "../write_buffer" }
write_summary = { path = "../write_summary" }
tokio-util = { version = "0.7.1" }
trace = { path = "../trace" }

View File

@ -31,6 +31,7 @@ use std::{
};
use time::SystemProvider;
use uuid::Uuid;
use write_summary::SequencerProgress;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
@ -50,6 +51,12 @@ pub enum Error {
#[snafu(display("Sequencer {} not found in data map", sequencer_id))]
SequencerNotFound { sequencer_id: SequencerId },
#[snafu(display(
"Sequencer not found for kafka partition {} in data map",
kafka_partition
))]
SequencerForPartitionNotFound { kafka_partition: KafkaPartition },
#[snafu(display("Namespace {} not found in catalog", namespace))]
NamespaceNotFound { namespace: String },
@ -154,6 +161,25 @@ impl IngesterData {
let table_data = tables.get(table_name)?;
Some(Arc::clone(table_data))
}
/// Return the ingestion progress for the specified kafka partitions
pub(crate) async fn progresses(
&self,
partitions: Vec<KafkaPartition>,
) -> Result<BTreeMap<KafkaPartition, SequencerProgress>> {
let mut progresses = BTreeMap::new();
for kafka_partition in partitions {
let sequencer_data = self
.sequencers
.iter()
.map(|(_, sequencer_data)| sequencer_data)
.find(|sequencer_data| sequencer_data.kafka_partition == kafka_partition)
.context(SequencerForPartitionNotFoundSnafu { kafka_partition })?;
progresses.insert(kafka_partition, sequencer_data.progress().await);
}
Ok(progresses)
}
}
/// The Persister has a function to persist a given partition ID and to update the
@ -288,6 +314,9 @@ impl Persister for IngesterData {
/// Data of a Shard
#[derive(Debug)]
pub struct SequencerData {
/// The kafka partition for this sequencer
kafka_partition: KafkaPartition,
// New namespaces can come in at any time so we need to be able to add new ones
namespaces: RwLock<BTreeMap<String, Arc<NamespaceData>>>,
@ -297,7 +326,7 @@ pub struct SequencerData {
impl SequencerData {
/// Initialise a new [`SequencerData`] that emits metrics to `metrics`.
pub fn new(metrics: Arc<metric::Registry>) -> Self {
pub fn new(kafka_partition: KafkaPartition, metrics: Arc<metric::Registry>) -> Self {
let namespace_count = metrics
.register_metric::<U64Counter>(
"ingester_namespaces_total",
@ -306,6 +335,7 @@ impl SequencerData {
.recorder(&[]);
Self {
kafka_partition,
namespaces: Default::default(),
metrics,
namespace_count,
@ -314,8 +344,12 @@ impl SequencerData {
/// Initialize new SequncerData with namespace for testing purpose only
#[cfg(test)]
pub fn new_for_test(namespaces: BTreeMap<String, Arc<NamespaceData>>) -> Self {
pub fn new_for_test(
kafka_partition: KafkaPartition,
namespaces: BTreeMap<String, Arc<NamespaceData>>,
) -> Self {
Self {
kafka_partition,
namespaces: RwLock::new(namespaces),
metrics: Default::default(),
namespace_count: Default::default(),
@ -387,6 +421,18 @@ impl SequencerData {
Ok(data)
}
/// Return the progress of this sequencer
async fn progress(&self) -> SequencerProgress {
let namespaces: Vec<_> = self.namespaces.read().values().map(Arc::clone).collect();
let mut progress = SequencerProgress::new();
for namespace_data in namespaces {
progress = progress.combine(namespace_data.progress().await);
}
progress
}
}
/// Data of a Namespace that belongs to a given Shard
@ -604,6 +650,18 @@ impl NamespaceData {
}
}
}
/// Return progress from this Namespace
async fn progress(&self) -> SequencerProgress {
let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect();
let mut progress = SequencerProgress::new();
for table_data in tables {
progress = progress.combine(table_data.read().await.progress())
}
progress
}
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
@ -792,6 +850,21 @@ impl TableData {
Ok(())
}
/// Return progress from this Table
fn progress(&self) -> SequencerProgress {
let progress = SequencerProgress::new();
let progress = match self.parquet_max_sequence_number() {
Some(n) => progress.with_persisted(n),
None => progress,
};
self.partition_data
.values()
.fold(progress, |progress, partition_data| {
progress.combine(partition_data.progress())
})
}
}
/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard
@ -861,7 +934,7 @@ impl PartitionData {
}
None => {
self.data.buffer = Some(BufferBatch {
min_sequencer_number: sequencer_number,
min_sequence_number: sequencer_number,
max_sequence_number: sequencer_number,
data: mb,
})
@ -946,6 +1019,11 @@ impl PartitionData {
self.data.snapshots.push(snapshot);
}
}
/// Return the progress from this Partition
fn progress(&self) -> SequencerProgress {
self.data.progress()
}
}
/// Data of an IOx partition split into batches
@ -1035,7 +1113,7 @@ impl DataBuffer {
) -> Result<Option<Arc<SnapshotBatch>>, mutable_batch::Error> {
if let Some(buf) = &self.buffer {
return Ok(Some(Arc::new(SnapshotBatch {
min_sequencer_number: buf.min_sequencer_number,
min_sequencer_number: buf.min_sequence_number,
max_sequencer_number: buf.max_sequence_number,
data: Arc::new(buf.data.to_arrow(Selection::All)?),
})));
@ -1156,6 +1234,33 @@ impl DataBuffer {
Ok(())
}
/// Return the progress in this DataBuffer
fn progress(&self) -> SequencerProgress {
let progress = SequencerProgress::new();
let progress = if let Some(buffer) = &self.buffer {
progress.combine(buffer.progress())
} else {
progress
};
let progress = self.snapshots.iter().fold(progress, |progress, snapshot| {
progress.combine(snapshot.progress())
});
if let Some(persisting) = &self.persisting {
persisting
.data
.data
.iter()
.fold(progress, |progress, snapshot| {
progress.combine(snapshot.progress())
})
} else {
progress
}
}
}
/// BufferBatch is a MutableBatch with its ingesting order, sequencer_number, that helps the
@ -1163,13 +1268,23 @@ impl DataBuffer {
#[derive(Debug)]
pub struct BufferBatch {
/// Sequence number of the first write in this batch
pub(crate) min_sequencer_number: SequenceNumber,
pub(crate) min_sequence_number: SequenceNumber,
/// Sequence number of the last write in this batch
pub(crate) max_sequence_number: SequenceNumber,
/// Ingesting data
pub(crate) data: MutableBatch,
}
impl BufferBatch {
/// Return the progress in this DataBuffer
fn progress(&self) -> SequencerProgress {
SequencerProgress::new()
.with_buffered(self.min_sequence_number)
.with_buffered(self.max_sequence_number)
}
}
/// SnapshotBatch contains data of many contiguous BufferBatches
#[derive(Debug, PartialEq)]
pub struct SnapshotBatch {
@ -1208,6 +1323,13 @@ impl SnapshotBatch {
}
})
}
/// Return progress in this data
fn progress(&self) -> SequencerProgress {
SequencerProgress::new()
.with_buffered(self.min_sequencer_number)
.with_buffered(self.max_sequencer_number)
}
}
/// PersistingBatch contains all needed info and data for creating
@ -1330,7 +1452,7 @@ mod tests {
let (_, mutable_batch1) =
lp_to_mutable_batch(r#"foo,t1=asdf iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#);
let buffer_batch1 = BufferBatch {
min_sequencer_number: seq_num1,
min_sequence_number: seq_num1,
max_sequence_number: seq_num1,
data: mutable_batch1,
};
@ -1405,7 +1527,11 @@ mod tests {
.unwrap();
let mut sequencers = BTreeMap::new();
sequencers.insert(sequencer1.id, SequencerData::new(Arc::clone(&metrics)));
let kafka_partition = KafkaPartition::new(0);
sequencers.insert(
sequencer1.id,
SequencerData::new(kafka_partition, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreImpl::new_in_memory());
@ -1485,8 +1611,14 @@ mod tests {
.await
.unwrap();
let mut sequencers = BTreeMap::new();
sequencers.insert(sequencer1.id, SequencerData::new(Arc::clone(&metrics)));
sequencers.insert(sequencer2.id, SequencerData::new(Arc::clone(&metrics)));
sequencers.insert(
sequencer1.id,
SequencerData::new(sequencer1.kafka_partition, Arc::clone(&metrics)),
);
sequencers.insert(
sequencer2.id,
SequencerData::new(sequencer2.kafka_partition, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreImpl::new_in_memory());
@ -1546,6 +1678,17 @@ mod tests {
.await
.unwrap();
// check progresses
let progresses = data.progresses(vec![kafka_partition]).await.unwrap();
let mut expected_progresses = BTreeMap::new();
expected_progresses.insert(
kafka_partition,
SequencerProgress::new()
.with_buffered(SequenceNumber::new(1))
.with_buffered(SequenceNumber::new(2)),
);
assert_eq!(progresses, expected_progresses);
let sd = data.sequencers.get(&sequencer1.id).unwrap();
let n = sd.namespace("foo").unwrap();
let partition_id;
@ -1597,6 +1740,17 @@ mod tests {
mem_table.parquet_max_sequence_number(),
Some(SequenceNumber::new(2))
);
// check progresses after persist
let progresses = data.progresses(vec![kafka_partition]).await.unwrap();
let mut expected_progresses = BTreeMap::new();
expected_progresses.insert(
kafka_partition,
SequencerProgress::new()
.with_buffered(SequenceNumber::new(1))
.with_persisted(SequenceNumber::new(2)),
);
assert_eq!(progresses, expected_progresses);
}
// Test deletes mixed with writes on a single parittion
@ -1623,7 +1777,7 @@ mod tests {
// verify data
assert_eq!(
p.data.buffer.as_ref().unwrap().min_sequencer_number,
p.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(1)
);
assert_eq!(
@ -1685,7 +1839,7 @@ mod tests {
// verify data
assert_eq!(
p.data.buffer.as_ref().unwrap().min_sequencer_number,
p.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(4)
);
assert_eq!(
@ -1782,7 +1936,7 @@ mod tests {
// verify data
assert_eq!(
p.data.buffer.as_ref().unwrap().min_sequencer_number,
p.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(8)
); // 1 newlly added mutable batch of 3 rows of data
assert_eq!(p.data.snapshots.len(), 0); // still empty
@ -1991,7 +2145,7 @@ mod tests {
let table = tables.get("mem").unwrap().read().await;
let partition = table.partition_data.get("1970-01-01").unwrap();
assert_eq!(
partition.data.buffer.as_ref().unwrap().min_sequencer_number,
partition.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(2)
);

View File

@ -35,6 +35,7 @@ use tokio::task::{JoinError, JoinHandle};
use tokio_util::sync::CancellationToken;
use trace::span::SpanRecorder;
use write_buffer::core::{WriteBufferReading, WriteBufferStreamHandler};
use write_summary::SequencerProgress;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
@ -53,6 +54,9 @@ pub enum Error {
WriteBuffer {
source: write_buffer::core::WriteBufferError,
},
#[snafu(display("Data Error: {}", source))]
Data { source: crate::data::Error },
}
/// When the lifecycle manager indicates that ingest should be paused because of
@ -63,7 +67,7 @@ const INGEST_PAUSE_DELAY: Duration = Duration::from_millis(100);
/// A specialized `Error` for Catalog errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// The [`IngestHandler`] handles all ingest from kafka, persistence and queries
/// An [`IngestHandler`] handles all ingest requests from kafka, persistence and queries
#[async_trait]
pub trait IngestHandler: Send + Sync {
/// Return results from the in-memory data that match this query
@ -72,6 +76,12 @@ pub trait IngestHandler: Send + Sync {
request: IngesterQueryRequest,
) -> Result<IngesterQueryResponse, crate::querier_handler::Error>;
/// Return sequencer progress for the requested kafka partitions
async fn progresses(
&self,
sequencers: Vec<KafkaPartition>,
) -> Result<BTreeMap<KafkaPartition, SequencerProgress>>;
/// Wait until the handler finished to shutdown.
///
/// Use [`shutdown`](Self::shutdown) to trigger a shutdown.
@ -127,7 +137,10 @@ impl IngestHandlerImpl {
// build the initial ingester data state
let mut sequencers = BTreeMap::new();
for s in sequencer_states.values() {
sequencers.insert(s.id, SequencerData::new(Arc::clone(&metric_registry)));
sequencers.insert(
s.id,
SequencerData::new(s.kafka_partition, Arc::clone(&metric_registry)),
);
}
let data = Arc::new(IngesterData {
object_store,
@ -242,6 +255,14 @@ impl IngestHandler for IngestHandlerImpl {
self.shutdown.cancel();
self.data.exec.shutdown();
}
/// Return the ingestion progress from each sequencer
async fn progresses(
&self,
partitions: Vec<KafkaPartition>,
) -> Result<BTreeMap<KafkaPartition, SequencerProgress>> {
self.data.progresses(partitions).await.context(DataSnafu)
}
}
impl Drop for IngestHandlerImpl {

View File

@ -9,8 +9,8 @@ use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bitflags::bitflags;
use data_types2::{
NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp, Tombstone,
TombstoneId,
KafkaPartition, NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp,
Tombstone, TombstoneId,
};
use iox_catalog::{
interface::{Catalog, INITIAL_COMPACTION_LEVEL},
@ -650,7 +650,8 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
namespaces.insert(TEST_NAMESPACE.to_string(), data_ns);
// One sequencer/shard that contains 2 namespaces
let seq_data = SequencerData::new_for_test(namespaces);
let kafka_partition = KafkaPartition::new(0);
let seq_data = SequencerData::new_for_test(kafka_partition, namespaces);
let mut sequencers = BTreeMap::new();
sequencers.insert(seq_id, seq_data);
@ -693,7 +694,8 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
namespaces.insert(TEST_NAMESPACE.to_string(), data_ns);
// One sequencer/shard that contains 1 namespace
let seq_data = SequencerData::new_for_test(namespaces);
let kafka_partition = KafkaPartition::new(0);
let seq_data = SequencerData::new_for_test(kafka_partition, namespaces);
let mut sequencers = BTreeMap::new();
sequencers.insert(seq_id, seq_data);

View File

@ -10,12 +10,14 @@ edition = "2021"
data_types2 = { path = "../data_types2" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
observability_deps = { path = "../observability_deps" }
workspace-hack = { path = "../workspace-hack"}
# Crates.io dependencies, in alphabetical order
base64 = "0.13"
serde_json = "1.0.79"
workspace-hack = { path = "../workspace-hack"}
snafu = "0.7"
[dev-dependencies]

View File

@ -1,11 +1,25 @@
use std::collections::BTreeMap;
use data_types2::{SequenceNumber, SequencerId};
use data_types2::{KafkaPartition, SequenceNumber};
use observability_deps::tracing::{debug, trace};
/// Protobuf to/from conversion
use generated_types::influxdata::iox::write_summary::v1 as proto;
use dml::DmlMeta;
use snafu::{OptionExt, Snafu};
mod progress;
pub use progress::SequencerProgress;
#[derive(Debug, Snafu, PartialEq)]
pub enum Error {
#[snafu(display("Unknown kafka partition: {}", kafka_partition))]
UnknownKafkaPartition { kafka_partition: KafkaPartition },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Contains information about a single write.
///
/// A single write consisting of multiple lines of line protocol
@ -18,13 +32,17 @@ use dml::DmlMeta;
#[derive(Debug, Default, Clone, PartialEq)]
/// Summary of a Vec<Vec<DmlMeta>>
pub struct WriteSummary {
/// Key is the sequencer, value is the sequence numbers from that sequencer.
/// Key is the sequencer_id from the DmlMeta structure (aka kafka
/// partition id), value is the sequence numbers from that
/// sequencer.
///
/// Note: BTreeMap to ensure the output is in a consistent order
sequencers: BTreeMap<SequencerId, Vec<SequenceNumber>>,
sequencers: BTreeMap<KafkaPartition, Vec<SequenceNumber>>,
}
impl WriteSummary {
pub fn new(metas: Vec<Vec<DmlMeta>>) -> Self {
debug!(?metas, "Creating write summary");
let sequences = metas
.iter()
.flat_map(|v| v.iter())
@ -32,7 +50,17 @@ impl WriteSummary {
let mut sequencers = BTreeMap::new();
for s in sequences {
let sequencer_id: i16 = s.sequencer_id.try_into().expect("Invalid sequencer id");
let sequencer_id: i32 = s.sequencer_id.try_into().expect("Invalid sequencer id");
// This is super confusing: "sequencer_id" in the router2
// and other parts of the codebase refers to what the
// ingester calls "kakfa_partition".
//
// The ingester uses "sequencer_id" to refer to the id of
// the Sequencer catalog type
//
// https://github.com/influxdata/influxdb_iox/issues/4237
let kafka_partition = KafkaPartition::new(sequencer_id);
let sequence_number: i64 = s
.sequence_number
@ -40,7 +68,7 @@ impl WriteSummary {
.expect("Invalid sequencer number");
sequencers
.entry(SequencerId::new(sequencer_id))
.entry(kafka_partition)
.or_insert_with(Vec::new)
.push(SequenceNumber::new(sequence_number))
}
@ -73,10 +101,83 @@ impl WriteSummary {
.map_err(|e| format!("Invalid write token, invalid content: {}", e))
}
/// return what sequencer ids were present in this write summary
pub fn sequencer_ids(&self) -> Vec<SequencerId> {
/// return what kafka partitions (sequencer ids from the write
/// buffer) were present in this write summary
pub fn kafka_partitions(&self) -> Vec<KafkaPartition> {
self.sequencers.keys().cloned().collect()
}
/// Given the write described by this summary and the sequencer's
/// progress, returns:
///
/// 1. `Ok(true) if the write is guaranteed to be readable
/// 2. `Ok(false) if the write is guaranteed to NOT be readable
/// 3. `Err` if a determination can not be made
///
/// The progress may not contain information about the kafka
/// partition of interest, for example.
pub fn readable(
&self,
progresses: &BTreeMap<KafkaPartition, SequencerProgress>,
) -> Result<bool> {
let readable = self.check_progress(progresses, |sequence_number, progress| {
progress.readable(sequence_number)
});
trace!(?readable, ?progresses, ?self, "Checked readable");
readable
}
/// Given the write described by this summary and the sequencer's
/// progress, returns:
///
/// 1. `Ok(true) if the write is guaranteed to be persisted to parquet
/// 2. `Ok(false) if the write is guaranteed to NOT be persisted to parquet
/// 3. `Err` if a determination can not be made
///
/// The progress may not contain information about the kafka
/// partition of interest, for example.
pub fn persisted(
&self,
progresses: &BTreeMap<KafkaPartition, SequencerProgress>,
) -> Result<bool> {
let persisted = self.check_progress(progresses, |sequence_number, progress| {
progress.persisted(sequence_number)
});
trace!(?persisted, ?progresses, ?self, "Checked persisted");
persisted
}
/// returns Some(true) if f(kafka_partition, progress) returns
/// true for all kafka_partitions in self for the corresponding
/// progress, Some(false) if `f` ever returned false, and None if
/// there `progresses` does not have information about one of the
/// partitions
fn check_progress<F>(
&self,
progresses: &BTreeMap<KafkaPartition, SequencerProgress>,
f: F,
) -> Result<bool>
where
F: Fn(SequenceNumber, &SequencerProgress) -> bool,
{
self.sequencers
.iter()
.map(|(&kafka_partition, sequence_numbers)| {
progresses
.get(&kafka_partition)
.map(|progress| {
sequence_numbers
.iter()
.all(|sequence_number| f(*sequence_number, progress))
})
.context(UnknownKafkaPartitionSnafu { kafka_partition })
})
.collect::<Result<Vec<_>>>()
.map(|all_results| {
// were all invocations of f() true?
all_results.into_iter().all(|v| v)
})
}
}
impl From<WriteSummary> for proto::WriteSummary {
@ -84,10 +185,12 @@ impl From<WriteSummary> for proto::WriteSummary {
let sequencers = summary
.sequencers
.into_iter()
.map(|(sequencer_id, sequence_numbers)| proto::SequencerWrite {
sequencer_id: sequencer_id.get() as i32,
sequence_numbers: sequence_numbers.into_iter().map(|v| v.get()).collect(),
})
.map(
|(kafka_partition, sequence_numbers)| proto::SequencerWrite {
sequencer_id: kafka_partition.get(),
sequence_numbers: sequence_numbers.into_iter().map(|v| v.get()).collect(),
},
)
.collect();
Self { sequencers }
@ -106,16 +209,12 @@ impl TryFrom<proto::WriteSummary> for WriteSummary {
sequencer_id,
sequence_numbers,
}| {
let sequencer_id = sequencer_id.try_into().map_err(|e| {
format!("Invalid sequencer id {} in proto: {}", sequencer_id, e)
})?;
let sequence_numbers = sequence_numbers
.into_iter()
.map(SequenceNumber::new)
.collect::<Vec<_>>();
Ok((SequencerId::new(sequencer_id), sequence_numbers))
Ok((KafkaPartition::new(sequencer_id), sequence_numbers))
},
)
.collect::<Result<BTreeMap<_, _>, String>>()?;
@ -280,24 +379,6 @@ mod tests {
WriteSummary::try_from_token(&token).unwrap();
}
#[test]
#[should_panic(
expected = "Invalid write token, invalid content: Invalid sequencer id 2147483647 in proto"
)]
fn token_parsing_bad_sequencer() {
// construct a message with sequencer id that can not be converted into i16
let bad_proto = proto::WriteSummary {
sequencers: vec![proto::SequencerWrite {
sequencer_id: i32::MAX,
sequence_numbers: vec![2],
}],
};
let token = base64::encode(serde_json::to_string(&bad_proto).unwrap());
WriteSummary::try_from_token(&token).unwrap();
}
fn make_meta(s: Sequence) -> DmlMeta {
use time::TimeProvider;
let time_provider = time::SystemProvider::new();
@ -306,4 +387,112 @@ mod tests {
let bytes_read = 132;
DmlMeta::sequenced(s, time_provider.now(), span_context, bytes_read)
}
#[test]
fn readable() {
let metas = vec![vec![
make_meta(Sequence::new(1, 2)),
make_meta(Sequence::new(1, 3)),
make_meta(Sequence::new(2, 1)),
]];
let summary = WriteSummary::new(metas);
let progresses = [
(
KafkaPartition::new(1),
SequencerProgress::new().with_buffered(SequenceNumber::new(3)),
),
(
KafkaPartition::new(2),
SequencerProgress::new().with_buffered(SequenceNumber::new(2)),
),
]
.into_iter()
.collect();
assert_eq!(summary.readable(&progresses), Ok(true));
// kafka partition 1 only made it to 2, but write includes 3
let progresses = [
(
KafkaPartition::new(1),
SequencerProgress::new().with_buffered(SequenceNumber::new(2)),
),
(
KafkaPartition::new(2),
SequencerProgress::new().with_buffered(SequenceNumber::new(2)),
),
]
.into_iter()
.collect();
assert_eq!(summary.readable(&progresses), Ok(false));
// No information on kafka partition 1
let progresses = [(
KafkaPartition::new(2),
SequencerProgress::new().with_buffered(SequenceNumber::new(2)),
)]
.into_iter()
.collect();
assert_eq!(
summary.readable(&progresses).unwrap_err().to_string(),
"Unknown kafka partition: 1"
);
}
#[test]
fn persisted() {
let metas = vec![vec![
make_meta(Sequence::new(1, 2)),
make_meta(Sequence::new(1, 3)),
make_meta(Sequence::new(2, 1)),
]];
let summary = WriteSummary::new(metas);
let progresses = [
(
KafkaPartition::new(1),
SequencerProgress::new().with_persisted(SequenceNumber::new(3)),
),
(
KafkaPartition::new(2),
SequencerProgress::new().with_persisted(SequenceNumber::new(2)),
),
]
.into_iter()
.collect();
assert_eq!(summary.persisted(&progresses), Ok(true));
// kafka partition 1 only made it to 2, but write includes 3
let progresses = [
(
KafkaPartition::new(1),
SequencerProgress::new().with_persisted(SequenceNumber::new(2)),
),
(
KafkaPartition::new(2),
SequencerProgress::new().with_persisted(SequenceNumber::new(2)),
),
]
.into_iter()
.collect();
assert_eq!(summary.persisted(&progresses), Ok(false));
// No information on kafka partition 1
let progresses = [(
KafkaPartition::new(2),
SequencerProgress::new().with_persisted(SequenceNumber::new(2)),
)]
.into_iter()
.collect();
assert_eq!(
summary.persisted(&progresses).unwrap_err().to_string(),
"Unknown kafka partition: 1"
);
}
}

View File

@ -0,0 +1,247 @@
use data_types2::SequenceNumber;
/// Information on how much data a particular sequencer has been processed
///
/// ```text
/// Write Lifecycle (compaction not shown):
///
/// Durable --------------> Readable -------------> Persisted
///
/// in sequencer, in memory, not yet in parquet
/// not readable. in parquet
/// ```
///
/// Note: min_readable_sequence_number <= min_totally_persisted_sequence_number
#[derive(Clone, Debug, PartialEq, Default)]
pub struct SequencerProgress {
/// Smallest sequence number of data that is buffered in memory
min_buffered: Option<SequenceNumber>,
/// Largest sequence number of data that is buffered in memory
max_buffered: Option<SequenceNumber>,
/// Largest sequence number of data that has been written to parquet
max_persisted: Option<SequenceNumber>,
}
impl SequencerProgress {
pub fn new() -> Self {
Default::default()
}
/// Note that `sequence_number` is buffered
pub fn with_buffered(mut self, sequence_number: SequenceNumber) -> Self {
self.min_buffered = Some(
self.min_buffered
.take()
.map(|cur| cur.min(sequence_number))
.unwrap_or(sequence_number),
);
self.max_buffered = Some(
self.max_buffered
.take()
.map(|cur| cur.max(sequence_number))
.unwrap_or(sequence_number),
);
self
}
/// Note that data with `sequence_number` was persisted; Note this does not
/// mean that all sequence numbers less than `sequence_number`
/// have been persisted.
pub fn with_persisted(mut self, sequence_number: SequenceNumber) -> Self {
self.max_persisted = Some(
self.max_persisted
.take()
.map(|cur| cur.max(sequence_number))
.unwrap_or(sequence_number),
);
self
}
// return true if this sequence number is readable
pub fn readable(&self, sequence_number: SequenceNumber) -> bool {
match (&self.max_buffered, &self.max_persisted) {
(Some(max_buffered), Some(max_persisted)) => {
&sequence_number <= max_buffered || &sequence_number <= max_persisted
}
(None, Some(max_persisted)) => &sequence_number <= max_persisted,
(Some(max_buffered), _) => &sequence_number <= max_buffered,
(None, None) => {
false // data not yet ingested
}
}
}
// return true if this sequence number is persisted
pub fn persisted(&self, sequence_number: SequenceNumber) -> bool {
// with both buffered and persisted data, need to
// ensure that no data is buffered to know that all is
// persisted
match (&self.min_buffered, &self.max_persisted) {
(Some(min_buffered), Some(max_persisted)) => {
// with both buffered and persisted data, need to
// ensure that no data is buffered to know that all is
// persisted
&sequence_number < min_buffered && &sequence_number <= max_persisted
}
(None, Some(max_persisted)) => &sequence_number <= max_persisted,
(_, None) => {
false // data not yet persisted
}
}
}
/// Combine the values from other
pub fn combine(self, other: Self) -> Self {
let updated = if let Some(min_buffered) = other.min_buffered {
self.with_buffered(min_buffered)
} else {
self
};
let updated = if let Some(max_buffered) = other.max_buffered {
updated.with_buffered(max_buffered)
} else {
updated
};
if let Some(max_persisted) = other.max_persisted {
updated.with_persisted(max_persisted)
} else {
updated
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty() {
let progress = SequencerProgress::new();
let sequence_number = SequenceNumber::new(0);
assert!(!progress.readable(sequence_number));
assert!(!progress.persisted(sequence_number));
}
#[test]
fn persisted() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress = SequencerProgress::new().with_persisted(eq);
assert!(progress.readable(lt));
assert!(progress.persisted(lt));
// persisted implies it is also readable
assert!(progress.readable(eq));
assert!(progress.persisted(eq));
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn buffered() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress = SequencerProgress::new().with_buffered(eq);
assert!(progress.readable(lt));
assert!(!progress.persisted(lt));
assert!(progress.readable(eq));
assert!(!progress.persisted(eq));
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn buffered_greater_than_persisted() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress = SequencerProgress::new()
.with_buffered(eq)
.with_persisted(lt);
assert!(progress.readable(lt));
assert!(progress.persisted(lt));
assert!(progress.readable(eq));
assert!(!progress.persisted(eq));
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn buffered_and_persisted() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress = SequencerProgress::new()
.with_buffered(eq)
.with_persisted(eq);
assert!(progress.readable(lt));
assert!(progress.persisted(lt));
assert!(progress.readable(eq));
assert!(!progress.persisted(eq)); // have buffered data, so can't be persisted here
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn buffered_less_than_persisted() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
// data buffered between lt and eq
let progress = SequencerProgress::new()
.with_buffered(lt)
.with_buffered(eq)
.with_persisted(eq);
assert!(progress.readable(lt));
assert!(!progress.persisted(lt)); // have buffered data at lt, can't be persisted
assert!(progress.readable(eq));
assert!(!progress.persisted(eq)); // have buffered data, so can't be persisted
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn combine() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress1 = SequencerProgress::new().with_buffered(gt);
let progress2 = SequencerProgress::new()
.with_buffered(lt)
.with_persisted(eq);
let expected = SequencerProgress::new()
.with_buffered(lt)
.with_buffered(gt)
.with_persisted(eq);
assert_eq!(progress1.combine(progress2), expected);
}
}