diff --git a/Cargo.lock b/Cargo.lock index 865a5f716a..cb5e3c3212 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1096,7 +1096,7 @@ dependencies = [ [[package]] name = "datafusion" version = "7.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=2a4a835bd727b00c58631a1c807bb598a0a12a93#2a4a835bd727b00c58631a1c807bb598a0a12a93" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=b890190a6521ab1e572184dcf6ea0a5afb3a47af#b890190a6521ab1e572184dcf6ea0a5afb3a47af" dependencies = [ "ahash", "arrow", @@ -1128,7 +1128,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "7.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=2a4a835bd727b00c58631a1c807bb598a0a12a93#2a4a835bd727b00c58631a1c807bb598a0a12a93" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=b890190a6521ab1e572184dcf6ea0a5afb3a47af#b890190a6521ab1e572184dcf6ea0a5afb3a47af" dependencies = [ "arrow", "ordered-float 2.10.0", @@ -1139,7 +1139,7 @@ dependencies = [ [[package]] name = "datafusion-data-access" version = "1.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=2a4a835bd727b00c58631a1c807bb598a0a12a93#2a4a835bd727b00c58631a1c807bb598a0a12a93" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=b890190a6521ab1e572184dcf6ea0a5afb3a47af#b890190a6521ab1e572184dcf6ea0a5afb3a47af" dependencies = [ "async-trait", "chrono", @@ -1152,7 +1152,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "7.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=2a4a835bd727b00c58631a1c807bb598a0a12a93#2a4a835bd727b00c58631a1c807bb598a0a12a93" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=b890190a6521ab1e572184dcf6ea0a5afb3a47af#b890190a6521ab1e572184dcf6ea0a5afb3a47af" dependencies = [ "ahash", "arrow", @@ -1163,7 +1163,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "7.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=2a4a835bd727b00c58631a1c807bb598a0a12a93#2a4a835bd727b00c58631a1c807bb598a0a12a93" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=b890190a6521ab1e572184dcf6ea0a5afb3a47af#b890190a6521ab1e572184dcf6ea0a5afb3a47af" dependencies = [ "ahash", "arrow", @@ -2343,6 +2343,7 @@ dependencies = [ "uuid", "workspace-hack", "write_buffer", + "write_summary", ] [[package]] @@ -3810,9 +3811,9 @@ dependencies = [ [[package]] name = "parquet" -version = "11.0.0" +version = "11.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1460c04ede29e7a2d825357cd764c403b5d28ebdabc129876552c8a86a4ee9a8" +checksum = "9e2ba225e8c800adda2da6b0d63a65abf71867868682947ccb8fc6a9c68ba541" dependencies = [ "arrow", "base64 0.13.0", @@ -6999,7 +7000,9 @@ dependencies = [ "data_types2", "dml", "generated_types", + "observability_deps", "serde_json", + "snafu", "time 0.1.0", "workspace-hack", ] diff --git a/data_types/src/sequence.rs b/data_types/src/sequence.rs index 0fa48ed308..2c8bbeac75 100644 --- a/data_types/src/sequence.rs +++ b/data_types/src/sequence.rs @@ -1,6 +1,8 @@ #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub struct Sequence { + /// The sequencer id (kafka partition id) pub sequencer_id: u32, + /// The sequence number (kafka offset) pub sequence_number: u64, } diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index 4005f431ee..5635d11c19 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -127,7 +127,11 @@ impl ColumnId { } } -/// Unique ID for a `Sequencer` +/// Unique ID for a `Sequencer`. Note this is NOT the same as the +/// "sequencer_number" in the `write_buffer` which currently means +/// "kafka partition". +/// +/// https://github.com/influxdata/influxdb_iox/issues/4237 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] pub struct SequencerId(i16); diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index bf4e4afb50..0641845ec2 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,5 +9,5 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (e.g. don't get support for crypo functions or avro) -upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="2a4a835bd727b00c58631a1c807bb598a0a12a93", default-features = false, package = "datafusion" } +upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="b890190a6521ab1e572184dcf6ea0a5afb3a47af", default-features = false, package = "datafusion" } workspace-hack = { path = "../workspace-hack"} diff --git a/db/src/lib.rs b/db/src/lib.rs index 9f06f216e3..f3619f55eb 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -2136,7 +2136,7 @@ mod tests { // Read buffer + Parquet chunk size catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); - catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1235); + catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1236); // All the chunks should have different IDs assert_ne!(mb_chunk.id(), rb_chunk.id()); @@ -2253,7 +2253,7 @@ mod tests { let registry = test_db.metric_registry.as_ref(); // Read buffer + Parquet chunk size - let object_store_bytes = 1235; + let object_store_bytes = 1236; catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); catalog_chunk_size_bytes_metric_eq(registry, "object_store", object_store_bytes); @@ -2732,7 +2732,7 @@ mod tests { id: chunk_summaries[0].id, storage: ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, - memory_bytes: 4095, // size of RB and OS chunks + memory_bytes: 4096, // size of RB and OS chunks object_store_bytes: 1574, // size of parquet file row_count: 2, time_of_last_access: None, @@ -2783,7 +2783,7 @@ mod tests { assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2526 + 1495); assert_eq!(db.catalog.metrics().memory().read_buffer(), 2550); - assert_eq!(db.catalog.metrics().memory().object_store(), 1545); + assert_eq!(db.catalog.metrics().memory().object_store(), 1546); } #[tokio::test] diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 696bcb7f27..43b76a7f70 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -44,6 +44,7 @@ tracker = { path = "../tracker" } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} write_buffer = { path = "../write_buffer" } +write_summary = { path = "../write_summary" } tokio-util = { version = "0.7.1" } trace = { path = "../trace" } diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 8d82a77abb..fe438303e3 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -31,6 +31,7 @@ use std::{ }; use time::SystemProvider; use uuid::Uuid; +use write_summary::SequencerProgress; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -50,6 +51,12 @@ pub enum Error { #[snafu(display("Sequencer {} not found in data map", sequencer_id))] SequencerNotFound { sequencer_id: SequencerId }, + #[snafu(display( + "Sequencer not found for kafka partition {} in data map", + kafka_partition + ))] + SequencerForPartitionNotFound { kafka_partition: KafkaPartition }, + #[snafu(display("Namespace {} not found in catalog", namespace))] NamespaceNotFound { namespace: String }, @@ -154,6 +161,25 @@ impl IngesterData { let table_data = tables.get(table_name)?; Some(Arc::clone(table_data)) } + + /// Return the ingestion progress for the specified kafka partitions + pub(crate) async fn progresses( + &self, + partitions: Vec, + ) -> Result> { + let mut progresses = BTreeMap::new(); + for kafka_partition in partitions { + let sequencer_data = self + .sequencers + .iter() + .map(|(_, sequencer_data)| sequencer_data) + .find(|sequencer_data| sequencer_data.kafka_partition == kafka_partition) + .context(SequencerForPartitionNotFoundSnafu { kafka_partition })?; + + progresses.insert(kafka_partition, sequencer_data.progress().await); + } + Ok(progresses) + } } /// The Persister has a function to persist a given partition ID and to update the @@ -301,6 +327,9 @@ impl Persister for IngesterData { /// Data of a Shard #[derive(Debug)] pub struct SequencerData { + /// The kafka partition for this sequencer + kafka_partition: KafkaPartition, + // New namespaces can come in at any time so we need to be able to add new ones namespaces: RwLock>>, @@ -310,7 +339,7 @@ pub struct SequencerData { impl SequencerData { /// Initialise a new [`SequencerData`] that emits metrics to `metrics`. - pub fn new(metrics: Arc) -> Self { + pub fn new(kafka_partition: KafkaPartition, metrics: Arc) -> Self { let namespace_count = metrics .register_metric::( "ingester_namespaces_total", @@ -319,6 +348,7 @@ impl SequencerData { .recorder(&[]); Self { + kafka_partition, namespaces: Default::default(), metrics, namespace_count, @@ -327,8 +357,12 @@ impl SequencerData { /// Initialize new SequncerData with namespace for testing purpose only #[cfg(test)] - pub fn new_for_test(namespaces: BTreeMap>) -> Self { + pub fn new_for_test( + kafka_partition: KafkaPartition, + namespaces: BTreeMap>, + ) -> Self { Self { + kafka_partition, namespaces: RwLock::new(namespaces), metrics: Default::default(), namespace_count: Default::default(), @@ -400,6 +434,18 @@ impl SequencerData { Ok(data) } + + /// Return the progress of this sequencer + async fn progress(&self) -> SequencerProgress { + let namespaces: Vec<_> = self.namespaces.read().values().map(Arc::clone).collect(); + + let mut progress = SequencerProgress::new(); + + for namespace_data in namespaces { + progress = progress.combine(namespace_data.progress().await); + } + progress + } } /// Data of a Namespace that belongs to a given Shard @@ -617,6 +663,18 @@ impl NamespaceData { } } } + + /// Return progress from this Namespace + async fn progress(&self) -> SequencerProgress { + let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect(); + + let mut progress = SequencerProgress::new(); + for table_data in tables { + progress = progress.combine(table_data.read().await.progress()) + } + + progress + } } /// Data of a Table in a given Namesapce that belongs to a given Shard @@ -805,6 +863,21 @@ impl TableData { Ok(()) } + + /// Return progress from this Table + fn progress(&self) -> SequencerProgress { + let progress = SequencerProgress::new(); + let progress = match self.parquet_max_sequence_number() { + Some(n) => progress.with_persisted(n), + None => progress, + }; + + self.partition_data + .values() + .fold(progress, |progress, partition_data| { + progress.combine(partition_data.progress()) + }) + } } /// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard @@ -874,7 +947,7 @@ impl PartitionData { } None => { self.data.buffer = Some(BufferBatch { - min_sequencer_number: sequencer_number, + min_sequence_number: sequencer_number, max_sequence_number: sequencer_number, data: mb, }) @@ -959,6 +1032,11 @@ impl PartitionData { self.data.snapshots.push(snapshot); } } + + /// Return the progress from this Partition + fn progress(&self) -> SequencerProgress { + self.data.progress() + } } /// Data of an IOx partition split into batches @@ -1048,7 +1126,7 @@ impl DataBuffer { ) -> Result>, mutable_batch::Error> { if let Some(buf) = &self.buffer { return Ok(Some(Arc::new(SnapshotBatch { - min_sequencer_number: buf.min_sequencer_number, + min_sequencer_number: buf.min_sequence_number, max_sequencer_number: buf.max_sequence_number, data: Arc::new(buf.data.to_arrow(Selection::All)?), }))); @@ -1169,6 +1247,33 @@ impl DataBuffer { Ok(()) } + + /// Return the progress in this DataBuffer + fn progress(&self) -> SequencerProgress { + let progress = SequencerProgress::new(); + + let progress = if let Some(buffer) = &self.buffer { + progress.combine(buffer.progress()) + } else { + progress + }; + + let progress = self.snapshots.iter().fold(progress, |progress, snapshot| { + progress.combine(snapshot.progress()) + }); + + if let Some(persisting) = &self.persisting { + persisting + .data + .data + .iter() + .fold(progress, |progress, snapshot| { + progress.combine(snapshot.progress()) + }) + } else { + progress + } + } } /// BufferBatch is a MutableBatch with its ingesting order, sequencer_number, that helps the @@ -1176,13 +1281,23 @@ impl DataBuffer { #[derive(Debug)] pub struct BufferBatch { /// Sequence number of the first write in this batch - pub(crate) min_sequencer_number: SequenceNumber, + pub(crate) min_sequence_number: SequenceNumber, /// Sequence number of the last write in this batch pub(crate) max_sequence_number: SequenceNumber, /// Ingesting data pub(crate) data: MutableBatch, } +impl BufferBatch { + /// Return the progress in this DataBuffer + + fn progress(&self) -> SequencerProgress { + SequencerProgress::new() + .with_buffered(self.min_sequence_number) + .with_buffered(self.max_sequence_number) + } +} + /// SnapshotBatch contains data of many contiguous BufferBatches #[derive(Debug, PartialEq)] pub struct SnapshotBatch { @@ -1221,6 +1336,13 @@ impl SnapshotBatch { } }) } + + /// Return progress in this data + fn progress(&self) -> SequencerProgress { + SequencerProgress::new() + .with_buffered(self.min_sequencer_number) + .with_buffered(self.max_sequencer_number) + } } /// PersistingBatch contains all needed info and data for creating @@ -1343,7 +1465,7 @@ mod tests { let (_, mutable_batch1) = lp_to_mutable_batch(r#"foo,t1=asdf iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#); let buffer_batch1 = BufferBatch { - min_sequencer_number: seq_num1, + min_sequence_number: seq_num1, max_sequence_number: seq_num1, data: mutable_batch1, }; @@ -1418,7 +1540,11 @@ mod tests { .unwrap(); let mut sequencers = BTreeMap::new(); - sequencers.insert(sequencer1.id, SequencerData::new(Arc::clone(&metrics))); + let kafka_partition = KafkaPartition::new(0); + sequencers.insert( + sequencer1.id, + SequencerData::new(kafka_partition, Arc::clone(&metrics)), + ); let object_store: Arc = Arc::new(ObjectStoreImpl::new_in_memory()); @@ -1498,8 +1624,14 @@ mod tests { .await .unwrap(); let mut sequencers = BTreeMap::new(); - sequencers.insert(sequencer1.id, SequencerData::new(Arc::clone(&metrics))); - sequencers.insert(sequencer2.id, SequencerData::new(Arc::clone(&metrics))); + sequencers.insert( + sequencer1.id, + SequencerData::new(sequencer1.kafka_partition, Arc::clone(&metrics)), + ); + sequencers.insert( + sequencer2.id, + SequencerData::new(sequencer2.kafka_partition, Arc::clone(&metrics)), + ); let object_store: Arc = Arc::new(ObjectStoreImpl::new_in_memory()); @@ -1559,6 +1691,17 @@ mod tests { .await .unwrap(); + // check progresses + let progresses = data.progresses(vec![kafka_partition]).await.unwrap(); + let mut expected_progresses = BTreeMap::new(); + expected_progresses.insert( + kafka_partition, + SequencerProgress::new() + .with_buffered(SequenceNumber::new(1)) + .with_buffered(SequenceNumber::new(2)), + ); + assert_eq!(progresses, expected_progresses); + let sd = data.sequencers.get(&sequencer1.id).unwrap(); let n = sd.namespace("foo").unwrap(); let partition_id; @@ -1631,6 +1774,17 @@ mod tests { mem_table.parquet_max_sequence_number(), Some(SequenceNumber::new(2)) ); + + // check progresses after persist + let progresses = data.progresses(vec![kafka_partition]).await.unwrap(); + let mut expected_progresses = BTreeMap::new(); + expected_progresses.insert( + kafka_partition, + SequencerProgress::new() + .with_buffered(SequenceNumber::new(1)) + .with_persisted(SequenceNumber::new(2)), + ); + assert_eq!(progresses, expected_progresses); } // Test deletes mixed with writes on a single parittion @@ -1657,7 +1811,7 @@ mod tests { // verify data assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequencer_number, + p.data.buffer.as_ref().unwrap().min_sequence_number, SequenceNumber::new(1) ); assert_eq!( @@ -1719,7 +1873,7 @@ mod tests { // verify data assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequencer_number, + p.data.buffer.as_ref().unwrap().min_sequence_number, SequenceNumber::new(4) ); assert_eq!( @@ -1816,7 +1970,7 @@ mod tests { // verify data assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequencer_number, + p.data.buffer.as_ref().unwrap().min_sequence_number, SequenceNumber::new(8) ); // 1 newlly added mutable batch of 3 rows of data assert_eq!(p.data.snapshots.len(), 0); // still empty @@ -2025,7 +2179,7 @@ mod tests { let table = tables.get("mem").unwrap().read().await; let partition = table.partition_data.get("1970-01-01").unwrap(); assert_eq!( - partition.data.buffer.as_ref().unwrap().min_sequencer_number, + partition.data.buffer.as_ref().unwrap().min_sequence_number, SequenceNumber::new(2) ); diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index fb62669501..68f726ffa7 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -35,6 +35,7 @@ use tokio::task::{JoinError, JoinHandle}; use tokio_util::sync::CancellationToken; use trace::span::SpanRecorder; use write_buffer::core::{WriteBufferReading, WriteBufferStreamHandler}; +use write_summary::SequencerProgress; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -53,6 +54,9 @@ pub enum Error { WriteBuffer { source: write_buffer::core::WriteBufferError, }, + + #[snafu(display("Data Error: {}", source))] + Data { source: crate::data::Error }, } /// When the lifecycle manager indicates that ingest should be paused because of @@ -63,7 +67,7 @@ const INGEST_PAUSE_DELAY: Duration = Duration::from_millis(100); /// A specialized `Error` for Catalog errors pub type Result = std::result::Result; -/// The [`IngestHandler`] handles all ingest from kafka, persistence and queries +/// An [`IngestHandler`] handles all ingest requests from kafka, persistence and queries #[async_trait] pub trait IngestHandler: Send + Sync { /// Return results from the in-memory data that match this query @@ -72,6 +76,12 @@ pub trait IngestHandler: Send + Sync { request: IngesterQueryRequest, ) -> Result; + /// Return sequencer progress for the requested kafka partitions + async fn progresses( + &self, + sequencers: Vec, + ) -> Result>; + /// Wait until the handler finished to shutdown. /// /// Use [`shutdown`](Self::shutdown) to trigger a shutdown. @@ -127,7 +137,10 @@ impl IngestHandlerImpl { // build the initial ingester data state let mut sequencers = BTreeMap::new(); for s in sequencer_states.values() { - sequencers.insert(s.id, SequencerData::new(Arc::clone(&metric_registry))); + sequencers.insert( + s.id, + SequencerData::new(s.kafka_partition, Arc::clone(&metric_registry)), + ); } let data = Arc::new(IngesterData { object_store, @@ -242,6 +255,14 @@ impl IngestHandler for IngestHandlerImpl { self.shutdown.cancel(); self.data.exec.shutdown(); } + + /// Return the ingestion progress from each sequencer + async fn progresses( + &self, + partitions: Vec, + ) -> Result> { + self.data.progresses(partitions).await.context(DataSnafu) + } } impl Drop for IngestHandlerImpl { diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 397436e20d..78b67b4f3d 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -9,8 +9,8 @@ use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; use bitflags::bitflags; use data_types2::{ - NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp, Tombstone, - TombstoneId, + KafkaPartition, NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp, + Tombstone, TombstoneId, }; use iox_catalog::{ interface::{Catalog, INITIAL_COMPACTION_LEVEL}, @@ -704,7 +704,8 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa namespaces.insert(TEST_NAMESPACE.to_string(), data_ns); // One sequencer/shard that contains 2 namespaces - let seq_data = SequencerData::new_for_test(namespaces); + let kafka_partition = KafkaPartition::new(0); + let seq_data = SequencerData::new_for_test(kafka_partition, namespaces); let mut sequencers = BTreeMap::new(); sequencers.insert(seq_id, seq_data); @@ -747,7 +748,8 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa namespaces.insert(TEST_NAMESPACE.to_string(), data_ns); // One sequencer/shard that contains 1 namespace - let seq_data = SequencerData::new_for_test(namespaces); + let kafka_partition = KafkaPartition::new(0); + let seq_data = SequencerData::new_for_test(kafka_partition, namespaces); let mut sequencers = BTreeMap::new(); sequencers.insert(seq_id, seq_data); diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 1ea6b7bab3..8b44fa36ca 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -1292,7 +1292,7 @@ mod tests { let mut generator = ChunkGenerator::new().await; let (chunk, _) = generator.generate().await.unwrap(); let parquet_metadata = chunk.parquet_metadata(); - assert_eq!(parquet_metadata.size(), 4069); + assert_eq!(parquet_metadata.size(), 4070); } #[test] diff --git a/write_summary/Cargo.toml b/write_summary/Cargo.toml index 351bdfccec..6f1dc5e013 100644 --- a/write_summary/Cargo.toml +++ b/write_summary/Cargo.toml @@ -10,12 +10,14 @@ edition = "2021" data_types2 = { path = "../data_types2" } dml = { path = "../dml" } generated_types = { path = "../generated_types" } +observability_deps = { path = "../observability_deps" } +workspace-hack = { path = "../workspace-hack"} # Crates.io dependencies, in alphabetical order base64 = "0.13" serde_json = "1.0.79" -workspace-hack = { path = "../workspace-hack"} +snafu = "0.7" [dev-dependencies] diff --git a/write_summary/src/lib.rs b/write_summary/src/lib.rs index f155b2ac78..d458bd72a5 100644 --- a/write_summary/src/lib.rs +++ b/write_summary/src/lib.rs @@ -1,11 +1,25 @@ use std::collections::BTreeMap; -use data_types2::{SequenceNumber, SequencerId}; +use data_types2::{KafkaPartition, SequenceNumber}; +use observability_deps::tracing::{debug, trace}; + /// Protobuf to/from conversion use generated_types::influxdata::iox::write_summary::v1 as proto; use dml::DmlMeta; +use snafu::{OptionExt, Snafu}; +mod progress; +pub use progress::SequencerProgress; + +#[derive(Debug, Snafu, PartialEq)] +pub enum Error { + #[snafu(display("Unknown kafka partition: {}", kafka_partition))] + UnknownKafkaPartition { kafka_partition: KafkaPartition }, +} + +pub type Result = std::result::Result; + /// Contains information about a single write. /// /// A single write consisting of multiple lines of line protocol @@ -18,13 +32,17 @@ use dml::DmlMeta; #[derive(Debug, Default, Clone, PartialEq)] /// Summary of a Vec> pub struct WriteSummary { - /// Key is the sequencer, value is the sequence numbers from that sequencer. + /// Key is the sequencer_id from the DmlMeta structure (aka kafka + /// partition id), value is the sequence numbers from that + /// sequencer. + /// /// Note: BTreeMap to ensure the output is in a consistent order - sequencers: BTreeMap>, + sequencers: BTreeMap>, } impl WriteSummary { pub fn new(metas: Vec>) -> Self { + debug!(?metas, "Creating write summary"); let sequences = metas .iter() .flat_map(|v| v.iter()) @@ -32,7 +50,17 @@ impl WriteSummary { let mut sequencers = BTreeMap::new(); for s in sequences { - let sequencer_id: i16 = s.sequencer_id.try_into().expect("Invalid sequencer id"); + let sequencer_id: i32 = s.sequencer_id.try_into().expect("Invalid sequencer id"); + + // This is super confusing: "sequencer_id" in the router2 + // and other parts of the codebase refers to what the + // ingester calls "kakfa_partition". + // + // The ingester uses "sequencer_id" to refer to the id of + // the Sequencer catalog type + // + // https://github.com/influxdata/influxdb_iox/issues/4237 + let kafka_partition = KafkaPartition::new(sequencer_id); let sequence_number: i64 = s .sequence_number @@ -40,7 +68,7 @@ impl WriteSummary { .expect("Invalid sequencer number"); sequencers - .entry(SequencerId::new(sequencer_id)) + .entry(kafka_partition) .or_insert_with(Vec::new) .push(SequenceNumber::new(sequence_number)) } @@ -73,10 +101,83 @@ impl WriteSummary { .map_err(|e| format!("Invalid write token, invalid content: {}", e)) } - /// return what sequencer ids were present in this write summary - pub fn sequencer_ids(&self) -> Vec { + /// return what kafka partitions (sequencer ids from the write + /// buffer) were present in this write summary + pub fn kafka_partitions(&self) -> Vec { self.sequencers.keys().cloned().collect() } + + /// Given the write described by this summary and the sequencer's + /// progress, returns: + /// + /// 1. `Ok(true) if the write is guaranteed to be readable + /// 2. `Ok(false) if the write is guaranteed to NOT be readable + /// 3. `Err` if a determination can not be made + /// + /// The progress may not contain information about the kafka + /// partition of interest, for example. + pub fn readable( + &self, + progresses: &BTreeMap, + ) -> Result { + let readable = self.check_progress(progresses, |sequence_number, progress| { + progress.readable(sequence_number) + }); + trace!(?readable, ?progresses, ?self, "Checked readable"); + readable + } + + /// Given the write described by this summary and the sequencer's + /// progress, returns: + /// + /// 1. `Ok(true) if the write is guaranteed to be persisted to parquet + /// 2. `Ok(false) if the write is guaranteed to NOT be persisted to parquet + /// 3. `Err` if a determination can not be made + /// + /// The progress may not contain information about the kafka + /// partition of interest, for example. + pub fn persisted( + &self, + progresses: &BTreeMap, + ) -> Result { + let persisted = self.check_progress(progresses, |sequence_number, progress| { + progress.persisted(sequence_number) + }); + trace!(?persisted, ?progresses, ?self, "Checked persisted"); + persisted + } + + /// returns Some(true) if f(kafka_partition, progress) returns + /// true for all kafka_partitions in self for the corresponding + /// progress, Some(false) if `f` ever returned false, and None if + /// there `progresses` does not have information about one of the + /// partitions + fn check_progress( + &self, + progresses: &BTreeMap, + f: F, + ) -> Result + where + F: Fn(SequenceNumber, &SequencerProgress) -> bool, + { + self.sequencers + .iter() + .map(|(&kafka_partition, sequence_numbers)| { + progresses + .get(&kafka_partition) + .map(|progress| { + sequence_numbers + .iter() + .all(|sequence_number| f(*sequence_number, progress)) + }) + .context(UnknownKafkaPartitionSnafu { kafka_partition }) + }) + .collect::>>() + .map(|all_results| { + // were all invocations of f() true? + all_results.into_iter().all(|v| v) + }) + } } impl From for proto::WriteSummary { @@ -84,10 +185,12 @@ impl From for proto::WriteSummary { let sequencers = summary .sequencers .into_iter() - .map(|(sequencer_id, sequence_numbers)| proto::SequencerWrite { - sequencer_id: sequencer_id.get() as i32, - sequence_numbers: sequence_numbers.into_iter().map(|v| v.get()).collect(), - }) + .map( + |(kafka_partition, sequence_numbers)| proto::SequencerWrite { + sequencer_id: kafka_partition.get(), + sequence_numbers: sequence_numbers.into_iter().map(|v| v.get()).collect(), + }, + ) .collect(); Self { sequencers } @@ -106,16 +209,12 @@ impl TryFrom for WriteSummary { sequencer_id, sequence_numbers, }| { - let sequencer_id = sequencer_id.try_into().map_err(|e| { - format!("Invalid sequencer id {} in proto: {}", sequencer_id, e) - })?; - let sequence_numbers = sequence_numbers .into_iter() .map(SequenceNumber::new) .collect::>(); - Ok((SequencerId::new(sequencer_id), sequence_numbers)) + Ok((KafkaPartition::new(sequencer_id), sequence_numbers)) }, ) .collect::, String>>()?; @@ -280,24 +379,6 @@ mod tests { WriteSummary::try_from_token(&token).unwrap(); } - #[test] - #[should_panic( - expected = "Invalid write token, invalid content: Invalid sequencer id 2147483647 in proto" - )] - fn token_parsing_bad_sequencer() { - // construct a message with sequencer id that can not be converted into i16 - let bad_proto = proto::WriteSummary { - sequencers: vec![proto::SequencerWrite { - sequencer_id: i32::MAX, - sequence_numbers: vec![2], - }], - }; - - let token = base64::encode(serde_json::to_string(&bad_proto).unwrap()); - - WriteSummary::try_from_token(&token).unwrap(); - } - fn make_meta(s: Sequence) -> DmlMeta { use time::TimeProvider; let time_provider = time::SystemProvider::new(); @@ -306,4 +387,112 @@ mod tests { let bytes_read = 132; DmlMeta::sequenced(s, time_provider.now(), span_context, bytes_read) } + + #[test] + fn readable() { + let metas = vec![vec![ + make_meta(Sequence::new(1, 2)), + make_meta(Sequence::new(1, 3)), + make_meta(Sequence::new(2, 1)), + ]]; + let summary = WriteSummary::new(metas); + + let progresses = [ + ( + KafkaPartition::new(1), + SequencerProgress::new().with_buffered(SequenceNumber::new(3)), + ), + ( + KafkaPartition::new(2), + SequencerProgress::new().with_buffered(SequenceNumber::new(2)), + ), + ] + .into_iter() + .collect(); + + assert_eq!(summary.readable(&progresses), Ok(true)); + + // kafka partition 1 only made it to 2, but write includes 3 + let progresses = [ + ( + KafkaPartition::new(1), + SequencerProgress::new().with_buffered(SequenceNumber::new(2)), + ), + ( + KafkaPartition::new(2), + SequencerProgress::new().with_buffered(SequenceNumber::new(2)), + ), + ] + .into_iter() + .collect(); + + assert_eq!(summary.readable(&progresses), Ok(false)); + + // No information on kafka partition 1 + let progresses = [( + KafkaPartition::new(2), + SequencerProgress::new().with_buffered(SequenceNumber::new(2)), + )] + .into_iter() + .collect(); + + assert_eq!( + summary.readable(&progresses).unwrap_err().to_string(), + "Unknown kafka partition: 1" + ); + } + + #[test] + fn persisted() { + let metas = vec![vec![ + make_meta(Sequence::new(1, 2)), + make_meta(Sequence::new(1, 3)), + make_meta(Sequence::new(2, 1)), + ]]; + let summary = WriteSummary::new(metas); + + let progresses = [ + ( + KafkaPartition::new(1), + SequencerProgress::new().with_persisted(SequenceNumber::new(3)), + ), + ( + KafkaPartition::new(2), + SequencerProgress::new().with_persisted(SequenceNumber::new(2)), + ), + ] + .into_iter() + .collect(); + + assert_eq!(summary.persisted(&progresses), Ok(true)); + + // kafka partition 1 only made it to 2, but write includes 3 + let progresses = [ + ( + KafkaPartition::new(1), + SequencerProgress::new().with_persisted(SequenceNumber::new(2)), + ), + ( + KafkaPartition::new(2), + SequencerProgress::new().with_persisted(SequenceNumber::new(2)), + ), + ] + .into_iter() + .collect(); + + assert_eq!(summary.persisted(&progresses), Ok(false)); + + // No information on kafka partition 1 + let progresses = [( + KafkaPartition::new(2), + SequencerProgress::new().with_persisted(SequenceNumber::new(2)), + )] + .into_iter() + .collect(); + + assert_eq!( + summary.persisted(&progresses).unwrap_err().to_string(), + "Unknown kafka partition: 1" + ); + } } diff --git a/write_summary/src/progress.rs b/write_summary/src/progress.rs new file mode 100644 index 0000000000..727fa61f63 --- /dev/null +++ b/write_summary/src/progress.rs @@ -0,0 +1,247 @@ +use data_types2::SequenceNumber; + +/// Information on how much data a particular sequencer has been processed +/// +/// ```text +/// Write Lifecycle (compaction not shown): +/// +/// Durable --------------> Readable -------------> Persisted +/// +/// in sequencer, in memory, not yet in parquet +/// not readable. in parquet +/// ``` +/// +/// Note: min_readable_sequence_number <= min_totally_persisted_sequence_number +#[derive(Clone, Debug, PartialEq, Default)] +pub struct SequencerProgress { + /// Smallest sequence number of data that is buffered in memory + min_buffered: Option, + + /// Largest sequence number of data that is buffered in memory + max_buffered: Option, + + /// Largest sequence number of data that has been written to parquet + max_persisted: Option, +} + +impl SequencerProgress { + pub fn new() -> Self { + Default::default() + } + + /// Note that `sequence_number` is buffered + pub fn with_buffered(mut self, sequence_number: SequenceNumber) -> Self { + self.min_buffered = Some( + self.min_buffered + .take() + .map(|cur| cur.min(sequence_number)) + .unwrap_or(sequence_number), + ); + self.max_buffered = Some( + self.max_buffered + .take() + .map(|cur| cur.max(sequence_number)) + .unwrap_or(sequence_number), + ); + self + } + + /// Note that data with `sequence_number` was persisted; Note this does not + /// mean that all sequence numbers less than `sequence_number` + /// have been persisted. + pub fn with_persisted(mut self, sequence_number: SequenceNumber) -> Self { + self.max_persisted = Some( + self.max_persisted + .take() + .map(|cur| cur.max(sequence_number)) + .unwrap_or(sequence_number), + ); + self + } + + // return true if this sequence number is readable + pub fn readable(&self, sequence_number: SequenceNumber) -> bool { + match (&self.max_buffered, &self.max_persisted) { + (Some(max_buffered), Some(max_persisted)) => { + &sequence_number <= max_buffered || &sequence_number <= max_persisted + } + (None, Some(max_persisted)) => &sequence_number <= max_persisted, + (Some(max_buffered), _) => &sequence_number <= max_buffered, + (None, None) => { + false // data not yet ingested + } + } + } + + // return true if this sequence number is persisted + pub fn persisted(&self, sequence_number: SequenceNumber) -> bool { + // with both buffered and persisted data, need to + // ensure that no data is buffered to know that all is + // persisted + match (&self.min_buffered, &self.max_persisted) { + (Some(min_buffered), Some(max_persisted)) => { + // with both buffered and persisted data, need to + // ensure that no data is buffered to know that all is + // persisted + &sequence_number < min_buffered && &sequence_number <= max_persisted + } + (None, Some(max_persisted)) => &sequence_number <= max_persisted, + (_, None) => { + false // data not yet persisted + } + } + } + + /// Combine the values from other + pub fn combine(self, other: Self) -> Self { + let updated = if let Some(min_buffered) = other.min_buffered { + self.with_buffered(min_buffered) + } else { + self + }; + + let updated = if let Some(max_buffered) = other.max_buffered { + updated.with_buffered(max_buffered) + } else { + updated + }; + + if let Some(max_persisted) = other.max_persisted { + updated.with_persisted(max_persisted) + } else { + updated + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty() { + let progress = SequencerProgress::new(); + let sequence_number = SequenceNumber::new(0); + assert!(!progress.readable(sequence_number)); + assert!(!progress.persisted(sequence_number)); + } + + #[test] + fn persisted() { + let lt = SequenceNumber::new(0); + let eq = SequenceNumber::new(1); + let gt = SequenceNumber::new(2); + + let progress = SequencerProgress::new().with_persisted(eq); + + assert!(progress.readable(lt)); + assert!(progress.persisted(lt)); + + // persisted implies it is also readable + assert!(progress.readable(eq)); + assert!(progress.persisted(eq)); + + assert!(!progress.readable(gt)); + assert!(!progress.persisted(gt)); + } + + #[test] + fn buffered() { + let lt = SequenceNumber::new(0); + let eq = SequenceNumber::new(1); + let gt = SequenceNumber::new(2); + + let progress = SequencerProgress::new().with_buffered(eq); + + assert!(progress.readable(lt)); + assert!(!progress.persisted(lt)); + + assert!(progress.readable(eq)); + assert!(!progress.persisted(eq)); + + assert!(!progress.readable(gt)); + assert!(!progress.persisted(gt)); + } + + #[test] + fn buffered_greater_than_persisted() { + let lt = SequenceNumber::new(0); + let eq = SequenceNumber::new(1); + let gt = SequenceNumber::new(2); + + let progress = SequencerProgress::new() + .with_buffered(eq) + .with_persisted(lt); + + assert!(progress.readable(lt)); + assert!(progress.persisted(lt)); + + assert!(progress.readable(eq)); + assert!(!progress.persisted(eq)); + + assert!(!progress.readable(gt)); + assert!(!progress.persisted(gt)); + } + + #[test] + fn buffered_and_persisted() { + let lt = SequenceNumber::new(0); + let eq = SequenceNumber::new(1); + let gt = SequenceNumber::new(2); + + let progress = SequencerProgress::new() + .with_buffered(eq) + .with_persisted(eq); + + assert!(progress.readable(lt)); + assert!(progress.persisted(lt)); + + assert!(progress.readable(eq)); + assert!(!progress.persisted(eq)); // have buffered data, so can't be persisted here + + assert!(!progress.readable(gt)); + assert!(!progress.persisted(gt)); + } + + #[test] + fn buffered_less_than_persisted() { + let lt = SequenceNumber::new(0); + let eq = SequenceNumber::new(1); + let gt = SequenceNumber::new(2); + + // data buffered between lt and eq + let progress = SequencerProgress::new() + .with_buffered(lt) + .with_buffered(eq) + .with_persisted(eq); + + assert!(progress.readable(lt)); + assert!(!progress.persisted(lt)); // have buffered data at lt, can't be persisted + + assert!(progress.readable(eq)); + assert!(!progress.persisted(eq)); // have buffered data, so can't be persisted + + assert!(!progress.readable(gt)); + assert!(!progress.persisted(gt)); + } + + #[test] + fn combine() { + let lt = SequenceNumber::new(0); + let eq = SequenceNumber::new(1); + let gt = SequenceNumber::new(2); + + let progress1 = SequencerProgress::new().with_buffered(gt); + + let progress2 = SequencerProgress::new() + .with_buffered(lt) + .with_persisted(eq); + + let expected = SequencerProgress::new() + .with_buffered(lt) + .with_buffered(gt) + .with_persisted(eq); + + assert_eq!(progress1.combine(progress2), expected); + } +}