refactor: move ingest timestamp from sequence to sequended entry
parent
d940d4f6d3
commit
0fcec6b742
|
@ -1720,36 +1720,29 @@ enum InnerClockValueError {
|
|||
ValueMayNotBeZero,
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum SequencedEntryError {
|
||||
#[snafu(display("{}", source))]
|
||||
InvalidFlatbuffer {
|
||||
source: flatbuffers::InvalidFlatbuffer,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SequencedEntry {
|
||||
entry: Entry,
|
||||
/// The (optional) sequence for this entry. At the time of
|
||||
/// writing, sequences will not be present when there is no
|
||||
/// configured mechanism to define the order of all writes.
|
||||
sequence: Option<Sequence>,
|
||||
|
||||
/// The (optional) sequence for this entry including the timestamp when the producer ingested it into the write
|
||||
/// buffer.
|
||||
///
|
||||
/// At the time of writing, sequences will not be present when there is no configured mechanism to define the order
|
||||
/// of all writes.
|
||||
sequence_and_producer_ts: Option<(Sequence, DateTime<Utc>)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct Sequence {
|
||||
pub id: u32,
|
||||
pub number: u64,
|
||||
pub ingest_timestamp: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Sequence {
|
||||
pub fn new(sequencer_id: u32, sequence_number: u64, ingest_timestamp: DateTime<Utc>) -> Self {
|
||||
pub fn new(sequencer_id: u32, sequence_number: u64) -> Self {
|
||||
Self {
|
||||
id: sequencer_id,
|
||||
number: sequence_number,
|
||||
ingest_timestamp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1757,15 +1750,20 @@ impl Sequence {
|
|||
impl SequencedEntry {
|
||||
pub fn new_from_sequence(
|
||||
sequence: Sequence,
|
||||
producer_wallclock_timestamp: DateTime<Utc>,
|
||||
entry: Entry,
|
||||
) -> Result<Self, SequencedEntryError> {
|
||||
let sequence = Some(sequence);
|
||||
Ok(Self { entry, sequence })
|
||||
) -> Self {
|
||||
Self {
|
||||
entry,
|
||||
sequence_and_producer_ts: Some((sequence, producer_wallclock_timestamp)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_unsequenced(entry: Entry) -> Self {
|
||||
let sequence = None;
|
||||
Self { entry, sequence }
|
||||
Self {
|
||||
entry,
|
||||
sequence_and_producer_ts: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>> {
|
||||
|
@ -1773,7 +1771,15 @@ impl SequencedEntry {
|
|||
}
|
||||
|
||||
pub fn sequence(&self) -> Option<&Sequence> {
|
||||
self.sequence.as_ref()
|
||||
self.sequence_and_producer_ts
|
||||
.as_ref()
|
||||
.map(|(sequence, _ts)| sequence)
|
||||
}
|
||||
|
||||
pub fn producer_wallclock_timestamp(&self) -> Option<DateTime<Utc>> {
|
||||
self.sequence_and_producer_ts
|
||||
.as_ref()
|
||||
.map(|(_sequence, ts)| *ts)
|
||||
}
|
||||
|
||||
pub fn entry(&self) -> &Entry {
|
||||
|
|
|
@ -543,33 +543,21 @@ mod tests {
|
|||
let start_time = Utc::now();
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
1,
|
||||
start_time,
|
||||
Utc::now(),
|
||||
i,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 4,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 4 }),
|
||||
2,
|
||||
Utc::now(),
|
||||
Utc::now(),
|
||||
Instant::now(),
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 10,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 10 }),
|
||||
1,
|
||||
Utc::now(),
|
||||
Utc::now(),
|
||||
|
@ -577,11 +565,7 @@ mod tests {
|
|||
);
|
||||
let last_time = Utc::now();
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 2,
|
||||
number: 23,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 2, number: 23 }),
|
||||
10,
|
||||
Utc::now(),
|
||||
last_time,
|
||||
|
@ -613,22 +597,14 @@ mod tests {
|
|||
let last_time = Utc::now();
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
1,
|
||||
start_time,
|
||||
start_time,
|
||||
created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 3,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 3 }),
|
||||
1,
|
||||
last_time,
|
||||
last_time,
|
||||
|
@ -639,11 +615,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let open_time = Utc::now();
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 6,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 6 }),
|
||||
2,
|
||||
last_time,
|
||||
open_time,
|
||||
|
@ -679,11 +651,7 @@ mod tests {
|
|||
|
||||
let first_end = Utc::now();
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
2,
|
||||
start_time,
|
||||
first_end,
|
||||
|
@ -695,11 +663,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let second_end = Utc::now();
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 3,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 3 }),
|
||||
3,
|
||||
first_end,
|
||||
second_end,
|
||||
|
@ -711,11 +675,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let third_end = Utc::now();
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 4,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 4 }),
|
||||
4,
|
||||
second_end,
|
||||
third_end,
|
||||
|
@ -747,11 +707,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let fourth_end = Utc::now();
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 5,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 5 }),
|
||||
1,
|
||||
fourth_end,
|
||||
fourth_end,
|
||||
|
@ -776,11 +732,7 @@ mod tests {
|
|||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 100)
|
||||
.unwrap();
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 9,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 9 }),
|
||||
2,
|
||||
Utc::now(),
|
||||
Utc::now(),
|
||||
|
@ -821,33 +773,21 @@ mod tests {
|
|||
let third_end = third_start + chrono::Duration::seconds(1);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
2,
|
||||
start_time,
|
||||
first_end,
|
||||
created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 3,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 3 }),
|
||||
3,
|
||||
second_start,
|
||||
second_end,
|
||||
second_created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 5,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 5 }),
|
||||
2,
|
||||
third_start,
|
||||
third_end,
|
||||
|
@ -912,33 +852,21 @@ mod tests {
|
|||
let third_end = third_start + chrono::Duration::seconds(1);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
2,
|
||||
start_time,
|
||||
first_end,
|
||||
created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 3,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 3 }),
|
||||
3,
|
||||
second_start,
|
||||
second_end,
|
||||
second_created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 5,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 5 }),
|
||||
2,
|
||||
third_start,
|
||||
third_end,
|
||||
|
@ -1003,33 +931,21 @@ mod tests {
|
|||
let third_end = second_end + chrono::Duration::seconds(1);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
2,
|
||||
start_time,
|
||||
first_end,
|
||||
created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 3,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 3 }),
|
||||
3,
|
||||
first_end,
|
||||
second_end,
|
||||
second_created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 5,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 5 }),
|
||||
2,
|
||||
third_start,
|
||||
third_end,
|
||||
|
@ -1101,33 +1017,21 @@ mod tests {
|
|||
let third_end = second_end + chrono::Duration::seconds(1);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
2,
|
||||
start_time,
|
||||
first_end,
|
||||
created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 3,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 3 }),
|
||||
3,
|
||||
second_start,
|
||||
second_end,
|
||||
second_created_at,
|
||||
);
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 5,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 5 }),
|
||||
2,
|
||||
third_start,
|
||||
third_end,
|
||||
|
@ -1181,11 +1085,7 @@ mod tests {
|
|||
let start = Utc::now();
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
2,
|
||||
start,
|
||||
start + chrono::Duration::seconds(2),
|
||||
|
@ -1201,11 +1101,7 @@ mod tests {
|
|||
);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 4,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 4 }),
|
||||
5,
|
||||
start,
|
||||
start + chrono::Duration::seconds(4),
|
||||
|
@ -1275,11 +1171,7 @@ mod tests {
|
|||
);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 9,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 9 }),
|
||||
9,
|
||||
start,
|
||||
start + chrono::Duration::seconds(2),
|
||||
|
@ -1309,11 +1201,7 @@ mod tests {
|
|||
let start = Utc::now();
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
2,
|
||||
start,
|
||||
start + chrono::Duration::seconds(2),
|
||||
|
@ -1321,11 +1209,7 @@ mod tests {
|
|||
);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 6,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 6 }),
|
||||
5,
|
||||
start,
|
||||
start + chrono::Duration::seconds(4),
|
||||
|
@ -1333,11 +1217,7 @@ mod tests {
|
|||
);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 9,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 9 }),
|
||||
9,
|
||||
start,
|
||||
start + chrono::Duration::seconds(2),
|
||||
|
@ -1345,11 +1225,7 @@ mod tests {
|
|||
);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 10,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 10 }),
|
||||
17,
|
||||
start,
|
||||
start + chrono::Duration::seconds(2),
|
||||
|
@ -1375,11 +1251,7 @@ mod tests {
|
|||
assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 14,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 14 }),
|
||||
11,
|
||||
start,
|
||||
start + chrono::Duration::seconds(2),
|
||||
|
@ -1455,11 +1327,7 @@ mod tests {
|
|||
|
||||
// Window 1
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 1,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 1 }),
|
||||
11,
|
||||
Utc.timestamp_nanos(10),
|
||||
Utc.timestamp_nanos(11),
|
||||
|
@ -1467,11 +1335,7 @@ mod tests {
|
|||
);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 2,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 2 }),
|
||||
4,
|
||||
Utc.timestamp_nanos(10),
|
||||
Utc.timestamp_nanos(340),
|
||||
|
@ -1479,11 +1343,7 @@ mod tests {
|
|||
);
|
||||
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 3,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 3 }),
|
||||
6,
|
||||
Utc.timestamp_nanos(1),
|
||||
Utc.timestamp_nanos(5),
|
||||
|
@ -1492,11 +1352,7 @@ mod tests {
|
|||
|
||||
// More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 1 => Window 2
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 4,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 4 }),
|
||||
3,
|
||||
Utc.timestamp_nanos(89),
|
||||
Utc.timestamp_nanos(90),
|
||||
|
@ -1505,11 +1361,7 @@ mod tests {
|
|||
|
||||
// More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 2 => Window 3
|
||||
w.add_range(
|
||||
Some(&Sequence {
|
||||
id: 1,
|
||||
number: 5,
|
||||
ingest_timestamp: Utc::now(),
|
||||
}),
|
||||
Some(&Sequence { id: 1, number: 5 }),
|
||||
8,
|
||||
Utc.timestamp_nanos(3),
|
||||
Utc.timestamp_nanos(4),
|
||||
|
|
|
@ -114,9 +114,6 @@ pub enum Error {
|
|||
))]
|
||||
StoreSequencedEntryFailures { errors: Vec<Error> },
|
||||
|
||||
#[snafu(display("Error building sequenced entry: {}", source))]
|
||||
SequencedEntryError { source: entry::SequencedEntryError },
|
||||
|
||||
#[snafu(display("background task cancelled: {}", source))]
|
||||
TaskCancelled { source: futures::future::Aborted },
|
||||
|
||||
|
@ -883,6 +880,9 @@ impl Db {
|
|||
let sequence = sequenced_entry
|
||||
.sequence()
|
||||
.expect("entry from write buffer must be sequenced");
|
||||
let producer_wallclock_timestamp = sequenced_entry
|
||||
.producer_wallclock_timestamp()
|
||||
.expect("entry from write buffer must have a producer wallclock time");
|
||||
let entry = sequenced_entry.entry();
|
||||
metrics.bytes_read.add(entry.data().len() as u64);
|
||||
metrics
|
||||
|
@ -936,7 +936,7 @@ impl Db {
|
|||
}
|
||||
metrics
|
||||
.last_ingest_ts
|
||||
.set(sequence.ingest_timestamp.timestamp_nanos() as usize, &[]);
|
||||
.set(producer_wallclock_timestamp.timestamp_nanos() as usize, &[]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -976,14 +976,15 @@ impl Db {
|
|||
// buffer to return success before adding the entry to the mutable buffer.
|
||||
|
||||
// TODO: be smarter than always using sequencer 0
|
||||
let sequence = write_buffer
|
||||
let (sequence, producer_wallclock_timestamp) = write_buffer
|
||||
.store_entry(&entry, 0)
|
||||
.await
|
||||
.context(WriteBufferWritingError)?;
|
||||
let sequenced_entry = Arc::new(
|
||||
SequencedEntry::new_from_sequence(sequence, entry)
|
||||
.context(SequencedEntryError)?,
|
||||
);
|
||||
let sequenced_entry = Arc::new(SequencedEntry::new_from_sequence(
|
||||
sequence,
|
||||
producer_wallclock_timestamp,
|
||||
entry,
|
||||
));
|
||||
|
||||
self.store_sequenced_entry(sequenced_entry)
|
||||
}
|
||||
|
@ -1484,20 +1485,16 @@ mod tests {
|
|||
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
|
||||
let ingest_ts1 = Utc.timestamp_millis(42);
|
||||
let ingest_ts2 = Utc.timestamp_millis(1337);
|
||||
write_buffer_state.push_entry(
|
||||
SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 0, ingest_ts1),
|
||||
lp_to_entry("mem foo=1 10"),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
write_buffer_state.push_entry(
|
||||
SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 7, ingest_ts2),
|
||||
lp_to_entry("cpu bar=2 20\ncpu bar=3 30"),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 0),
|
||||
ingest_ts1,
|
||||
lp_to_entry("mem foo=1 10"),
|
||||
));
|
||||
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 7),
|
||||
ingest_ts2,
|
||||
lp_to_entry("cpu bar=2 20\ncpu bar=3 30"),
|
||||
));
|
||||
let write_buffer = MockBufferForReading::new(write_buffer_state);
|
||||
|
||||
let test_db = TestDb::builder()
|
||||
|
@ -2728,21 +2725,26 @@ mod tests {
|
|||
let partition_key = "1970-01-01T00";
|
||||
|
||||
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(2);
|
||||
write_buffer_state.push_entry(
|
||||
SequencedEntry::new_from_sequence(Sequence::new(0, 0, Utc::now()), entry.clone())
|
||||
.unwrap(),
|
||||
);
|
||||
write_buffer_state.push_entry(
|
||||
SequencedEntry::new_from_sequence(Sequence::new(1, 0, Utc::now()), entry.clone())
|
||||
.unwrap(),
|
||||
);
|
||||
write_buffer_state.push_entry(
|
||||
SequencedEntry::new_from_sequence(Sequence::new(1, 2, Utc::now()), entry.clone())
|
||||
.unwrap(),
|
||||
);
|
||||
write_buffer_state.push_entry(
|
||||
SequencedEntry::new_from_sequence(Sequence::new(0, 1, Utc::now()), entry).unwrap(),
|
||||
);
|
||||
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 0),
|
||||
Utc::now(),
|
||||
entry.clone(),
|
||||
));
|
||||
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(1, 0),
|
||||
Utc::now(),
|
||||
entry.clone(),
|
||||
));
|
||||
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(1, 2),
|
||||
Utc::now(),
|
||||
entry.clone(),
|
||||
));
|
||||
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 1),
|
||||
Utc::now(),
|
||||
entry,
|
||||
));
|
||||
let write_buffer = MockBufferForReading::new(write_buffer_state);
|
||||
|
||||
let db = TestDb::builder()
|
||||
|
|
|
@ -220,10 +220,10 @@ mod tests {
|
|||
assert_eq!(entries.len(), 1);
|
||||
|
||||
SequencedEntry::new_from_sequence(
|
||||
Sequence::new(sequencer_id, sequence_number, Utc::now()),
|
||||
Sequence::new(sequencer_id, sequence_number),
|
||||
Utc::now(),
|
||||
entries.pop().unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use std::fmt::Debug;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use entry::{Entry, Sequence, SequencedEntry};
|
||||
use futures::{future::BoxFuture, stream::BoxStream};
|
||||
|
||||
|
@ -20,7 +21,7 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
|
|||
&self,
|
||||
entry: &Entry,
|
||||
sequencer_id: u32,
|
||||
) -> Result<Sequence, WriteBufferError>;
|
||||
) -> Result<(Sequence, DateTime<Utc>), WriteBufferError>;
|
||||
}
|
||||
|
||||
pub type FetchHighWatermarkFut<'a> = BoxFuture<'a, Result<u64, WriteBufferError>>;
|
||||
|
@ -313,9 +314,9 @@ pub mod test_utils {
|
|||
let entry_west_1 = lp_to_entry("upc,region=west user=1 200");
|
||||
|
||||
let writer = context.writing();
|
||||
let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().number;
|
||||
let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().number;
|
||||
let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().number;
|
||||
let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().0.number;
|
||||
let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().0.number;
|
||||
let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().0.number;
|
||||
|
||||
let mut reader_1 = context.reading().await;
|
||||
let mut reader_2 = context.reading().await;
|
||||
|
@ -343,7 +344,7 @@ pub mod test_utils {
|
|||
|
||||
// seek to far end and then at data
|
||||
reader_1.seek(0, 1_000_000).await.unwrap();
|
||||
let _sequence_number_east_3 = writer.store_entry(&entry_east_3, 0).await.unwrap().number;
|
||||
let _sequence_number_east_3 = writer.store_entry(&entry_east_3, 0).await.unwrap().0.number;
|
||||
let mut streams = reader_1.streams();
|
||||
assert_eq!(streams.len(), 2);
|
||||
let (_sequencer_id, mut stream_1) = streams.pop().unwrap();
|
||||
|
@ -389,11 +390,13 @@ pub mod test_utils {
|
|||
.store_entry(&entry_east_2, sequencer_id_1)
|
||||
.await
|
||||
.unwrap()
|
||||
.0
|
||||
.number;
|
||||
let mark_2 = writer
|
||||
.store_entry(&entry_west_1, sequencer_id_2)
|
||||
.await
|
||||
.unwrap()
|
||||
.0
|
||||
.number;
|
||||
assert_eq!((stream_1.fetch_high_watermark)().await.unwrap(), mark_1 + 1);
|
||||
assert_eq!((stream_2.fetch_high_watermark)().await.unwrap(), mark_2 + 1);
|
||||
|
@ -435,8 +438,7 @@ pub mod test_utils {
|
|||
|
||||
// check that the timestamp records the ingestion time, not the read time
|
||||
let sequenced_entry = stream.stream.next().await.unwrap().unwrap();
|
||||
let sequence = sequenced_entry.sequence().unwrap();
|
||||
let ts_entry = sequence.ingest_timestamp;
|
||||
let ts_entry = sequenced_entry.producer_wallclock_timestamp().unwrap();
|
||||
assert!(ts_entry >= ts_pre, "{} >= {}", ts_entry, ts_pre);
|
||||
assert!(ts_entry <= ts_post, "{} <= {}", ts_entry, ts_post);
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use std::{
|
|||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{TimeZone, Utc};
|
||||
use chrono::{DateTime, TimeZone, Utc};
|
||||
use data_types::server_id::ServerId;
|
||||
use entry::{Entry, Sequence, SequencedEntry};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
|
@ -48,7 +48,7 @@ impl WriteBufferWriting for KafkaBufferProducer {
|
|||
&self,
|
||||
entry: &Entry,
|
||||
sequencer_id: u32,
|
||||
) -> Result<Sequence, WriteBufferError> {
|
||||
) -> Result<(Sequence, DateTime<Utc>), WriteBufferError> {
|
||||
let partition = i32::try_from(sequencer_id)?;
|
||||
let timestamp = Utc::now();
|
||||
|
||||
|
@ -69,11 +69,13 @@ impl WriteBufferWriting for KafkaBufferProducer {
|
|||
|
||||
debug!(db_name=%self.database_name, %offset, %partition, size=entry.data().len(), "wrote to kafka");
|
||||
|
||||
Ok(Sequence {
|
||||
id: partition.try_into()?,
|
||||
number: offset.try_into()?,
|
||||
ingest_timestamp: timestamp,
|
||||
})
|
||||
Ok((
|
||||
Sequence {
|
||||
id: partition.try_into()?,
|
||||
number: offset.try_into()?,
|
||||
},
|
||||
timestamp,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -153,10 +155,9 @@ impl WriteBufferReading for KafkaBufferConsumer {
|
|||
let sequence = Sequence {
|
||||
id: message.partition().try_into()?,
|
||||
number: message.offset().try_into()?,
|
||||
ingest_timestamp: timestamp,
|
||||
};
|
||||
|
||||
Ok(SequencedEntry::new_from_sequence(sequence, entry)?)
|
||||
Ok(SequencedEntry::new_from_sequence(sequence, timestamp, entry))
|
||||
})
|
||||
.boxed();
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::{collections::BTreeMap, sync::Arc, task::Poll};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::Utc;
|
||||
use chrono::{DateTime, Utc};
|
||||
use entry::{Entry, Sequence, SequencedEntry};
|
||||
use futures::{stream, FutureExt, StreamExt};
|
||||
use parking_lot::Mutex;
|
||||
|
@ -110,7 +110,7 @@ impl WriteBufferWriting for MockBufferForWriting {
|
|||
&self,
|
||||
entry: &Entry,
|
||||
sequencer_id: u32,
|
||||
) -> Result<Sequence, WriteBufferError> {
|
||||
) -> Result<(Sequence, DateTime<Utc>), WriteBufferError> {
|
||||
let mut entries = self.state.entries.lock();
|
||||
let sequencer_entries = entries.get_mut(&sequencer_id).unwrap();
|
||||
|
||||
|
@ -129,15 +129,15 @@ impl WriteBufferWriting for MockBufferForWriting {
|
|||
let sequence = Sequence {
|
||||
id: sequencer_id,
|
||||
number: sequence_number,
|
||||
ingest_timestamp: Utc::now(),
|
||||
};
|
||||
let timestamp = Utc::now();
|
||||
sequencer_entries.push(Ok(SequencedEntry::new_from_sequence(
|
||||
sequence,
|
||||
timestamp,
|
||||
entry.clone(),
|
||||
)
|
||||
.unwrap()));
|
||||
)));
|
||||
|
||||
Ok(sequence)
|
||||
Ok((sequence, timestamp))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,7 +150,7 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
|
|||
&self,
|
||||
_entry: &Entry,
|
||||
_sequencer_id: u32,
|
||||
) -> Result<Sequence, WriteBufferError> {
|
||||
) -> Result<(Sequence, DateTime<Utc>), WriteBufferError> {
|
||||
Err(String::from(
|
||||
"Something bad happened on the way to writing an entry in the write buffer",
|
||||
)
|
||||
|
@ -364,8 +364,12 @@ mod tests {
|
|||
fn test_state_push_entry_panic_wrong_sequencer() {
|
||||
let state = MockBufferSharedState::empty_with_n_sequencers(2);
|
||||
let entry = lp_to_entry("upc,region=east user=1 100");
|
||||
let sequence = Sequence::new(2, 0, Utc::now());
|
||||
state.push_entry(SequencedEntry::new_from_sequence(sequence, entry).unwrap());
|
||||
let sequence = Sequence::new(2, 0);
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
sequence,
|
||||
Utc::now(),
|
||||
entry,
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -375,9 +379,17 @@ mod tests {
|
|||
fn test_state_push_entry_panic_wrong_sequence_number_equal() {
|
||||
let state = MockBufferSharedState::empty_with_n_sequencers(2);
|
||||
let entry = lp_to_entry("upc,region=east user=1 100");
|
||||
let sequence = Sequence::new(1, 13, Utc::now());
|
||||
state.push_entry(SequencedEntry::new_from_sequence(sequence, entry.clone()).unwrap());
|
||||
state.push_entry(SequencedEntry::new_from_sequence(sequence, entry).unwrap());
|
||||
let sequence = Sequence::new(1, 13);
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
sequence,
|
||||
Utc::now(),
|
||||
entry.clone(),
|
||||
));
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
sequence,
|
||||
Utc::now(),
|
||||
entry,
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -387,10 +399,18 @@ mod tests {
|
|||
fn test_state_push_entry_panic_wrong_sequence_number_less() {
|
||||
let state = MockBufferSharedState::empty_with_n_sequencers(2);
|
||||
let entry = lp_to_entry("upc,region=east user=1 100");
|
||||
let sequence_1 = Sequence::new(1, 13, Utc::now());
|
||||
let sequence_2 = Sequence::new(1, 12, Utc::now());
|
||||
state.push_entry(SequencedEntry::new_from_sequence(sequence_1, entry.clone()).unwrap());
|
||||
state.push_entry(SequencedEntry::new_from_sequence(sequence_2, entry).unwrap());
|
||||
let sequence_1 = Sequence::new(1, 13);
|
||||
let sequence_2 = Sequence::new(1, 12);
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
sequence_1,
|
||||
Utc::now(),
|
||||
entry.clone(),
|
||||
));
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
sequence_2,
|
||||
Utc::now(),
|
||||
entry,
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue