diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 7e806594a2..124c00169f 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -1720,36 +1720,29 @@ enum InnerClockValueError { ValueMayNotBeZero, } -#[derive(Debug, Snafu)] -pub enum SequencedEntryError { - #[snafu(display("{}", source))] - InvalidFlatbuffer { - source: flatbuffers::InvalidFlatbuffer, - }, -} - #[derive(Debug, Clone)] pub struct SequencedEntry { entry: Entry, - /// The (optional) sequence for this entry. At the time of - /// writing, sequences will not be present when there is no - /// configured mechanism to define the order of all writes. - sequence: Option, + + /// The (optional) sequence for this entry including the timestamp when the producer ingested it into the write + /// buffer. + /// + /// At the time of writing, sequences will not be present when there is no configured mechanism to define the order + /// of all writes. + sequence_and_producer_ts: Option<(Sequence, DateTime)>, } #[derive(Debug, Copy, Clone)] pub struct Sequence { pub id: u32, pub number: u64, - pub ingest_timestamp: DateTime, } impl Sequence { - pub fn new(sequencer_id: u32, sequence_number: u64, ingest_timestamp: DateTime) -> Self { + pub fn new(sequencer_id: u32, sequence_number: u64) -> Self { Self { id: sequencer_id, number: sequence_number, - ingest_timestamp, } } } @@ -1757,15 +1750,20 @@ impl Sequence { impl SequencedEntry { pub fn new_from_sequence( sequence: Sequence, + producer_wallclock_timestamp: DateTime, entry: Entry, - ) -> Result { - let sequence = Some(sequence); - Ok(Self { entry, sequence }) + ) -> Self { + Self { + entry, + sequence_and_producer_ts: Some((sequence, producer_wallclock_timestamp)), + } } pub fn new_unsequenced(entry: Entry) -> Self { - let sequence = None; - Self { entry, sequence } + Self { + entry, + sequence_and_producer_ts: None, + } } pub fn partition_writes(&self) -> Option>> { @@ -1773,7 +1771,15 @@ impl SequencedEntry { } pub fn sequence(&self) -> Option<&Sequence> { - self.sequence.as_ref() + self.sequence_and_producer_ts + .as_ref() + .map(|(sequence, _ts)| sequence) + } + + pub fn producer_wallclock_timestamp(&self) -> Option> { + self.sequence_and_producer_ts + .as_ref() + .map(|(_sequence, ts)| *ts) } pub fn entry(&self) -> &Entry { diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index b0cb326b19..7ba95dbafd 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -543,33 +543,21 @@ mod tests { let start_time = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 1, start_time, Utc::now(), i, ); w.add_range( - Some(&Sequence { - id: 1, - number: 4, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 4 }), 2, Utc::now(), Utc::now(), Instant::now(), ); w.add_range( - Some(&Sequence { - id: 1, - number: 10, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 10 }), 1, Utc::now(), Utc::now(), @@ -577,11 +565,7 @@ mod tests { ); let last_time = Utc::now(); w.add_range( - Some(&Sequence { - id: 2, - number: 23, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 2, number: 23 }), 10, Utc::now(), last_time, @@ -613,22 +597,14 @@ mod tests { let last_time = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 1, start_time, start_time, created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 3, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 3 }), 1, last_time, last_time, @@ -639,11 +615,7 @@ mod tests { .unwrap(); let open_time = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 6, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 6 }), 2, last_time, open_time, @@ -679,11 +651,7 @@ mod tests { let first_end = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 2, start_time, first_end, @@ -695,11 +663,7 @@ mod tests { .unwrap(); let second_end = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 3, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 3 }), 3, first_end, second_end, @@ -711,11 +675,7 @@ mod tests { .unwrap(); let third_end = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 4, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 4 }), 4, second_end, third_end, @@ -747,11 +707,7 @@ mod tests { .unwrap(); let fourth_end = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 5, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 5 }), 1, fourth_end, fourth_end, @@ -776,11 +732,7 @@ mod tests { .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 100) .unwrap(); w.add_range( - Some(&Sequence { - id: 1, - number: 9, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 9 }), 2, Utc::now(), Utc::now(), @@ -821,33 +773,21 @@ mod tests { let third_end = third_start + chrono::Duration::seconds(1); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 2, start_time, first_end, created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 3, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 3 }), 3, second_start, second_end, second_created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 5, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 5 }), 2, third_start, third_end, @@ -912,33 +852,21 @@ mod tests { let third_end = third_start + chrono::Duration::seconds(1); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 2, start_time, first_end, created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 3, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 3 }), 3, second_start, second_end, second_created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 5, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 5 }), 2, third_start, third_end, @@ -1003,33 +931,21 @@ mod tests { let third_end = second_end + chrono::Duration::seconds(1); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 2, start_time, first_end, created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 3, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 3 }), 3, first_end, second_end, second_created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 5, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 5 }), 2, third_start, third_end, @@ -1101,33 +1017,21 @@ mod tests { let third_end = second_end + chrono::Duration::seconds(1); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 2, start_time, first_end, created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 3, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 3 }), 3, second_start, second_end, second_created_at, ); w.add_range( - Some(&Sequence { - id: 1, - number: 5, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 5 }), 2, third_start, third_end, @@ -1181,11 +1085,7 @@ mod tests { let start = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 2, start, start + chrono::Duration::seconds(2), @@ -1201,11 +1101,7 @@ mod tests { ); w.add_range( - Some(&Sequence { - id: 1, - number: 4, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 4 }), 5, start, start + chrono::Duration::seconds(4), @@ -1275,11 +1171,7 @@ mod tests { ); w.add_range( - Some(&Sequence { - id: 1, - number: 9, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 9 }), 9, start, start + chrono::Duration::seconds(2), @@ -1309,11 +1201,7 @@ mod tests { let start = Utc::now(); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 2, start, start + chrono::Duration::seconds(2), @@ -1321,11 +1209,7 @@ mod tests { ); w.add_range( - Some(&Sequence { - id: 1, - number: 6, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 6 }), 5, start, start + chrono::Duration::seconds(4), @@ -1333,11 +1217,7 @@ mod tests { ); w.add_range( - Some(&Sequence { - id: 1, - number: 9, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 9 }), 9, start, start + chrono::Duration::seconds(2), @@ -1345,11 +1225,7 @@ mod tests { ); w.add_range( - Some(&Sequence { - id: 1, - number: 10, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 10 }), 17, start, start + chrono::Duration::seconds(2), @@ -1375,11 +1251,7 @@ mod tests { assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); w.add_range( - Some(&Sequence { - id: 1, - number: 14, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 14 }), 11, start, start + chrono::Duration::seconds(2), @@ -1455,11 +1327,7 @@ mod tests { // Window 1 w.add_range( - Some(&Sequence { - id: 1, - number: 1, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 1 }), 11, Utc.timestamp_nanos(10), Utc.timestamp_nanos(11), @@ -1467,11 +1335,7 @@ mod tests { ); w.add_range( - Some(&Sequence { - id: 1, - number: 2, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 2 }), 4, Utc.timestamp_nanos(10), Utc.timestamp_nanos(340), @@ -1479,11 +1343,7 @@ mod tests { ); w.add_range( - Some(&Sequence { - id: 1, - number: 3, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 3 }), 6, Utc.timestamp_nanos(1), Utc.timestamp_nanos(5), @@ -1492,11 +1352,7 @@ mod tests { // More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 1 => Window 2 w.add_range( - Some(&Sequence { - id: 1, - number: 4, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 4 }), 3, Utc.timestamp_nanos(89), Utc.timestamp_nanos(90), @@ -1505,11 +1361,7 @@ mod tests { // More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 2 => Window 3 w.add_range( - Some(&Sequence { - id: 1, - number: 5, - ingest_timestamp: Utc::now(), - }), + Some(&Sequence { id: 1, number: 5 }), 8, Utc.timestamp_nanos(3), Utc.timestamp_nanos(4), diff --git a/server/src/db.rs b/server/src/db.rs index 1b0581a86f..c2311996c1 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -114,9 +114,6 @@ pub enum Error { ))] StoreSequencedEntryFailures { errors: Vec }, - #[snafu(display("Error building sequenced entry: {}", source))] - SequencedEntryError { source: entry::SequencedEntryError }, - #[snafu(display("background task cancelled: {}", source))] TaskCancelled { source: futures::future::Aborted }, @@ -883,6 +880,9 @@ impl Db { let sequence = sequenced_entry .sequence() .expect("entry from write buffer must be sequenced"); + let producer_wallclock_timestamp = sequenced_entry + .producer_wallclock_timestamp() + .expect("entry from write buffer must have a producer wallclock time"); let entry = sequenced_entry.entry(); metrics.bytes_read.add(entry.data().len() as u64); metrics @@ -936,7 +936,7 @@ impl Db { } metrics .last_ingest_ts - .set(sequence.ingest_timestamp.timestamp_nanos() as usize, &[]); + .set(producer_wallclock_timestamp.timestamp_nanos() as usize, &[]); } } @@ -976,14 +976,15 @@ impl Db { // buffer to return success before adding the entry to the mutable buffer. // TODO: be smarter than always using sequencer 0 - let sequence = write_buffer + let (sequence, producer_wallclock_timestamp) = write_buffer .store_entry(&entry, 0) .await .context(WriteBufferWritingError)?; - let sequenced_entry = Arc::new( - SequencedEntry::new_from_sequence(sequence, entry) - .context(SequencedEntryError)?, - ); + let sequenced_entry = Arc::new(SequencedEntry::new_from_sequence( + sequence, + producer_wallclock_timestamp, + entry, + )); self.store_sequenced_entry(sequenced_entry) } @@ -1484,20 +1485,16 @@ mod tests { let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); let ingest_ts1 = Utc.timestamp_millis(42); let ingest_ts2 = Utc.timestamp_millis(1337); - write_buffer_state.push_entry( - SequencedEntry::new_from_sequence( - Sequence::new(0, 0, ingest_ts1), - lp_to_entry("mem foo=1 10"), - ) - .unwrap(), - ); - write_buffer_state.push_entry( - SequencedEntry::new_from_sequence( - Sequence::new(0, 7, ingest_ts2), - lp_to_entry("cpu bar=2 20\ncpu bar=3 30"), - ) - .unwrap(), - ); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 0), + ingest_ts1, + lp_to_entry("mem foo=1 10"), + )); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 7), + ingest_ts2, + lp_to_entry("cpu bar=2 20\ncpu bar=3 30"), + )); let write_buffer = MockBufferForReading::new(write_buffer_state); let test_db = TestDb::builder() @@ -2728,21 +2725,26 @@ mod tests { let partition_key = "1970-01-01T00"; let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(2); - write_buffer_state.push_entry( - SequencedEntry::new_from_sequence(Sequence::new(0, 0, Utc::now()), entry.clone()) - .unwrap(), - ); - write_buffer_state.push_entry( - SequencedEntry::new_from_sequence(Sequence::new(1, 0, Utc::now()), entry.clone()) - .unwrap(), - ); - write_buffer_state.push_entry( - SequencedEntry::new_from_sequence(Sequence::new(1, 2, Utc::now()), entry.clone()) - .unwrap(), - ); - write_buffer_state.push_entry( - SequencedEntry::new_from_sequence(Sequence::new(0, 1, Utc::now()), entry).unwrap(), - ); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 0), + Utc::now(), + entry.clone(), + )); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(1, 0), + Utc::now(), + entry.clone(), + )); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(1, 2), + Utc::now(), + entry.clone(), + )); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 1), + Utc::now(), + entry, + )); let write_buffer = MockBufferForReading::new(write_buffer_state); let db = TestDb::builder() diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index ed84be385b..ba0acce2e4 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -220,10 +220,10 @@ mod tests { assert_eq!(entries.len(), 1); SequencedEntry::new_from_sequence( - Sequence::new(sequencer_id, sequence_number, Utc::now()), + Sequence::new(sequencer_id, sequence_number), + Utc::now(), entries.pop().unwrap(), ) - .unwrap() } } diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index fbb30caadc..3963918b32 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use entry::{Entry, Sequence, SequencedEntry}; use futures::{future::BoxFuture, stream::BoxStream}; @@ -20,7 +21,7 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { &self, entry: &Entry, sequencer_id: u32, - ) -> Result; + ) -> Result<(Sequence, DateTime), WriteBufferError>; } pub type FetchHighWatermarkFut<'a> = BoxFuture<'a, Result>; @@ -313,9 +314,9 @@ pub mod test_utils { let entry_west_1 = lp_to_entry("upc,region=west user=1 200"); let writer = context.writing(); - let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().number; - let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().number; - let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().number; + let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().0.number; + let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().0.number; + let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().0.number; let mut reader_1 = context.reading().await; let mut reader_2 = context.reading().await; @@ -343,7 +344,7 @@ pub mod test_utils { // seek to far end and then at data reader_1.seek(0, 1_000_000).await.unwrap(); - let _sequence_number_east_3 = writer.store_entry(&entry_east_3, 0).await.unwrap().number; + let _sequence_number_east_3 = writer.store_entry(&entry_east_3, 0).await.unwrap().0.number; let mut streams = reader_1.streams(); assert_eq!(streams.len(), 2); let (_sequencer_id, mut stream_1) = streams.pop().unwrap(); @@ -389,11 +390,13 @@ pub mod test_utils { .store_entry(&entry_east_2, sequencer_id_1) .await .unwrap() + .0 .number; let mark_2 = writer .store_entry(&entry_west_1, sequencer_id_2) .await .unwrap() + .0 .number; assert_eq!((stream_1.fetch_high_watermark)().await.unwrap(), mark_1 + 1); assert_eq!((stream_2.fetch_high_watermark)().await.unwrap(), mark_2 + 1); @@ -435,8 +438,7 @@ pub mod test_utils { // check that the timestamp records the ingestion time, not the read time let sequenced_entry = stream.stream.next().await.unwrap().unwrap(); - let sequence = sequenced_entry.sequence().unwrap(); - let ts_entry = sequence.ingest_timestamp; + let ts_entry = sequenced_entry.producer_wallclock_timestamp().unwrap(); assert!(ts_entry >= ts_pre, "{} >= {}", ts_entry, ts_pre); assert!(ts_entry <= ts_post, "{} <= {}", ts_entry, ts_post); } diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 85dee7cc41..a1dbd647ce 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -6,7 +6,7 @@ use std::{ }; use async_trait::async_trait; -use chrono::{TimeZone, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use data_types::server_id::ServerId; use entry::{Entry, Sequence, SequencedEntry}; use futures::{FutureExt, StreamExt}; @@ -48,7 +48,7 @@ impl WriteBufferWriting for KafkaBufferProducer { &self, entry: &Entry, sequencer_id: u32, - ) -> Result { + ) -> Result<(Sequence, DateTime), WriteBufferError> { let partition = i32::try_from(sequencer_id)?; let timestamp = Utc::now(); @@ -69,11 +69,13 @@ impl WriteBufferWriting for KafkaBufferProducer { debug!(db_name=%self.database_name, %offset, %partition, size=entry.data().len(), "wrote to kafka"); - Ok(Sequence { - id: partition.try_into()?, - number: offset.try_into()?, - ingest_timestamp: timestamp, - }) + Ok(( + Sequence { + id: partition.try_into()?, + number: offset.try_into()?, + }, + timestamp, + )) } } @@ -153,10 +155,9 @@ impl WriteBufferReading for KafkaBufferConsumer { let sequence = Sequence { id: message.partition().try_into()?, number: message.offset().try_into()?, - ingest_timestamp: timestamp, }; - Ok(SequencedEntry::new_from_sequence(sequence, entry)?) + Ok(SequencedEntry::new_from_sequence(sequence, timestamp, entry)) }) .boxed(); diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index 331ab82b64..e741bcb35c 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeMap, sync::Arc, task::Poll}; use async_trait::async_trait; -use chrono::Utc; +use chrono::{DateTime, Utc}; use entry::{Entry, Sequence, SequencedEntry}; use futures::{stream, FutureExt, StreamExt}; use parking_lot::Mutex; @@ -110,7 +110,7 @@ impl WriteBufferWriting for MockBufferForWriting { &self, entry: &Entry, sequencer_id: u32, - ) -> Result { + ) -> Result<(Sequence, DateTime), WriteBufferError> { let mut entries = self.state.entries.lock(); let sequencer_entries = entries.get_mut(&sequencer_id).unwrap(); @@ -129,15 +129,15 @@ impl WriteBufferWriting for MockBufferForWriting { let sequence = Sequence { id: sequencer_id, number: sequence_number, - ingest_timestamp: Utc::now(), }; + let timestamp = Utc::now(); sequencer_entries.push(Ok(SequencedEntry::new_from_sequence( sequence, + timestamp, entry.clone(), - ) - .unwrap())); + ))); - Ok(sequence) + Ok((sequence, timestamp)) } } @@ -150,7 +150,7 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { &self, _entry: &Entry, _sequencer_id: u32, - ) -> Result { + ) -> Result<(Sequence, DateTime), WriteBufferError> { Err(String::from( "Something bad happened on the way to writing an entry in the write buffer", ) @@ -364,8 +364,12 @@ mod tests { fn test_state_push_entry_panic_wrong_sequencer() { let state = MockBufferSharedState::empty_with_n_sequencers(2); let entry = lp_to_entry("upc,region=east user=1 100"); - let sequence = Sequence::new(2, 0, Utc::now()); - state.push_entry(SequencedEntry::new_from_sequence(sequence, entry).unwrap()); + let sequence = Sequence::new(2, 0); + state.push_entry(SequencedEntry::new_from_sequence( + sequence, + Utc::now(), + entry, + )); } #[test] @@ -375,9 +379,17 @@ mod tests { fn test_state_push_entry_panic_wrong_sequence_number_equal() { let state = MockBufferSharedState::empty_with_n_sequencers(2); let entry = lp_to_entry("upc,region=east user=1 100"); - let sequence = Sequence::new(1, 13, Utc::now()); - state.push_entry(SequencedEntry::new_from_sequence(sequence, entry.clone()).unwrap()); - state.push_entry(SequencedEntry::new_from_sequence(sequence, entry).unwrap()); + let sequence = Sequence::new(1, 13); + state.push_entry(SequencedEntry::new_from_sequence( + sequence, + Utc::now(), + entry.clone(), + )); + state.push_entry(SequencedEntry::new_from_sequence( + sequence, + Utc::now(), + entry, + )); } #[test] @@ -387,10 +399,18 @@ mod tests { fn test_state_push_entry_panic_wrong_sequence_number_less() { let state = MockBufferSharedState::empty_with_n_sequencers(2); let entry = lp_to_entry("upc,region=east user=1 100"); - let sequence_1 = Sequence::new(1, 13, Utc::now()); - let sequence_2 = Sequence::new(1, 12, Utc::now()); - state.push_entry(SequencedEntry::new_from_sequence(sequence_1, entry.clone()).unwrap()); - state.push_entry(SequencedEntry::new_from_sequence(sequence_2, entry).unwrap()); + let sequence_1 = Sequence::new(1, 13); + let sequence_2 = Sequence::new(1, 12); + state.push_entry(SequencedEntry::new_from_sequence( + sequence_1, + Utc::now(), + entry.clone(), + )); + state.push_entry(SequencedEntry::new_from_sequence( + sequence_2, + Utc::now(), + entry, + )); } #[test]