feat: persist part+db checkpoint in parquets and catalog

This will be required for replay on server startup.
pull/24376/head
Marco Neumann 2021-07-01 18:18:55 +02:00
parent a35b334ee5
commit cdab1bed05
11 changed files with 251 additions and 15 deletions

1
Cargo.lock generated
View File

@ -2630,6 +2630,7 @@ dependencies = [
"generated_types",
"internal_types",
"metrics",
"mutable_buffer",
"object_store",
"observability_deps",
"parking_lot",

View File

@ -19,4 +19,38 @@ message IoxMetadata {
// Chunk ID.
uint32 chunk_id = 5;
// Partition checkpoint with pre-split data for the in this file.
PartitionCheckpoint partition_checkpoint = 6;
// Database checkpoint created at the time of the write.
DatabaseCheckpoint database_checkpoint = 7;
}
// Partition checkpoint.
//
// Note that a partition checkpoint belongs to a single partition (via table name and partition key). Since this
// checkpoint is usually serialized as part of `IoxMetadata`, the partition information is NOT repeated as part of this
// message.
message PartitionCheckpoint {
// Maps sequencer_id to the minimum and maximum sequence numbers seen.
map<uint32, MinMaxSequence> sequencer_numbers = 1;
// Minimum unpersisted timestamp.
google.protobuf.Timestamp min_unpersisted_timestamp = 2;
}
// Record of the playback state for the whole database.
//
// This effectively contains the minimum sequence numbers over the whole database that are the starting point for
// replay.
message DatabaseCheckpoint {
// Maps `sequencer_id` to the minimum sequence numbers seen.
map<uint32, uint64> min_sequencer_numbers = 1;
}
// The minimum and maximum sequence numbers seen for a given sequencer.
message MinMaxSequence {
uint64 min = 1;
uint64 max = 2;
}

View File

@ -215,7 +215,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Immutable record of the playback state for a single partition.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PartitionCheckpoint {
/// Table of the partition.
table_name: String,
@ -268,18 +268,40 @@ impl PartitionCheckpoint {
pub fn sequencer_ids(&self) -> Vec<u32> {
self.sequencer_numbers.keys().copied().collect()
}
/// Iterate over sequencer numbers.
pub fn sequencer_numbers_iter(&self) -> impl Iterator<Item = (u32, MinMaxSequence)> + '_ {
self.sequencer_numbers
.iter()
.map(|(sequencer_id, min_max)| (*sequencer_id, *min_max))
}
/// Minimum unpersisted timestamp.
pub fn min_unpersisted_timestamp(&self) -> DateTime<Utc> {
self.min_unpersisted_timestamp
}
}
/// Immutable record of the playback state for the whole database.
///
/// This effectively contains the minimum sequence numbers over the whole database that are the starting point for replay.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatabaseCheckpoint {
/// Maps `sequencer_id` to the minimum sequence numbers seen.
min_sequencer_numbers: BTreeMap<u32, u64>,
}
impl DatabaseCheckpoint {
/// Create new database checkpoint.
///
/// **This should only rarely be be used directly. Consider using [`PersistCheckpointBuilder`] to collect
/// database-wide checkpoints!**
pub fn new(min_sequencer_numbers: BTreeMap<u32, u64>) -> Self {
Self {
min_sequencer_numbers,
}
}
/// Get minimum sequence number that should be used during replay of the given sequencer.
///
/// This will return `None` for unknown sequencer. This might have multiple reasons, e.g. in case of Apache Kafka it
@ -295,6 +317,13 @@ impl DatabaseCheckpoint {
pub fn sequencer_ids(&self) -> Vec<u32> {
self.min_sequencer_numbers.keys().copied().collect()
}
/// Iterate over minimum sequencer numbers
pub fn min_sequencer_number_iter(&self) -> impl Iterator<Item = (u32, u64)> + '_ {
self.min_sequencer_numbers
.iter()
.map(|(sequencer_id, min)| (*sequencer_id, *min))
}
}
/// Builder that helps with recording checkpoints from persistence windows during the persistence phase.

View File

@ -229,7 +229,7 @@ struct Window {
}
/// The minimum and maximum sequence numbers seen for a given sequencer
#[derive(Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct MinMaxSequence {
min: u64,
max: u64,

View File

@ -16,6 +16,7 @@ futures = "0.3.7"
generated_types = { path = "../generated_types" }
internal_types = {path = "../internal_types"}
metrics = { path = "../metrics" }
mutable_buffer = { path = "../mutable_buffer" }
object_store = {path = "../object_store"}
observability_deps = { path = "../observability_deps" }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time

View File

@ -31,7 +31,7 @@ use uuid::Uuid;
/// Current version for serialized transactions.
///
/// For breaking changes, this will change.
pub const TRANSACTION_VERSION: u32 = 6;
pub const TRANSACTION_VERSION: u32 = 7;
/// File suffix for transaction files in object store.
pub const TRANSACTION_FILE_SUFFIX: &str = "txn";

View File

@ -86,7 +86,7 @@
//! [Apache Parquet]: https://parquet.apache.org/
//! [Apache Thrift]: https://thrift.apache.org/
//! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
use std::{convert::TryInto, sync::Arc};
use std::{collections::BTreeMap, convert::TryInto, sync::Arc};
use chrono::{DateTime, Utc};
use data_types::partition_metadata::{
@ -94,6 +94,10 @@ use data_types::partition_metadata::{
};
use generated_types::influxdata::iox::catalog::v1 as proto;
use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema};
use mutable_buffer::{
checkpoint::{DatabaseCheckpoint, PartitionCheckpoint},
persistence_windows::MinMaxSequence,
};
use parquet::{
arrow::parquet_to_arrow_schema,
file::{
@ -117,7 +121,7 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputPro
///
/// **Important: When changing this structure, consider bumping the
/// [catalog transaction version](crate::catalog::TRANSACTION_VERSION)!**
pub const METADATA_VERSION: u32 = 1;
pub const METADATA_VERSION: u32 = 2;
/// File-level metadata key to store the IOx-specific data.
///
@ -213,6 +217,9 @@ pub enum Error {
#[snafu(display("Field missing while parsing IOx metadata: {}", field))]
IoxMetadataFieldMissing { field: String },
#[snafu(display("Min-max relation wrong while parsing IOx metadata"))]
IoxMetadataMinMax,
#[snafu(display("Cannot parse IOx metadata from Protobuf: {}", source))]
IoxMetadataBroken {
source: Box<dyn std::error::Error + Send + Sync>,
@ -235,7 +242,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// [metadata version](METADATA_VERSION)!**
///
/// [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3
#[allow(missing_copy_implementations)] // we want to extend this type in the future
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct IoxMetadata {
/// Timestamp when this file was created.
@ -249,11 +255,18 @@ pub struct IoxMetadata {
/// Chunk ID.
pub chunk_id: u32,
/// Partition checkpoint with pre-split data for the in this file.
pub partition_checkpoint: PartitionCheckpoint,
/// Database checkpoint created at the time of the write.
pub database_checkpoint: DatabaseCheckpoint,
}
impl IoxMetadata {
/// Read from protobuf message
fn from_protobuf(data: &[u8]) -> Result<Self> {
// extract protobuf message from bytes
let proto_msg = proto::IoxMetadata::decode(data)
.map_err(|err| Box::new(err) as _)
.context(IoxMetadataBroken)?;
@ -266,6 +279,7 @@ impl IoxMetadata {
});
}
// extract creation timestamp
let creation_timestamp: DateTime<Utc> = proto_msg
.creation_timestamp
.ok_or_else(|| Error::IoxMetadataFieldMissing {
@ -275,22 +289,98 @@ impl IoxMetadata {
.map_err(|err| Box::new(err) as _)
.context(IoxMetadataBroken)?;
// extract partition checkpoint
let proto_partition_checkpoint =
proto_msg
.partition_checkpoint
.ok_or_else(|| Error::IoxMetadataFieldMissing {
field: "partition_checkpoint".to_string(),
})?;
let sequencer_numbers = proto_partition_checkpoint
.sequencer_numbers
.into_iter()
.map(|(sequencer_id, min_max)| {
if min_max.min <= min_max.max {
Ok((sequencer_id, MinMaxSequence::new(min_max.min, min_max.max)))
} else {
Err(Error::IoxMetadataMinMax)
}
})
.collect::<Result<BTreeMap<u32, MinMaxSequence>>>()?;
let min_unpersisted_timestamp = proto_partition_checkpoint
.min_unpersisted_timestamp
.ok_or_else(|| Error::IoxMetadataFieldMissing {
field: "partition_checkpoint.min_unpersisted_timestamp".to_string(),
})?
.try_into()
.map_err(|err| Box::new(err) as _)
.context(IoxMetadataBroken)?;
let partition_checkpoint = PartitionCheckpoint::new(
proto_msg.table_name.clone(),
proto_msg.partition_key.clone(),
sequencer_numbers,
min_unpersisted_timestamp,
);
// extract database checkpoint
let proto_database_checkpoint =
proto_msg
.database_checkpoint
.ok_or_else(|| Error::IoxMetadataFieldMissing {
field: "database_checkpoint".to_string(),
})?;
let min_sequencer_numbers = proto_database_checkpoint
.min_sequencer_numbers
.into_iter()
.collect();
let database_checkpoint = DatabaseCheckpoint::new(min_sequencer_numbers);
Ok(Self {
creation_timestamp,
table_name: proto_msg.table_name,
partition_key: proto_msg.partition_key,
chunk_id: proto_msg.chunk_id,
partition_checkpoint,
database_checkpoint,
})
}
/// Convert to protobuf v3 message.
pub(crate) fn to_protobuf(&self) -> std::result::Result<Vec<u8>, prost::EncodeError> {
let proto_partition_checkpoint = proto::PartitionCheckpoint {
sequencer_numbers: self
.partition_checkpoint
.sequencer_numbers_iter()
.map(|(sequencer_id, min_max)| {
(
sequencer_id,
proto::MinMaxSequence {
min: min_max.min(),
max: min_max.max(),
},
)
})
.collect(),
min_unpersisted_timestamp: Some(
self.partition_checkpoint.min_unpersisted_timestamp().into(),
),
};
let proto_database_checkpoint = proto::DatabaseCheckpoint {
min_sequencer_numbers: self
.database_checkpoint
.min_sequencer_number_iter()
.collect(),
};
let proto_msg = proto::IoxMetadata {
version: METADATA_VERSION,
creation_timestamp: Some(self.creation_timestamp.into()),
table_name: self.table_name.clone(),
partition_key: self.partition_key.clone(),
chunk_id: self.chunk_id,
partition_checkpoint: Some(proto_partition_checkpoint),
database_checkpoint: Some(proto_database_checkpoint),
};
let mut buf = Vec::new();
@ -647,7 +737,8 @@ mod tests {
use internal_types::schema::TIME_COLUMN_NAME;
use crate::test_utils::{
chunk_addr, load_parquet_from_store, make_chunk, make_chunk_no_row_group, make_object_store,
chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, make_chunk,
make_chunk_no_row_group, make_object_store,
};
#[tokio::test]
@ -814,11 +905,17 @@ mod tests {
#[test]
fn test_iox_metadata_from_protobuf_checks_version() {
let table_name = "table1";
let partition_key = "part1";
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(table_name, partition_key);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: "table1".to_string(),
partition_key: "part1".to_string(),
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
chunk_id: 1337,
partition_checkpoint,
database_checkpoint,
};
let proto_bytes = metadata.to_protobuf().unwrap();

View File

@ -191,6 +191,7 @@ mod tests {
use std::num::NonZeroU32;
use crate::metadata::IoxMetadata;
use crate::test_utils::create_partition_and_database_checkpoint;
use crate::{catalog::test_helpers::TestCatalogState, storage::MemWriter};
use crate::{
catalog::PreservedCatalog,
@ -466,11 +467,15 @@ mod tests {
let (record_batches, _schema, _column_summaries, _num_rows) = make_record_batch("foo");
let storage = Storage::new(Arc::clone(object_store), server_id);
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(table_name, partition_key);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
chunk_id,
partition_checkpoint,
database_checkpoint,
};
let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches));
let (path, parquet_md) = storage

View File

@ -442,7 +442,9 @@ mod tests {
use std::convert::TryFrom;
use super::*;
use crate::test_utils::{make_object_store, make_record_batch};
use crate::test_utils::{
create_partition_and_database_checkpoint, make_object_store, make_record_batch,
};
use arrow::array::{ArrayRef, StringArray};
use arrow_util::assert_batches_eq;
use chrono::Utc;
@ -451,11 +453,17 @@ mod tests {
#[tokio::test]
async fn test_parquet_contains_key_value_metadata() {
let table_name = "table1";
let partition_key = "part1";
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(table_name, partition_key);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: "table1".to_string(),
partition_key: "part1".to_string(),
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
chunk_id: 1337,
partition_checkpoint,
database_checkpoint,
};
// create parquet file
@ -517,11 +525,15 @@ mod tests {
batch.schema(),
vec![Arc::new(batch)],
));
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(table_name, partition_key);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
chunk_id,
partition_checkpoint,
database_checkpoint,
};
let (path, _) = storage

View File

@ -1,4 +1,4 @@
use std::{num::NonZeroU32, sync::Arc};
use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc};
use arrow::{
array::{
@ -21,6 +21,10 @@ use internal_types::{
schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use mutable_buffer::{
checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder},
persistence_windows::MinMaxSequence,
};
use object_store::{memory::InMemory, path::Path, ObjectStore, ObjectStoreApi};
use parquet::{
arrow::{ArrowReader, ParquetFileArrowReader},
@ -146,11 +150,15 @@ pub async fn make_chunk_given_record_batch(
} else {
Box::pin(MemoryStream::new(record_batches))
};
let (partition_checkpoint, database_checkpoint) =
create_partition_and_database_checkpoint(&addr.table_name, &addr.partition_key);
let metadata = IoxMetadata {
creation_timestamp: Utc.timestamp(10, 20),
table_name: addr.table_name.to_string(),
partition_key: addr.partition_key.to_string(),
chunk_id: addr.chunk_id,
partition_checkpoint,
database_checkpoint,
};
let (path, parquet_metadata) = storage
.write_to_object_store(addr.clone(), stream, metadata)
@ -739,3 +747,25 @@ pub async fn make_metadata(
IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(),
)
}
/// Create [`PartitionCheckpoint`] and [`DatabaseCheckpoint`] for testing.
pub fn create_partition_and_database_checkpoint(
table_name: &str,
partition_key: &str,
) -> (PartitionCheckpoint, DatabaseCheckpoint) {
// create partition checkpoint
let mut sequencer_numbers = BTreeMap::new();
sequencer_numbers.insert(1, MinMaxSequence::new(15, 18));
sequencer_numbers.insert(2, MinMaxSequence::new(25, 28));
let min_unpersisted_timestamp = Utc.timestamp(10, 20);
let partition_checkpoint = PartitionCheckpoint::new(
table_name.to_string(),
partition_key.to_string(),
sequencer_numbers,
min_unpersisted_timestamp,
);
// build database checkpoint
let builder = PersistCheckpointBuilder::new(partition_checkpoint);
builder.build()
}

View File

@ -13,6 +13,9 @@ use chrono::Utc;
use data_types::job::Job;
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::selection::Selection;
use mutable_buffer::checkpoint::{
DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder,
};
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::{debug, warn};
use parquet_file::{
@ -21,7 +24,7 @@ use parquet_file::{
storage::Storage,
};
use snafu::ResultExt;
use std::{future::Future, sync::Arc};
use std::{collections::BTreeMap, future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use super::error::{
@ -107,11 +110,15 @@ pub fn write_chunk_to_object_store(
//
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let (partition_checkpoint, database_checkpoint) =
fake_partition_and_database_checkpoint(&addr.table_name, &addr.partition_key);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: addr.table_name.to_string(),
partition_key: addr.partition_key.to_string(),
chunk_id: addr.chunk_id,
partition_checkpoint,
database_checkpoint,
};
let (path, parquet_metadata) = storage
.write_to_object_store(addr, stream, metadata)
@ -184,3 +191,23 @@ pub fn write_chunk_to_object_store(
Ok((tracker, fut.track(registration)))
}
/// Fake until we have the split implementation in-place.
fn fake_partition_and_database_checkpoint(
table_name: &str,
partition_key: &str,
) -> (PartitionCheckpoint, DatabaseCheckpoint) {
// create partition checkpoint
let sequencer_numbers = BTreeMap::new();
let min_unpersisted_timestamp = Utc::now();
let partition_checkpoint = PartitionCheckpoint::new(
table_name.to_string(),
partition_key.to_string(),
sequencer_numbers,
min_unpersisted_timestamp,
);
// build database checkpoint
let builder = PersistCheckpointBuilder::new(partition_checkpoint);
builder.build()
}