Merge pull request #2093 from influxdata/crepererum/naive_replay2

feat: implement naive replay
pull/24376/head
kodiakhq[bot] 2021-07-23 08:23:57 +00:00 committed by GitHub
commit 50b436a8a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 738 additions and 54 deletions

View File

@ -564,7 +564,7 @@ impl Default for ReplayPlanner {
}
/// Plan that contains all necessary information to orchastrate a replay.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ReplayPlan {
/// Replay range (inclusive minimum sequence number, inclusive maximum sequence number) for every sequencer.
///

View File

@ -57,6 +57,7 @@ async fn chunk_pruning_sql() {
let TestDb {
db,
metric_registry,
..
} = setup().await;
let expected = vec![
@ -112,6 +113,7 @@ async fn chunk_pruning_influxrpc() {
let TestDb {
db,
metric_registry,
..
} = setup().await;
let predicate = PredicateBuilder::new()

View File

@ -10,6 +10,7 @@ use data_types::{
use metrics::MetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use parquet_file::catalog::PreservedCatalog;
use persistence_windows::checkpoint::ReplayPlan;
use query::exec::Executor;
use write_buffer::config::WriteBufferConfig;
@ -393,7 +394,10 @@ enum DatabaseState {
RulesLoaded { rules: Arc<DatabaseRules> },
/// Catalog is loaded but data from sequencers / write buffers is not yet replayed.
Replay { db: Arc<Db> },
Replay {
db: Arc<Db>,
replay_plan: ReplayPlan,
},
/// Fully initialized database.
Initialized {
@ -456,6 +460,15 @@ impl DatabaseState {
DatabaseState::Initialized { db, .. } => Some(db.rules()),
}
}
fn replay_plan(&self) -> Option<ReplayPlan> {
match self {
DatabaseState::Known { .. } => None,
DatabaseState::RulesLoaded { .. } => None,
DatabaseState::Replay { replay_plan, .. } => Some(replay_plan.clone()),
DatabaseState::Initialized { .. } => None,
}
}
}
impl Drop for DatabaseState {
@ -538,6 +551,11 @@ impl<'a> DatabaseHandle<'a> {
self.state().db_any_state()
}
/// Get replay plan, if any.
pub fn replay_plan(&self) -> Option<ReplayPlan> {
self.state().replay_plan()
}
/// Commit modification done to this handle to config.
///
/// After commiting a new handle for the same database can be created.
@ -585,6 +603,7 @@ impl<'a> DatabaseHandle<'a> {
&mut self,
preserved_catalog: PreservedCatalog,
catalog: Catalog,
replay_plan: ReplayPlan,
write_buffer: Option<WriteBufferConfig>,
) -> Result<()> {
match self.state().as_ref() {
@ -600,7 +619,7 @@ impl<'a> DatabaseHandle<'a> {
};
let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs)));
self.state = Some(Arc::new(DatabaseState::Replay { db }));
self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan }));
Ok(())
}
@ -615,7 +634,7 @@ impl<'a> DatabaseHandle<'a> {
/// Advance database state to [`Initialized`](DatabaseStateCode::Initialized).
pub fn advance_init(&mut self) -> Result<()> {
match self.state().as_ref() {
DatabaseState::Replay { db } => {
DatabaseState::Replay { db, .. } => {
if self.config.shutdown.is_cancelled() {
error!("server is shutting down");
return ServerShuttingDown.fail();
@ -750,7 +769,7 @@ mod test {
db_reservation.advance_rules_loaded(rules).unwrap();
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
&name,
config.object_store(),
config.server_id(),
@ -760,7 +779,7 @@ mod test {
.await
.unwrap();
db_reservation
.advance_replay(preserved_catalog, catalog, None)
.advance_replay(preserved_catalog, catalog, replay_plan, None)
.unwrap();
db_reservation.advance_init().unwrap();
@ -835,14 +854,16 @@ mod test {
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), config.server_id());
assert!(db_reservation.rules().is_none());
assert!(db_reservation.replay_plan().is_none());
db_reservation.advance_rules_loaded(rules).unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::RulesLoaded);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), config.server_id());
assert!(db_reservation.rules().is_some());
assert!(db_reservation.replay_plan().is_none());
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
&name,
config.object_store(),
config.server_id(),
@ -852,18 +873,20 @@ mod test {
.await
.unwrap();
db_reservation
.advance_replay(preserved_catalog, catalog, None)
.advance_replay(preserved_catalog, catalog, replay_plan, None)
.unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::Replay);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), config.server_id());
assert!(db_reservation.rules().is_some());
assert!(db_reservation.replay_plan().is_some());
db_reservation.advance_init().unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::Initialized);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), config.server_id());
assert!(db_reservation.rules().is_some());
assert!(db_reservation.replay_plan().is_none());
db_reservation.commit();
}
@ -922,7 +945,7 @@ mod test {
let name = DatabaseName::new("foo").unwrap();
let config = make_config(None);
let rules = DatabaseRules::new(name.clone());
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
&name,
config.object_store(),
config.server_id(),
@ -936,7 +959,7 @@ mod test {
let mut db_reservation = config.create_db(name.clone()).unwrap();
db_reservation.advance_rules_loaded(rules).unwrap();
db_reservation
.advance_replay(preserved_catalog, catalog, None)
.advance_replay(preserved_catalog, catalog, replay_plan, None)
.unwrap();
db_reservation.advance_init().unwrap();
db_reservation.commit();

View File

@ -24,6 +24,7 @@ use data_types::{
};
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use entry::{Entry, SequencedEntry};
use futures::TryStreamExt;
use futures::{stream::BoxStream, StreamExt};
use internal_types::schema::Schema;
use metrics::KeyValue;
@ -36,10 +37,12 @@ use parquet_file::{
catalog::{CheckpointData, PreservedCatalog},
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
};
use persistence_windows::checkpoint::ReplayPlan;
use persistence_windows::persistence_windows::PersistenceWindows;
use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::collections::{BTreeMap, BTreeSet};
use std::{
any::Any,
collections::HashMap,
@ -140,6 +143,59 @@ pub enum Error {
table_name: String,
partition_key: String,
},
#[snafu(display("Cannot create replay plan: {}", source))]
ReplayPlanError {
source: persistence_windows::checkpoint::Error,
},
#[snafu(display("Cannot seek sequencer {} during replay: {}", sequencer_id, source))]
ReplaySeekError {
sequencer_id: u32,
source: write_buffer::core::WriteBufferError,
},
#[snafu(display(
"Cannot get entry from sequencer {} during replay: {}",
sequencer_id,
source
))]
ReplayEntryError {
sequencer_id: u32,
source: write_buffer::core::WriteBufferError,
},
#[snafu(display(
"For sequencer {} expected to find sequence {} but replay jumped to {}",
sequencer_id,
expected_sequence_number,
actual_sequence_number,
))]
ReplayEntryLostError {
sequencer_id: u32,
actual_sequence_number: u64,
expected_sequence_number: u64,
},
#[snafu(display(
"Cannot store from sequencer {} during replay: {}",
sequencer_id,
source
))]
ReplayStoreError {
sequencer_id: u32,
source: Box<Error>,
},
#[snafu(display(
"Replay plan references unknown sequencer {}, known sequencers are {:?}",
sequencer_id,
sequencer_ids,
))]
ReplayUnknownSequencer {
sequencer_id: u32,
sequencer_ids: Vec<u32>,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -715,8 +771,107 @@ impl Db {
}
/// Perform sequencer-driven replay for this DB.
pub async fn perform_replay(&self) {
// TODO: implement replay
pub async fn perform_replay(&self, replay_plan: &ReplayPlan) -> Result<()> {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer {
let db_name = self.rules.read().db_name().to_string();
info!(%db_name, "starting replay");
let mut write_buffer = write_buffer
.try_lock()
.expect("no streams should exist at this point");
// check if write buffer and replay plan agree on the set of sequencer ids
let sequencer_ids: BTreeSet<_> = write_buffer
.streams()
.into_iter()
.map(|(sequencer_id, _stream)| sequencer_id)
.collect();
for sequencer_id in replay_plan.sequencer_ids() {
if !sequencer_ids.contains(&sequencer_id) {
return Err(Error::ReplayUnknownSequencer {
sequencer_id,
sequencer_ids: sequencer_ids.iter().copied().collect(),
});
}
}
// determine replay ranges based on the plan
let replay_ranges: BTreeMap<_, _> = sequencer_ids
.into_iter()
.filter_map(|sequencer_id| {
replay_plan
.replay_range(sequencer_id)
.map(|min_max| (sequencer_id, min_max))
})
.collect();
// seek write buffer according to the plan
for (sequencer_id, min_max) in &replay_ranges {
if let Some(min) = min_max.min() {
info!(%db_name, sequencer_id, sequence_number=min, "seek sequencer in preperation for replay");
write_buffer
.seek(*sequencer_id, min)
.await
.context(ReplaySeekError {
sequencer_id: *sequencer_id,
})?;
} else {
let sequence_number = min_max.max() + 1;
info!(%db_name, sequencer_id, sequence_number, "seek sequencer that did not require replay");
write_buffer
.seek(*sequencer_id, sequence_number)
.await
.context(ReplaySeekError {
sequencer_id: *sequencer_id,
})?;
}
}
// replay ranges
for (sequencer_id, mut stream) in write_buffer.streams() {
if let Some(min_max) = replay_ranges.get(&sequencer_id) {
if min_max.min().is_none() {
// no replay required
continue;
}
info!(
%db_name,
sequencer_id,
sequence_number_min=min_max.min().expect("checked above"),
sequence_number_max=min_max.max(),
"replay sequencer",
);
while let Some(entry) = stream
.stream
.try_next()
.await
.context(ReplayEntryError { sequencer_id })?
{
let sequence = *entry.sequence().expect("entry must be sequenced");
if sequence.number > min_max.max() {
return Err(Error::ReplayEntryLostError {
sequencer_id,
actual_sequence_number: sequence.number,
expected_sequence_number: min_max.max(),
});
}
let entry = Arc::new(entry);
self.store_sequenced_entry(entry)
.map_err(Box::new)
.context(ReplayStoreError { sequencer_id })?;
// done replaying?
if sequence.number == min_max.max() {
break;
}
}
}
}
}
Ok(())
}
/// Background worker function
@ -1284,10 +1439,13 @@ mod tests {
use chrono::DateTime;
use data_types::{
chunk_metadata::ChunkStorage,
database_rules::{LifecycleRules, PartitionTemplate, TemplatePart},
database_rules::{LifecycleRules, PartitionTemplate, Partitioner, TemplatePart},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
};
use entry::{test_helpers::lp_to_entry, Sequence};
use entry::{
test_helpers::{lp_to_entries, lp_to_entry},
Sequence,
};
use futures::{stream, StreamExt, TryStreamExt};
use internal_types::{schema::Schema, selection::Selection};
use object_store::{
@ -1299,7 +1457,10 @@ mod tests {
metadata::IoxParquetMetaData,
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};
use persistence_windows::min_max_sequence::MinMaxSequence;
use persistence_windows::{
checkpoint::{PartitionCheckpoint, PersistCheckpointBuilder, ReplayPlanner},
min_max_sequence::{MinMaxSequence, OptionalMinMaxSequence},
};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
use std::{
collections::{BTreeMap, HashSet},
@ -3870,6 +4031,440 @@ mod tests {
);
}
struct TestSequencedEntry {
sequencer_id: u32,
sequence_number: u64,
lp: &'static str,
}
impl TestSequencedEntry {
fn build(self, partitioner: &impl Partitioner) -> SequencedEntry {
let Self {
sequencer_id,
sequence_number,
lp,
} = self;
let mut entries = lp_to_entries(lp, partitioner);
assert_eq!(entries.len(), 1);
SequencedEntry::new_from_sequence(
Sequence::new(sequencer_id, sequence_number),
entries.pop().unwrap(),
)
.unwrap()
}
}
struct ReplayTest {
/// Number of sequencers in this test setup.
n_sequencers: u32,
/// Entries for all sequencers that will be present during first DB start.
///
/// **NOTE: You may mix entries for different sequencers here but the entries for a single sequencer must be in
/// order (e.g. sequence number 1 must come after sequence number 0).**
sequencers_entries_start: Vec<TestSequencedEntry>,
/// The partitions (by table name and partition key) that are present after
/// [`sequencers_entries_start`](Self::sequencers_entries_start) are processed.
///
/// This data must be correct, otherwise the test will time out.
partitions_before_restart: Vec<(&'static str, &'static str)>,
/// The partitions (by table name and partition key) that will be persisted before the DB will be restarted.
partitions_persisted: Vec<(&'static str, &'static str)>,
/// Additional entires for all sequencers (on top of
/// [`sequencers_entries_start`](Self::sequencers_entries_start)) that will be added while the DB is stopped and
/// before it will be started again.
///
/// **NOTE: You may mix entries for different sequencers here but the entries for a single sequencer must be in
/// order (e.g. sequence number 1 must come after sequence number 0).**
additional_sequencers_entries_restart: Vec<TestSequencedEntry>,
/// The partitions (by table name and partition key) that are present after
/// replay and restart (from [`sequencers_entries_start`](Self::sequencers_entries_start) and
/// [`additional_sequencers_entries_restart`](Self::additional_sequencers_entries_restart)).
///
/// This data must be correct, otherwise the test will time out.
partitions_after_restart: Vec<(&'static str, &'static str)>,
}
impl ReplayTest {
async fn run(self) {
// Test that data is replayed from write buffers
// ==================== setup ====================
let object_store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::try_from(1).unwrap();
let db_name = "replay_test";
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
// ==================== do: fill write buffer state ====================
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers);
for se in self.sequencers_entries_start {
write_buffer_state.push_entry(se.build(&partition_template));
}
// ==================== do: create DB ====================
// Create a DB given a server id, an object store, write buffer, and a db name
let write_buffer = MockBufferForReading::new(write_buffer_state.clone());
let db = TestDb::builder()
.object_store(Arc::clone(&object_store))
.server_id(server_id)
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.partition_template(partition_template.clone())
.db_name(db_name)
.build()
.await
.db;
// ==================== do: start background worker ====================
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
// ==================== do: wait for partitions to reach expected state ====================
let t_0 = Instant::now();
loop {
let partitions = Self::get_partitions(&db);
let partitions: Vec<(&str, &str)> = partitions
.iter()
.map(|(s1, s2)| (s1.as_ref(), s2.as_ref()))
.collect();
if partitions[..] == self.partitions_before_restart {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// ==================== do: stop background worker ====================
shutdown.cancel();
join_handle.await.unwrap();
// ==================== do: persist some partitions ====================
for (table_name, partition_key) in &self.partitions_persisted {
// Mark the MB chunk close
let chunk_id = {
let mb_chunk = db
.rollover_partition(table_name, partition_key)
.await
.unwrap()
.unwrap();
mb_chunk.id()
};
// Move that MB chunk to RB chunk and drop it from MB
db.move_chunk_to_read_buffer(table_name, partition_key, chunk_id)
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
}
// ==================== do: re-create DB ====================
// Create a DB given a server id, an object store, write buffer, and a db name
drop(db);
let write_buffer = MockBufferForReading::new(write_buffer_state.clone());
let test_db = TestDb::builder()
.object_store(Arc::clone(&object_store))
.server_id(server_id)
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.partition_template(partition_template.clone())
.db_name(db_name)
.build()
.await;
let db = test_db.db;
let replay_plan = test_db.replay_plan;
// ==================== check: only preserved partitions are loaded ====================
let partitions = Self::get_partitions(&db);
let partitions: Vec<(&str, &str)> = partitions
.iter()
.map(|(s1, s2)| (s1.as_ref(), s2.as_ref()))
.collect();
assert_eq!(partitions[..], self.partitions_persisted);
// ==================== do: fill write buffer state with more data ====================
// that data must not be loaded during replay
for se in self.additional_sequencers_entries_restart {
write_buffer_state.push_entry(se.build(&partition_template));
}
// ==================== do: replay ====================
db.perform_replay(&replay_plan).await.unwrap();
// ==================== check: all previously known partitions are loaded ====================
let partitions = Self::get_partitions(&db);
let partitions: Vec<(&str, &str)> = partitions
.iter()
.map(|(s1, s2)| (s1.as_ref(), s2.as_ref()))
.collect();
assert_eq!(partitions[..], self.partitions_before_restart);
// ==================== do: start background worker ====================
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
// ==================== do: wait for partitions to reach expected state ====================
let t_0 = Instant::now();
loop {
let partitions = Self::get_partitions(&db);
let partitions: Vec<(&str, &str)> = partitions
.iter()
.map(|(s1, s2)| (s1.as_ref(), s2.as_ref()))
.collect();
if partitions[..] == self.partitions_after_restart {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// ==================== do: stop background worker ====================
shutdown.cancel();
join_handle.await.unwrap();
}
fn get_partitions(db: &Db) -> Vec<(String, String)> {
let mut partitions: Vec<_> = db
.catalog
.partitions()
.into_iter()
.map(|partition| {
let partition = partition.read();
(
partition.table_name().to_string(),
partition.key().to_string(),
)
})
.collect();
partitions.sort();
partitions
}
}
#[tokio::test]
async fn replay_ok_two_partitions_persist_second() {
ReplayTest {
n_sequencers: 1,
sequencers_entries_start: vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=1 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_2,tag_partition_by=a bar=1 0",
},
],
partitions_before_restart: vec![
("table_1", "tag_partition_by_a"),
("table_2", "tag_partition_by_a"),
],
partitions_persisted: vec![("table_2", "tag_partition_by_a")],
additional_sequencers_entries_restart: vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=b bar=1 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 3,
lp: "table_2,tag_partition_by=b bar=1 0",
},
],
partitions_after_restart: vec![
("table_1", "tag_partition_by_a"),
("table_1", "tag_partition_by_b"),
("table_2", "tag_partition_by_a"),
("table_2", "tag_partition_by_b"),
],
}
.run()
.await;
}
#[tokio::test]
async fn replay_ok_two_partitions_persist_first() {
ReplayTest {
n_sequencers: 1,
sequencers_entries_start: vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=1 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_2,tag_partition_by=a bar=1 0",
},
],
partitions_before_restart: vec![
("table_1", "tag_partition_by_a"),
("table_2", "tag_partition_by_a"),
],
partitions_persisted: vec![("table_1", "tag_partition_by_a")],
additional_sequencers_entries_restart: vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=b bar=1 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 3,
lp: "table_2,tag_partition_by=b bar=1 0",
},
],
partitions_after_restart: vec![
("table_1", "tag_partition_by_a"),
("table_1", "tag_partition_by_b"),
("table_2", "tag_partition_by_a"),
("table_2", "tag_partition_by_b"),
],
}
.run()
.await;
}
#[tokio::test]
async fn replay_ok_nothing_to_replay() {
ReplayTest {
n_sequencers: 1,
sequencers_entries_start: vec![],
partitions_before_restart: vec![],
partitions_persisted: vec![],
additional_sequencers_entries_restart: vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=a bar=1 0",
}],
partitions_after_restart: vec![("table_1", "tag_partition_by_a")],
}
.run()
.await;
}
#[tokio::test]
async fn replay_ok_different_sequencer_situations() {
// three sequencers:
// 0: no data at all
// 1: replay required, additional incoming data during downtime
// 2: replay required, no new data
//
// three partitions:
// table 1, partition a: comes from sequencer 1 and 2, gets persisted
// table 1, partition b: part of the new data in sequencer 1
// table 2: partition a: from sequencer 1, not persisted but recovered during replay
ReplayTest {
n_sequencers: 3,
sequencers_entries_start: vec![
TestSequencedEntry {
sequencer_id: 1,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=1 0",
},
TestSequencedEntry {
sequencer_id: 2,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=1 0",
},
TestSequencedEntry {
sequencer_id: 2,
sequence_number: 1,
lp: "table_2,tag_partition_by=a bar=1 0",
},
],
partitions_before_restart: vec![
("table_1", "tag_partition_by_a"),
("table_2", "tag_partition_by_a"),
],
partitions_persisted: vec![("table_1", "tag_partition_by_a")],
additional_sequencers_entries_restart: vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_1,tag_partition_by=b bar=1 0",
}],
partitions_after_restart: vec![
("table_1", "tag_partition_by_a"),
("table_1", "tag_partition_by_b"),
("table_2", "tag_partition_by_a"),
],
}
.run()
.await;
}
#[tokio::test]
async fn replay_fail_sequencers_change() {
// create write buffer w/ sequencer 0 and 1
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(2);
let write_buffer = MockBufferForReading::new(write_buffer_state);
// create DB
let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.build()
.await
.db;
// construct replay plan for sequencers 0 and 2
let mut sequencer_numbers = BTreeMap::new();
sequencer_numbers.insert(0, OptionalMinMaxSequence::new(Some(0), 1));
sequencer_numbers.insert(2, OptionalMinMaxSequence::new(Some(0), 1));
let partition_checkpoint = PartitionCheckpoint::new(
Arc::from("table"),
Arc::from("partition"),
sequencer_numbers,
Utc::now(),
);
let builder = PersistCheckpointBuilder::new(partition_checkpoint);
let (partition_checkpoint, database_checkpoint) = builder.build();
let mut replay_planner = ReplayPlanner::new();
replay_planner.register_checkpoints(&partition_checkpoint, &database_checkpoint);
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(&replay_plan).await;
assert_contains!(
res.unwrap_err().to_string(),
"Replay plan references unknown sequencer"
);
}
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";

View File

@ -11,12 +11,37 @@ use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
};
use snafu::ResultExt;
use persistence_windows::checkpoint::{ReplayPlan, ReplayPlanner};
use snafu::{ResultExt, Snafu};
use crate::db::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle};
use super::catalog::Catalog;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Cannot build replay plan: {}", source))]
CannotBuildReplayPlan {
source: persistence_windows::checkpoint::Error,
},
#[snafu(display("Cannot create new empty preserved catalog: {}", source))]
CannotCreateCatalog {
source: parquet_file::catalog::Error,
},
#[snafu(display("Cannot load preserved catalog: {}", source))]
CannotLoadCatalog {
source: parquet_file::catalog::Error,
},
#[snafu(display("Cannot wipe preserved catalog: {}", source))]
CannotWipeCatalog {
source: parquet_file::catalog::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Load preserved catalog state from store.
///
/// If no catalog exists yet, a new one will be created.
@ -29,20 +54,22 @@ pub async fn load_or_create_preserved_catalog(
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
wipe_on_error: bool,
) -> std::result::Result<(PreservedCatalog, Catalog), parquet_file::catalog::Error> {
) -> Result<(PreservedCatalog, Catalog, ReplayPlan)> {
// first try to load existing catalogs
match PreservedCatalog::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput::new(db_name, server_id, Arc::clone(&metrics_registry)),
LoaderEmptyInput::new(db_name, server_id, Arc::clone(&metrics_registry)),
)
.await
{
Ok(Some(catalog)) => {
Ok(Some((preserved_catalog, loader))) => {
// successfull load
info!("Found existing catalog for DB {}", db_name);
Ok(catalog)
let Loader { catalog, planner } = loader;
let plan = planner.build().context(CannotBuildReplayPlan)?;
Ok((preserved_catalog, catalog, plan))
}
Ok(None) => {
// no catalog yet => create one
@ -65,7 +92,9 @@ pub async fn load_or_create_preserved_catalog(
// broken => wipe for now (at least during early iterations)
error!("cannot load catalog, so wipe it: {}", e);
PreservedCatalog::wipe(&object_store, server_id, db_name).await?;
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.context(CannotWipeCatalog)?;
create_preserved_catalog(
db_name,
@ -75,7 +104,7 @@ pub async fn load_or_create_preserved_catalog(
)
.await
} else {
Err(e)
Err(Error::CannotLoadCatalog { source: e })
}
}
}
@ -89,25 +118,30 @@ pub async fn create_preserved_catalog(
object_store: Arc<ObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
) -> std::result::Result<(PreservedCatalog, Catalog), parquet_file::catalog::Error> {
PreservedCatalog::new_empty(
) -> Result<(PreservedCatalog, Catalog, ReplayPlan)> {
let (preserved_catalog, loader) = PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput::new(db_name, server_id, Arc::clone(&metrics_registry)),
LoaderEmptyInput::new(db_name, server_id, Arc::clone(&metrics_registry)),
)
.await
.context(CannotCreateCatalog)?;
let Loader { catalog, planner } = loader;
let plan = planner.build().context(CannotBuildReplayPlan)?;
Ok((preserved_catalog, catalog, plan))
}
/// All input required to create an empty [`Catalog`](crate::db::catalog::Catalog).
/// All input required to create an empty [`Loader`]
#[derive(Debug)]
pub struct CatalogEmptyInput {
pub struct LoaderEmptyInput {
domain: ::metrics::Domain,
metrics_registry: Arc<::metrics::MetricRegistry>,
metric_labels: Vec<KeyValue>,
}
impl CatalogEmptyInput {
impl LoaderEmptyInput {
fn new(db_name: &str, server_id: ServerId, metrics_registry: Arc<MetricRegistry>) -> Self {
let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()),
@ -123,16 +157,26 @@ impl CatalogEmptyInput {
}
}
impl CatalogState for Catalog {
type EmptyInput = CatalogEmptyInput;
/// Helper to track data during catalog loading.
#[derive(Debug)]
struct Loader {
catalog: Catalog,
planner: ReplayPlanner,
}
impl CatalogState for Loader {
type EmptyInput = LoaderEmptyInput;
fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self {
Self::new(
Arc::from(db_name),
data.domain,
data.metrics_registry,
data.metric_labels,
)
Self {
catalog: Catalog::new(
Arc::from(db_name),
data.domain,
data.metrics_registry,
data.metric_labels,
),
planner: ReplayPlanner::new(),
}
}
fn add(
@ -150,10 +194,15 @@ impl CatalogState for Catalog {
path: info.path.clone(),
})?;
// remember file for replay
self.planner
.register_checkpoints(&iox_md.partition_checkpoint, &iox_md.database_checkpoint);
// Create a parquet chunk for this chunk
let metrics = self
.catalog
.metrics_registry
.register_domain_with_labels("parquet", self.metric_labels.clone());
.register_domain_with_labels("parquet", self.catalog.metric_labels.clone());
let metrics = ParquetChunkMetrics::new(&metrics);
let parquet_chunk = ParquetChunk::new(
@ -170,8 +219,9 @@ impl CatalogState for Catalog {
// Get partition from the catalog
// Note that the partition might not exist yet if the chunk is loaded from an existing preserved catalog.
let (partition, table_schema) =
self.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key);
let (partition, table_schema) = self
.catalog
.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key);
let mut partition = partition.write();
if partition.chunk(iox_md.chunk_id).is_some() {
return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists { path: info.path });
@ -188,7 +238,7 @@ impl CatalogState for Catalog {
fn remove(&mut self, path: DirsAndFileName) -> parquet_file::catalog::Result<()> {
let mut removed_any = false;
for partition in self.partitions() {
for partition in self.catalog.partitions() {
let mut partition = partition.write();
let mut to_remove = vec![];
@ -222,8 +272,9 @@ impl CatalogState for Catalog {
mod tests {
use std::convert::TryFrom;
use parquet_file::catalog::test_helpers::{
assert_catalog_state_implementation, TestCatalogState,
use parquet_file::catalog::{
test_helpers::{assert_catalog_state_implementation, TestCatalogState},
CheckpointData,
};
use crate::db::checkpoint_data_from_catalog;
@ -253,18 +304,19 @@ mod tests {
.unwrap();
}
fn checkpoint_data_from_loader(loader: &Loader) -> CheckpointData {
checkpoint_data_from_catalog(&loader.catalog)
}
#[tokio::test]
async fn test_catalog_state() {
let metrics_registry = Arc::new(::metrics::MetricRegistry::new());
let empty_input = CatalogEmptyInput {
let empty_input = LoaderEmptyInput {
domain: metrics_registry.register_domain("catalog"),
metrics_registry,
metric_labels: vec![],
};
assert_catalog_state_implementation::<Catalog, _>(
empty_input,
checkpoint_data_from_catalog,
)
.await;
assert_catalog_state_implementation::<Loader, _>(empty_input, checkpoint_data_from_loader)
.await;
}
}

View File

@ -80,6 +80,9 @@ pub enum Error {
db_name
))]
DbPartiallyInitialized { db_name: String },
#[snafu(display("Cannot replay: {}", source))]
ReplayError { source: Box<crate::db::Error> },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -286,7 +289,7 @@ async fn try_advance_database_init_process(
}
DatabaseStateCode::RulesLoaded => {
// rules already loaded => continue with loading preserved catalog
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
&handle.db_name(),
handle.object_store(),
handle.server_id(),
@ -308,7 +311,7 @@ async fn try_advance_database_init_process(
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
handle
.advance_replay(preserved_catalog, catalog, write_buffer)
.advance_replay(preserved_catalog, catalog, replay_plan, write_buffer)
.map_err(Box::new)
.context(InitDbError)?;
@ -319,7 +322,13 @@ async fn try_advance_database_init_process(
let db = handle
.db_any_state()
.expect("DB should be available in this state");
db.perform_replay().await;
let replay_plan = handle
.replay_plan()
.expect("replay plan should exist in this state");
db.perform_replay(&replay_plan)
.await
.map_err(Box::new)
.context(ReplayError)?;
handle
.advance_init()

View File

@ -635,7 +635,7 @@ where
db_reservation.advance_rules_loaded(rules.clone())?;
// load preserved catalog
let (preserved_catalog, catalog) = create_preserved_catalog(
let (preserved_catalog, catalog, replay_plan) = create_preserved_catalog(
rules.db_name(),
Arc::clone(&self.store),
config.server_id(),
@ -652,7 +652,7 @@ where
source: e,
})?;
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?;
db_reservation.advance_replay(preserved_catalog, catalog, replay_plan, write_buffer)?;
// no actual replay required
db_reservation.advance_init()?;

View File

@ -7,6 +7,7 @@ use data_types::{
DatabaseName,
};
use object_store::ObjectStore;
use persistence_windows::checkpoint::ReplayPlan;
use query::{exec::Executor, QueryDatabase};
use write_buffer::config::WriteBufferConfig;
@ -22,6 +23,7 @@ use data_types::database_rules::LifecycleRules;
pub struct TestDb {
pub db: Arc<Db>,
pub metric_registry: metrics::TestMetricRegistry,
pub replay_plan: ReplayPlan,
}
impl TestDb {
@ -60,7 +62,7 @@ impl TestDbBuilder {
let exec = Arc::new(Executor::new(1));
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
db_name.as_str(),
Arc::clone(&object_store),
server_id,
@ -106,6 +108,7 @@ impl TestDbBuilder {
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Arc::new(Db::new(database_to_commit, Arc::new(JobRegistry::new()))),
replay_plan,
}
}