diff --git a/Cargo.lock b/Cargo.lock index 4898400f4e..36eb430f39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,7 +841,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/perf_integration_df#9ef4a257cf8b7717df60de56b9e17c6bd7286cd4" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/perf_integration_df_2#d201ebf323a532ac858fe33083639df4a8d321ee" dependencies = [ "ahash 0.7.4", "arrow", @@ -1677,6 +1677,7 @@ dependencies = [ "tracker", "trogging", "uuid", + "write_buffer", ] [[package]] @@ -4955,9 +4956,14 @@ version = "0.1.0" dependencies = [ "async-trait", "data_types", + "dotenv", "entry", "futures", + "observability_deps", + "parking_lot", "rdkafka", + "tokio", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c8e0b7879b..b86ddd5c62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ influxdb_iox_client = { path = "influxdb_iox_client", features = ["flight"] } test_helpers = { path = "test_helpers" } synchronized-writer = "1" parking_lot = "0.11.1" +write_buffer = { path = "write_buffer" } # Crates.io dependencies, in alphabetical order assert_cmd = "1.0.0" diff --git a/data_types/src/chunk_metadata.rs b/data_types/src/chunk_metadata.rs index afb1b1c9d9..6eebaa6277 100644 --- a/data_types/src/chunk_metadata.rs +++ b/data_types/src/chunk_metadata.rs @@ -126,8 +126,11 @@ pub struct ChunkSummary { /// Is there any outstanding lifecycle action for this chunk? pub lifecycle_action: Option, - /// The total estimated size of this chunk, in bytes - pub estimated_bytes: usize, + /// The number of bytes used to store this chunk in memory + pub memory_bytes: usize, + + /// The number of bytes used to store this chunk in object storage + pub object_store_bytes: usize, /// The total number of rows in this chunk pub row_count: usize, @@ -153,7 +156,7 @@ pub struct ChunkColumnSummary { pub name: Arc, /// Estimated size, in bytes, consumed by this column. - pub estimated_bytes: usize, + pub memory_bytes: usize, } /// Contains additional per-column details about physical storage of a chunk @@ -168,13 +171,15 @@ pub struct DetailedChunkSummary { impl ChunkSummary { /// Construct a ChunkSummary that has None for all timestamps + #[allow(clippy::too_many_arguments)] pub fn new_without_timestamps( partition_key: Arc, table_name: Arc, id: u32, storage: ChunkStorage, lifecycle_action: Option, - estimated_bytes: usize, + memory_bytes: usize, + object_store_bytes: usize, row_count: usize, ) -> Self { Self { @@ -183,11 +188,47 @@ impl ChunkSummary { id, storage, lifecycle_action, - estimated_bytes, + memory_bytes, + object_store_bytes, row_count, time_of_first_write: None, time_of_last_write: None, time_closed: None, } } + + /// Return a new ChunkSummary with None for all timestamps + pub fn normalize(self) -> Self { + let ChunkSummary { + partition_key, + table_name, + id, + storage, + lifecycle_action, + memory_bytes, + object_store_bytes, + row_count, + .. + } = self; + Self::new_without_timestamps( + partition_key, + table_name, + id, + storage, + lifecycle_action, + memory_bytes, + object_store_bytes, + row_count, + ) + } + + /// Normalizes a set of ChunkSummaries for comparison by removing timestamps + pub fn normalize_summaries(summaries: Vec) -> Vec { + let mut summaries = summaries + .into_iter() + .map(|summary| summary.normalize()) + .collect::>(); + summaries.sort_unstable(); + summaries + } } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 252893e0c4..e780e1c993 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (function packages) -upstream = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/perf_integration_df", default-features = false, package = "datafusion" } +upstream = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/perf_integration_df_2", default-features = false, package = "datafusion" } diff --git a/docker/Dockerfile.ci.integration b/docker/Dockerfile.ci.integration index bab5865bb7..53cef783c8 100644 --- a/docker/Dockerfile.ci.integration +++ b/docker/Dockerfile.ci.integration @@ -21,4 +21,4 @@ ENV TEST_INTEGRATION=1 ENV KAFKA_CONNECT=kafka:9092 # Run the integration tests that connect to Kafka that will be running in another container -CMD ["sh", "-c", "cargo test -p influxdb_iox --test end_to_end write_buffer -- --nocapture"] +CMD ["sh", "-c", "./docker/integration_test.sh"] diff --git a/docker/integration_test.sh b/docker/integration_test.sh new file mode 100755 index 0000000000..6125a0e2c3 --- /dev/null +++ b/docker/integration_test.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +set -euxo pipefail + +cargo test -p write_buffer kafka -- --nocapture +cargo test -p influxdb_iox --test end_to_end write_buffer -- --nocapture diff --git a/docs/sql.md b/docs/sql.md index 0c2f7e857b..7d28822c08 100644 --- a/docs/sql.md +++ b/docs/sql.md @@ -86,7 +86,7 @@ OBSERVER; Example SQL to show the total estimated storage size by database: SELECT database_name, storage, count(*) as num_chunks, - sum(estimated_bytes)/1024/1024 as estimated_mb + sum(memory_bytes)/1024/1024 as estimated_mb FROM chunks GROUP BY database_name, storage ORDER BY estimated_mb desc; @@ -164,7 +164,7 @@ Here are some interesting reports you can run when in `OBSERVER` mode: ```sql SELECT database_name, count(*) as num_chunks, - sum(estimated_bytes)/1024/1024 as estimated_mb + sum(memory_bytes)/1024/1024 as estimated_mb FROM chunks GROUP BY database_name ORDER BY estimated_mb desc @@ -175,7 +175,7 @@ LIMIT 20; ```sql SELECT database_name, storage, count(*) as num_chunks, - sum(estimated_bytes)/1024/1024 as estimated_mb + sum(memory_bytes)/1024/1024 as estimated_mb FROM chunks GROUP BY database_name, storage ORDER BY estimated_mb desc @@ -187,7 +187,7 @@ LIMIT 20; ```sql SELECT database_name, table_name, storage, count(*) as num_chunks, - sum(estimated_bytes)/1024/1024 as estimated_mb + sum(memory_bytes)/1024/1024 as estimated_mb FROM chunks GROUP BY database_name, table_name, storage ORDER BY estimated_mb desc @@ -217,7 +217,7 @@ LIMIT 20; ### Time range stored per table -This query provides an estimate, by table, of how long of a time range +This query provides an estimate, by table, of how long of a time range and the estimated number of rows per second it holds in IOx (the `1,000,000,000` is the conversion from nanoseconds) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 2ff58df3c9..6b7c781639 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -1716,7 +1716,7 @@ pub enum SequencedEntryError { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SequencedEntry { entry: Entry, /// The (optional) sequence for this entry. At the time of @@ -1775,6 +1775,10 @@ impl SequencedEntry { pub fn sequence(&self) -> Option<&Sequence> { self.sequence.as_ref() } + + pub fn entry(&self) -> &Entry { + &self.entry + } } pub mod test_helpers { diff --git a/generated_types/protos/influxdata/iox/management/v1/chunk.proto b/generated_types/protos/influxdata/iox/management/v1/chunk.proto index f7be3016bd..5e664623d2 100644 --- a/generated_types/protos/influxdata/iox/management/v1/chunk.proto +++ b/generated_types/protos/influxdata/iox/management/v1/chunk.proto @@ -62,8 +62,11 @@ message Chunk { // Is there any outstanding lifecycle action for this chunk? ChunkLifecycleAction lifecycle_action = 10; - // The total estimated size of this chunk, in bytes - uint64 estimated_bytes = 4; + // The number of bytes used to store this chunk in memory + uint64 memory_bytes = 4; + + // The number of bytes used to store this chunk in object storage + uint64 object_store_bytes = 11; // The number of rows in this chunk uint64 row_count = 9; diff --git a/generated_types/src/chunk.rs b/generated_types/src/chunk.rs index f45aec6145..cb93f084eb 100644 --- a/generated_types/src/chunk.rs +++ b/generated_types/src/chunk.rs @@ -13,7 +13,8 @@ impl From for management::Chunk { id, storage, lifecycle_action, - estimated_bytes, + memory_bytes, + object_store_bytes, row_count, time_of_first_write, time_of_last_write, @@ -25,7 +26,8 @@ impl From for management::Chunk { let lifecycle_action: management::ChunkLifecycleAction = lifecycle_action.into(); let lifecycle_action = lifecycle_action.into(); // convert to i32 - let estimated_bytes = estimated_bytes as u64; + let memory_bytes = memory_bytes as u64; + let object_store_bytes = object_store_bytes as u64; let row_count = row_count as u64; let partition_key = partition_key.to_string(); @@ -41,7 +43,8 @@ impl From for management::Chunk { id, storage, lifecycle_action, - estimated_bytes, + memory_bytes, + object_store_bytes, row_count, time_of_first_write, time_of_last_write, @@ -114,12 +117,14 @@ impl TryFrom for ChunkSummary { partition_key, table_name, id, - estimated_bytes, + memory_bytes, + object_store_bytes, row_count, .. } = proto; - let estimated_bytes = estimated_bytes as usize; + let memory_bytes = memory_bytes as usize; + let object_store_bytes = object_store_bytes as usize; let row_count = row_count as usize; let partition_key = Arc::from(partition_key.as_str()); let table_name = Arc::from(table_name.as_str()); @@ -130,7 +135,8 @@ impl TryFrom for ChunkSummary { id, storage, lifecycle_action, - estimated_bytes, + memory_bytes, + object_store_bytes, row_count, time_of_first_write, time_of_last_write, @@ -184,7 +190,8 @@ mod test { partition_key: "foo".to_string(), table_name: "bar".to_string(), id: 42, - estimated_bytes: 1234, + memory_bytes: 1234, + object_store_bytes: 567, row_count: 321, storage: management::ChunkStorage::ObjectStoreOnly.into(), lifecycle_action: management::ChunkLifecycleAction::Moving.into(), @@ -198,7 +205,8 @@ mod test { partition_key: Arc::from("foo"), table_name: Arc::from("bar"), id: 42, - estimated_bytes: 1234, + memory_bytes: 1234, + object_store_bytes: 567, row_count: 321, storage: ChunkStorage::ObjectStoreOnly, lifecycle_action: Some(ChunkLifecycleAction::Moving), @@ -220,7 +228,8 @@ mod test { partition_key: Arc::from("foo"), table_name: Arc::from("bar"), id: 42, - estimated_bytes: 1234, + memory_bytes: 1234, + object_store_bytes: 567, row_count: 321, storage: ChunkStorage::ObjectStoreOnly, lifecycle_action: Some(ChunkLifecycleAction::Persisting), @@ -235,7 +244,8 @@ mod test { partition_key: "foo".to_string(), table_name: "bar".to_string(), id: 42, - estimated_bytes: 1234, + memory_bytes: 1234, + object_store_bytes: 567, row_count: 321, storage: management::ChunkStorage::ObjectStoreOnly.into(), lifecycle_action: management::ChunkLifecycleAction::Persisting.into(), diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 6e533b34c0..c76c7dbc5a 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -265,16 +265,16 @@ async fn sql_select_from_system_chunks() { // test timestamps, etc) let expected = vec![ - "+----+---------------+------------+-------------------+-----------------+-----------+", - "| id | partition_key | table_name | storage | estimated_bytes | row_count |", - "+----+---------------+------------+-------------------+-----------------+-----------+", - "| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 213 | 3 |", - "| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 177 | 2 |", - "+----+---------------+------------+-------------------+-----------------+-----------+", + "+----+---------------+------------+-------------------+--------------+-----------+", + "| id | partition_key | table_name | storage | memory_bytes | row_count |", + "+----+---------------+------------+-------------------+--------------+-----------+", + "| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 213 | 3 |", + "| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 177 | 2 |", + "+----+---------------+------------+-------------------+--------------+-----------+", ]; run_sql_test_case!( TwoMeasurementsManyFieldsOneChunk {}, - "SELECT id, partition_key, table_name, storage, estimated_bytes, row_count from system.chunks", + "SELECT id, partition_key, table_name, storage, memory_bytes, row_count from system.chunks", &expected ); } @@ -316,24 +316,24 @@ async fn sql_select_from_system_chunk_columns() { // with different chunk configurations. let expected = vec![ - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", - "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | estimated_bytes |", - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", - "| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | Boston | Boston | 252 |", - "| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 1 | 70.4 | 70.4 | 425 |", - "| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | MA | MA | 240 |", - "| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 1 | 70.4 | 70.4 | 425 |", - "| 1970-01-01T00 | 0 | h2o | time | ReadBuffer | 2 | 50 | 250 | 51 |", - "| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 1 | Boston | Boston | 35 |", - "| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 1 | 51 | 51 | 25 |", - "| 1970-01-01T00 | 0 | o2 | state | OpenMutableBuffer | 2 | CA | MA | 41 |", - "| 1970-01-01T00 | 0 | o2 | temp | OpenMutableBuffer | 2 | 53.4 | 79 | 25 |", - "| 1970-01-01T00 | 0 | o2 | time | OpenMutableBuffer | 2 | 50 | 300 | 25 |", - "| 1970-01-01T00 | 1 | h2o | city | OpenMutableBuffer | 1 | Boston | Boston | 31 |", - "| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 72.4 | 72.4 | 17 |", - "| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | CA | CA | 27 |", - "| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 350 | 350 | 17 |", - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", + "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | memory_bytes |", + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", + "| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | Boston | Boston | 252 |", + "| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 1 | 70.4 | 70.4 | 425 |", + "| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | MA | MA | 240 |", + "| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 1 | 70.4 | 70.4 | 425 |", + "| 1970-01-01T00 | 0 | h2o | time | ReadBuffer | 2 | 50 | 250 | 51 |", + "| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 1 | Boston | Boston | 35 |", + "| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 1 | 51 | 51 | 25 |", + "| 1970-01-01T00 | 0 | o2 | state | OpenMutableBuffer | 2 | CA | MA | 41 |", + "| 1970-01-01T00 | 0 | o2 | temp | OpenMutableBuffer | 2 | 53.4 | 79 | 25 |", + "| 1970-01-01T00 | 0 | o2 | time | OpenMutableBuffer | 2 | 50 | 300 | 25 |", + "| 1970-01-01T00 | 1 | h2o | city | OpenMutableBuffer | 1 | Boston | Boston | 31 |", + "| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 72.4 | 72.4 | 17 |", + "| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | CA | CA | 27 |", + "| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 350 | 350 | 17 |", + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", ]; run_sql_test_case!( TwoMeasurementsManyFieldsTwoChunks {}, diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 344e58bee3..122a447757 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -161,16 +161,16 @@ impl Table { .iter() .flat_map(|rg| rg.column_sizes()) // combine statistics for columns across row groups - .fold(BTreeMap::new(), |mut map, (name, estimated_bytes)| { + .fold(BTreeMap::new(), |mut map, (name, memory_bytes)| { let entry = map.entry(name).or_insert(0); - *entry += estimated_bytes; + *entry += memory_bytes; map }) // Now turn into Vec .into_iter() - .map(|(name, estimated_bytes)| ChunkColumnSummary { + .map(|(name, memory_bytes)| ChunkColumnSummary { name: name.into(), - estimated_bytes, + memory_bytes, }) .collect() } @@ -1116,11 +1116,11 @@ mod test { let expected = vec![ ChunkColumnSummary { name: "count".into(), - estimated_bytes: 110, + memory_bytes: 110, }, ChunkColumnSummary { name: "time".into(), - estimated_bytes: 107, + memory_bytes: 107, }, ]; assert_eq!(table.column_sizes(), expected); diff --git a/server/src/db.rs b/server/src/db.rs index a67fa2b128..26f7d386ad 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -651,14 +651,17 @@ impl Db { let rules = self.rules.read(); rules.lifecycle_rules.immutable }; + debug!(%immutable, has_write_buffer=self.write_buffer.is_some(), "storing entry"); match (self.write_buffer.as_ref(), immutable) { (Some(WriteBufferConfig::Writing(write_buffer)), true) => { // If only the write buffer is configured, this is passing the data through to // the write buffer, and it's not an error. We ignore the returned metadata; it // will get picked up when data is read from the write buffer. + + // TODO: be smarter than always using sequencer 0 let _ = write_buffer - .store_entry(&entry) + .store_entry(&entry, 0) .await .context(WriteBufferWritingError)?; Ok(()) @@ -666,8 +669,10 @@ impl Db { (Some(WriteBufferConfig::Writing(write_buffer)), false) => { // If using both write buffer and mutable buffer, we want to wait for the write // buffer to return success before adding the entry to the mutable buffer. + + // TODO: be smarter than always using sequencer 0 let sequence = write_buffer - .store_entry(&entry) + .store_entry(&entry, 0) .await .context(WriteBufferWritingError)?; let sequenced_entry = Arc::new( @@ -1020,6 +1025,7 @@ mod tests { use tokio_util::sync::CancellationToken; use write_buffer::mock::{ MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, + MockBufferSharedState, }; type TestError = Box; @@ -1042,7 +1048,8 @@ mod tests { async fn write_with_write_buffer_no_mutable_buffer() { // Writes should be forwarded to the write buffer and *not* rejected if the write buffer is // configured and the mutable buffer isn't - let write_buffer = Arc::new(MockBufferForWriting::default()); + let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); + let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state.clone())); let test_db = TestDb::builder() .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) .build() @@ -1054,14 +1061,15 @@ mod tests { let entry = lp_to_entry("cpu bar=1 10"); test_db.store_entry(entry).await.unwrap(); - assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); + assert_eq!(write_buffer_state.get_messages(0).len(), 1); } #[tokio::test] async fn write_to_write_buffer_and_mutable_buffer() { // Writes should be forwarded to the write buffer *and* the mutable buffer if both are // configured. - let write_buffer = Arc::new(MockBufferForWriting::default()); + let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); + let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state.clone())); let db = TestDb::builder() .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) .build() @@ -1071,7 +1079,7 @@ mod tests { let entry = lp_to_entry("cpu bar=1 10"); db.store_entry(entry).await.unwrap(); - assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); + assert_eq!(write_buffer_state.get_messages(0).len(), 1); let batches = run_query(db, "select * from cpu").await; @@ -1109,9 +1117,10 @@ mod tests { #[tokio::test] async fn read_from_write_buffer_write_to_mutable_buffer() { let entry = lp_to_entry("cpu bar=1 10"); - let write_buffer = Arc::new(MockBufferForReading::new(vec![Ok( - SequencedEntry::new_unsequenced(entry), - )])); + let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); + write_buffer_state + .push_entry(SequencedEntry::new_from_sequence(Sequence::new(0, 0), entry).unwrap()); + let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state)); let db = TestDb::builder() .write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _)) @@ -1165,10 +1174,12 @@ mod tests { #[tokio::test] async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() { - let write_buffer = Arc::new(MockBufferForReading::new(vec![Err(String::from( - "Something bad happened on the way to creating a SequencedEntry", - ) - .into())])); + let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); + write_buffer_state.push_error( + String::from("Something bad happened on the way to creating a SequencedEntry").into(), + 0, + ); + let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state)); let test_db = TestDb::builder() .write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _)) @@ -2070,7 +2081,8 @@ mod tests { async fn write_updates_persistence_windows() { // Writes should update the persistence windows when there // is a write buffer configured. - let write_buffer = Arc::new(MockBufferForWriting::default()); + let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); + let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state)); let db = TestDb::builder() .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) .build() @@ -2117,12 +2129,19 @@ mod tests { let entry = lp_to_entry("cpu bar=1 10"); let partition_key = "1970-01-01T00"; - let write_buffer = Arc::new(MockBufferForReading::new(vec![ - Ok(SequencedEntry::new_from_sequence(Sequence::new(0, 0), entry.clone()).unwrap()), - Ok(SequencedEntry::new_from_sequence(Sequence::new(1, 0), entry.clone()).unwrap()), - Ok(SequencedEntry::new_from_sequence(Sequence::new(1, 2), entry.clone()).unwrap()), - Ok(SequencedEntry::new_from_sequence(Sequence::new(0, 1), entry).unwrap()), - ])); + let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(2); + write_buffer_state.push_entry( + SequencedEntry::new_from_sequence(Sequence::new(0, 0), entry.clone()).unwrap(), + ); + write_buffer_state.push_entry( + SequencedEntry::new_from_sequence(Sequence::new(1, 0), entry.clone()).unwrap(), + ); + write_buffer_state.push_entry( + SequencedEntry::new_from_sequence(Sequence::new(1, 2), entry.clone()).unwrap(), + ); + write_buffer_state + .push_entry(SequencedEntry::new_from_sequence(Sequence::new(0, 1), entry).unwrap()); + let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state)); let db = TestDb::builder() .write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _)) @@ -2244,36 +2263,6 @@ mod tests { assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![1]); } - /// Normalizes a set of ChunkSummaries for comparison by removing timestamps - fn normalize_summaries(summaries: Vec) -> Vec { - let mut summaries = summaries - .into_iter() - .map(|summary| { - let ChunkSummary { - partition_key, - table_name, - id, - storage, - lifecycle_action, - estimated_bytes, - row_count, - .. - } = summary; - ChunkSummary::new_without_timestamps( - partition_key, - table_name, - id, - storage, - lifecycle_action, - estimated_bytes, - row_count, - ) - }) - .collect::>(); - summaries.sort_unstable(); - summaries - } - #[tokio::test] async fn partition_chunk_summaries() { // Test that chunk id listing is hooked up @@ -2288,7 +2277,7 @@ mod tests { print!("Partitions: {:?}", db.partition_keys().unwrap()); let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15"); - let chunk_summaries = normalize_summaries(chunk_summaries); + let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries); let expected = vec![ChunkSummary::new_without_timestamps( Arc::from("1970-01-05T15"), @@ -2296,7 +2285,8 @@ mod tests { 0, ChunkStorage::OpenMutableBuffer, None, - 70, + 70, // memory_size + 0, // os_size 1, )]; @@ -2304,7 +2294,7 @@ mod tests { .chunk_summaries() .unwrap() .into_iter() - .map(|x| x.estimated_bytes) + .map(|x| x.memory_bytes) .sum(); assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size); @@ -2394,7 +2384,7 @@ mod tests { write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000").await; let chunk_summaries = db.chunk_summaries().expect("expected summary to return"); - let chunk_summaries = normalize_summaries(chunk_summaries); + let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries); let lifecycle_action = None; @@ -2406,6 +2396,7 @@ mod tests { ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, 2115, // size of RB and OS chunks + 1132, // size of parquet file 1, ), ChunkSummary::new_without_timestamps( @@ -2415,6 +2406,7 @@ mod tests { ChunkStorage::OpenMutableBuffer, lifecycle_action, 64, + 0, // no OS chunks 1, ), ChunkSummary::new_without_timestamps( @@ -2424,6 +2416,7 @@ mod tests { ChunkStorage::ClosedMutableBuffer, lifecycle_action, 2398, + 0, // no OS chunks 1, ), ChunkSummary::new_without_timestamps( @@ -2433,13 +2426,14 @@ mod tests { ChunkStorage::OpenMutableBuffer, lifecycle_action, 87, + 0, // no OS chunks 1, ), ]; assert_eq!( expected, chunk_summaries, - "expected:\n{:#?}\n\nactual:{:#?}\n\n", + "\n\nexpected:\n{:#?}\n\nactual:{:#?}\n\n", expected, chunk_summaries ); diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 3d1cb118eb..638ca9f409 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -476,7 +476,8 @@ impl CatalogChunk { id: self.addr.chunk_id, storage, lifecycle_action, - estimated_bytes: self.size(), + memory_bytes: self.memory_bytes(), + object_store_bytes: self.object_store_bytes(), row_count, time_of_first_write: self.time_of_first_write, time_of_last_write: self.time_of_last_write, @@ -491,7 +492,7 @@ impl CatalogChunk { fn to_summary(v: (&str, usize)) -> ChunkColumnSummary { ChunkColumnSummary { name: v.0.into(), - estimated_bytes: v.1, + memory_bytes: v.1, } } @@ -529,7 +530,7 @@ impl CatalogChunk { } /// Returns an approximation of the amount of process memory consumed by the chunk - pub fn size(&self) -> usize { + pub fn memory_bytes(&self) -> usize { match &self.stage { ChunkStage::Open { mb_chunk, .. } => mb_chunk.size(), ChunkStage::Frozen { representation, .. } => match &representation { @@ -550,6 +551,15 @@ impl CatalogChunk { } } + /// Returns the number of bytes of object storage consumed by this chunk + pub fn object_store_bytes(&self) -> usize { + match &self.stage { + ChunkStage::Open { .. } => 0, + ChunkStage::Frozen { .. } => 0, + ChunkStage::Persisted { parquet, .. } => parquet.file_size_bytes(), + } + } + /// Returns a mutable reference to the mutable buffer storage for chunks in the Open state pub fn mutable_buffer(&mut self) -> Result<&mut MBChunk> { match &mut self.stage { diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index fab7c39756..e864fb8926 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -71,6 +71,7 @@ pub(crate) fn compact_chunks( let ctx = db.exec.new_context(ExecutorType::Reorg); let fut = async move { + let fut_now = std::time::Instant::now(); let key = compute_sort_key(query_chunks.iter().map(|x| x.summary())); let key_str = format!("\"{}\"", key); // for logging @@ -108,7 +109,8 @@ pub(crate) fn compact_chunks( info!(input_chunks=query_chunks.len(), rub_row_groups=rb_row_groups, input_rows=input_rows, output_rows=guard.table_summary().count(), - sort_key=%key_str, compaction_took = ?elapsed, rows_per_sec=?throughput, "chunk(s) compacted"); + sort_key=%key_str, compaction_took = ?elapsed, fut_execution_duration= ?fut_now.elapsed(), + rows_per_sec=?throughput, "chunk(s) compacted"); Ok(DbChunk::snapshot(&guard)) }; diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index 733ecc56ae..f80f06b9bc 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -197,7 +197,8 @@ fn chunk_summaries_schema() -> SchemaRef { Field::new("table_name", DataType::Utf8, false), Field::new("storage", DataType::Utf8, false), Field::new("lifecycle_action", DataType::Utf8, true), - Field::new("estimated_bytes", DataType::UInt64, false), + Field::new("memory_bytes", DataType::UInt64, false), + Field::new("object_store_bytes", DataType::UInt64, false), Field::new("row_count", DataType::UInt64, false), Field::new("time_of_first_write", ts.clone(), true), Field::new("time_of_last_write", ts.clone(), true), @@ -223,9 +224,13 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< .iter() .map(|c| c.lifecycle_action.map(|a| a.name())) .collect::(); - let estimated_bytes = chunks + let memory_bytes = chunks .iter() - .map(|c| Some(c.estimated_bytes as u64)) + .map(|c| Some(c.memory_bytes as u64)) + .collect::(); + let object_store_bytes = chunks + .iter() + .map(|c| Some(c.object_store_bytes as u64).filter(|&v| v > 0)) .collect::(); let row_counts = chunks .iter() @@ -255,7 +260,8 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< Arc::new(table_name), Arc::new(storage), Arc::new(lifecycle_action), - Arc::new(estimated_bytes), + Arc::new(memory_bytes), + Arc::new(object_store_bytes), Arc::new(row_counts), Arc::new(time_of_first_write), Arc::new(time_of_last_write), @@ -380,7 +386,7 @@ fn chunk_columns_schema() -> SchemaRef { Field::new("row_count", DataType::UInt64, true), Field::new("min_value", DataType::Utf8, true), Field::new("max_value", DataType::Utf8, true), - Field::new("estimated_bytes", DataType::UInt64, true), + Field::new("memory_bytes", DataType::UInt64, true), ])) } @@ -396,7 +402,7 @@ fn assemble_chunk_columns( .map(|column_summary| { ( column_summary.name.as_ref(), - column_summary.estimated_bytes as u64, + column_summary.memory_bytes as u64, ) }) .collect() @@ -413,7 +419,7 @@ fn assemble_chunk_columns( let mut row_count = UInt64Builder::new(row_estimate); let mut min_values = StringBuilder::new(row_estimate); let mut max_values = StringBuilder::new(row_estimate); - let mut estimated_bytes = UInt64Builder::new(row_estimate); + let mut memory_bytes = UInt64Builder::new(row_estimate); // Note no rows are produced for partitions with no chunks, or // tables with no partitions: There are other tables to list tables @@ -442,7 +448,7 @@ fn assemble_chunk_columns( let size = column_index.remove(column.name.as_str()); - estimated_bytes.append_option(size)?; + memory_bytes.append_option(size)?; } } @@ -457,7 +463,7 @@ fn assemble_chunk_columns( Arc::new(row_count.finish()), Arc::new(min_values.finish()), Arc::new(max_values.finish()), - Arc::new(estimated_bytes.finish()), + Arc::new(memory_bytes.finish()), ], ) } @@ -616,7 +622,8 @@ mod tests { id: 0, storage: ChunkStorage::OpenMutableBuffer, lifecycle_action: None, - estimated_bytes: 23754, + memory_bytes: 23754, + object_store_bytes: 0, row_count: 11, time_of_first_write: Some(DateTime::from_utc( NaiveDateTime::from_timestamp(10, 0), @@ -628,10 +635,11 @@ mod tests { ChunkSummary { partition_key: Arc::from("p1"), table_name: Arc::from("table1"), - id: 0, + id: 1, storage: ChunkStorage::OpenMutableBuffer, lifecycle_action: Some(ChunkLifecycleAction::Persisting), - estimated_bytes: 23454, + memory_bytes: 23455, + object_store_bytes: 0, row_count: 22, time_of_first_write: None, time_of_last_write: Some(DateTime::from_utc( @@ -640,15 +648,35 @@ mod tests { )), time_closed: None, }, + ChunkSummary { + partition_key: Arc::from("p1"), + table_name: Arc::from("table1"), + id: 2, + storage: ChunkStorage::ObjectStoreOnly, + lifecycle_action: None, + memory_bytes: 1234, + object_store_bytes: 5678, + row_count: 33, + time_of_first_write: Some(DateTime::from_utc( + NaiveDateTime::from_timestamp(100, 0), + Utc, + )), + time_of_last_write: Some(DateTime::from_utc( + NaiveDateTime::from_timestamp(200, 0), + Utc, + )), + time_closed: None, + }, ]; let expected = vec![ - "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", - "| id | partition_key | table_name | storage | lifecycle_action | estimated_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |", - "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", - "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | 11 | 1970-01-01 00:00:10 | | |", - "| 0 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23454 | 22 | | 1970-01-01 00:01:20 | |", - "+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", + "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", + "| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |", + "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", + "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | 1970-01-01 00:00:10 | | |", + "| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | | 1970-01-01 00:01:20 | |", + "| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01 00:01:40 | 1970-01-01 00:03:20 | |", + "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", ]; let schema = chunk_summaries_schema(); @@ -825,7 +853,8 @@ mod tests { id: 42, storage: ChunkStorage::ReadBuffer, lifecycle_action, - estimated_bytes: 23754, + memory_bytes: 23754, + object_store_bytes: 0, row_count: 11, time_of_first_write: None, time_of_last_write: None, @@ -834,11 +863,11 @@ mod tests { columns: vec![ ChunkColumnSummary { name: "c1".into(), - estimated_bytes: 11, + memory_bytes: 11, }, ChunkColumnSummary { name: "c2".into(), - estimated_bytes: 12, + memory_bytes: 12, }, ], }, @@ -859,7 +888,8 @@ mod tests { id: 43, storage: ChunkStorage::OpenMutableBuffer, lifecycle_action, - estimated_bytes: 23754, + memory_bytes: 23754, + object_store_bytes: 0, row_count: 11, time_of_first_write: None, time_of_last_write: None, @@ -867,7 +897,7 @@ mod tests { }, columns: vec![ChunkColumnSummary { name: "c1".into(), - estimated_bytes: 100, + memory_bytes: 100, }], }, ), @@ -887,7 +917,8 @@ mod tests { id: 44, storage: ChunkStorage::OpenMutableBuffer, lifecycle_action, - estimated_bytes: 23754, + memory_bytes: 23754, + object_store_bytes: 0, row_count: 11, time_of_first_write: None, time_of_last_write: None, @@ -895,21 +926,21 @@ mod tests { }, columns: vec![ChunkColumnSummary { name: "c3".into(), - estimated_bytes: 200, + memory_bytes: 200, }], }, ), ]; let expected = vec![ - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", - "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | estimated_bytes |", - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", - "| p1 | 42 | t1 | c1 | ReadBuffer | 55 | bar | foo | 11 |", - "| p1 | 42 | t1 | c2 | ReadBuffer | 66 | 11 | 43 | 12 |", - "| p2 | 43 | t1 | c1 | OpenMutableBuffer | 667 | 110 | 430 | 100 |", - "| p2 | 44 | t2 | c3 | OpenMutableBuffer | 4 | -1 | 2 | 200 |", - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", + "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | memory_bytes |", + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", + "| p1 | 42 | t1 | c1 | ReadBuffer | 55 | bar | foo | 11 |", + "| p1 | 42 | t1 | c2 | ReadBuffer | 66 | 11 | 43 | 12 |", + "| p2 | 43 | t1 | c1 | OpenMutableBuffer | 667 | 110 | 430 | 100 |", + "| p2 | 44 | t2 | c3 | OpenMutableBuffer | 4 | -1 | 2 | 200 |", + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", ]; let batch = assemble_chunk_columns(chunk_columns_schema(), summaries).unwrap(); diff --git a/src/commands/sql/observer.rs b/src/commands/sql/observer.rs index c3cba4b02d..6224836a66 100644 --- a/src/commands/sql/observer.rs +++ b/src/commands/sql/observer.rs @@ -83,7 +83,7 @@ OBSERVER; Example SQL to show the total estimated storage size by database: SELECT database_name, storage, count(*) as num_chunks, - sum(estimated_bytes)/1024/1024 as estimated_mb + sum(memory_bytes)/1024/1024 as estimated_mb FROM chunks GROUP BY database_name, storage ORDER BY estimated_mb desc; diff --git a/src/commands/sql/repl_command.rs b/src/commands/sql/repl_command.rs index 9d9dfee54a..f053e222bb 100644 --- a/src/commands/sql/repl_command.rs +++ b/src/commands/sql/repl_command.rs @@ -116,7 +116,7 @@ SHOW COLUMNS FROM my_table; ;; Show columns in the table SELECT partition_key, table_name, storage, count(*) as chunk_count, - sum(estimated_bytes)/(1024*1024) as size_mb + sum(memory_bytes)/(1024*1024) as size_mb FROM system.chunks GROUP BY diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 6fcc3662a2..5028d82318 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -317,7 +317,8 @@ async fn test_chunk_get() { id: 0, storage: ChunkStorage::OpenMutableBuffer.into(), lifecycle_action, - estimated_bytes: 100, + memory_bytes: 100, + object_store_bytes: 0, row_count: 2, time_of_first_write: None, time_of_last_write: None, @@ -329,7 +330,8 @@ async fn test_chunk_get() { id: 0, storage: ChunkStorage::OpenMutableBuffer.into(), lifecycle_action, - estimated_bytes: 82, + memory_bytes: 82, + object_store_bytes: 0, row_count: 1, time_of_first_write: None, time_of_last_write: None, @@ -498,7 +500,8 @@ async fn test_list_partition_chunks() { id: 0, storage: ChunkStorage::OpenMutableBuffer.into(), lifecycle_action: ChunkLifecycleAction::Unspecified.into(), - estimated_bytes: 100, + memory_bytes: 100, + object_store_bytes: 0, row_count: 2, time_of_first_write: None, time_of_last_write: None, @@ -819,7 +822,8 @@ fn normalize_chunks(chunks: Vec) -> Vec { id, storage, lifecycle_action, - estimated_bytes, + memory_bytes, + object_store_bytes, row_count, .. } = summary; @@ -829,11 +833,12 @@ fn normalize_chunks(chunks: Vec) -> Vec { id, storage, lifecycle_action, - estimated_bytes, row_count, time_of_first_write: None, time_of_last_write: None, time_closed: None, + memory_bytes, + object_store_bytes, } }) .collect::>() diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index 6eefd6c61b..a09285c695 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -197,7 +197,7 @@ async fn test_get_chunks() { .and(predicate::str::contains( r#""storage": "OpenMutableBuffer","#, )) - .and(predicate::str::contains(r#""estimated_bytes": 100"#)) + .and(predicate::str::contains(r#""memory_bytes": 100"#)) // Check for a non empty timestamp such as // "time_of_first_write": "2021-03-30T17:11:10.723866Z", .and(predicate::str::contains(r#""time_of_first_write": "20"#)); diff --git a/tests/end_to_end_cases/persistence.rs b/tests/end_to_end_cases/persistence.rs index fa7aee11e1..634c506856 100644 --- a/tests/end_to_end_cases/persistence.rs +++ b/tests/end_to_end_cases/persistence.rs @@ -10,6 +10,7 @@ use crate::{ }; use super::scenario::{collect_query, create_readable_database, rand_name, DatabaseBuilder}; +use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, CompactChunks}; #[tokio::test] async fn test_chunk_is_persisted_automatically() { @@ -53,21 +54,25 @@ async fn test_full_lifecycle() { let fixture = ServerFixture::create_shared().await; let mut write_client = fixture.write_client(); - let db_name = rand_name(); - DatabaseBuilder::new(db_name.clone()) - .persist(true) - // wait 2 seconds for the data to arrive (to ensure we compact a single chunk) - .persist_age_threshold_seconds(2) - .late_arrive_window_seconds(1) - .build(fixture.grpc_channel()) - .await; - - // write in enough data to exceed the soft limit (512K) and - // expect that it compacts, persists and then unloads the data from memory let num_payloads = 10; let num_duplicates = 2; let payload_size = 1_000; + let total_rows = num_payloads * num_duplicates * payload_size; + + let db_name = rand_name(); + DatabaseBuilder::new(db_name.clone()) + .persist(true) + // Each write should go into a separate chunk to test compaction + .mub_row_threshold(payload_size) + // Only trigger persistence once we've finished writing + .persist_row_threshold(total_rows) + .persist_age_threshold_seconds(1000) + // A low late arrival time to speed up the test + .late_arrive_window_seconds(1) + .build(fixture.grpc_channel()) + .await; + let payloads: Vec<_> = (0..num_payloads) .map(|x| { (0..payload_size) @@ -76,24 +81,17 @@ async fn test_full_lifecycle() { }) .collect(); - for payload in payloads.iter().take(num_payloads - 1) { + for payload in &payloads { // Writing the same data multiple times should be compacted away for _ in 0..num_duplicates { let num_lines_written = write_client .write(&db_name, payload) .await .expect("successful write"); - assert_eq!(num_lines_written, payload_size); + assert_eq!(num_lines_written, payload_size as usize); } } - // Don't duplicate last write as it is what crosses the persist row threshold - let num_lines_written = write_client - .write(&db_name, payloads.last().unwrap()) - .await - .expect("successful write"); - assert_eq!(num_lines_written, payload_size); - wait_for_exact_chunk_states( &fixture, &db_name, @@ -102,11 +100,27 @@ async fn test_full_lifecycle() { ) .await; + // Expect compaction to have occurred + let performed_compaction = fixture + .operations_client() + .list_operations() + .await + .unwrap() + .iter() + .any(|operation| match operation.metadata().job { + Some(Job::CompactChunks(CompactChunks { + db_name: operation_db_name, + .. + })) => operation_db_name == db_name, + _ => false, + }); + assert!(performed_compaction); + // Expect them to have been compacted into a single read buffer // with the duplicates eliminated let chunks = list_chunks(&fixture, &db_name).await; assert_eq!(chunks.len(), 1); - assert_eq!(chunks[0].row_count, num_payloads * payload_size) + assert_eq!(chunks[0].row_count, (num_payloads * payload_size) as usize) } #[tokio::test] @@ -163,8 +177,6 @@ async fn test_query_chunk_after_restart() { /// Create a closed read buffer chunk and return its id async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 { - use influxdb_iox_client::management::generated_types::operation_metadata::Job; - let mut management_client = fixture.management_client(); let mut write_client = fixture.write_client(); let mut operations_client = fixture.operations_client(); diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index e8f4cb005f..cb2af37d4d 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -317,11 +317,21 @@ impl DatabaseBuilder { self } + pub fn mub_row_threshold(mut self, threshold: u64) -> Self { + self.lifecycle_rules.mub_row_threshold = threshold; + self + } + pub fn persist_age_threshold_seconds(mut self, threshold: u32) -> Self { self.lifecycle_rules.persist_age_threshold_seconds = threshold; self } + pub fn persist_row_threshold(mut self, threshold: u64) -> Self { + self.lifecycle_rules.persist_row_threshold = threshold; + self + } + pub fn late_arrive_window_seconds(mut self, late_arrive_window_seconds: u32) -> Self { self.lifecycle_rules.late_arrive_window_seconds = late_arrive_window_seconds; self diff --git a/tests/end_to_end_cases/write_buffer.rs b/tests/end_to_end_cases/write_buffer.rs index 3217a2e162..e8562db19c 100644 --- a/tests/end_to_end_cases/write_buffer.rs +++ b/tests/end_to_end_cases/write_buffer.rs @@ -15,54 +15,11 @@ use rdkafka::{ }; use std::convert::TryFrom; use test_helpers::assert_contains; - -/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the -/// caller. -/// -/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide -/// guidance for setting `KAFKA_CONNECTION`. -/// -/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early. -macro_rules! maybe_skip_integration { - () => {{ - use std::env; - dotenv::dotenv().ok(); - - match ( - env::var("TEST_INTEGRATION").is_ok(), - env::var("KAFKA_CONNECT").ok(), - ) { - (true, Some(kafka_connection)) => kafka_connection, - (true, None) => { - panic!( - "TEST_INTEGRATION is set which requires running integration tests, but \ - KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \ - `docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \ - set KAFKA_CONNECT to the host and port where Kafka is accessible. If \ - running the `docker-compose` command and the Rust tests on the host, the \ - value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \ - tests in another container in the `docker-compose` network as on CI, \ - `KAFKA_CONNECT` should be `kafka:9092`." - ) - } - (false, Some(_)) => { - eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run"); - return; - } - (false, None) => { - eprintln!( - "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ - run" - ); - return; - } - } - }}; -} +use write_buffer::maybe_skip_kafka_integration; #[tokio::test] async fn writes_go_to_kafka() { - let kafka_connection = maybe_skip_integration!(); + let kafka_connection = maybe_skip_kafka_integration!(); // set up a database with a write buffer pointing at kafka let server = ServerFixture::create_shared().await; @@ -135,7 +92,7 @@ async fn produce_to_kafka_directly( #[tokio::test] async fn reads_come_from_kafka() { - let kafka_connection = maybe_skip_integration!(); + let kafka_connection = maybe_skip_kafka_integration!(); // set up a database to read from Kafka let server = ServerFixture::create_shared().await; @@ -220,7 +177,7 @@ async fn reads_come_from_kafka() { #[tokio::test] async fn cant_write_to_db_reading_from_kafka() { - let kafka_connection = maybe_skip_integration!(); + let kafka_connection = maybe_skip_kafka_integration!(); // set up a database to read from Kafka let server = ServerFixture::create_shared().await; diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index ad5c96969e..c226b0408f 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -8,4 +8,11 @@ async-trait = "0.1" data_types = { path = "../data_types" } entry = { path = "../entry" } futures = "0.3" +parking_lot = "0.11.1" rdkafka = "0.26.0" +observability_deps = { path = "../observability_deps" } + +[dev-dependencies] +dotenv = "0.15.0" +tokio = { version = "1.0", features = ["macros", "fs"] } +uuid = { version = "0.8", features = ["serde", "v4"] } diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 7d0e650a12..940f577bb7 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -11,9 +11,14 @@ pub type WriteBufferError = Box; /// entries from the Write Buffer at a later time. #[async_trait] pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static { - /// Send an `Entry` to the write buffer and return information that can be used to restore - /// entries at a later time. - async fn store_entry(&self, entry: &Entry) -> Result; + /// Send an `Entry` to the write buffer using the specified sequencer ID. + /// + /// Returns information that can be used to restore entries at a later time. + async fn store_entry( + &self, + entry: &Entry, + sequencer_id: u32, + ) -> Result; } /// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using @@ -26,3 +31,151 @@ pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static { 'life0: 'async_trait, Self: 'async_trait; } + +pub mod test_utils { + use async_trait::async_trait; + use entry::{test_helpers::lp_to_entry, Entry}; + use futures::{StreamExt, TryStreamExt}; + + use super::{WriteBufferReading, WriteBufferWriting}; + + #[async_trait] + pub trait TestAdapter: Send + Sync { + type Context: TestContext; + + async fn new_context(&self, n_sequencers: u32) -> Self::Context; + } + + pub trait TestContext: Send + Sync { + type Writing: WriteBufferWriting; + type Reading: WriteBufferReading; + + fn writing(&self) -> Self::Writing; + + fn reading(&self) -> Self::Reading; + } + + pub async fn perform_generic_tests(adapter: T) + where + T: TestAdapter, + { + test_single_stream_io(&adapter).await; + test_multi_stream_io(&adapter).await; + test_multi_writer_multi_reader(&adapter).await; + } + + async fn test_single_stream_io(adapter: &T) + where + T: TestAdapter, + { + let context = adapter.new_context(1).await; + + let entry_1 = lp_to_entry("upc user=1 100"); + let entry_2 = lp_to_entry("upc user=2 200"); + let entry_3 = lp_to_entry("upc user=3 300"); + + let writer = context.writing(); + let reader = context.reading(); + + let mut stream = reader.stream(); + let waker = futures::task::noop_waker(); + let mut cx = futures::task::Context::from_waker(&waker); + + // empty stream is pending + assert!(stream.poll_next_unpin(&mut cx).is_pending()); + + // adding content allows us to get results + writer.store_entry(&entry_1, 0).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_1); + + // stream is pending again + assert!(stream.poll_next_unpin(&mut cx).is_pending()); + + // adding more data unblocks the stream + writer.store_entry(&entry_2, 0).await.unwrap(); + writer.store_entry(&entry_3, 0).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_2); + assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_3); + + // stream is pending again + assert!(stream.poll_next_unpin(&mut cx).is_pending()); + } + + async fn test_multi_stream_io(adapter: &T) + where + T: TestAdapter, + { + let context = adapter.new_context(1).await; + + let entry_1 = lp_to_entry("upc user=1 100"); + let entry_2 = lp_to_entry("upc user=2 200"); + let entry_3 = lp_to_entry("upc user=3 300"); + + let writer = context.writing(); + let reader = context.reading(); + + let mut stream_1 = reader.stream(); + let mut stream_2 = reader.stream(); + let waker = futures::task::noop_waker(); + let mut cx = futures::task::Context::from_waker(&waker); + + // empty streams is pending + assert!(stream_1.poll_next_unpin(&mut cx).is_pending()); + assert!(stream_2.poll_next_unpin(&mut cx).is_pending()); + + // streams poll from same source + writer.store_entry(&entry_1, 0).await.unwrap(); + writer.store_entry(&entry_2, 0).await.unwrap(); + writer.store_entry(&entry_3, 0).await.unwrap(); + assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_1); + assert_eq!(stream_2.next().await.unwrap().unwrap().entry(), &entry_2); + assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_3); + + // both streams are pending again + assert!(stream_1.poll_next_unpin(&mut cx).is_pending()); + assert!(stream_2.poll_next_unpin(&mut cx).is_pending()); + } + + async fn test_multi_writer_multi_reader(adapter: &T) + where + T: TestAdapter, + { + let context = adapter.new_context(2).await; + + let entry_east_1 = lp_to_entry("upc,region=east user=1 100"); + let entry_east_2 = lp_to_entry("upc,region=east user=2 200"); + let entry_west_1 = lp_to_entry("upc,region=west user=1 200"); + + let writer_1 = context.writing(); + let writer_2 = context.writing(); + let reader_1 = context.reading(); + let reader_2 = context.reading(); + + // TODO: do not hard-code sequencer IDs here but provide a proper interface + writer_1.store_entry(&entry_east_1, 0).await.unwrap(); + writer_1.store_entry(&entry_west_1, 1).await.unwrap(); + writer_2.store_entry(&entry_east_2, 0).await.unwrap(); + + assert_reader_content(reader_1, &[&entry_east_1, &entry_east_2, &entry_west_1]).await; + assert_reader_content(reader_2, &[&entry_east_1, &entry_east_2, &entry_west_1]).await; + } + + async fn assert_reader_content(reader: R, expected: &[&Entry]) + where + R: WriteBufferReading, + { + // we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever + let mut results: Vec<_> = reader + .stream() + .take(expected.len()) + .try_collect() + .await + .unwrap(); + results.sort_by_key(|entry| { + let sequence = entry.sequence().unwrap(); + (sequence.id, sequence.number) + }); + let actual: Vec<_> = results.iter().map(|entry| entry.entry()).collect(); + assert_eq!(&actual[..], expected); + } +} diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 2f26436f66..160d0b9710 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -4,10 +4,12 @@ use async_trait::async_trait; use data_types::server_id::ServerId; use entry::{Entry, Sequence, SequencedEntry}; use futures::{stream::BoxStream, StreamExt}; +use observability_deps::tracing::debug; use rdkafka::{ consumer::{Consumer, StreamConsumer}, error::KafkaError, producer::{FutureProducer, FutureRecord}, + util::Timeout, ClientConfig, Message, }; @@ -31,24 +33,28 @@ impl std::fmt::Debug for KafkaBufferProducer { #[async_trait] impl WriteBufferWriting for KafkaBufferProducer { - /// Send an `Entry` to Kafka and return the partition ID as the sequencer ID and the offset - /// as the sequence number. - async fn store_entry(&self, entry: &Entry) -> Result { + /// Send an `Entry` to Kafka using the sequencer ID as a partition. + async fn store_entry( + &self, + entry: &Entry, + sequencer_id: u32, + ) -> Result { + let partition = i32::try_from(sequencer_id)?; // This type annotation is necessary because `FutureRecord` is generic over key type, but // key is optional and we're not setting a key. `String` is arbitrary. - let record: FutureRecord<'_, String, _> = - FutureRecord::to(&self.database_name).payload(entry.data()); + let record: FutureRecord<'_, String, _> = FutureRecord::to(&self.database_name) + .payload(entry.data()) + .partition(partition); - // Can't use `?` here because `send_result` returns `Err((E: Error, original_msg))` so we - // have to extract the actual error out with a `match`. - let (partition, offset) = match self.producer.send_result(record) { - // Same error structure on the result of the future, need to `match` - Ok(delivery_future) => match delivery_future.await? { - Ok((partition, offset)) => (partition, offset), - Err((e, _returned_record)) => return Err(Box::new(e)), - }, - Err((e, _returned_record)) => return Err(Box::new(e)), - }; + debug!(db_name=%self.database_name, partition, size=entry.data().len(), "writing to kafka"); + + let (partition, offset) = self + .producer + .send(record, Timeout::Never) + .await + .map_err(|(e, _owned_message)| Box::new(e))?; + + debug!(db_name=%self.database_name, %offset, %partition, size=entry.data().len(), "wrote to kafka"); Ok(Sequence { id: partition.try_into()?, @@ -69,6 +75,7 @@ impl KafkaBufferProducer { cfg.set("bootstrap.servers", &conn); cfg.set("message.timeout.ms", "5000"); cfg.set("message.max.bytes", "10000000"); + cfg.set("queue.buffering.max.kbytes", "10485760"); let producer: FutureProducer = cfg.create()?; @@ -153,3 +160,138 @@ impl KafkaBufferConsumer { }) } } + +pub mod test_utils { + /// Get the testing Kafka connection string or return current scope. + /// + /// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the + /// caller. + /// + /// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide + /// guidance for setting `KAFKA_CONNECTION`. + /// + /// If `TEST_INTEGRATION` is not set, skip the calling test by returning early. + #[macro_export] + macro_rules! maybe_skip_kafka_integration { + () => {{ + use std::env; + dotenv::dotenv().ok(); + + match ( + env::var("TEST_INTEGRATION").is_ok(), + env::var("KAFKA_CONNECT").ok(), + ) { + (true, Some(kafka_connection)) => kafka_connection, + (true, None) => { + panic!( + "TEST_INTEGRATION is set which requires running integration tests, but \ + KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \ + `docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \ + set KAFKA_CONNECT to the host and port where Kafka is accessible. If \ + running the `docker-compose` command and the Rust tests on the host, the \ + value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \ + tests in another container in the `docker-compose` network as on CI, \ + `KAFKA_CONNECT` should be `kafka:9092`." + ) + } + (false, Some(_)) => { + eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run"); + return; + } + (false, None) => { + eprintln!( + "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ + run" + ); + return; + } + } + }}; + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + + use rdkafka::{ + admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, + client::DefaultClientContext, + }; + use uuid::Uuid; + + use crate::{ + core::test_utils::{perform_generic_tests, TestAdapter, TestContext}, + maybe_skip_kafka_integration, + }; + + use super::*; + + struct KafkaTestAdapter { + conn: String, + } + + impl KafkaTestAdapter { + fn new(conn: String) -> Self { + Self { conn } + } + } + + #[async_trait] + impl TestAdapter for KafkaTestAdapter { + type Context = KafkaTestContext; + + async fn new_context(&self, n_sequencers: u32) -> Self::Context { + // Common Kafka config + let mut cfg = ClientConfig::new(); + cfg.set("bootstrap.servers", &self.conn); + cfg.set("message.timeout.ms", "5000"); + + // Create a topic with `n_partitions` partitions in Kafka + let database_name = format!("test_topic_{}", Uuid::new_v4()); + let admin: AdminClient = cfg.clone().create().unwrap(); + let topic = NewTopic::new( + &database_name, + n_sequencers as i32, + TopicReplication::Fixed(1), + ); + let opts = AdminOptions::default(); + admin.create_topics(&[topic], &opts).await.unwrap(); + + KafkaTestContext { + conn: self.conn.clone(), + database_name, + server_id_counter: AtomicU32::new(1), + } + } + } + + struct KafkaTestContext { + conn: String, + database_name: String, + server_id_counter: AtomicU32, + } + + impl TestContext for KafkaTestContext { + type Writing = KafkaBufferProducer; + + type Reading = KafkaBufferConsumer; + + fn writing(&self) -> Self::Writing { + KafkaBufferProducer::new(&self.conn, &self.database_name).unwrap() + } + + fn reading(&self) -> Self::Reading { + let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst); + let server_id = ServerId::try_from(server_id).unwrap(); + KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name).unwrap() + } + } + + #[tokio::test] + async fn test_generic() { + let conn = maybe_skip_kafka_integration!(); + + perform_generic_tests(KafkaTestAdapter::new(conn)).await; + } +} diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index 046ca2334a..bd735d93dc 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::{collections::BTreeMap, sync::Arc, task::Poll}; use async_trait::async_trait; use entry::{Entry, Sequence, SequencedEntry}; @@ -6,25 +6,136 @@ use futures::{ stream::{self, BoxStream}, StreamExt, }; +use parking_lot::Mutex; use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; -#[derive(Debug, Default)] +type EntryResVec = Vec>; + +/// Mocked entries for [`MockBufferForWriting`] and [`MockBufferForReading`]. +#[derive(Debug, Clone)] +pub struct MockBufferSharedState { + entries: Arc>>, +} + +impl MockBufferSharedState { + /// Create new shared state w/ N sequencers. + pub fn empty_with_n_sequencers(n_sequencers: u32) -> Self { + let entries: BTreeMap<_, _> = (0..n_sequencers) + .map(|sequencer_id| (sequencer_id, vec![])) + .collect(); + + Self { + entries: Arc::new(Mutex::new(entries)), + } + } + + /// Push a new entry to the specified sequencer. + /// + /// # Panics + /// - when given entry is not sequenced + /// - when specified sequencer does not exist + /// - when sequence number in entry is not larger the current maximum + pub fn push_entry(&self, entry: SequencedEntry) { + let sequence = entry.sequence().expect("entry must be sequenced"); + let mut entries = self.entries.lock(); + let entry_vec = entries.get_mut(&sequence.id).expect("invalid sequencer ID"); + let max_sequence_number = entry_vec + .iter() + .filter_map(|entry_res| { + entry_res + .as_ref() + .ok() + .map(|entry| entry.sequence().unwrap().number) + }) + .max(); + if let Some(max_sequence_number) = max_sequence_number { + assert!( + max_sequence_number < sequence.number, + "sequence number {} is less/equal than current max sequencer number {}", + sequence.number, + max_sequence_number + ); + } + entry_vec.push(Ok(entry)); + } + + /// Push error to specified sequencer. + /// + /// # Panics + /// - when sequencer does not exist + pub fn push_error(&self, error: WriteBufferError, sequencer_id: u32) { + let mut entries = self.entries.lock(); + let entry_vec = entries + .get_mut(&sequencer_id) + .expect("invalid sequencer ID"); + entry_vec.push(Err(error)); + } + + /// Get messages (entries and errors) for specified sequencer. + /// + /// # Panics + /// - when sequencer does not exist + pub fn get_messages(&self, sequencer_id: u32) -> Vec> { + let mut entries = self.entries.lock(); + let entry_vec = entries + .get_mut(&sequencer_id) + .expect("invalid sequencer ID"); + + entry_vec + .iter() + .map(|entry_res| match entry_res { + Ok(entry) => Ok(entry.clone()), + Err(e) => Err(e.to_string().into()), + }) + .collect() + } +} + +#[derive(Debug)] pub struct MockBufferForWriting { - pub entries: Arc>>, + state: MockBufferSharedState, +} + +impl MockBufferForWriting { + pub fn new(state: MockBufferSharedState) -> Self { + Self { state } + } } #[async_trait] impl WriteBufferWriting for MockBufferForWriting { - async fn store_entry(&self, entry: &Entry) -> Result { - let mut entries = self.entries.lock().unwrap(); - let offset = entries.len() as u64; - entries.push(entry.clone()); + async fn store_entry( + &self, + entry: &Entry, + sequencer_id: u32, + ) -> Result { + let mut entries = self.state.entries.lock(); + let sequencer_entries = entries.get_mut(&sequencer_id).unwrap(); - Ok(Sequence { - id: 0, - number: offset, - }) + let sequence_number = sequencer_entries + .iter() + .filter_map(|entry_res| { + entry_res + .as_ref() + .ok() + .map(|entry| entry.sequence().unwrap().number) + }) + .max() + .map(|n| n + 1) + .unwrap_or(0); + + let sequence = Sequence { + id: sequencer_id, + number: sequence_number, + }; + sequencer_entries.push(Ok(SequencedEntry::new_from_sequence( + sequence, + entry.clone(), + ) + .unwrap())); + + Ok(sequence) } } @@ -33,7 +144,11 @@ pub struct MockBufferForWritingThatAlwaysErrors; #[async_trait] impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { - async fn store_entry(&self, _entry: &Entry) -> Result { + async fn store_entry( + &self, + _entry: &Entry, + _sequencer_id: u32, + ) -> Result { Err(String::from( "Something bad happened on the way to writing an entry in the write buffer", ) @@ -41,9 +156,23 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { } } -type MoveableEntries = Arc>>>; pub struct MockBufferForReading { - entries: MoveableEntries, + state: MockBufferSharedState, + positions: Arc>>, +} + +impl MockBufferForReading { + pub fn new(state: MockBufferSharedState) -> Self { + let n_sequencers = state.entries.lock().len() as u32; + let positions: BTreeMap<_, _> = (0..n_sequencers) + .map(|sequencer_id| (sequencer_id, 0)) + .collect(); + + Self { + state, + positions: Arc::new(Mutex::new(positions)), + } + } } impl std::fmt::Debug for MockBufferForReading { @@ -52,14 +181,6 @@ impl std::fmt::Debug for MockBufferForReading { } } -impl MockBufferForReading { - pub fn new(entries: Vec>) -> Self { - Self { - entries: Arc::new(Mutex::new(entries)), - } - } -} - impl WriteBufferReading for MockBufferForReading { fn stream<'life0, 'async_trait>( &'life0 self, @@ -68,11 +189,129 @@ impl WriteBufferReading for MockBufferForReading { 'life0: 'async_trait, Self: 'async_trait, { - // move the entries out of `self` to move them into the stream - let entries: Vec<_> = self.entries.lock().unwrap().drain(..).collect(); + let state = self.state.clone(); + let positions = Arc::clone(&self.positions); - stream::iter(entries.into_iter()) - .chain(stream::pending()) - .boxed() + stream::poll_fn(move |_ctx| { + let entries = state.entries.lock(); + let mut positions = positions.lock(); + + for (sequencer_id, position) in positions.iter_mut() { + let entry_vec = entries.get(sequencer_id).unwrap(); + if entry_vec.len() > *position { + let entry = match &entry_vec[*position] { + Ok(entry) => Ok(entry.clone()), + Err(e) => Err(e.to_string().into()), + }; + *position += 1; + return Poll::Ready(Some(entry)); + } + } + + Poll::Pending + }) + .boxed() + } +} + +#[cfg(test)] +mod tests { + use entry::test_helpers::lp_to_entry; + + use crate::core::test_utils::{perform_generic_tests, TestAdapter, TestContext}; + + use super::*; + + struct MockTestAdapter {} + + #[async_trait] + impl TestAdapter for MockTestAdapter { + type Context = MockTestContext; + + async fn new_context(&self, n_sequencers: u32) -> Self::Context { + MockTestContext { + state: MockBufferSharedState::empty_with_n_sequencers(n_sequencers), + } + } + } + + struct MockTestContext { + state: MockBufferSharedState, + } + + impl TestContext for MockTestContext { + type Writing = MockBufferForWriting; + + type Reading = MockBufferForReading; + + fn writing(&self) -> Self::Writing { + MockBufferForWriting::new(self.state.clone()) + } + + fn reading(&self) -> Self::Reading { + MockBufferForReading::new(self.state.clone()) + } + } + + #[tokio::test] + async fn test_generic() { + perform_generic_tests(MockTestAdapter {}).await; + } + + #[test] + #[should_panic(expected = "entry must be sequenced")] + fn test_state_push_entry_panic_unsequenced() { + let state = MockBufferSharedState::empty_with_n_sequencers(2); + let entry = lp_to_entry("upc,region=east user=1 100"); + state.push_entry(SequencedEntry::new_unsequenced(entry)); + } + + #[test] + #[should_panic(expected = "invalid sequencer ID")] + fn test_state_push_entry_panic_wrong_sequencer() { + let state = MockBufferSharedState::empty_with_n_sequencers(2); + let entry = lp_to_entry("upc,region=east user=1 100"); + let sequence = Sequence::new(2, 0); + state.push_entry(SequencedEntry::new_from_sequence(sequence, entry).unwrap()); + } + + #[test] + #[should_panic( + expected = "sequence number 13 is less/equal than current max sequencer number 13" + )] + fn test_state_push_entry_panic_wrong_sequence_number_equal() { + let state = MockBufferSharedState::empty_with_n_sequencers(2); + let entry = lp_to_entry("upc,region=east user=1 100"); + let sequence = Sequence::new(1, 13); + state.push_entry(SequencedEntry::new_from_sequence(sequence, entry.clone()).unwrap()); + state.push_entry(SequencedEntry::new_from_sequence(sequence, entry).unwrap()); + } + + #[test] + #[should_panic( + expected = "sequence number 12 is less/equal than current max sequencer number 13" + )] + fn test_state_push_entry_panic_wrong_sequence_number_less() { + let state = MockBufferSharedState::empty_with_n_sequencers(2); + let entry = lp_to_entry("upc,region=east user=1 100"); + let sequence_1 = Sequence::new(1, 13); + let sequence_2 = Sequence::new(1, 12); + state.push_entry(SequencedEntry::new_from_sequence(sequence_1, entry.clone()).unwrap()); + state.push_entry(SequencedEntry::new_from_sequence(sequence_2, entry).unwrap()); + } + + #[test] + #[should_panic(expected = "invalid sequencer ID")] + fn test_state_push_error_panic_wrong_sequencer() { + let state = MockBufferSharedState::empty_with_n_sequencers(2); + let error = "foo".to_string().into(); + state.push_error(error, 2); + } + + #[test] + #[should_panic(expected = "invalid sequencer ID")] + fn test_state_get_messages_panic_wrong_sequencer() { + let state = MockBufferSharedState::empty_with_n_sequencers(2); + state.get_messages(2); } }