Merge pull request #1668 from influxdata/crepererum/issue1381

feat: catalog checkpointing infrastructure
pull/24376/head
kodiakhq[bot] 2021-06-10 14:05:46 +00:00 committed by GitHub
commit ab92c0c321
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 967 additions and 111 deletions

1
Cargo.lock generated
View File

@ -3746,6 +3746,7 @@ dependencies = [
"influxdb_line_protocol",
"influxdb_tsm",
"mutable_buffer",
"object_store",
"packers",
"query",
"query_tests",

View File

@ -64,7 +64,7 @@ message Transaction {
// Revision counter, must by "previous revision" + 1 or 0 for the first transaction.
uint64 revision_counter = 3;
// UUID unique to this transaction. Used to detect concurrent transactions. For the first transaction this field is
// empty.
string uuid = 4;
@ -76,4 +76,21 @@ message Transaction {
//
// Timestamp of the start of the transaction.
google.protobuf.Timestamp start_timestamp = 6;
}
// Definition of how this transaction relates to previous transaction and how it should be processed.
enum Encoding {
// Unspecified encoding, will lead to an controlled error. Every transaction object MUST decide if the encoding
// is "delta" or "full".
ENCODING_UNSPECIFIED = 0;
// The actions in this message only encode changes to the previous transactions which must be processed
// beforehand. This is the default for "ordinary" transactions.
ENCODING_DELTA = 1;
// The actions in this message contain the full state of the catalog at this point in time. This is used for checkpoints.
ENCODING_FULL = 2;
}
// Definition on how this transaction is encoded.
Encoding encoding = 7;
}

File diff suppressed because it is too large Load Diff

View File

@ -156,6 +156,13 @@ impl CatalogState for TracerCatalogState {
// Do NOT remove the file since we still need it for time travel
Ok(())
}
fn files(
&self,
) -> std::collections::HashMap<DirsAndFileName, Arc<parquet::file::metadata::ParquetMetaData>>
{
unimplemented!("File tracking not implemented for TracerCatalogState")
}
}
#[cfg(test)]
@ -236,7 +243,7 @@ mod tests {
let (path, _md) = make_metadata(&object_store, "foo", 3).await;
paths_delete.push(path.display());
transaction.commit().await.unwrap();
transaction.commit(false).await.unwrap();
}
// run clean-up
@ -278,7 +285,7 @@ mod tests {
let (path, md) = make_metadata(&object_store, "foo", i).await;
transaction.add_parquet(&path.clone().into(), &md).unwrap();
transaction.commit().await.unwrap();
transaction.commit(false).await.unwrap();
path.display()
},

View File

@ -130,12 +130,12 @@ where
.add_parquet(&path, metadata)
.context(FileRecordFailure)?;
}
transaction.commit().await.context(CommitFailure)?;
transaction.commit(false).await.context(CommitFailure)?;
} else {
// we do not have any files for this transaction (there might have been other actions though or it was
// an empty transaction) => create new empty transaction
let transaction = catalog.open_transaction().await;
transaction.commit().await.context(CommitFailure)?;
transaction.commit(false).await.context(CommitFailure)?;
}
}
}
@ -299,12 +299,12 @@ mod tests {
.await;
transaction.add_parquet(&path, &md).unwrap();
transaction.commit().await.unwrap();
transaction.commit(false).await.unwrap();
}
{
// empty transaction
let transaction = catalog.open_transaction().await;
transaction.commit().await.unwrap();
transaction.commit(false).await.unwrap();
}
{
let mut transaction = catalog.open_transaction().await;
@ -320,7 +320,7 @@ mod tests {
.await;
transaction.add_parquet(&path, &md).unwrap();
transaction.commit().await.unwrap();
transaction.commit(false).await.unwrap();
}
// store catalog state
@ -486,7 +486,7 @@ mod tests {
)
.await;
transaction.commit().await.unwrap();
transaction.commit(false).await.unwrap();
}
// wipe catalog

View File

@ -736,7 +736,7 @@ impl Db {
transaction
.add_parquet(&path.into(), &parquet_metadata)
.context(TransactionError)?;
transaction.commit().await.context(TransactionError)?;
transaction.commit(false).await.context(TransactionError)?;
}
// We know this chunk is ParquetFile type
@ -1290,6 +1290,15 @@ impl CatalogState for Catalog {
fn remove(&self, _path: DirsAndFileName) -> parquet_file::catalog::Result<()> {
unimplemented!("parquet files cannot be removed from the catalog for now")
}
fn files(
&self,
) -> std::collections::HashMap<
DirsAndFileName,
Arc<datafusion::parquet::file::metadata::ParquetMetaData>,
> {
todo!("wire up catalog file tracking")
}
}
pub mod test_helpers {

View File

@ -19,6 +19,7 @@ flate2 = "1.0.20"
influxdb_tsm = { path = "../influxdb_tsm" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
mutable_buffer = { path = "../mutable_buffer" }
object_store = { path = "../object_store" }
packers = { path = "../packers" }
query = { path = "../query" }
query_tests = { path = "../query_tests" }
@ -33,7 +34,6 @@ tokio = { version = "1.0", features = ["macros", "time"] }
name = "influxrpc"
harness = false
[[bench]]
name = "snapshot"
harness = false
@ -42,7 +42,6 @@ harness = false
name = "write"
harness = false
[[bench]]
name = "encoders"
harness = false
@ -58,3 +57,7 @@ harness = false
[[bench]]
name = "packers"
harness = false
[[bench]]
name = "catalog_persistence"
harness = false

View File

@ -0,0 +1,146 @@
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, SamplingMode};
use object_store::{memory::InMemory, throttle::ThrottledStore, ObjectStore};
use server::{db::test_helpers::write_lp, utils::TestDb};
use std::{sync::Arc, time::Duration};
use tokio::{
runtime::{Handle, Runtime},
sync::Mutex,
task::block_in_place,
};
/// Number of chunks simulated for persistence.
const N_CHUNKS: u32 = 100;
/// Number of tags for the test table.
const N_TAGS: usize = 10;
/// Number of fields for the test table.
const N_FIELDS: usize = 10;
/// Run all benchmarks that test catalog persistence.
fn benchmark_catalog_persistence(c: &mut Criterion) {
let object_store = create_throttled_store();
let setup_done = Mutex::new(false);
// benchmark reading the catalog
let mut group = c.benchmark_group("catalog");
group.measurement_time(Duration::from_secs(30));
group.sample_size(10);
group.sampling_mode(SamplingMode::Flat);
group.warm_up_time(Duration::from_secs(10));
group.bench_function("catalog_restore", |b| {
b.to_async(Runtime::new().unwrap()).iter_batched(
|| {
// Threaded runtime is already running.
block_in_place(|| {
Handle::current().block_on(setup(Arc::clone(&object_store), &setup_done));
});
},
|_| async {
let db = create_persisted_db(Arc::clone(&object_store)).await.db;
// test that data is actually loaded
let partition_key = "1970-01-01T00";
let table_name = "cpu";
let chunk_id = 0;
assert!(db
.table_summary(partition_key, table_name, chunk_id)
.is_some());
},
BatchSize::SmallInput,
);
});
group.finish();
}
/// Persist a database to the given object store with [`N_CHUNKS`] chunks.
async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
let mut guard = done.lock().await;
if *guard {
return;
}
let db = create_persisted_db(object_store).await.db;
let lp = create_lp(N_TAGS, N_FIELDS);
let partition_key = "1970-01-01T00";
for chunk_id in 0..N_CHUNKS {
let table_names = write_lp(&db, &lp);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, &table_name, chunk_id, &Default::default())
.await
.unwrap();
db.write_chunk_to_object_store(
partition_key,
&table_name,
chunk_id,
&Default::default(),
)
.await
.unwrap();
db.unload_read_buffer(partition_key, &table_name, chunk_id)
.await
.unwrap();
}
}
*guard = true;
}
/// Create a persisted database and load its catalog.
#[inline(never)]
async fn create_persisted_db(object_store: Arc<ObjectStore>) -> TestDb {
TestDb::builder().object_store(object_store).build().await
}
/// Create line protocol for a single entry with `n_tags` tags and `n_fields` fields.
///
/// The table is `"cpu"` and the timestamp is `0`.
fn create_lp(n_tags: usize, n_fields: usize) -> String {
let mut lp = "cpu".to_string();
for i in 0..n_tags {
lp.push_str(&format!(",tag_{}=x", i));
}
lp.push(' ');
for i in 0..n_fields {
if i > 0 {
lp.push(',')
}
lp.push_str(&format!("field_{}=1", i));
}
lp.push_str(" 0");
lp
}
/// Create object store with somewhat realistic operation latencies.
fn create_throttled_store() -> Arc<ObjectStore> {
let mut throttled_store = ThrottledStore::<InMemory>::new(InMemory::new());
// for every call: assume a 100ms latency
throttled_store.wait_delete_per_call = Duration::from_millis(100);
throttled_store.wait_get_per_call = Duration::from_millis(100);
throttled_store.wait_list_per_call = Duration::from_millis(100);
throttled_store.wait_list_with_delimiter_per_call = Duration::from_millis(100);
throttled_store.wait_put_per_call = Duration::from_millis(100);
// for list operations: assume we need 1 call per 1k entries at 100ms
throttled_store.wait_list_per_entry = Duration::from_millis(100) / 1_000;
throttled_store.wait_list_with_delimiter_per_entry = Duration::from_millis(100) / 1_000;
// for upload/download: assume 1GByte/s
throttled_store.wait_get_per_byte = Duration::from_secs(1) / 1_000_000_000;
throttled_store.wait_put_per_byte = Duration::from_secs(1) / 1_000_000_000;
Arc::new(ObjectStore::new_in_memory_throttled(throttled_store))
}
criterion_group!(benches, benchmark_catalog_persistence);
criterion_main!(benches);

View File

@ -8,7 +8,6 @@ use read_filter::benchmark_read_filter;
use read_group::benchmark_read_group;
use tag_values::benchmark_tag_values;
// criterion_group!(benches, benchmark_tag_values, benchmark_read_filter);
criterion_group!(
benches,
benchmark_tag_values,