Merge branch 'main' into dom/gossip-subset

pull/24376/head
Dom 2023-08-30 12:56:59 +01:00 committed by GitHub
commit c983d1a096
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 526 additions and 126 deletions

4
Cargo.lock generated
View File

@ -3439,9 +3439,9 @@ dependencies = [
[[package]]
name = "memchr"
version = "2.6.0"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76fc44e2588d5b436dbc3c6cf62aef290f90dab6235744a93dfe1cc18f451e2c"
checksum = "f478948fd84d9f8e86967bf432640e46adfb5a4bd4f14ef7e864ab38220534ae"
[[package]]
name = "memmap2"

View File

@ -15,6 +15,7 @@
unused_crate_dependencies
)]
use thiserror::Error;
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
@ -639,6 +640,79 @@ impl ParquetFile {
}
}
impl From<ParquetFile> for generated_types::influxdata::iox::catalog::v1::ParquetFile {
fn from(v: ParquetFile) -> Self {
Self {
id: v.id.get(),
namespace_id: v.namespace_id.get(),
table_id: v.table_id.get(),
partition_identifier: Some(v.partition_id.into()),
object_store_id: v.object_store_id.to_string(),
min_time: v.min_time.get(),
max_time: v.max_time.get(),
to_delete: v.to_delete.map(|v| v.get()),
file_size_bytes: v.file_size_bytes,
row_count: v.row_count,
compaction_level: v.compaction_level as i32,
created_at: v.created_at.get(),
column_set: v.column_set.iter().map(|v| v.get()).collect(),
max_l0_created_at: v.max_l0_created_at.get(),
}
}
}
/// Errors deserialising a protobuf serialised [`ParquetFile`].
#[derive(Debug, Error)]
pub enum ParquetFileProtoError {
/// The proto type does not contain a partition ID.
#[error("no partition id specified for parquet file")]
NoPartitionId,
/// The specified partition ID is invalid.
#[error(transparent)]
InvalidPartitionId(#[from] PartitionIdProtoError),
/// The specified object store UUID is invalid.
#[error("invalid object store ID: {0}")]
InvalidObjectStoreId(uuid::Error),
/// The specified compaction level value is invalid.
#[error("invalid compaction level: {0}")]
InvalidCompactionLevel(Box<dyn std::error::Error + Send + Sync + 'static>),
}
impl TryFrom<generated_types::influxdata::iox::catalog::v1::ParquetFile> for ParquetFile {
type Error = ParquetFileProtoError;
fn try_from(
v: generated_types::influxdata::iox::catalog::v1::ParquetFile,
) -> Result<Self, Self::Error> {
Ok(Self {
id: ParquetFileId::new(v.id),
namespace_id: NamespaceId::new(v.namespace_id),
table_id: TableId::new(v.table_id),
partition_id: TransitionPartitionId::try_from(
v.partition_identifier
.ok_or(ParquetFileProtoError::NoPartitionId)?,
)?,
object_store_id: v
.object_store_id
.parse()
.map_err(ParquetFileProtoError::InvalidObjectStoreId)?,
min_time: Timestamp::new(v.min_time),
max_time: Timestamp::new(v.max_time),
to_delete: v.to_delete.map(Timestamp::new),
file_size_bytes: v.file_size_bytes,
row_count: v.row_count,
compaction_level: CompactionLevel::try_from(v.compaction_level)
.map_err(ParquetFileProtoError::InvalidCompactionLevel)?,
created_at: Timestamp::new(v.created_at),
column_set: ColumnSet::new(v.column_set.into_iter().map(ColumnId::new)),
max_l0_created_at: Timestamp::new(v.max_l0_created_at),
})
}
}
/// Data for a parquet file to be inserted into the catalog.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParquetFileParams {
@ -1624,10 +1698,12 @@ pub struct FileRange {
#[cfg(test)]
mod tests {
use super::*;
use std::borrow::Cow;
use super::*;
use ordered_float::OrderedFloat;
use proptest::{prelude::*, proptest};
#[test]
fn test_chunk_id_new() {
@ -2635,4 +2711,77 @@ mod tests {
assert_eq!(tr.start(), 1);
assert_eq!(tr.end(), 1);
}
use crate::partition::tests::arbitrary_partition_id;
prop_compose! {
/// Return an arbitrary [`Timestamp`].
pub fn arbitrary_timestamp()(value in any::<i64>()) -> Timestamp {
Timestamp::new(value)
}
}
fn arbitrary_compaction_level() -> impl prop::strategy::Strategy<Value = CompactionLevel> {
prop_oneof![
Just(CompactionLevel::Initial),
Just(CompactionLevel::FileNonOverlapped),
Just(CompactionLevel::Final),
]
}
prop_compose! {
/// Return an arbitrary [`ParquetFile`] with a randomised values.
fn arbitrary_parquet_file()(
partition_id in arbitrary_partition_id(),
parquet_file_id in any::<i64>(),
namespace_id in any::<i64>(),
table_id in any::<i64>(),
min_time in arbitrary_timestamp(),
max_time in arbitrary_timestamp(),
to_delete in prop::option::of(arbitrary_timestamp()),
file_size_bytes in any::<i64>(),
row_count in any::<i64>(),
compaction_level in arbitrary_compaction_level(),
created_at in arbitrary_timestamp(),
column_set in prop::collection::vec(any::<i64>(), 0..10),
max_l0_created_at in arbitrary_timestamp(),
) -> ParquetFile {
let column_set = ColumnSet::new(column_set.into_iter().map(ColumnId::new));
ParquetFile {
id: ParquetFileId::new(parquet_file_id),
namespace_id: NamespaceId::new(namespace_id),
table_id: TableId::new(table_id),
partition_id,
object_store_id: Uuid::new_v4(),
min_time,
max_time,
to_delete,
file_size_bytes,
row_count,
compaction_level,
created_at,
column_set,
max_l0_created_at,
}
}
}
proptest! {
/// Assert a [`ParquetFile`] is round-trippable through proto
/// serialisation.
#[test]
fn prop_parquet_file_proto_round_trip(file in arbitrary_parquet_file()) {
use generated_types::influxdata::iox::catalog::v1 as proto;
// Encoding is infallible
let encoded = proto::ParquetFile::from(file.clone());
// Decoding a valid proto ParquetFile is infallible.
let decoded = ParquetFile::try_from(encoded).unwrap();
// The deserialised value must match the input (round trippable)
assert_eq!(decoded, file);
}
}
}

View File

@ -530,7 +530,7 @@ impl Partition {
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use assert_matches::assert_matches;
@ -556,7 +556,7 @@ mod tests {
prop_compose! {
/// Return an arbitrary [`TransitionPartitionId`] with a randomised ID
/// value.
fn arbitrary_partition_id()(
pub fn arbitrary_partition_id()(
use_hash in any::<bool>(),
row_id in any::<i64>(),
hash_id in any::<[u8; PARTITION_HASH_ID_SIZE_BYTES]>()

View File

@ -32,7 +32,7 @@ message ParquetFile {
// the max timestamp of data in this file
int64 max_time = 10;
// the optional timestamp of when this file was marked for deletion
int64 to_delete = 11;
optional int64 to_delete = 11;
// the file size in bytes
int64 file_size_bytes = 12;
// the number of rows in this file

View File

@ -199,7 +199,7 @@ mod tests {
object_store_id: "bananas".to_string(),
min_time: 1,
max_time: 100,
to_delete: 0,
to_delete: Some(0),
file_size_bytes: 424242,
row_count: 4242111,
compaction_level: 4200,

View File

@ -10,26 +10,31 @@ use data_types::{
};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use once_cell::sync::Lazy;
use router::namespace_cache::{
MemoryNamespaceCache, NamespaceCache, ReadThroughCache, ShardedCache,
use router::{
gossip::anti_entropy::{actor::AntiEntropyActor, merkle::MerkleTree},
namespace_cache::{MemoryNamespaceCache, NamespaceCache, ReadThroughCache, ShardedCache},
};
static ARBITRARY_NAMESPACE: Lazy<NamespaceName<'static>> =
Lazy::new(|| "bananas".try_into().unwrap());
fn init_ns_cache(
rt: &tokio::runtime::Runtime,
initial_schema: impl IntoIterator<Item = (NamespaceName<'static>, NamespaceSchema)>,
) -> impl NamespaceCache {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(ShardedCache::new(
let cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
Arc::clone(&catalog),
));
let (actor, handle) = AntiEntropyActor::new(Arc::clone(&cache));
rt.spawn(actor.run());
let cache = MerkleTree::new(cache, handle);
let cache = Arc::new(ReadThroughCache::new(cache, Arc::clone(&catalog)));
for (name, schema) in initial_schema {
cache.put_schema(name, schema);
}
@ -66,12 +71,17 @@ fn bench_add_new_tables_with_columns(
let initial_schema = generate_namespace_schema(INITIAL_TABLE_COUNT, columns_per_table);
let schema_update = generate_namespace_schema(INITIAL_TABLE_COUNT + tables, columns_per_table);
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.build()
.unwrap();
group.throughput(Throughput::Elements((tables * columns_per_table) as _));
group.bench_function(format!("{tables}x{columns_per_table}"), |b| {
b.iter_batched(
|| {
(
init_ns_cache([(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]),
init_ns_cache(&rt, [(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]),
ARBITRARY_NAMESPACE.clone(),
schema_update.clone(),
)
@ -92,12 +102,14 @@ fn bench_add_columns_to_existing_table(
let initial_schema = generate_namespace_schema(1, initial_column_count);
let schema_update = generate_namespace_schema(1, initial_column_count + add_new_columns);
let rt = tokio::runtime::Runtime::new().unwrap();
group.throughput(Throughput::Elements(add_new_columns as _));
group.bench_function(format!("{initial_column_count}+{add_new_columns}"), |b| {
b.iter_batched(
|| {
(
init_ns_cache([(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]),
init_ns_cache(&rt, [(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]),
ARBITRARY_NAMESPACE.clone(),
schema_update.clone(),
)

View File

@ -0,0 +1,165 @@
//! An actor task maintaining [`MerkleSearchTree`] state.
use std::sync::Arc;
use data_types::{NamespaceName, NamespaceSchema};
use merkle_search_tree::{digest::RootHash, MerkleSearchTree};
use observability_deps::tracing::{debug, info, trace};
use tokio::sync::{mpsc, oneshot};
use crate::namespace_cache::{CacheMissErr, NamespaceCache};
use super::handle::AntiEntropyHandle;
/// Requests sent from an [`AntiEntropyHandle`] to an [`AntiEntropyActor`].
#[derive(Debug)]
pub(super) enum Op {
/// Request the content / merkle tree root hash.
ContentHash(oneshot::Sender<RootHash>),
}
/// A [`NamespaceCache`] anti-entropy state tracking primitive.
///
/// This actor maintains a [`MerkleSearchTree`] covering the content of
/// [`NamespaceSchema`] provided to it.
///
/// [`NamespaceCache`]: crate::namespace_cache::NamespaceCache
#[derive(Debug)]
pub struct AntiEntropyActor<T> {
cache: T,
schema_rx: mpsc::Receiver<NamespaceName<'static>>,
op_rx: mpsc::Receiver<Op>,
mst: MerkleSearchTree<NamespaceName<'static>, Arc<NamespaceSchema>>,
}
impl<T> AntiEntropyActor<T>
where
// A cache lookup in the underlying impl MUST be infallible - it's asking
// for existing records, and the cache MUST always return them.
T: NamespaceCache<ReadError = CacheMissErr>,
{
/// Initialise a new [`AntiEntropyActor`], and return the
/// [`AntiEntropyHandle`] used to interact with it.
///
/// The provided cache is used to resolve the most up-to-date
/// [`NamespaceSchema`] for given a [`NamespaceName`], which SHOULD be as
/// fast as possible and MUST be infallible.
pub fn new(cache: T) -> (Self, AntiEntropyHandle) {
// Initialise the queue used to push schema updates.
//
// Filling this queue causes dropped updates to the MST, which in turn
// cause spurious convergence rounds which are expensive relative to a
// bit of worst-case RAM (network trips, CPU for hashing, gossip
// traffic).
//
// Each queue entry has a 64 byte upper limit (excluding queue & pointer
// overhead) because the namespace name has a 64 byte upper bound. This
// means there's an upper (worst case) RAM utilisation bound of 1 MiB
// for these queue values.
let (schema_tx, schema_rx) = mpsc::channel(16_000);
// A "command" channel for non-schema-update messages.
let (op_tx, op_rx) = mpsc::channel(50);
(
Self {
cache,
schema_rx,
op_rx,
mst: MerkleSearchTree::default(),
},
AntiEntropyHandle::new(op_tx, schema_tx),
)
}
/// Block and run the anti-entropy actor until all [`AntiEntropyHandle`]
/// instances for it have been dropped.
pub async fn run(mut self) {
info!("starting anti-entropy state actor");
loop {
tokio::select! {
// Bias towards processing schema updates.
//
// This presents the risk of starvation / high latency for
// "operation" commands (sent over op_rx) under heavy update
// load (which is unlikely) at the benefit of ensuring all
// operations occur against an up-to-date MST.
//
// This helps avoid spurious convergence rounds by ensuring all
// updates are always applied as soon as possible.
biased;
// Immediately apply the available MST update.
Some(name) = self.schema_rx.recv() => self.handle_upsert(name).await,
// Else process an "operation" command from the actor handle.
Some(op) = self.op_rx.recv() => self.handle_op(op),
// And stop if both channels have closed.
else => {
info!("stopping anti-entropy state actor");
return
},
}
}
}
async fn handle_upsert(&mut self, name: NamespaceName<'static>) {
let schema = match self.cache.get_schema(&name).await {
Ok(v) => v,
Err(CacheMissErr { .. }) => {
// This is far from ideal as it causes the MST to skip applying
// an update, effectively causing it to diverge from the actual
// cache state.
//
// This will cause spurious convergence runs between peers that
// have applied this update to their MST, in turn causing it to
// converge locally.
//
// If no node applied this update, then this value will not be
// converged between any peers until a subsequent update causes
// a successful MST update on a peer.
//
// Instead the bounds require the only allowable error to be a
// cache miss error (no I/O error or other problem) - this can't
// ever happen, because state updates are enqueued for existing
// schemas, so there MUST always be an entry returned.
panic!("cache miss for namespace schema {}", name);
}
};
trace!(%name, ?schema, "applying schema");
self.mst.upsert(name, &schema);
}
fn handle_op(&mut self, op: Op) {
match op {
Op::ContentHash(tx) => {
let root_hash = self.mst.root_hash().clone();
debug!(%root_hash, "generated content hash");
// The caller may have stopped listening, so ignore any errors.
let _ = tx.send(root_hash);
}
}
}
}
#[cfg(test)]
mod tests {
use crate::namespace_cache::MemoryNamespaceCache;
use super::*;
#[tokio::test]
async fn test_empty_content_hash_fixture() {
let (actor, handle) = AntiEntropyActor::new(Arc::new(MemoryNamespaceCache::default()));
tokio::spawn(actor.run());
let got = handle.content_hash().await;
assert_eq!(got.to_string(), "UEnXR4Cj4H1CAqtH1M7y9A==");
}
}

View File

@ -0,0 +1,114 @@
//! A handle to interact with a [`AntiEntropyActor`].
//!
//! [`AntiEntropyActor`]: super::actor::AntiEntropyActor
use data_types::NamespaceName;
use merkle_search_tree::digest::RootHash;
use observability_deps::tracing::error;
use tokio::sync::{mpsc, oneshot};
use super::actor::Op;
/// A handle to an [`AntiEntropyActor`].
///
/// [`AntiEntropyActor`]: super::actor::AntiEntropyActor
#[derive(Debug, Clone)]
pub struct AntiEntropyHandle {
// Non-schema actor requests.
op_tx: mpsc::Sender<Op>,
// Schema update notifications (prioritised by actor)
schema_tx: mpsc::Sender<NamespaceName<'static>>,
}
impl AntiEntropyHandle {
pub(super) fn new(
op_tx: mpsc::Sender<Op>,
schema_tx: mpsc::Sender<NamespaceName<'static>>,
) -> AntiEntropyHandle {
Self { op_tx, schema_tx }
}
/// Request the [`MerkleSearchTree`] observe a new update to `name`.
///
/// This call is cheap - it places `name` into a queue to be processed
/// asynchronously by a background worker. If the queue is saturated, an
/// error is logged and the update is dropped.
///
/// # Ordering
///
/// Calls to this method MUST only be made when a subsequent cache lookup
/// would yield a schema for `name`.
///
/// # Starvation
///
/// The [`AntiEntropyActor`] prioritises processing upsert requests over all
/// other operations - an extreme rate of calls to this method may adversely
/// affect the latency of other [`AntiEntropyHandle`] methods.
///
/// [`MerkleSearchTree`]: merkle_search_tree::MerkleSearchTree
/// [`AntiEntropyActor`]: super::actor::AntiEntropyActor
pub(crate) fn observe_update(&self, name: NamespaceName<'static>) {
// NOTE: this doesn't send the actual schema and it likely never should.
//
// If this method sent `(name, schema)` tuples, it would require the
// stream of calls to contain monotonic schemas - that means that
// `schema` must always "go forwards", with subsequent calls containing
// a (non-strict) superset of the content of prior calls.
//
// This invariant can't be preserved in a multi-threaded system where
// updates can be applied to the same cached entry concurrently:
//
// - T1: cache.upsert(name, schema(table_a)) -> schema(table_a)
// - T2: cache.upsert(name, schema(table_b)) -> schema(table_a, table_b)
// - T2: handle.upsert(name, schema(table_a, table_b))
// - T1: handle.upsert(name, schema(table_a))
//
// The last call violates the monotonicity requirement - T1 sets the
// anti-entropy state to the pre-merged value containing only table_a,
// overwriting the correct state that reflected both tables.
//
// The monotonic property is provided by the underlying cache
// implementation; namespace schemas are always merged. By providing the
// actor the name of the updated schema, it can read the merged and most
// up to date schema directly from the cache itself, ensuring
// monotonicity of the schemas regardless of call order.
if self.schema_tx.try_send(name.clone()).is_err() {
// If enqueuing this schema update fails, the MST will become
// out-of-sync w.r.t the content of the cache, and falsely believe
// the update applied to the cache in this schema (if any) has not
// been applied locally.
//
// If this happens, peers may start conflict resolution rounds to
// converge this difference, eventually causing the local node to
// perform a no-op update to the local key, and in turn causing
// another requeue attempt here.
//
// This is bad for efficiency because it causes spurious syncs, but
// does not effect correctness due to the monotonicity of updates.
//
// If every peer hits this same edge case, none of the MSTs will
// contain the updated schema, and no convergence will be attempted
// for this update until a subsequent enqueue for "name" succeeds.
// This does affect correctness, but is exceedingly unlikely, and
// logged for debugging purposes.
error!(%name, "error enqueuing schema update for anti-entropy");
}
}
/// Return the current content hash ([`RootHash`]) describing the set of
/// [`NamespaceSchema`] observed so far.
///
/// [`NamespaceSchema`]: data_types::NamespaceSchema
pub(crate) async fn content_hash(&self) -> RootHash {
let (tx, rx) = oneshot::channel();
self.op_tx
.send(Op::ContentHash(tx))
.await
.expect("anti-entropy actor has stopped");
rx.await.expect("anti-entropy actor has stopped")
}
}

View File

@ -7,17 +7,17 @@ use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use merkle_search_tree::MerkleSearchTree;
use parking_lot::Mutex;
use crate::namespace_cache::{ChangeStats, NamespaceCache};
use super::handle::AntiEntropyHandle;
/// A [`NamespaceCache`] decorator that maintains a content hash / consistency
/// proof.
///
/// This [`MerkleTree`] tracks the content of the underlying [`NamespaceCache`]
/// delegate, maintaining a compact, serialisable representation in a
/// [MerkleSearchTree] that can be used to perform efficient differential
/// [`MerkleSearchTree`] that can be used to perform efficient differential
/// convergence (anti-entropy) of peers to provide eventual consistency.
///
/// # Merge Correctness
@ -35,29 +35,31 @@ use crate::namespace_cache::{ChangeStats, NamespaceCache};
///
/// If two nodes are producing differing hashes for the same underlying content,
/// they will appear to never converge.
///
/// [`MerkleSearchTree`]: merkle_search_tree::MerkleSearchTree
#[derive(Debug)]
pub struct MerkleTree<T> {
inner: T,
mst: Mutex<MerkleSearchTree<NamespaceName<'static>, NamespaceSchema>>,
handle: AntiEntropyHandle,
}
impl<T> MerkleTree<T> {
impl<T> MerkleTree<T>
where
T: Send + Sync,
{
/// Initialise a new [`MerkleTree`] that generates a content hash covering
/// `inner`.
pub fn new(inner: T) -> Self {
Self {
inner,
mst: Mutex::new(MerkleSearchTree::default()),
}
pub fn new(inner: T, handle: AntiEntropyHandle) -> Self {
Self { inner, handle }
}
/// Return a 128-bit hash describing the content of the inner `T`.
///
/// This hash only covers a subset of schema fields (see
/// [`NamespaceContentHash`]).
pub fn content_hash(&self) -> merkle_search_tree::digest::RootHash {
self.mst.lock().root_hash().clone()
pub async fn content_hash(&self) -> merkle_search_tree::digest::RootHash {
self.handle.content_hash().await
}
}
@ -84,9 +86,8 @@ where
// return value (the new content of the cache).
let (schema, diff) = self.inner.put_schema(namespace.clone(), schema);
// Intercept the the resulting cache entry state and merge it into the
// merkle tree.
self.mst.lock().upsert(namespace, &schema);
// Attempt to asynchronously update the MST.
self.handle.observe_update(namespace.clone());
// And pass through the return value to the caller.
(schema, diff)

View File

@ -1,5 +1,7 @@
//! Anti-entropy primitives providing eventual consistency over gossip.
pub mod actor;
pub mod handle;
pub mod merkle;
#[cfg(test)]
@ -7,7 +9,7 @@ mod tests {
use std::{collections::BTreeMap, sync::Arc};
use crate::{
gossip::anti_entropy::merkle::MerkleTree,
gossip::anti_entropy::{actor::AntiEntropyActor, merkle::MerkleTree},
namespace_cache::{MemoryNamespaceCache, NamespaceCache},
};
@ -112,11 +114,22 @@ mod tests {
// An arbitrary namespace with an ID that lies outside of `updates`.
last_update in arbitrary_namespace_schema(42_i64..100),
) {
let ns_a = MerkleTree::new(Arc::new(MemoryNamespaceCache::default()));
let ns_b = MerkleTree::new(Arc::new(MemoryNamespaceCache::default()));
tokio::runtime::Runtime::new().unwrap().block_on(async move {
let cache_a = Arc::new(MemoryNamespaceCache::default());
let cache_b = Arc::new(MemoryNamespaceCache::default());
let (actor_a, handle_a) = AntiEntropyActor::new(Arc::clone(&cache_a));
let (actor_b, handle_b) = AntiEntropyActor::new(Arc::clone(&cache_b));
// Start the MST actors
tokio::spawn(actor_a.run());
tokio::spawn(actor_b.run());
let ns_a = MerkleTree::new(cache_a, handle_a.clone());
let ns_b = MerkleTree::new(cache_b, handle_b.clone());
// Invariant: two empty namespace caches have the same content hash.
assert_eq!(ns_a.content_hash(), ns_b.content_hash());
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
for update in updates {
// Generate a unique, deterministic name for this namespace.
@ -128,7 +141,7 @@ mod tests {
// Invariant: after applying the same update, the content hashes
// MUST match (even if this update was a no-op / not an update)
assert_eq!(ns_a.content_hash(), ns_b.content_hash());
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
}
// At this point all updates have been applied to both caches.
@ -140,12 +153,13 @@ mod tests {
// Invariant: last_update definitely added new cache content,
// therefore the cache content hashes MUST diverge.
assert_ne!(ns_a.content_hash(), ns_b.content_hash());
assert_ne!(handle_a.content_hash().await, handle_b.content_hash().await);
// Invariant: applying the update to the other cache converges their
// content hashes.
ns_b.put_schema(name, last_update);
assert_eq!(ns_a.content_hash(), ns_b.content_hash());
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
});
}
}
}

View File

@ -18,7 +18,7 @@
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
use data_types::{PartitionHashId, PartitionId, TableId, TransitionPartitionId};
use data_types::{TableId, TransitionPartitionId};
use generated_types::influxdata::iox::catalog::v1::*;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use observability_deps::tracing::*;
@ -47,7 +47,11 @@ impl catalog_service_server::CatalogService for CatalogService {
) -> Result<Response<GetParquetFilesByPartitionIdResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let partition_id = to_partition_id(req.partition_identifier)?;
let partition_id = req
.partition_identifier
.map(TransitionPartitionId::try_from)
.ok_or_else(|| Status::invalid_argument("no partition id specified"))?
.map_err(|e| Status::invalid_argument(e.to_string()))?;
let parquet_files = repos
.parquet_files()
@ -58,7 +62,7 @@ impl catalog_service_server::CatalogService for CatalogService {
Status::not_found(e.to_string())
})?;
let parquet_files: Vec<_> = parquet_files.into_iter().map(to_parquet_file).collect();
let parquet_files: Vec<_> = parquet_files.into_iter().map(Into::into).collect();
let response = GetParquetFilesByPartitionIdResponse { parquet_files };
@ -125,7 +129,7 @@ impl catalog_service_server::CatalogService for CatalogService {
Status::not_found(e.to_string())
})?;
let parquet_files: Vec<_> = parquet_files.into_iter().map(to_parquet_file).collect();
let parquet_files: Vec<_> = parquet_files.into_iter().map(Into::into).collect();
let response = GetParquetFilesByNamespaceTableResponse { parquet_files };
@ -161,7 +165,7 @@ impl catalog_service_server::CatalogService for CatalogService {
Status::not_found(e.to_string())
})?;
let parquet_files: Vec<_> = parquet_files.into_iter().map(to_parquet_file).collect();
let parquet_files: Vec<_> = parquet_files.into_iter().map(Into::into).collect();
let response = GetParquetFilesByNamespaceResponse { parquet_files };
@ -169,68 +173,9 @@ impl catalog_service_server::CatalogService for CatalogService {
}
}
fn to_partition_identifier(partition_id: &TransitionPartitionId) -> PartitionIdentifier {
match partition_id {
TransitionPartitionId::Deterministic(hash_id) => PartitionIdentifier {
id: Some(partition_identifier::Id::HashId(
hash_id.as_bytes().to_owned(),
)),
},
TransitionPartitionId::Deprecated(id) => PartitionIdentifier {
id: Some(partition_identifier::Id::CatalogId(id.get())),
},
}
}
fn to_partition_id(
partition_identifier: Option<PartitionIdentifier>,
) -> Result<TransitionPartitionId, Status> {
let partition_id =
match partition_identifier
.and_then(|pi| pi.id)
.ok_or(Status::invalid_argument(
"No partition identifier specified",
))? {
partition_identifier::Id::HashId(bytes) => TransitionPartitionId::Deterministic(
PartitionHashId::try_from(&bytes[..]).map_err(|e| {
Status::invalid_argument(format!(
"Could not parse bytes as a `PartitionHashId`: {e}"
))
})?,
),
partition_identifier::Id::CatalogId(id) => {
TransitionPartitionId::Deprecated(PartitionId::new(id))
}
};
Ok(partition_id)
}
// converts the catalog ParquetFile to protobuf
fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile {
let partition_identifier = to_partition_identifier(&p.partition_id);
ParquetFile {
id: p.id.get(),
namespace_id: p.namespace_id.get(),
table_id: p.table_id.get(),
partition_identifier: Some(partition_identifier),
object_store_id: p.object_store_id.to_string(),
min_time: p.min_time.get(),
max_time: p.max_time.get(),
to_delete: p.to_delete.map(|t| t.get()).unwrap_or(0),
file_size_bytes: p.file_size_bytes,
row_count: p.row_count,
compaction_level: p.compaction_level as i32,
created_at: p.created_at.get(),
column_set: p.column_set.iter().map(|id| id.get()).collect(),
max_l0_created_at: p.max_l0_created_at.get(),
}
}
// converts the catalog Partition to protobuf
fn to_partition(p: data_types::Partition) -> Partition {
let identifier = to_partition_identifier(&p.transition_partition_id());
let identifier = PartitionIdentifier::from(p.transition_partition_id());
let array_sort_key_ids = p
.sort_key_ids
@ -306,7 +251,7 @@ mod tests {
Arc::clone(&catalog)
};
let partition_identifier = to_partition_identifier(&partition_id);
let partition_identifier = PartitionIdentifier::from(partition_id);
let grpc = super::CatalogService::new(catalog);
let request = GetParquetFilesByPartitionIdRequest {
@ -318,7 +263,7 @@ mod tests {
.await
.expect("rpc request should succeed");
let response = tonic_response.into_inner();
let expect: Vec<_> = [p1, p2].into_iter().map(to_parquet_file).collect();
let expect: Vec<ParquetFile> = [p1, p2].into_iter().map(Into::into).collect();
assert_eq!(expect, response.parquet_files,);
}