feat: implement naive replay

pull/24376/head
Marco Neumann 2021-07-22 16:25:05 +02:00
parent 6ef3680554
commit db0f501b02
2 changed files with 522 additions and 5 deletions

View File

@ -24,6 +24,7 @@ use data_types::{
};
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use entry::{Entry, SequencedEntry};
use futures::TryStreamExt;
use futures::{stream::BoxStream, StreamExt};
use internal_types::schema::Schema;
use metrics::KeyValue;
@ -36,10 +37,12 @@ use parquet_file::{
catalog::{CheckpointData, PreservedCatalog},
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
};
use persistence_windows::checkpoint::ReplayPlan;
use persistence_windows::persistence_windows::PersistenceWindows;
use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::collections::BTreeMap;
use std::{
any::Any,
collections::HashMap,
@ -140,6 +143,49 @@ pub enum Error {
table_name: String,
partition_key: String,
},
#[snafu(display("Cannot create replay plan: {}", source))]
ReplayPlanError {
source: persistence_windows::checkpoint::Error,
},
#[snafu(display("Cannot seek sequencer {} during replay: {}", sequencer_id, source))]
ReplaySeekError {
sequencer_id: u32,
source: write_buffer::core::WriteBufferError,
},
#[snafu(display(
"Cannot get entry from sequencer {} during replay: {}",
sequencer_id,
source
))]
ReplayEntryError {
sequencer_id: u32,
source: write_buffer::core::WriteBufferError,
},
#[snafu(display(
"For sequencer {} expected to find sequence {} but replay jumped to {}",
sequencer_id,
expected_sequence_number,
actual_sequence_number,
))]
ReplayEntryLostError {
sequencer_id: u32,
actual_sequence_number: u64,
expected_sequence_number: u64,
},
#[snafu(display(
"Cannot store from sequencer {} during replay: {}",
sequencer_id,
source
))]
ReplayStoreError {
sequencer_id: u32,
source: Box<Error>,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -715,8 +761,90 @@ impl Db {
}
/// Perform sequencer-driven replay for this DB.
pub async fn perform_replay(&self) {
// TODO: implement replay
pub async fn perform_replay(&self, replay_plan: &ReplayPlan) -> Result<()> {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer {
let mut write_buffer = write_buffer
.try_lock()
.expect("no streams should exist at this point");
// determine replay ranges based on the plan
let sequencer_ids: Vec<_> = write_buffer
.streams()
.into_iter()
.map(|(sequencer_id, _stream)| sequencer_id)
.collect();
let replay_ranges: BTreeMap<_, _> = sequencer_ids
.into_iter()
.filter_map(|sequencer_id| {
replay_plan
.replay_range(sequencer_id)
.map(|min_max| (sequencer_id, min_max))
})
.collect();
// seek write buffer according to the plan
for (sequencer_id, min_max) in &replay_ranges {
if let Some(min) = min_max.min() {
write_buffer
.seek(*sequencer_id, min)
.await
.context(ReplaySeekError {
sequencer_id: *sequencer_id,
})?;
}
}
// replay ranges
for (sequencer_id, mut stream) in write_buffer.streams() {
if let Some(min_max) = replay_ranges.get(&sequencer_id) {
if min_max.min().is_none() {
// no replay required
continue;
}
while let Some(entry) = stream
.stream
.try_next()
.await
.context(ReplayEntryError { sequencer_id })?
{
let sequence = *entry.sequence().expect("entry must be sequenced");
if sequence.number > min_max.max() {
return Err(Error::ReplayEntryLostError {
sequencer_id,
actual_sequence_number: sequence.number,
expected_sequence_number: min_max.max(),
});
}
let entry = Arc::new(entry);
self.store_sequenced_entry(entry)
.map_err(Box::new)
.context(ReplayStoreError { sequencer_id })?;
// done replaying?
if sequence.number == min_max.max() {
break;
}
}
}
}
// seek write buffer according to the plan
for (sequencer_id, min_max) in &replay_ranges {
// only seek when this was not already done during replay
if min_max.min().is_none() {
write_buffer
.seek(*sequencer_id, min_max.max() + 1)
.await
.context(ReplaySeekError {
sequencer_id: *sequencer_id,
})?;
}
}
}
Ok(())
}
/// Background worker function
@ -1284,10 +1412,13 @@ mod tests {
use chrono::DateTime;
use data_types::{
chunk_metadata::ChunkStorage,
database_rules::{LifecycleRules, PartitionTemplate, TemplatePart},
database_rules::{LifecycleRules, PartitionTemplate, Partitioner, TemplatePart},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
};
use entry::{test_helpers::lp_to_entry, Sequence};
use entry::{
test_helpers::{lp_to_entries, lp_to_entry},
Sequence,
};
use futures::{stream, StreamExt, TryStreamExt};
use internal_types::{schema::Schema, selection::Selection};
use object_store::{
@ -3870,6 +4001,383 @@ mod tests {
);
}
struct TestSequencedEntry {
sequencer_id: u32,
sequence_number: u64,
lp: &'static str,
}
impl TestSequencedEntry {
fn build(self, partitioner: &impl Partitioner) -> SequencedEntry {
let Self {
sequencer_id,
sequence_number,
lp,
} = self;
let mut entries = lp_to_entries(lp, partitioner);
assert_eq!(entries.len(), 1);
SequencedEntry::new_from_sequence(
Sequence::new(sequencer_id, sequence_number),
entries.pop().unwrap(),
)
.unwrap()
}
}
struct ReplayTest {
/// Number of sequencers in this test setup.
n_sequencers: u32,
/// Entries for all sequencers that will be present during first DB start.
///
/// **NOTE: You may mix entries for different sequencers here but the entries for a single sequencer must be in
/// order (e.g. sequence number 1 must come after sequence number 0).**
sequencers_entries_start: Vec<TestSequencedEntry>,
/// The partitions (by table name and partition key) that are present after
/// [`sequencers_entries_start`](Self::sequencers_entries_start) are processed.
///
/// This data must be correct, otherwise the test will time out.
partitions_before_restart: Vec<(&'static str, &'static str)>,
/// The partitions (by table name and partition key) that will be persisted before the DB will be restarted.
partitions_persisted: Vec<(&'static str, &'static str)>,
/// Additional entires for all sequencers (on top of
/// [`sequencers_entries_start`](Self::sequencers_entries_start)) that will be added while the DB is stopped and
/// before it will be started again.
///
/// **NOTE: You may mix entries for different sequencers here but the entries for a single sequencer must be in
/// order (e.g. sequence number 1 must come after sequence number 0).**
additional_sequencers_entries_restart: Vec<TestSequencedEntry>,
/// The partitions (by table name and partition key) that are present after
/// replay and restart (from [`sequencers_entries_start`](Self::sequencers_entries_start) and
/// [`additional_sequencers_entries_restart`](Self::additional_sequencers_entries_restart)).
///
/// This data must be correct, otherwise the test will time out.
partitions_after_restart: Vec<(&'static str, &'static str)>,
}
impl ReplayTest {
async fn run(self) {
// Test that data is replayed from write buffers
// ==================== setup ====================
let object_store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::try_from(1).unwrap();
let db_name = "replay_test";
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
// ==================== do: fill write buffer state ====================
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers);
for se in self.sequencers_entries_start {
write_buffer_state.push_entry(se.build(&partition_template));
}
// ==================== do: create DB ====================
// Create a DB given a server id, an object store, write buffer, and a db name
let write_buffer = MockBufferForReading::new(write_buffer_state.clone());
let db = TestDb::builder()
.object_store(Arc::clone(&object_store))
.server_id(server_id)
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.partition_template(partition_template.clone())
.db_name(db_name)
.build()
.await
.db;
// ==================== do: start background worker ====================
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
// ==================== do: wait for partitions to reach expected state ====================
let t_0 = Instant::now();
loop {
let partitions = Self::get_partitions(&db);
let partitions: Vec<(&str, &str)> = partitions
.iter()
.map(|(s1, s2)| (s1.as_ref(), s2.as_ref()))
.collect();
if partitions[..] == self.partitions_before_restart {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// ==================== do: stop background worker ====================
shutdown.cancel();
join_handle.await.unwrap();
// ==================== do: persist some partitions ====================
for (table_name, partition_key) in &self.partitions_persisted {
// Mark the MB chunk close
let chunk_id = {
let mb_chunk = db
.rollover_partition(table_name, partition_key)
.await
.unwrap()
.unwrap();
mb_chunk.id()
};
// Move that MB chunk to RB chunk and drop it from MB
db.move_chunk_to_read_buffer(table_name, partition_key, chunk_id)
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
}
// ==================== do: re-create DB ====================
// Create a DB given a server id, an object store, write buffer, and a db name
drop(db);
let write_buffer = MockBufferForReading::new(write_buffer_state.clone());
let test_db = TestDb::builder()
.object_store(Arc::clone(&object_store))
.server_id(server_id)
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.partition_template(partition_template.clone())
.db_name(db_name)
.build()
.await;
let db = test_db.db;
let replay_plan = test_db.replay_plan;
// ==================== check: only preserved partitions are loaded ====================
let partitions = Self::get_partitions(&db);
let partitions: Vec<(&str, &str)> = partitions
.iter()
.map(|(s1, s2)| (s1.as_ref(), s2.as_ref()))
.collect();
assert_eq!(partitions[..], self.partitions_persisted);
// ==================== do: fill write buffer state with more data ====================
// that data must not be loaded during replay
for se in self.additional_sequencers_entries_restart {
write_buffer_state.push_entry(se.build(&partition_template));
}
// ==================== do: replay ====================
db.perform_replay(&replay_plan).await.unwrap();
// ==================== check: all previously known partitions are loaded ====================
let partitions = Self::get_partitions(&db);
let partitions: Vec<(&str, &str)> = partitions
.iter()
.map(|(s1, s2)| (s1.as_ref(), s2.as_ref()))
.collect();
assert_eq!(partitions[..], self.partitions_before_restart);
// ==================== do: start background worker ====================
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
// ==================== do: wait for partitions to reach expected state ====================
let t_0 = Instant::now();
loop {
let partitions = Self::get_partitions(&db);
let partitions: Vec<(&str, &str)> = partitions
.iter()
.map(|(s1, s2)| (s1.as_ref(), s2.as_ref()))
.collect();
if partitions[..] == self.partitions_after_restart {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// ==================== do: stop background worker ====================
shutdown.cancel();
join_handle.await.unwrap();
}
fn get_partitions(db: &Db) -> Vec<(String, String)> {
let mut partitions: Vec<_> = db
.catalog
.partitions()
.into_iter()
.map(|partition| {
let partition = partition.read();
(
partition.table_name().to_string(),
partition.key().to_string(),
)
})
.collect();
partitions.sort();
partitions
}
}
#[tokio::test]
async fn replay() {
ReplayTest {
n_sequencers: 1,
sequencers_entries_start: vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=1 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_2,tag_partition_by=a bar=1 0",
},
],
partitions_before_restart: vec![
("table_1", "tag_partition_by_a"),
("table_2", "tag_partition_by_a"),
],
partitions_persisted: vec![("table_2", "tag_partition_by_a")],
additional_sequencers_entries_restart: vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=b bar=1 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 3,
lp: "table_2,tag_partition_by=b bar=1 0",
},
],
partitions_after_restart: vec![
("table_1", "tag_partition_by_a"),
("table_1", "tag_partition_by_b"),
("table_2", "tag_partition_by_a"),
("table_2", "tag_partition_by_b"),
],
}
.run()
.await;
ReplayTest {
n_sequencers: 1,
sequencers_entries_start: vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=1 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_2,tag_partition_by=a bar=1 0",
},
],
partitions_before_restart: vec![
("table_1", "tag_partition_by_a"),
("table_2", "tag_partition_by_a"),
],
partitions_persisted: vec![("table_1", "tag_partition_by_a")],
additional_sequencers_entries_restart: vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=b bar=1 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 3,
lp: "table_2,tag_partition_by=b bar=1 0",
},
],
partitions_after_restart: vec![
("table_1", "tag_partition_by_a"),
("table_1", "tag_partition_by_b"),
("table_2", "tag_partition_by_a"),
("table_2", "tag_partition_by_b"),
],
}
.run()
.await;
ReplayTest {
n_sequencers: 1,
sequencers_entries_start: vec![],
partitions_before_restart: vec![],
partitions_persisted: vec![],
additional_sequencers_entries_restart: vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=a bar=1 0",
}],
partitions_after_restart: vec![("table_1", "tag_partition_by_a")],
}
.run()
.await;
ReplayTest {
n_sequencers: 3,
sequencers_entries_start: vec![
TestSequencedEntry {
sequencer_id: 1,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=1 0",
},
TestSequencedEntry {
sequencer_id: 2,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=1 0",
},
TestSequencedEntry {
sequencer_id: 2,
sequence_number: 1,
lp: "table_2,tag_partition_by=a bar=1 0",
},
],
partitions_before_restart: vec![
("table_1", "tag_partition_by_a"),
("table_2", "tag_partition_by_a"),
],
partitions_persisted: vec![("table_1", "tag_partition_by_a")],
additional_sequencers_entries_restart: vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_1,tag_partition_by=b bar=1 0",
}],
partitions_after_restart: vec![
("table_1", "tag_partition_by_a"),
("table_1", "tag_partition_by_b"),
("table_2", "tag_partition_by_a"),
],
}
.run()
.await;
}
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";

View File

@ -80,6 +80,9 @@ pub enum Error {
db_name
))]
DbPartiallyInitialized { db_name: String },
#[snafu(display("Cannot replay: {}", source))]
ReplayError { source: Box<crate::db::Error> },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -319,7 +322,13 @@ async fn try_advance_database_init_process(
let db = handle
.db_any_state()
.expect("DB should be available in this state");
db.perform_replay().await;
let replay_plan = handle
.replay_plan()
.expect("replay plan should exist in this state");
db.perform_replay(&replay_plan)
.await
.map_err(Box::new)
.context(ReplayError)?;
handle
.advance_init()