Merge branch 'main' into genscript

pull/24376/head
kodiakhq[bot] 2021-07-13 14:11:37 +00:00 committed by GitHub
commit c5f97211ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 89 additions and 2 deletions

View File

@ -1,6 +1,6 @@
//! In memory structures for tracking data ingest and when persistence can or should occur. //! In memory structures for tracking data ingest and when persistence can or should occur.
use std::{ use std::{
collections::{BTreeMap, VecDeque}, collections::{btree_map::Entry, BTreeMap, VecDeque},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -27,6 +27,12 @@ pub struct PersistenceWindows {
open: Option<Window>, open: Option<Window>,
late_arrival_period: Duration, late_arrival_period: Duration,
closed_window_period: Duration, closed_window_period: Duration,
/// The last last instant passed to PersistenceWindows::add_range
last_instant: Instant,
/// maps sequencer_id to the maximum sequence passed to PersistenceWindows::add_range
sequencer_numbers: BTreeMap<u32, u64>,
} }
/// A handle for flushing data from the `PersistenceWindows` /// A handle for flushing data from the `PersistenceWindows`
@ -56,6 +62,8 @@ impl PersistenceWindows {
open: None, open: None,
late_arrival_period, late_arrival_period,
closed_window_period, closed_window_period,
last_instant: Instant::now(),
sequencer_numbers: Default::default(),
} }
} }
@ -79,6 +87,30 @@ impl PersistenceWindows {
max_time: DateTime<Utc>, max_time: DateTime<Utc>,
received_at: Instant, received_at: Instant,
) { ) {
assert!(
received_at >= self.last_instant,
"PersistenceWindows::add_range called out of order"
);
self.last_instant = received_at;
if let Some(sequence) = sequence {
match self.sequencer_numbers.entry(sequence.id) {
Entry::Occupied(mut occupied) => {
assert!(
*occupied.get() < sequence.number,
"sequence number {} for sequencer {} was not greater than previous {}",
sequence.number,
sequence.id,
*occupied.get()
);
*occupied.get_mut() = sequence.number;
}
Entry::Vacant(vacant) => {
vacant.insert(sequence.number);
}
}
}
self.rotate(received_at); self.rotate(received_at);
match self.open.as_mut() { match self.open.as_mut() {
@ -983,7 +1015,7 @@ mod tests {
); );
w.add_range( w.add_range(
Some(&Sequence { id: 1, number: 9 }), Some(&Sequence { id: 1, number: 10 }),
17, 17,
start, start,
start + chrono::Duration::seconds(2), start + chrono::Duration::seconds(2),

View File

@ -118,3 +118,56 @@ pub(crate) fn compact_chunks(
Ok((tracker, fut.track(registration))) Ok((tracker, fut.track(registration)))
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::db::test_helpers::write_lp;
use crate::utils::make_db;
use data_types::chunk_metadata::ChunkStorage;
use lifecycle::{LockableChunk, LockablePartition};
use query::QueryDatabase;
#[tokio::test]
async fn test_compact_freeze() {
let test_db = make_db().await;
let db = test_db.db;
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20").await;
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10").await;
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20").await;
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10").await;
let partition_keys = db.partition_keys().unwrap();
assert_eq!(partition_keys.len(), 1);
let db_partition = db.partition("cpu", &partition_keys[0]).unwrap();
let partition = LockableCatalogPartition::new(Arc::clone(&db), Arc::clone(&db_partition));
let partition = partition.read();
let chunks = LockablePartition::chunks(&partition);
assert_eq!(chunks.len(), 1);
let chunk = chunks[0].1.read();
let (_, fut) = compact_chunks(partition.upgrade(), vec![chunk.upgrade()]).unwrap();
// NB: perform the write before spawning the background task that performs the compaction
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 40").await;
tokio::spawn(fut).await.unwrap().unwrap().unwrap();
let summaries: Vec<_> = db_partition
.read()
.chunk_summaries()
.map(|summary| (summary.storage, summary.row_count))
.collect();
assert_eq!(
summaries,
vec![
(ChunkStorage::OpenMutableBuffer, 1),
(ChunkStorage::ReadBuffer, 5)
]
)
}
}

View File

@ -535,6 +535,7 @@ impl InitStatus {
config: rules.write_buffer_connection.clone(), config: rules.write_buffer_connection.clone(),
}, },
)?; )?;
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
handle handle
.advance_replay(preserved_catalog, catalog, write_buffer) .advance_replay(preserved_catalog, catalog, write_buffer)

View File

@ -562,6 +562,7 @@ where
source: e, source: e,
} }
})?; })?;
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?; db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?;
// no actual replay required // no actual replay required