feat(ingester): ingester2 init

Adds an ingester2 crate to hold the MVP of the Kafkaless project.

This was necessary due to the tight coupling of the ingester internals
with tests in external crates, and eases the parallel development of two
version of the ingester.

This commit contains various changes from the "ingester" crate, mostly
removing the concept/references to a "shard" or "ShardId" where
possible.

This commit does not copy over all of the "ingester" crate - only those
components that are definitely needed. I will drag across more as
functionality is implemented.
pull/24376/head
Dom Dwyer 2022-11-24 15:26:34 +01:00
parent 92719f5b2b
commit a66fc0b645
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
40 changed files with 5474 additions and 0 deletions

41
Cargo.lock generated
View File

@ -2446,6 +2446,47 @@ dependencies = [
"write_summary",
]
[[package]]
name = "ingester2"
version = "0.1.0"
dependencies = [
"arrow",
"arrow_util",
"assert_matches",
"async-trait",
"backoff",
"data_types",
"datafusion",
"datafusion_util",
"dml",
"futures",
"generated_types",
"hashbrown 0.13.1",
"iox_catalog",
"iox_query",
"iox_time",
"lazy_static",
"metric",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"object_store",
"observability_deps",
"once_cell",
"parking_lot 0.12.1",
"paste",
"predicate",
"rand",
"schema",
"service_grpc_catalog",
"test_helpers",
"thiserror",
"tokio",
"tonic",
"trace",
"uuid",
]
[[package]]
name = "insta"
version = "1.21.1"

View File

@ -25,6 +25,7 @@ members = [
"influxdb2_client",
"influxrpc_parser",
"ingester",
"ingester2",
"iox_catalog",
"iox_data_generator",
"garbage_collector",

45
ingester2/Cargo.toml Normal file
View File

@ -0,0 +1,45 @@
[package]
name = "ingester2"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
arrow = { workspace = true, features = ["prettyprint"] }
arrow_util = { version = "0.1.0", path = "../arrow_util" }
async-trait = "0.1.58"
backoff = { version = "0.1.0", path = "../backoff" }
data_types = { version = "0.1.0", path = "../data_types" }
datafusion.workspace = true
dml = { version = "0.1.0", path = "../dml" }
futures = "0.3.25"
generated_types = { version = "0.1.0", path = "../generated_types" }
hashbrown.workspace = true
iox_catalog = { version = "0.1.0", path = "../iox_catalog" }
iox_query = { version = "0.1.0", path = "../iox_query" }
iox_time = { path = "../iox_time" }
metric = { version = "0.1.0", path = "../metric" }
mutable_batch = { version = "0.1.0", path = "../mutable_batch" }
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
object_store = "0.5.1"
observability_deps = { version = "0.1.0", path = "../observability_deps" }
once_cell = "1.16.0"
parking_lot = "0.12.1"
predicate = { version = "0.1.0", path = "../predicate" }
rand = "0.8.5"
schema = { version = "0.1.0", path = "../schema" }
service_grpc_catalog = { version = "0.1.0", path = "../service_grpc_catalog" }
thiserror = "1.0.37"
tokio = { version = "1.22", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tonic = "0.8.2"
trace = { version = "0.1.0", path = "../trace" }
uuid = "1.2.2"
[dev-dependencies]
assert_matches = "1.5.0"
datafusion_util = { path = "../datafusion_util" }
lazy_static = "1.4.0"
mutable_batch_lp = { path = "../mutable_batch_lp" }
paste = "1.0.9"
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }

383
ingester2/src/arcmap.rs Normal file
View File

@ -0,0 +1,383 @@
//! A map key-value map where values are always wrapped in an [`Arc`], with
//! helper methods for exactly-once initialisation.
#![allow(dead_code)]
use std::{
borrow::Borrow,
hash::{BuildHasher, Hash, Hasher},
sync::Arc,
};
use hashbrown::{
hash_map::{DefaultHashBuilder, RawEntryMut},
HashMap,
};
use parking_lot::RwLock;
/// A key-value map where all values are wrapped in [`Arc`]'s and shared across
/// all readers of a given key.
///
/// Each key in an [`ArcMap`] is initialised exactly once, with subsequent
/// lookups being handed an [`Arc`] handle to the same instance.
#[derive(Debug)]
pub(crate) struct ArcMap<K, V, S = DefaultHashBuilder> {
map: RwLock<HashMap<K, Arc<V>, S>>,
hasher: S,
}
impl<K, V, S> std::ops::Deref for ArcMap<K, V, S> {
type Target = RwLock<HashMap<K, Arc<V>, S>>;
fn deref(&self) -> &Self::Target {
&self.map
}
}
impl<K, V> Default for ArcMap<K, V> {
fn default() -> Self {
// The same hasher should be used by everything that hashes for a
// consistent result.
//
// See https://github.com/influxdata/influxdb_iox/pull/6086.
let map: HashMap<K, Arc<V>> = Default::default();
let hasher = map.hasher().clone();
Self {
map: RwLock::new(map),
hasher,
}
}
}
impl<K, V, S> ArcMap<K, V, S>
where
K: Hash + Eq,
S: BuildHasher,
{
/// Fetch an [`Arc`]-wrapped `V` for `key`, or if this is the first lookup
/// for `key`, initialise the value with the provided `init` closure.
///
/// # Concurrency
///
/// This call is thread-safe - if two calls race, a value will be
/// initialised exactly once (one arbitrary caller's `init` closure will be
/// executed) and both callers will obtain a handle to the same instance of
/// `V`. Both threads will eagerly initialise V and race to "win" storing V
/// in the map.
///
/// # Performance
///
/// This method is biased towards read-heavy workloads, with many readers
/// progressing in parallel. If the value for `key` must be initialised, all
/// readers are blocked while `init` executes and the resulting `V` is
/// memorised.
pub(crate) fn get_or_insert_with<Q, F>(&self, key: &Q, init: F) -> Arc<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ToOwned<Owned = K> + ?Sized,
F: FnOnce() -> Arc<V>,
{
// Memorise the hash outside of the lock.
//
// This allows the hash to be re-used (and not recomputed) if the value
// has to be inserted into the map after the existence check. It also
// obviously keeps the hashing outside of the lock.
let hash = self.compute_hash(key);
// First check if the entry exists already.
//
// This does NOT use an upgradable read lock, as readers waiting for an
// upgradeable read lock block other readers wanting an upgradeable read
// lock. If all readers do that, it's effectively an exclusive lock.
if let Some((_, v)) = self
.map
.read()
.raw_entry()
.from_hash(hash, Self::key_equal(key))
{
return Arc::clone(v);
}
// Otherwise acquire a write lock and insert the value if necessary (it
// is possible another thread initialised the value after the read check
// above, but before this write lock was granted).
let mut guard = self.map.write();
match guard.raw_entry_mut().from_hash(hash, Self::key_equal(key)) {
RawEntryMut::Occupied(v) => Arc::clone(v.get()),
RawEntryMut::Vacant(v) => {
Arc::clone(v.insert_hashed_nocheck(hash, key.to_owned(), init()).1)
}
}
}
/// A convenience method over [`Self::get_or_insert_with()`] that
/// initialises `V` to the default value when `key` has no entry.
pub(crate) fn get_or_default<Q>(&self, key: &Q) -> Arc<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ToOwned<Owned = K> + ?Sized,
V: Default,
{
self.get_or_insert_with(key, Default::default)
}
/// A getter for `key` that returns an [`Arc`]-wrapped `V`, or [`None`] if
/// `key` has not yet been initialised.
///
/// # Concurrency
///
/// This method is cheap, and multiple callers progress in parallel. Callers
/// are blocked by a call to [`Self::get_or_insert_with()`] only when a `V`
/// needs to be initialised.
pub(crate) fn get<Q>(&self, key: &Q) -> Option<Arc<V>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.compute_hash(key);
self.map
.read()
.raw_entry()
.from_hash(hash, Self::key_equal(key))
.map(|(_k, v)| Arc::clone(v))
}
/// Insert `value` indexed by `key`.
///
/// # Panics
///
/// This method panics if a value already exists for `key`.
pub(crate) fn insert(&self, key: K, value: Arc<V>) {
let hash = self.compute_hash(key.borrow());
match self
.map
.write()
.raw_entry_mut()
.from_hash(hash, Self::key_equal(&key))
{
RawEntryMut::Occupied(_) => panic!("inserting existing key into ArcMap"),
RawEntryMut::Vacant(view) => {
view.insert_hashed_nocheck(hash, key, value);
}
}
}
/// Return a state snapshot of all the values in this [`ArcMap`] in
/// arbitrary order.
///
/// # Concurrency
///
/// The snapshot generation is serialised w.r.t concurrent calls to mutate
/// `self` (that is, a new entry may appear immediately after the snapshot
/// is generated). Calls to [`Self::values`] and other "read" methods
/// proceed in parallel.
pub(crate) fn values(&self) -> Vec<Arc<V>> {
self.map.read().values().map(Arc::clone).collect()
}
#[inline]
fn compute_hash<Q: Hash + ?Sized>(&self, key: &Q) -> u64 {
let mut state = self.hasher.build_hasher();
key.hash(&mut state);
state.finish()
}
#[inline]
fn key_equal<Q>(q: &Q) -> impl FnMut(&'_ K) -> bool + '_
where
K: Borrow<Q>,
Q: ?Sized + Eq,
{
move |k| q.eq(k.borrow())
}
}
#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Barrier,
};
use super::*;
#[test]
fn test_get() {
let map = ArcMap::<String, usize>::default();
let key: &str = "bananas";
assert!(map.get(key).is_none());
// Assert the value is initialised from the closure
let got: Arc<usize> = map.get_or_insert_with(key, || Arc::new(42));
assert_eq!(*got, 42);
// Assert the same Arc is returned later.
let other = map.get(key).expect("should have been initialised");
assert!(Arc::ptr_eq(&got, &other));
}
#[test]
fn test_init_once() {
let map = ArcMap::<String, usize>::default();
let key: &str = "bananas";
// Assert the value is initialised from the closure
let got = map.get_or_insert_with(key, || Arc::new(42));
assert_eq!(*got, 42);
// And subsequent calls observe the same value, regardless of the init
// closure
let got = map.get_or_insert_with(key, || Arc::new(13));
assert_eq!(*got, 42);
let got = map.get_or_default(key);
assert_eq!(*got, 42);
}
#[test]
fn test_insert() {
let map = ArcMap::<String, usize>::default();
let key: &str = "bananas";
assert!(map.get(key).is_none());
// Assert the value is initialised from the closure
map.insert(key.to_owned(), Arc::new(42));
let got = map.get(key).unwrap();
assert_eq!(*got, 42);
// Assert the same Arc is returned later.
let other = map.get(key).expect("should have been initialised");
assert_eq!(*other, 42);
assert!(Arc::ptr_eq(&got, &other));
// And subsequent calls observe the same value, regardless of the init
// closure
let got = map.get_or_insert_with(key, || Arc::new(13));
assert_eq!(*got, 42);
assert!(Arc::ptr_eq(&got, &other));
}
#[test]
fn test_values() {
let map = ArcMap::<usize, String>::default();
map.insert(1, Arc::new("bananas".to_string()));
map.insert(2, Arc::new("platanos".to_string()));
let mut got = map
.values()
.into_iter()
.map(|v| String::clone(&*v))
.collect::<Vec<_>>();
got.sort_unstable();
assert_eq!(got, &["bananas", "platanos"]);
}
#[test]
#[should_panic = "inserting existing key"]
fn test_insert_existing() {
let map = ArcMap::<String, usize>::default();
let key: &str = "bananas";
map.insert(key.to_owned(), Arc::new(42));
map.insert(key.to_owned(), Arc::new(42));
}
#[test]
#[allow(clippy::needless_collect)] // Only needless if you like deadlocks.
fn test_init_once_parallel() {
let map = Arc::new(ArcMap::<String, usize>::default());
const NUM_THREADS: usize = 10;
let barrier = Arc::new(Barrier::new(NUM_THREADS));
let init_count = Arc::new(AtomicUsize::new(0));
let key: &str = "bananas";
// Spawn NUM_THREADS and have all of them wait until all the threads
// have initialised before racing to initialise a V for key.
//
// Each thread tries to initialise V to a unique per-thread value, and
// this test asserts only one thread successfully initialises V to it's
// unique value.
let handles = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
let init_count = Arc::clone(&init_count);
std::thread::spawn(move || {
// Rendezvous with all threads before continuing to maximise
// the racy-ness.
barrier.wait();
let got = map.get_or_insert_with(key, || {
init_count.fetch_add(1, Ordering::SeqCst);
Arc::new(i)
});
*got == i
})
})
.collect::<Vec<_>>();
let winners = handles
.into_iter()
.fold(0, |acc, h| if h.join().unwrap() { acc + 1 } else { acc });
assert_eq!(winners, 1); // Number of threads that observed their unique value
assert_eq!(init_count.load(Ordering::SeqCst), 1); // Number of init() calls
}
#[test]
fn test_cross_thread_visibility() {
let refs = Arc::new(ArcMap::default());
const N_THREADS: i64 = 10;
let handles = (0..N_THREADS)
.map(|i| {
let refs = Arc::clone(&refs);
std::thread::spawn(move || {
refs.insert(i, Arc::new(i));
})
})
.collect::<Vec<_>>();
for h in handles {
h.join().unwrap();
}
for i in 0..N_THREADS {
let v = refs.get(&i).unwrap();
assert_eq!(i, *v);
}
}
// Assert values can be "moved" due to FnOnce being used, vs. Fn.
//
// This is a compile-time assertion more than a runtime test.
#[test]
fn test_fn_once() {
let map = ArcMap::<String, String>::default();
// A non-copy value that is moved into the FnOnce
let v = "bananas".to_owned();
let v = map.get_or_insert_with("platanos", move || Arc::new(v));
assert_eq!(*v, "bananas")
}
#[test]
fn test_key_equal() {
let k = 42;
assert!(ArcMap::<_, ()>::key_equal(&k)(&k));
assert!(!ArcMap::<_, ()>::key_equal(&24)(&k));
}
}

View File

@ -0,0 +1,8 @@
pub(crate) mod namespace;
pub(crate) mod partition;
pub(crate) mod table;
/// The root node of a [`BufferTree`].
mod root;
#[allow(unused_imports)]
pub(crate) use root::*;

View File

@ -0,0 +1,262 @@
//! Namespace level data buffer structures.
pub(crate) mod name_resolver;
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use dml::DmlOperation;
use metric::U64Counter;
use observability_deps::tracing::warn;
use super::{
partition::resolver::PartitionProvider,
table::{name_resolver::TableNameProvider, TableData},
};
use crate::{arcmap::ArcMap, deferred_load::DeferredLoad, dml_sink::DmlSink};
/// The string name / identifier of a Namespace.
///
/// A reference-counted, cheap clone-able string.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct NamespaceName(Arc<str>);
impl<T> From<T> for NamespaceName
where
T: AsRef<str>,
{
fn from(v: T) -> Self {
Self(Arc::from(v.as_ref()))
}
}
impl std::ops::Deref for NamespaceName {
type Target = Arc<str>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::fmt::Display for NamespaceName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// Data of a Namespace that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct NamespaceData {
namespace_id: NamespaceId,
namespace_name: DeferredLoad<NamespaceName>,
/// A set of tables this [`NamespaceData`] instance has processed
/// [`DmlOperation`]'s for.
///
/// The [`TableNameProvider`] acts as a [`DeferredLoad`] constructor to
/// resolve the [`TableName`] for new [`TableData`] out of the hot path.
///
/// [`TableName`]: crate::buffer_tree::table::TableName
tables: ArcMap<TableId, TableData>,
table_name_resolver: Arc<dyn TableNameProvider>,
/// The count of tables initialised in this Ingester so far, across all
/// namespaces.
table_count: U64Counter,
/// The resolver of `(table_id, partition_key)` to [`PartitionData`].
///
/// [`PartitionData`]: super::partition::PartitionData
partition_provider: Arc<dyn PartitionProvider>,
}
impl NamespaceData {
/// Initialize new tables with default partition template of daily
pub(super) fn new(
namespace_id: NamespaceId,
namespace_name: DeferredLoad<NamespaceName>,
table_name_resolver: Arc<dyn TableNameProvider>,
partition_provider: Arc<dyn PartitionProvider>,
metrics: &metric::Registry,
) -> Self {
let table_count = metrics
.register_metric::<U64Counter>(
"ingester_tables",
"Number of tables known to the ingester",
)
.recorder(&[]);
Self {
namespace_id,
namespace_name,
tables: Default::default(),
table_name_resolver,
table_count,
partition_provider,
}
}
/// Return the table data by ID.
pub(crate) fn table(&self, table_id: TableId) -> Option<Arc<TableData>> {
self.tables.get(&table_id)
}
/// Return the [`NamespaceId`] this [`NamespaceData`] belongs to.
pub(crate) fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
#[cfg(test)]
pub(super) fn table_count(&self) -> &U64Counter {
&self.table_count
}
/// Returns the [`NamespaceName`] for this namespace.
pub(crate) fn namespace_name(&self) -> &DeferredLoad<NamespaceName> {
&self.namespace_name
}
}
#[async_trait]
impl DmlSink for NamespaceData {
type Error = mutable_batch::Error;
async fn apply(&self, op: DmlOperation) -> Result<(), Self::Error> {
let sequence_number = op
.meta()
.sequence()
.expect("applying unsequenced op")
.sequence_number;
match op {
DmlOperation::Write(write) => {
// Extract the partition key derived by the router.
let partition_key = write.partition_key().clone();
for (table_id, b) in write.into_tables() {
// Grab a reference to the table data, or insert a new
// TableData for it.
let table_data = self.tables.get_or_insert_with(&table_id, || {
self.table_count.inc(1);
Arc::new(TableData::new(
table_id,
self.table_name_resolver.for_table(table_id),
self.namespace_id,
Arc::clone(&self.partition_provider),
))
});
table_data
.buffer_table_write(sequence_number, b, partition_key.clone())
.await?;
}
}
DmlOperation::Delete(delete) => {
// Deprecated delete support:
// https://github.com/influxdata/influxdb_iox/issues/5825
warn!(
namespace_name=%self.namespace_name,
namespace_id=%self.namespace_id,
table_name=?delete.table_name(),
sequence_number=?delete.meta().sequence(),
"discarding unsupported delete op"
);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use data_types::{PartitionId, PartitionKey, ShardIndex};
use metric::{Attributes, Metric};
use super::*;
use crate::{
buffer_tree::{
namespace::NamespaceData,
partition::{resolver::mock::MockPartitionProvider, PartitionData, SortKeyState},
table::{name_resolver::mock::MockTableNameProvider, TableName},
},
deferred_load::{self, DeferredLoad},
test_util::make_write_op,
};
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
const TABLE_NAME: &str = "bananas";
const TABLE_ID: TableId = TableId::new(44);
const NAMESPACE_NAME: &str = "platanos";
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test]
async fn test_namespace_init_table() {
let metrics = Arc::new(metric::Registry::default());
// Configure the mock partition provider to return a partition for this
// table ID.
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("banana-split"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
));
let ns = NamespaceData::new(
NAMESPACE_ID,
DeferredLoad::new(Duration::from_millis(1), async { NAMESPACE_NAME.into() }),
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
partition_provider,
&metrics,
);
// Assert the namespace name was stored
let name = ns.namespace_name().to_string();
assert!(
(name == NAMESPACE_NAME) || (name == deferred_load::UNRESOLVED_DISPLAY_STRING),
"unexpected namespace name: {name}"
);
// Assert the namespace does not contain the test data
assert!(ns.table(TABLE_ID).is_none());
// Write some test data
ns.apply(DmlOperation::Write(make_write_op(
&PartitionKey::from("banana-split"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,city=Medford day="sun",temp=55 22"#,
)))
.await
.expect("buffer op should succeed");
// Referencing the table should succeed
assert!(ns.table(TABLE_ID).is_some());
// And the table counter metric should increase
let tables = metrics
.get_instrument::<Metric<U64Counter>>("ingester_tables")
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch();
assert_eq!(tables, 1);
// Ensure the deferred namespace name is loaded.
let name = ns.namespace_name().get().await;
assert_eq!(&**name, NAMESPACE_NAME);
assert_eq!(ns.namespace_name().to_string(), NAMESPACE_NAME);
}
}

View File

@ -0,0 +1,138 @@
use std::{sync::Arc, time::Duration};
use backoff::{Backoff, BackoffConfig};
use data_types::NamespaceId;
use iox_catalog::interface::Catalog;
use super::NamespaceName;
use crate::deferred_load::DeferredLoad;
/// An abstract provider of a [`DeferredLoad`] configured to fetch the
/// [`NamespaceName`] of the specified [`NamespaceId`].
pub(crate) trait NamespaceNameProvider: Send + Sync + std::fmt::Debug {
fn for_namespace(&self, id: NamespaceId) -> DeferredLoad<NamespaceName>;
}
#[derive(Debug)]
pub(crate) struct NamespaceNameResolver {
max_smear: Duration,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
}
impl NamespaceNameResolver {
pub(crate) fn new(
max_smear: Duration,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> Self {
Self {
max_smear,
catalog,
backoff_config,
}
}
/// Fetch the [`NamespaceName`] from the [`Catalog`] for specified
/// `namespace_id`, retrying endlessly when errors occur.
pub(crate) async fn fetch(
namespace_id: NamespaceId,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> NamespaceName {
Backoff::new(&backoff_config)
.retry_all_errors("fetch namespace name", || async {
let s = catalog
.repositories()
.await
.namespaces()
.get_by_id(namespace_id)
.await?
.expect("resolving namespace name for non-existent namespace id")
.name
.into();
Result::<_, iox_catalog::interface::Error>::Ok(s)
})
.await
.expect("retry forever")
}
}
impl NamespaceNameProvider for NamespaceNameResolver {
fn for_namespace(&self, id: NamespaceId) -> DeferredLoad<NamespaceName> {
DeferredLoad::new(
self.max_smear,
Self::fetch(id, Arc::clone(&self.catalog), self.backoff_config.clone()),
)
}
}
#[cfg(test)]
pub(crate) mod mock {
use super::*;
#[derive(Debug)]
pub(crate) struct MockNamespaceNameProvider {
name: NamespaceName,
}
impl MockNamespaceNameProvider {
pub(crate) fn new(name: impl Into<NamespaceName>) -> Self {
Self { name: name.into() }
}
}
impl Default for MockNamespaceNameProvider {
fn default() -> Self {
Self::new("bananas")
}
}
impl NamespaceNameProvider for MockNamespaceNameProvider {
fn for_namespace(&self, _id: NamespaceId) -> DeferredLoad<NamespaceName> {
let name = self.name.clone();
DeferredLoad::new(Duration::from_secs(1), async { name })
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::ShardIndex;
use test_helpers::timeout::FutureTimeout;
use super::*;
use crate::test_util::populate_catalog;
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
#[tokio::test]
async fn test_fetch() {
let metrics = Arc::new(metric::Registry::default());
let backoff_config = BackoffConfig::default();
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
// Populate the catalog with the shard / namespace / table
let (_shard_id, ns_id, _table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
let fetcher = Arc::new(NamespaceNameResolver::new(
Duration::from_secs(10),
Arc::clone(&catalog),
backoff_config.clone(),
));
let got = fetcher
.for_namespace(ns_id)
.get()
.with_timeout_panic(Duration::from_secs(5))
.await;
assert_eq!(&**got, NAMESPACE_NAME);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,111 @@
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use data_types::SequenceNumber;
use mutable_batch::MutableBatch;
mod always_some;
mod mutable_buffer;
mod state_machine;
pub(crate) mod traits;
pub(crate) use state_machine::*;
use self::{always_some::AlwaysSome, traits::Queryable};
use crate::sequence_range::SequenceNumberRange;
/// The current state of the [`BufferState`] state machine.
///
/// NOTE that this does NOT contain the [`Persisting`] state, as this is a
/// immutable, terminal state that does not accept further writes and is
/// directly queryable.
#[derive(Debug)]
#[must_use = "FSM should not be dropped unused"]
enum FsmState {
/// The data buffer contains no data snapshots, and is accepting writes.
Buffering(BufferState<Buffering>),
}
impl Default for FsmState {
fn default() -> Self {
Self::Buffering(BufferState::new())
}
}
impl FsmState {
/// Return the current range of writes in the [`BufferState`] state machine,
/// if any.
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
match self {
Self::Buffering(v) => v.sequence_number_range(),
}
}
}
/// A helper wrapper over the [`BufferState`] FSM to abstract the caller from
/// state transitions during reads and writes from the underlying buffer.
#[derive(Debug, Default)]
#[must_use = "DataBuffer should not be dropped unused"]
pub(crate) struct DataBuffer(AlwaysSome<FsmState>);
impl DataBuffer {
/// Return the range of [`SequenceNumber`] currently queryable by calling
/// [`Self::get_query_data()`].
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
self.0.sequence_number_range()
}
/// Buffer the given [`MutableBatch`] in memory, ordered by the specified
/// [`SequenceNumber`].
///
/// # Panics
///
/// This method panics if `sequence_number` is not strictly greater than
/// previous calls.
pub(crate) fn buffer_write(
&mut self,
mb: MutableBatch,
sequence_number: SequenceNumber,
) -> Result<(), mutable_batch::Error> {
// Take ownership of the FSM and apply the write.
self.0.mutate(|fsm| match fsm {
// Mutable stats simply have the write applied.
FsmState::Buffering(mut b) => {
let ret = b.write(mb, sequence_number);
(FsmState::Buffering(b), ret)
}
})
}
/// Return all data for this buffer, ordered by the [`SequenceNumber`] from
/// which it was buffered with.
pub(crate) fn get_query_data(&mut self) -> Vec<Arc<RecordBatch>> {
// Take ownership of the FSM and return the data within it.
self.0.mutate(|fsm| match fsm {
// The buffering state can return data.
FsmState::Buffering(b) => {
let ret = b.get_query_data();
(FsmState::Buffering(b), ret)
}
})
}
// Deconstruct the [`DataBuffer`] into the underlying FSM in a
// [`Persisting`] state, if the buffer contains any data.
pub(crate) fn into_persisting(self) -> Option<BufferState<Persisting>> {
let p = match self.0.into_inner() {
FsmState::Buffering(b) => {
// Attempt to snapshot the buffer to an immutable state.
match b.snapshot() {
Transition::Ok(b) => b.into_persisting(),
Transition::Unchanged(_) => {
// The buffer contains no data.
return None;
}
}
}
};
Some(p)
}
}

View File

@ -0,0 +1,70 @@
//! A helper type that ensures an `Option` is always `Some` once the guard is
//! dropped.
/// A helper type that aims to ease calling methods on a type that takes `self`,
/// that must always be restored at the end of the method call.
#[derive(Debug)]
pub(super) struct AlwaysSome<T>(Option<T>);
impl<T> Default for AlwaysSome<T>
where
T: Default,
{
fn default() -> Self {
Self::new(T::default())
}
}
impl<T> std::ops::Deref for AlwaysSome<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap()
}
}
impl<T> AlwaysSome<T> {
/// Wrap `value` in an [`AlwaysSome`].
pub(super) fn new(value: T) -> Self {
Self(Some(value))
}
pub(super) fn mutate<F, R>(&mut self, f: F) -> R
where
F: FnOnce(T) -> (T, R),
{
let value = std::mem::take(&mut self.0);
let (value, ret) = f(value.expect("AlwaysSome value is None!"));
self.0 = Some(value);
ret
}
/// Deconstruct `self`, returning the inner value.
pub(crate) fn into_inner(self) -> T {
self.0.unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_always_some() {
let mut a = AlwaysSome::<usize>::default();
let ret = a.mutate(|value| {
assert_eq!(value, 0);
(42, true)
});
assert!(ret);
let ret = a.mutate(|value| {
assert_eq!(value, 42);
(13, "bananas")
});
assert_eq!(ret, "bananas");
assert_eq!(a.into_inner(), 13);
}
}

View File

@ -0,0 +1,57 @@
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use mutable_batch::MutableBatch;
use schema::Projection;
/// A [`Buffer`] is an internal mutable buffer wrapper over a [`MutableBatch`]
/// for the [`BufferState`] FSM.
///
/// A [`Buffer`] can contain no writes.
///
/// [`BufferState`]: super::super::BufferState
#[derive(Debug, Default)]
pub(super) struct Buffer {
buffer: Option<MutableBatch>,
}
impl Buffer {
/// Apply `batch` to the in-memory buffer.
///
/// # Data Loss
///
/// If this method returns an error, the data in `batch` is problematic and
/// has been discarded.
pub(super) fn buffer_write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> {
match self.buffer {
Some(ref mut b) => b.extend_from(&batch)?,
None => self.buffer = Some(batch),
};
Ok(())
}
/// Generates a [`RecordBatch`] from the data in this [`Buffer`].
///
/// If this [`Buffer`] is empty when this method is called, the call is a
/// NOP and [`None`] is returned.
///
/// # Panics
///
/// If generating the snapshot fails, this method panics.
pub(super) fn snapshot(self) -> Option<Arc<RecordBatch>> {
Some(Arc::new(
self.buffer?
.to_arrow(Projection::All)
.expect("failed to snapshot buffer data"),
))
}
pub(super) fn is_empty(&self) -> bool {
self.buffer.is_none()
}
pub(super) fn buffer(&self) -> Option<&MutableBatch> {
self.buffer.as_ref()
}
}

View File

@ -0,0 +1,273 @@
#![allow(dead_code)]
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use data_types::SequenceNumber;
use mutable_batch::MutableBatch;
mod buffering;
mod persisting;
mod snapshot;
pub(in crate::buffer_tree::partition::buffer) use buffering::*;
pub(crate) use persisting::*;
use super::traits::{Queryable, Writeable};
use crate::sequence_range::SequenceNumberRange;
/// A result type for fallible transitions.
///
/// The type system ensures the state machine is always returned to the caller,
/// regardless of the transition outcome.
#[derive(Debug)]
pub(crate) enum Transition<A, B> {
/// The transition succeeded, and the new state is contained within.
Ok(BufferState<A>),
/// The state machine failed to transition due to an invariant not being
/// upheld, and the original state is contained within.
Unchanged(BufferState<B>),
}
impl<A, B> Transition<A, B> {
/// A helper function to construct [`Self::Ok`] variants.
pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Self {
Self::Ok(BufferState {
state: v,
sequence_range,
})
}
/// A helper function to construct [`Self::Unchanged`] variants.
pub(super) fn unchanged(v: BufferState<B>) -> Self {
Self::Unchanged(v)
}
}
/// A finite state machine for buffering writes, and converting them into a
/// queryable data format on-demand.
///
/// This FSM is used to provide explicit states for each stage of the data
/// lifecycle within a partition buffer:
///
/// ```text
/// ┌──────────────┐
/// │ Buffering │
/// └───────┬──────┘
/// │
/// ▼
/// ┌ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─
/// Snapshot ├─────▶ Persisting │
/// └ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─
/// ```
///
/// Boxes with dashed lines indicate immutable, queryable states that contain
/// data in an efficient data format for query execution ([`RecordBatch`]).
///
/// Boxes with solid lines indicate a mutable state to which further writes can
/// be applied.
///
/// A [`BufferState`] tracks the bounding [`SequenceNumber`] values it has
/// observed, and enforces monotonic writes (w.r.t their [`SequenceNumber`]).
#[derive(Debug)]
pub(crate) struct BufferState<T> {
state: T,
sequence_range: SequenceNumberRange,
}
impl BufferState<Buffering> {
/// Initialise a new buffer state machine.
pub(super) fn new() -> Self {
Self {
state: Buffering::default(),
sequence_range: SequenceNumberRange::default(),
}
}
}
impl<T> BufferState<T> {
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
&self.sequence_range
}
}
/// A [`BufferState`] in a mutable state can accept writes and record their
/// [`SequenceNumber`].
impl<T> BufferState<T>
where
T: Writeable,
{
/// The provided [`SequenceNumber`] MUST be for the given [`MutableBatch`].
///
/// # Panics
///
/// This method panics if it is called non-monotonic writes/sequence
/// numbers.
pub(crate) fn write(
&mut self,
batch: MutableBatch,
n: SequenceNumber,
) -> Result<(), mutable_batch::Error> {
self.state.write(batch)?;
self.sequence_range.observe(n);
Ok(())
}
}
/// A [`BufferState`] in a queryable state delegates the read to the current
/// state machine state.
impl<T> Queryable for BufferState<T>
where
T: Queryable,
{
/// Returns the current buffer data.
///
/// This is always a cheap method call.
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {
self.state.get_query_data()
}
}
#[cfg(test)]
mod tests {
use std::ops::Deref;
use arrow_util::assert_batches_eq;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::Projection;
use snapshot::*;
use super::*;
#[test]
fn test_buffer_lifecycle() {
// Initialise a buffer in the base state.
let mut buffer: BufferState<Buffering> = BufferState::new();
// Validate the sequence number ranges are not populated.
assert!(buffer.sequence_number_range().inclusive_min().is_none());
assert!(buffer.sequence_number_range().inclusive_max().is_none());
// Write some data to a buffer.
buffer
.write(
lp_to_mutable_batch(
r#"bananas,tag=platanos great=true,how_much=42 668563242000000042"#,
)
.1,
SequenceNumber::new(0),
)
.expect("write to empty buffer should succeed");
// Extract the queryable data from the buffer and validate it.
//
// Keep the data to validate they are ref-counted copies after further
// writes below. Note this construct allows the caller to decide when/if
// to allocate.
let w1_data = buffer.get_query_data();
let expected = vec![
"+-------+----------+----------+--------------------------------+",
"| great | how_much | tag | time |",
"+-------+----------+----------+--------------------------------+",
"| true | 42 | platanos | 1991-03-10T00:00:42.000000042Z |",
"+-------+----------+----------+--------------------------------+",
];
assert_batches_eq!(&expected, &[w1_data[0].deref().clone()]);
// Apply another write.
buffer
.write(
lp_to_mutable_batch(
r#"bananas,tag=platanos great=true,how_much=1000 668563242000000043"#,
)
.1,
SequenceNumber::new(1),
)
.expect("write to empty buffer should succeed");
// Snapshot the buffer into an immutable, queryable data format.
let buffer: BufferState<Snapshot> = match buffer.snapshot() {
Transition::Ok(v) => v,
Transition::Unchanged(_) => panic!("did not transition to snapshot state"),
};
// Verify the writes are still queryable.
let w2_data = buffer.get_query_data();
let expected = vec![
"+-------+----------+----------+--------------------------------+",
"| great | how_much | tag | time |",
"+-------+----------+----------+--------------------------------+",
"| true | 42 | platanos | 1991-03-10T00:00:42.000000042Z |",
"| true | 1000 | platanos | 1991-03-10T00:00:42.000000043Z |",
"+-------+----------+----------+--------------------------------+",
];
assert_eq!(w2_data.len(), 1);
assert_batches_eq!(&expected, &[w2_data[0].deref().clone()]);
// Ensure the same data is returned for a second read.
{
let second_read = buffer.get_query_data();
assert_eq!(w2_data, second_read);
// And that no data was actually copied.
let same_arcs = w2_data
.iter()
.zip(second_read.iter())
.all(|(a, b)| Arc::ptr_eq(a, b));
assert!(same_arcs);
}
// Finally transition into the terminal persisting state.
let buffer: BufferState<Persisting> = buffer.into_persisting();
// Validate the sequence number ranges were updated as writes occurred.
assert_eq!(
buffer.sequence_number_range().inclusive_min(),
Some(SequenceNumber::new(0))
);
assert_eq!(
buffer.sequence_number_range().inclusive_max(),
Some(SequenceNumber::new(1))
);
// Extract the final buffered result
let final_data = buffer.into_data();
// And once again verify no data was changed, copied or re-ordered.
assert_eq!(w2_data, final_data);
let same_arcs = w2_data
.into_iter()
.zip(final_data.into_iter())
.all(|(a, b)| Arc::ptr_eq(&a, &b));
assert!(same_arcs);
}
#[test]
fn test_snapshot_buffer_different_but_compatible_schemas() {
let mut buffer = BufferState::new();
// Missing tag `t1`
let (_, mut mb1) = lp_to_mutable_batch(r#"foo iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#);
buffer.state.write(mb1.clone()).unwrap();
// Missing field `iv`
let (_, mb2) = lp_to_mutable_batch(r#"foo,t1=aoeu uv=1u,fv=12.0,bv=false,sv="bye" 10000"#);
buffer.state.write(mb2.clone()).unwrap();
let buffer: BufferState<Snapshot> = match buffer.snapshot() {
Transition::Ok(v) => v,
Transition::Unchanged(_) => panic!("failed to transition"),
};
assert_eq!(buffer.get_query_data().len(), 1);
let snapshot = &buffer.get_query_data()[0];
// Generate the combined buffer from the original inputs to compare
// against.
mb1.extend_from(&mb2).unwrap();
let want = mb1.to_arrow(Projection::All).unwrap();
assert_eq!(&**snapshot, &want);
}
}

View File

@ -0,0 +1,90 @@
//! A write buffer.
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use mutable_batch::MutableBatch;
use schema::Projection;
use super::{snapshot::Snapshot, BufferState, Transition};
use crate::buffer_tree::partition::buffer::{
mutable_buffer::Buffer,
traits::{Queryable, Writeable},
};
/// The FSM starting ingest state - a mutable buffer collecting writes.
#[derive(Debug, Default)]
pub(crate) struct Buffering {
/// The buffer for incoming writes.
///
/// This buffer MAY be empty when no writes have occured since transitioning
/// to this state.
buffer: Buffer,
}
/// Implement on-demand querying of the buffered contents without storing the
/// generated snapshot.
///
/// In the future this [`Queryable`] should NOT be implemented for
/// [`Buffering`], and instead snapshots should be incrementally generated and
/// compacted. See <https://github.com/influxdata/influxdb_iox/issues/5805> for
/// context.
impl Queryable for Buffering {
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {
let data = self.buffer.buffer().map(|v| {
Arc::new(
v.to_arrow(Projection::All)
.expect("failed to snapshot buffer data"),
)
});
match data {
Some(v) => vec![v],
None => vec![],
}
}
}
impl Writeable for Buffering {
fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> {
self.buffer.buffer_write(batch)
}
}
impl BufferState<Buffering> {
/// Attempt to generate a snapshot from the data in this buffer.
///
/// This returns [`Transition::Unchanged`] if this buffer contains no data.
pub(crate) fn snapshot(self) -> Transition<Snapshot, Buffering> {
if self.state.buffer.is_empty() {
// It is a logical error to snapshot an empty buffer.
return Transition::unchanged(self);
}
// Generate a snapshot from the buffer.
let snap = self
.state
.buffer
.snapshot()
.expect("snapshot of non-empty buffer should succeed");
// And transition to the WithSnapshot state.
Transition::ok(Snapshot::new(vec![snap]), self.sequence_range)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_buffer_does_not_snapshot() {
let b = BufferState::new();
match b.snapshot() {
Transition::Ok(_) => panic!("empty buffer should not transition to snapshot state"),
Transition::Unchanged(_) => {
// OK!
}
}
}
}

View File

@ -0,0 +1,36 @@
//! A writfield1 buffer, with one or more snapshots.
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use super::BufferState;
use crate::buffer_tree::partition::buffer::traits::Queryable;
/// An immutable set of [`RecordBatch`] in the process of being persisted.
#[derive(Debug)]
pub(crate) struct Persisting {
/// Snapshots generated from previous buffer contents to be persisted.
///
/// INVARIANT: this array is always non-empty.
snapshots: Vec<Arc<RecordBatch>>,
}
impl Persisting {
pub(super) fn new(snapshots: Vec<Arc<RecordBatch>>) -> Self {
Self { snapshots }
}
}
impl Queryable for Persisting {
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {
self.snapshots.clone()
}
}
impl BufferState<Persisting> {
/// Consume `self`, returning the data it holds as a set of [`RecordBatch`].
pub(super) fn into_data(self) -> Vec<Arc<RecordBatch>> {
self.state.snapshots
}
}

View File

@ -0,0 +1,42 @@
//! A writfield1 buffer, with one or more snapshots.
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use super::BufferState;
use crate::buffer_tree::partition::buffer::{
state_machine::persisting::Persisting, traits::Queryable,
};
/// An immutable, queryable FSM state containing at least one buffer snapshot.
#[derive(Debug)]
pub(crate) struct Snapshot {
/// Snapshots generated from previous buffer contents.
///
/// INVARIANT: this array is always non-empty.
snapshots: Vec<Arc<RecordBatch>>,
}
impl Snapshot {
pub(super) fn new(snapshots: Vec<Arc<RecordBatch>>) -> Self {
assert!(!snapshots.is_empty());
Self { snapshots }
}
}
impl Queryable for Snapshot {
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {
self.snapshots.clone()
}
}
impl BufferState<Snapshot> {
pub(crate) fn into_persisting(self) -> BufferState<Persisting> {
assert!(!self.state.snapshots.is_empty());
BufferState {
state: Persisting::new(self.state.snapshots),
sequence_range: self.sequence_range,
}
}
}

View File

@ -0,0 +1,17 @@
//! Private traits for state machine states.
use std::{fmt::Debug, sync::Arc};
use arrow::record_batch::RecordBatch;
use mutable_batch::MutableBatch;
/// A state that can accept writes.
pub(crate) trait Writeable: Debug {
fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error>;
}
/// A state that can return the contents of the buffer as one or more
/// [`RecordBatch`] instances.
pub(crate) trait Queryable: Debug {
fn get_query_data(&self) -> Vec<Arc<RecordBatch>>;
}

View File

@ -0,0 +1,408 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use super::r#trait::PartitionProvider;
use crate::{
buffer_tree::{
partition::{resolver::SortKeyResolver, PartitionData, SortKeyState},
table::TableName,
},
deferred_load::DeferredLoad,
};
/// The data-carrying value of a `(table_id, partition_key)` lookup.
#[derive(Debug)]
struct Entry {
partition_id: PartitionId,
max_sequence_number: Option<SequenceNumber>,
}
/// A read-through cache mapping `(table_id, partition_key)` tuples to
/// `(partition_id, max_sequence_number)`.
///
/// This data is safe to cache as only one ingester is ever responsible for a
/// given table partition, and this amortises partition persist marker discovery
/// queries during startup, eliminating them from the ingest hot path in the
/// common startup case (an ingester restart with no new partitions to add).
///
/// # Memory Overhead
///
/// Excluding map overhead, and assuming partition keys in the form
/// "YYYY-MM-DD", each entry takes:
///
/// - Partition key: String (8 len + 8 cap + 8 ptr + data len) = 34 bytes
/// - TableId: 8 bytes
/// - PartitionId: 8 bytes
/// - Optional sequence number: 16 bytes
///
/// For a total of 66 bytes per entry - approx 15,887 entries can be held in 1MB
/// of memory.
///
/// Each cache hit _removes_ the entry from the cache - this eliminates the
/// memory overhead for items that were hit. This is the expected (only valid!)
/// usage pattern.
///
/// # Deferred Sort Key Loading
///
/// This cache does NOT cache the [`SortKey`] for each [`PartitionData`], as the
/// sort key can be large and is likely unique per table, and thus not
/// share-able across instances / prohibitively expensive to cache.
///
/// Instead cached instances are returned with a deferred sort key resolver
/// which attempts to fetch the sort key in the background some time after
/// construction.
///
/// [`SortKey`]: schema::sort::SortKey
#[derive(Debug)]
pub(crate) struct PartitionCache<T> {
// The inner delegate called for a cache miss.
inner: T,
/// Cached entries.
///
/// First lookup level is the shared partition keys - this eliminates
/// needing to share the string key per table_id in memory which would be
/// the case if inverted. This is cheaper (memory) than using Arc refs to
/// share the keys.
///
/// It's also likely a smaller N (more tables than partition keys) making it
/// a faster search for cache misses.
#[allow(clippy::type_complexity)]
entries: Mutex<HashMap<PartitionKey, HashMap<TableId, Entry>>>,
/// Data needed to construct the [`SortKeyResolver`] for cached entries.
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
/// The maximum amount of time a [`SortKeyResolver`] may wait until
/// pre-fetching the sort key in the background.
max_smear: Duration,
}
impl<T> PartitionCache<T> {
/// Initialise a [`PartitionCache`] containing the specified partitions.
///
/// Any cache miss is passed through to `inner`.
///
/// Any cache hit returns a [`PartitionData`] configured with a
/// [`SortKeyState::Deferred`] for deferred key loading in the background.
/// The [`SortKeyResolver`] is initialised with the given `catalog`,
/// `backoff_config`, and `max_smear` maximal load wait duration.
pub(crate) fn new<P>(
inner: T,
partitions: P,
max_smear: Duration,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> Self
where
P: IntoIterator<Item = Partition>,
{
let mut entries = HashMap::<PartitionKey, HashMap<TableId, Entry>>::new();
for p in partitions.into_iter() {
entries.entry(p.partition_key).or_default().insert(
p.table_id,
Entry {
partition_id: p.id,
max_sequence_number: p.persisted_sequence_number,
},
);
}
// Minimise the overhead of the maps.
for tables in entries.values_mut() {
tables.shrink_to_fit();
}
entries.shrink_to_fit();
Self {
entries: Mutex::new(entries),
inner,
catalog,
backoff_config,
max_smear,
}
}
/// Search for an cached entry matching the `(partition_key, table_id)`
/// tuple.
fn find(
&self,
table_id: TableId,
partition_key: &PartitionKey,
) -> Option<(PartitionKey, Entry)> {
let mut entries = self.entries.lock();
// Look up the partition key provided by the caller.
//
// If the partition key is a hit, clone the key from the map and return
// it instead of using the caller-provided partition key - this allows
// effective reuse of the same partition key str across all hits for it
// and is more memory efficient than using the caller-provided partition
// key in the PartitionData.
let key = entries.get_key_value(partition_key)?.0.clone();
let partition = entries.get_mut(partition_key).unwrap();
let e = partition.remove(&table_id)?;
// As a entry was removed, check if it is now empty.
if partition.is_empty() {
entries.remove(partition_key);
entries.shrink_to_fit();
} else {
partition.shrink_to_fit();
}
Some((key, e))
}
}
#[async_trait]
impl<T> PartitionProvider for PartitionCache<T>
where
T: PartitionProvider,
{
async fn get_partition(
&self,
partition_key: PartitionKey,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData {
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.
if let Some((key, cached)) = self.find(table_id, &partition_key) {
debug!(%table_id, %partition_key, "partition cache hit");
// Initialise a deferred resolver for the sort key.
let sort_key_resolver = DeferredLoad::new(
self.max_smear,
SortKeyResolver::new(
cached.partition_id,
Arc::clone(&__self.catalog),
self.backoff_config.clone(),
)
.fetch(),
);
// Use the returned partition key instead of the callers - this
// allows the backing str memory to be reused across all partitions
// using the same key!
return PartitionData::new(
cached.partition_id,
key,
namespace_id,
table_id,
table_name,
SortKeyState::Deferred(Arc::new(sort_key_resolver)),
cached.max_sequence_number,
);
}
debug!(%table_id, %partition_key, "partition cache miss");
// Otherwise delegate to the catalog / inner impl.
self.inner
.get_partition(partition_key, namespace_id, table_id, table_name)
.await
}
}
#[cfg(test)]
mod tests {
use iox_catalog::mem::MemCatalog;
use super::*;
use crate::{
buffer_tree::partition::resolver::mock::MockPartitionProvider, TRANSITION_SHARD_ID,
};
const PARTITION_KEY: &str = "bananas";
const PARTITION_ID: PartitionId = PartitionId::new(42);
const NAMESPACE_ID: NamespaceId = NamespaceId::new(2);
const TABLE_ID: TableId = TableId::new(3);
const TABLE_NAME: &str = "platanos";
fn new_cache<P>(
inner: MockPartitionProvider,
partitions: P,
) -> PartitionCache<MockPartitionProvider>
where
P: IntoIterator<Item = Partition>,
{
PartitionCache::new(
inner,
partitions,
Duration::from_secs(10_000_000),
Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))),
BackoffConfig::default(),
)
}
#[tokio::test]
async fn test_miss() {
let data = PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
);
let inner = MockPartitionProvider::default().with_partition(data);
let cache = new_cache(inner, []);
let got = cache
.get_partition(
PARTITION_KEY.into(),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
assert!(cache.inner.is_empty());
}
#[tokio::test]
async fn test_hit() {
let inner = MockPartitionProvider::default();
let stored_partition_key = PartitionKey::from(PARTITION_KEY);
let partition = Partition {
id: PARTITION_ID,
shard_id: TRANSITION_SHARD_ID,
table_id: TABLE_ID,
partition_key: stored_partition_key.clone(),
sort_key: vec!["dos".to_string(), "bananas".to_string()],
persisted_sequence_number: Default::default(),
};
let cache = new_cache(inner, [partition]);
let callers_partition_key = PartitionKey::from(PARTITION_KEY);
let got = cache
.get_partition(
callers_partition_key.clone(),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
assert_eq!(*got.partition_key(), PartitionKey::from(PARTITION_KEY));
// The cache should have been cleaned up as it was consumed.
assert!(cache.entries.lock().is_empty());
// Assert the partition key from the cache was used for the lifetime of
// the partition, so that it is shared with the cache + other partitions
// that share the same partition key across all tables.
assert!(got.partition_key().ptr_eq(&stored_partition_key));
// It does not use the short-lived caller's partition key (derived from
// the DML op it is processing).
assert!(!got.partition_key().ptr_eq(&callers_partition_key));
}
#[tokio::test]
async fn test_miss_partition_key() {
let other_key = PartitionKey::from("test");
let other_key_id = PartitionId::new(99);
let inner = MockPartitionProvider::default().with_partition(PartitionData::new(
other_key_id,
other_key.clone(),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
));
let partition = Partition {
id: PARTITION_ID,
shard_id: TRANSITION_SHARD_ID,
table_id: TABLE_ID,
partition_key: PARTITION_KEY.into(),
sort_key: Default::default(),
persisted_sequence_number: Default::default(),
};
let cache = new_cache(inner, [partition]);
let got = cache
.get_partition(
other_key.clone(),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), other_key_id);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
}
#[tokio::test]
async fn test_miss_table_id() {
let other_table = TableId::new(1234);
let inner = MockPartitionProvider::default().with_partition(PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
NAMESPACE_ID,
other_table,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
));
let partition = Partition {
id: PARTITION_ID,
shard_id: TRANSITION_SHARD_ID,
table_id: TABLE_ID,
partition_key: PARTITION_KEY.into(),
sort_key: Default::default(),
persisted_sequence_number: Default::default(),
};
let cache = new_cache(inner, [partition]);
let got = cache
.get_partition(
PARTITION_KEY.into(),
NAMESPACE_ID,
other_table,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.table_id(), other_table);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
}
}

View File

@ -0,0 +1,171 @@
//! A [`PartitionProvider`] implementation that hits the [`Catalog`] to resolve
//! the partition id and persist offset.
use std::sync::Arc;
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{NamespaceId, Partition, PartitionKey, TableId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
use super::r#trait::PartitionProvider;
use crate::{
buffer_tree::{
partition::{PartitionData, SortKeyState},
table::TableName,
},
deferred_load::DeferredLoad,
TRANSITION_SHARD_ID,
};
/// A [`PartitionProvider`] implementation that hits the [`Catalog`] to resolve
/// the partition id and persist offset, returning an initialised
/// [`PartitionData`].
#[derive(Debug)]
pub(crate) struct CatalogPartitionResolver {
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
}
impl CatalogPartitionResolver {
/// Construct a [`CatalogPartitionResolver`] that looks up partitions in
/// `catalog`.
pub(crate) fn new(catalog: Arc<dyn Catalog>) -> Self {
Self {
catalog,
backoff_config: Default::default(),
}
}
async fn get(
&self,
partition_key: PartitionKey,
table_id: TableId,
) -> Result<Partition, iox_catalog::interface::Error> {
self.catalog
.repositories()
.await
.partitions()
.create_or_get(partition_key, TRANSITION_SHARD_ID, table_id)
.await
}
}
#[async_trait]
impl PartitionProvider for CatalogPartitionResolver {
async fn get_partition(
&self,
partition_key: PartitionKey,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData {
debug!(
%partition_key,
%table_id,
%table_name,
"upserting partition in catalog"
);
let p = Backoff::new(&self.backoff_config)
.retry_all_errors("resolve partition", || {
self.get(partition_key.clone(), table_id)
})
.await
.expect("retry forever");
PartitionData::new(
p.id,
// Use the caller's partition key instance, as it MAY be shared with
// other instance, but the instance returned from the catalog
// definitely has no other refs.
partition_key,
namespace_id,
table_id,
table_name,
SortKeyState::Provided(p.sort_key()),
p.persisted_sequence_number,
)
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::ShardIndex;
use super::*;
use crate::TRANSITION_SHARD_ID;
const TABLE_NAME: &str = "bananas";
const PARTITION_KEY: &str = "platanos";
#[tokio::test]
async fn test_resolver() {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
let (_shard_id, namespace_id, table_id) = {
let mut repos = catalog.repositories().await;
let t = repos.topics().create_or_get("platanos").await.unwrap();
let q = repos.query_pools().create_or_get("platanos").await.unwrap();
let ns = repos
.namespaces()
.create(TABLE_NAME, None, t.id, q.id)
.await
.unwrap();
let shard = repos
.shards()
.create_or_get(&t, ShardIndex::new(0))
.await
.unwrap();
assert_eq!(shard.id, TRANSITION_SHARD_ID);
let table = repos
.tables()
.create_or_get(TABLE_NAME, ns.id)
.await
.unwrap();
(shard.id, ns.id, table.id)
};
let callers_partition_key = PartitionKey::from(PARTITION_KEY);
let table_name = TableName::from(TABLE_NAME);
let resolver = CatalogPartitionResolver::new(Arc::clone(&catalog));
let got = resolver
.get_partition(
callers_partition_key.clone(),
namespace_id,
table_id,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
// Ensure the table name is available.
let _ = got.table_name().get().await;
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(got.table_name().to_string(), table_name.to_string());
assert_matches!(got.sort_key(), SortKeyState::Provided(None));
assert_eq!(got.max_persisted_sequence_number(), None);
assert!(got.partition_key.ptr_eq(&callers_partition_key));
let got = catalog
.repositories()
.await
.partitions()
.get_by_id(got.partition_id)
.await
.unwrap()
.expect("partition not created");
assert_eq!(got.table_id, table_id);
assert_eq!(got.partition_key, PartitionKey::from(PARTITION_KEY));
}
}

View File

@ -0,0 +1,69 @@
//! A mock [`PartitionProvider`] to inject [`PartitionData`] for tests.
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, TableId};
use parking_lot::Mutex;
use super::r#trait::PartitionProvider;
use crate::{
buffer_tree::{partition::PartitionData, table::TableName},
deferred_load::DeferredLoad,
};
/// A mock [`PartitionProvider`] for testing that returns pre-initialised
/// [`PartitionData`] for configured `(key, table)` tuples.
#[derive(Debug, Default)]
pub(crate) struct MockPartitionProvider {
partitions: Mutex<HashMap<(PartitionKey, TableId), PartitionData>>,
}
impl MockPartitionProvider {
/// A builder helper for [`Self::insert()`].
#[must_use]
pub(crate) fn with_partition(mut self, data: PartitionData) -> Self {
self.insert(data);
self
}
/// Add `data` to the mock state, returning it when asked for the specified
/// `(key, table)` tuple.
pub(crate) fn insert(&mut self, data: PartitionData) {
assert!(
self.partitions
.lock()
.insert((data.partition_key().clone(), data.table_id()), data)
.is_none(),
"overwriting an existing mock PartitionData"
);
}
/// Returns true if all mock values have been consumed.
pub(crate) fn is_empty(&self) -> bool {
self.partitions.lock().is_empty()
}
}
#[async_trait]
impl PartitionProvider for MockPartitionProvider {
async fn get_partition(
&self,
partition_key: PartitionKey,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData {
let p = self
.partitions
.lock()
.remove(&(partition_key.clone(), table_id))
.unwrap_or_else(|| {
panic!("no partition data for mock ({partition_key:?}, {table_id:?})")
});
assert_eq!(p.namespace_id(), namespace_id);
assert_eq!(p.table_name().to_string(), table_name.to_string());
p
}
}

View File

@ -0,0 +1,20 @@
//! An abstract resolver of [`PartitionData`] for a given table.
//!
//! [`PartitionData`]: crate::buffer_tree::partition::PartitionData
#![allow(unused_imports)] // Transition time only.
mod cache;
pub(crate) use cache::*;
mod r#trait;
pub(crate) use r#trait::*;
mod catalog;
pub(crate) use catalog::*;
mod sort_key;
pub(crate) use sort_key::*;
#[cfg(test)]
pub(crate) mod mock;

View File

@ -0,0 +1,102 @@
//! A optimised resolver of a partition [`SortKey`].
use std::sync::Arc;
use backoff::{Backoff, BackoffConfig};
use data_types::PartitionId;
use iox_catalog::interface::Catalog;
use schema::sort::SortKey;
/// A resolver of [`SortKey`] from the catalog for a given [`PartitionId`].
#[derive(Debug)]
pub(crate) struct SortKeyResolver {
partition_id: PartitionId,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
}
impl SortKeyResolver {
pub(crate) fn new(
partition_id: PartitionId,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> Self {
Self {
partition_id,
backoff_config,
catalog,
}
}
/// Fetch the [`SortKey`] from the [`Catalog`] for `partition_id`, retrying
/// endlessly when errors occur.
pub(crate) async fn fetch(self) -> Option<SortKey> {
Backoff::new(&self.backoff_config)
.retry_all_errors("fetch partition sort key", || async {
let s = self
.catalog
.repositories()
.await
.partitions()
.get_by_id(self.partition_id)
.await?
.expect("resolving sort key for non-existent partition")
.sort_key();
Result::<_, iox_catalog::interface::Error>::Ok(s)
})
.await
.expect("retry forever")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::ShardIndex;
use super::*;
use crate::test_util::populate_catalog;
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
const PARTITION_KEY: &str = "platanos";
#[tokio::test]
async fn test_fetch() {
let metrics = Arc::new(metric::Registry::default());
let backoff_config = BackoffConfig::default();
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
// Populate the catalog with the shard / namespace / table
let (shard_id, _ns_id, table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
let partition_id = catalog
.repositories()
.await
.partitions()
.create_or_get(PARTITION_KEY.into(), shard_id, table_id)
.await
.expect("should create")
.id;
let fetcher =
SortKeyResolver::new(partition_id, Arc::clone(&catalog), backoff_config.clone());
// Set the sort key
let catalog_state = catalog
.repositories()
.await
.partitions()
.update_sort_key(partition_id, &["uno", "dos", "bananas"])
.await
.expect("should update existing partition key");
let fetched = fetcher.fetch().await;
assert_eq!(fetched, catalog_state.sort_key());
}
}

View File

@ -0,0 +1,84 @@
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, TableId};
use crate::{
buffer_tree::{partition::PartitionData, table::TableName},
deferred_load::DeferredLoad,
};
/// An infallible resolver of [`PartitionData`] for the specified table and
/// partition key, returning an initialised [`PartitionData`] buffer for it.
#[async_trait]
pub(crate) trait PartitionProvider: Send + Sync + Debug {
/// Return an initialised [`PartitionData`] for a given `(partition_key,
/// table_id)` tuple.
///
/// NOTE: the constructor for [`PartitionData`] is NOT `pub` and SHOULD NOT
/// be `pub` so this trait is effectively sealed.
async fn get_partition(
&self,
partition_key: PartitionKey,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData;
}
#[async_trait]
impl<T> PartitionProvider for Arc<T>
where
T: PartitionProvider,
{
async fn get_partition(
&self,
partition_key: PartitionKey,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData {
(**self)
.get_partition(partition_key, namespace_id, table_id, table_name)
.await
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use data_types::PartitionId;
use super::*;
use crate::buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState};
#[tokio::test]
async fn test_arc_impl() {
let key = PartitionKey::from("bananas");
let namespace_id = NamespaceId::new(1234);
let table_id = TableId::new(24);
let table_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
}));
let partition = PartitionId::new(4242);
let data = PartitionData::new(
partition,
"bananas".into(),
namespace_id,
table_id,
Arc::clone(&table_name),
SortKeyState::Provided(None),
None,
);
let mock = Arc::new(MockPartitionProvider::default().with_partition(data));
let got = mock
.get_partition(key, namespace_id, table_id, Arc::clone(&table_name))
.await;
assert_eq!(got.partition_id(), partition);
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(got.table_name().to_string(), table_name.to_string());
}
}

View File

@ -0,0 +1,97 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::NamespaceId;
use dml::DmlOperation;
use metric::U64Counter;
use super::{
namespace::{name_resolver::NamespaceNameProvider, NamespaceData},
partition::resolver::PartitionProvider,
table::name_resolver::TableNameProvider,
};
use crate::{arcmap::ArcMap, dml_sink::DmlSink};
#[derive(Debug)]
pub(crate) struct BufferTree {
/// The resolver of `(table_id, partition_key)` to [`PartitionData`].
///
/// [`PartitionData`]: super::partition::PartitionData
partition_provider: Arc<dyn PartitionProvider>,
/// A set of namespaces this [`BufferTree`] instance has processed
/// [`DmlOperation`]'s for.
///
/// The [`NamespaceNameProvider`] acts as a [`DeferredLoad`] constructor to
/// resolve the [`NamespaceName`] for new [`NamespaceData`] out of the hot
/// path.
///
/// [`DeferredLoad`]: crate::deferred_load::DeferredLoad
/// [`NamespaceName`]: data_types::NamespaceName
namespaces: ArcMap<NamespaceId, NamespaceData>,
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
/// The [`TableName`] provider used by [`NamespaceData`] to initialise a
/// [`TableData`].
///
/// [`TableName`]: crate::buffer_tree::table::TableName
/// [`TableData`]: crate::buffer_tree::table::TableData
table_name_resolver: Arc<dyn TableNameProvider>,
metrics: Arc<metric::Registry>,
namespace_count: U64Counter,
}
impl BufferTree {
/// Initialise a new [`BufferTree`] that emits metrics to `metrics`.
pub(crate) fn new(
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
table_name_resolver: Arc<dyn TableNameProvider>,
partition_provider: Arc<dyn PartitionProvider>,
metrics: Arc<metric::Registry>,
) -> Self {
let namespace_count = metrics
.register_metric::<U64Counter>(
"ingester_namespaces",
"Number of namespaces known to the ingester",
)
.recorder(&[]);
Self {
namespaces: Default::default(),
namespace_name_resolver,
table_name_resolver,
metrics,
partition_provider,
namespace_count,
}
}
/// Gets the namespace data out of the map
pub(crate) fn namespace(&self, namespace_id: NamespaceId) -> Option<Arc<NamespaceData>> {
self.namespaces.get(&namespace_id)
}
}
#[async_trait]
impl DmlSink for BufferTree {
type Error = mutable_batch::Error;
async fn apply(&self, op: DmlOperation) -> Result<(), Self::Error> {
let namespace_id = op.namespace_id();
let namespace_data = self.namespaces.get_or_insert_with(&namespace_id, || {
// Increase the metric that records the number of namespaces
// buffered in this ingester instance.
self.namespace_count.inc(1);
Arc::new(NamespaceData::new(
namespace_id,
self.namespace_name_resolver.for_namespace(namespace_id),
Arc::clone(&self.table_name_resolver),
Arc::clone(&self.partition_provider),
&self.metrics,
))
});
namespace_data.apply(op).await
}
}

View File

@ -0,0 +1,278 @@
//! Table level data buffer structures.
pub(crate) mod name_resolver;
use std::sync::Arc;
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId};
use mutable_batch::MutableBatch;
use parking_lot::{Mutex, RwLock};
use super::partition::{resolver::PartitionProvider, PartitionData};
use crate::{arcmap::ArcMap, deferred_load::DeferredLoad};
/// A double-referenced map where [`PartitionData`] can be looked up by
/// [`PartitionKey`], or ID.
#[derive(Debug, Default)]
struct DoubleRef {
by_key: ArcMap<PartitionKey, Mutex<PartitionData>>,
by_id: ArcMap<PartitionId, Mutex<PartitionData>>,
}
impl DoubleRef {
/// Try to insert the provided [`PartitionData`].
///
/// Note that the partition MAY have been inserted concurrently, and the
/// returned [`PartitionData`] MAY be a different instance for the same
/// underlying partition.
fn try_insert(&mut self, ns: PartitionData) -> Arc<Mutex<PartitionData>> {
let id = ns.partition_id();
let key = ns.partition_key().clone();
let ns = Arc::new(Mutex::new(ns));
self.by_key.get_or_insert_with(&key, || Arc::clone(&ns));
self.by_id.get_or_insert_with(&id, || ns)
}
fn by_key(&self, key: &PartitionKey) -> Option<Arc<Mutex<PartitionData>>> {
self.by_key.get(key)
}
fn by_id(&self, id: PartitionId) -> Option<Arc<Mutex<PartitionData>>> {
self.by_id.get(&id)
}
}
/// The string name / identifier of a Table.
///
/// A reference-counted, cheap clone-able string.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct TableName(Arc<str>);
impl<T> From<T> for TableName
where
T: AsRef<str>,
{
fn from(v: T) -> Self {
Self(Arc::from(v.as_ref()))
}
}
impl From<TableName> for Arc<str> {
fn from(v: TableName) -> Self {
v.0
}
}
impl std::fmt::Display for TableName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::ops::Deref for TableName {
type Target = Arc<str>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl PartialEq<str> for TableName {
fn eq(&self, other: &str) -> bool {
&*self.0 == other
}
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct TableData {
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
/// The catalog ID of the namespace this table is being populated from.
namespace_id: NamespaceId,
/// An abstract constructor of [`PartitionData`] instances for a given
/// `(key, table)` tuple.
partition_provider: Arc<dyn PartitionProvider>,
// Map of partition key to its data
partition_data: RwLock<DoubleRef>,
}
impl TableData {
/// Initialize new table buffer identified by [`TableId`] in the catalog.
///
/// Optionally the given tombstone max [`SequenceNumber`] identifies the
/// inclusive upper bound of tombstones associated with this table. Any data
/// greater than this value is guaranteed to not (yet) have a delete
/// tombstone that must be resolved.
///
/// The partition provider is used to instantiate a [`PartitionData`]
/// instance when this [`TableData`] instance observes an op for a partition
/// for the first time.
pub(super) fn new(
table_id: TableId,
table_name: DeferredLoad<TableName>,
namespace_id: NamespaceId,
partition_provider: Arc<dyn PartitionProvider>,
) -> Self {
Self {
table_id,
table_name: Arc::new(table_name),
namespace_id,
partition_data: Default::default(),
partition_provider,
}
}
// buffers the table write and returns true if the lifecycle manager indicates that
// ingest should be paused.
pub(super) async fn buffer_table_write(
&self,
sequence_number: SequenceNumber,
batch: MutableBatch,
partition_key: PartitionKey,
) -> Result<(), mutable_batch::Error> {
let p = self.partition_data.read().by_key(&partition_key);
let partition_data = match p {
Some(p) => p,
None => {
let p = self
.partition_provider
.get_partition(
partition_key.clone(),
self.namespace_id,
self.table_id,
Arc::clone(&self.table_name),
)
.await;
// Add the double-referenced partition to the map.
//
// This MAY return a different instance than `p` if another
// thread has already initialised the partition.
self.partition_data.write().try_insert(p)
}
};
partition_data.lock().buffer_write(batch, sequence_number)?;
Ok(())
}
/// Return a mutable reference to all partitions buffered for this table.
///
/// # Ordering
///
/// The order of [`PartitionData`] in the iterator is arbitrary and should
/// not be relied upon.
pub(crate) fn partitions(&self) -> Vec<Arc<Mutex<PartitionData>>> {
self.partition_data.read().by_key.values()
}
/// Return the [`PartitionData`] for the specified ID.
pub(crate) fn get_partition(
&self,
partition_id: PartitionId,
) -> Option<Arc<Mutex<PartitionData>>> {
self.partition_data.read().by_id(partition_id)
}
/// Return the [`PartitionData`] for the specified partition key.
pub(crate) fn get_partition_by_key(
&self,
partition_key: &PartitionKey,
) -> Option<Arc<Mutex<PartitionData>>> {
self.partition_data.read().by_key(partition_key)
}
/// Returns the table ID for this partition.
pub(crate) fn table_id(&self) -> TableId {
self.table_id
}
/// Returns the name of this table.
pub(crate) fn table_name(&self) -> &Arc<DeferredLoad<TableName>> {
&self.table_name
}
/// Return the [`NamespaceId`] this table is a part of.
pub(crate) fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use data_types::PartitionId;
use mutable_batch_lp::lines_to_batches;
use super::*;
use crate::buffer_tree::partition::{
resolver::mock::MockPartitionProvider, PartitionData, SortKeyState,
};
const TABLE_NAME: &str = "bananas";
const TABLE_ID: TableId = TableId::new(44);
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const PARTITION_KEY: &str = "platanos";
const PARTITION_ID: PartitionId = PartitionId::new(0);
#[tokio::test]
async fn test_partition_double_ref() {
// Configure the mock partition provider to return a partition for this
// table ID.
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
));
let table = TableData::new(
TABLE_ID,
DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
}),
NAMESPACE_ID,
partition_provider,
);
let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0)
.unwrap()
.remove(TABLE_NAME)
.unwrap();
// Assert the table does not contain the test partition
assert!(table
.partition_data
.read()
.by_key(&PARTITION_KEY.into())
.is_none());
assert!(table.partition_data.read().by_id(PARTITION_ID).is_none());
// Write some test data
table
.buffer_table_write(SequenceNumber::new(42), batch, PARTITION_KEY.into())
.await
.expect("buffer op should succeed");
// Referencing the partition should succeed
assert!(table
.partition_data
.read()
.by_key(&PARTITION_KEY.into())
.is_some());
assert!(table.partition_data.read().by_id(PARTITION_ID).is_some());
}
}

View File

@ -0,0 +1,138 @@
use std::{sync::Arc, time::Duration};
use backoff::{Backoff, BackoffConfig};
use data_types::TableId;
use iox_catalog::interface::Catalog;
use super::TableName;
use crate::deferred_load::DeferredLoad;
/// An abstract provider of a [`DeferredLoad`] configured to fetch the
/// [`TableName`] of the specified [`TableId`].
pub(crate) trait TableNameProvider: Send + Sync + std::fmt::Debug {
fn for_table(&self, id: TableId) -> DeferredLoad<TableName>;
}
#[derive(Debug)]
pub(crate) struct TableNameResolver {
max_smear: Duration,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
}
impl TableNameResolver {
pub(crate) fn new(
max_smear: Duration,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> Self {
Self {
max_smear,
catalog,
backoff_config,
}
}
/// Fetch the [`TableName`] from the [`Catalog`] for specified
/// `table_id`, retrying endlessly when errors occur.
pub(crate) async fn fetch(
table_id: TableId,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> TableName {
Backoff::new(&backoff_config)
.retry_all_errors("fetch table name", || async {
let s = catalog
.repositories()
.await
.tables()
.get_by_id(table_id)
.await?
.expect("resolving table name for non-existent table id")
.name
.into();
Result::<_, iox_catalog::interface::Error>::Ok(s)
})
.await
.expect("retry forever")
}
}
impl TableNameProvider for TableNameResolver {
fn for_table(&self, id: TableId) -> DeferredLoad<TableName> {
DeferredLoad::new(
self.max_smear,
Self::fetch(id, Arc::clone(&self.catalog), self.backoff_config.clone()),
)
}
}
#[cfg(test)]
pub(crate) mod mock {
use super::*;
#[derive(Debug)]
pub(crate) struct MockTableNameProvider {
name: TableName,
}
impl MockTableNameProvider {
pub(crate) fn new(name: impl Into<TableName>) -> Self {
Self { name: name.into() }
}
}
impl Default for MockTableNameProvider {
fn default() -> Self {
Self::new("bananas")
}
}
impl TableNameProvider for MockTableNameProvider {
fn for_table(&self, _id: TableId) -> DeferredLoad<TableName> {
let name = self.name.clone();
DeferredLoad::new(Duration::from_secs(1), async { name })
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::ShardIndex;
use test_helpers::timeout::FutureTimeout;
use super::*;
use crate::test_util::populate_catalog;
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
#[tokio::test]
async fn test_fetch() {
let metrics = Arc::new(metric::Registry::default());
let backoff_config = BackoffConfig::default();
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
// Populate the catalog with the shard / namespace / table
let (_shard_id, _ns_id, table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
let fetcher = Arc::new(TableNameResolver::new(
Duration::from_secs(10),
Arc::clone(&catalog),
backoff_config.clone(),
));
let got = fetcher
.for_table(table_id)
.get()
.with_timeout_panic(Duration::from_secs(5))
.await;
assert_eq!(&**got, TABLE_NAME);
}
}

View File

@ -0,0 +1,512 @@
//! Generic deferred execution of arbitrary [`Future`]'s.
use std::{fmt::Display, sync::Arc, time::Duration};
use futures::Future;
use observability_deps::tracing::*;
use parking_lot::Mutex;
use rand::Rng;
use tokio::{
sync::{
oneshot::{self, Sender},
Notify,
},
task::JoinHandle,
};
/// [`UNRESOLVED_DISPLAY_STRING`] defines the string shown when invoking the
/// [`Display`] implementation on a [`DeferredLoad`] that has not yet resolved
/// the deferred value.
pub(crate) const UNRESOLVED_DISPLAY_STRING: &str = "<unresolved>";
/// The states of a [`DeferredLoad`] instance.
#[derive(Debug)]
enum State<T> {
/// The value has not yet been fetched by the background task.
///
/// Sending a value will wake the background task.
Unresolved(Sender<()>),
/// The value is being actively resolved by the background task.
///
/// Callers can subscribe to a completion event by waiting on the
/// [`Notify`].
Loading(Arc<Notify>),
/// The value was fetched by the background task and is read to be consumed.
///
/// Only the background task ever sets this state.
Resolved(T),
}
/// A deferred resolver of `T` in the background, or on demand.
///
/// This implementation combines lazy / deferred loading of `T`, and a
/// background timer that pre-fetches `T` after some random duration of time.
/// Combined, these behaviours provide random jitter for the execution of the
/// resolve [`Future`] across the allowable time range.
///
/// If the [`DeferredLoad`] is dropped and the background task is still
/// incomplete (sleeping / actively fetching `T`) it is aborted immediately. The
/// background task exists once it has successfully resolved `T`.
///
/// # Stale Cached Values
///
/// This is effectively a cache that is pre-fetched in the background - this
/// necessitates that the caller can tolerate, or detect, stale values.
pub(crate) struct DeferredLoad<T> {
/// The inner state of the [`DeferredLoad`].
///
/// The [`Option`] facilitates taking ownership of the state to transition,
/// and MUST always be [`Some`] once the mutex is released.
value: Arc<Mutex<Option<State<T>>>>,
handle: JoinHandle<()>,
}
impl<T> std::fmt::Debug for DeferredLoad<T>
where
T: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeferredLoad")
.field("value", &self.value)
.field("handle", &self.handle)
.finish()
}
}
impl<T> Display for DeferredLoad<T>
where
T: Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.value.lock().as_ref().unwrap() {
State::Unresolved(_) | State::Loading(_) => f.write_str(UNRESOLVED_DISPLAY_STRING),
State::Resolved(v) => v.fmt(f),
}
}
}
impl<T> DeferredLoad<T> {
/// Provide a hint to the [`DeferredLoad`] that the value will be used soon.
///
/// This allows the value to be materialised in the background, in parallel
/// while the caller is executing code that will eventually call
/// [`Self::get()`].
pub(crate) fn prefetch_now(&self) {
let mut state = self.value.lock();
// If the value has already resolved, this call is a NOP.
if let Some(State::Resolved(_)) = &*state {
return;
}
// Potentially transition the state, discarding the waker.
let (_waker, new_state) = self.get_load_waker(state.take().unwrap());
*state = Some(new_state);
}
/// Potentially transition `state`, returning the [`Notify`] that will be
/// signalled when loading the value completes, and the (potentially
/// changed) state.
///
/// # Panics
///
/// This method panics if `state` is [`State::Resolved`].
fn get_load_waker(&self, state: State<T>) -> (Arc<Notify>, State<T>) {
let waker = match state {
// This caller is the first to demand the value - wake the
// background task, initialise the notification mechanism and
// wait for the task to complete.
State::Unresolved(task_waker) => {
// Wake the running background task, ignoring any send error
// as the background task may have concurrently woken up due
// to the sleep timer and stopped listening on the waker
// channel.
let _ = task_waker.send(());
// Replace the state with a notification for this thread
// (and others that call get()) to wait on for the
// concurrent fetch to complete.
Arc::new(Notify::default())
}
// If the value is already being fetched, wait for the fetch to
// complete.
State::Loading(waker) => waker,
// This was checked above before take()-ing the state.
State::Resolved(_) => unreachable!(),
};
// Ensure any subsequent callers can subscribe to the completion
// event by transitioning to the loading state.
let state = State::Loading(Arc::clone(&waker));
// Whenever there is a waker for the caller, the background task
// MUST be running.
//
// This check happens before the state lock is released, ensuring
// the background task doesn't concurrently finish (it would be
// blocked waiting to update the state).
assert!(!self.handle.is_finished());
(waker, state)
}
}
impl<T> DeferredLoad<T>
where
T: Send + Sync + 'static,
{
/// Construct a [`DeferredLoad`] instance that fetches `T` after at most
/// `max_wait` duration of time, by executing `F`.
///
/// The background task will wait a uniformly random duration of time
/// between `[0, max_wait)` before attempting to pre-fetch `T` by executing
/// the provided future.
pub(crate) fn new<F>(max_wait: Duration, loader: F) -> Self
where
F: Future<Output = T> + Send + 'static,
{
// Init the value container the background thread populates, and
// populate the starting state with a handle to immediately wake the
// background task.
let (tx, rx) = oneshot::channel();
let value = Arc::new(Mutex::new(Some(State::Unresolved(tx))));
// Select random duration from a uniform distribution, up to the
// configured maximum.
let wait_for = rand::thread_rng().gen_range(Duration::ZERO..max_wait);
// Spawn the background task, sleeping for the random duration of time
// before fetching the sort key.
let handle = tokio::spawn({
let value = Arc::clone(&value);
async move {
// Sleep for the random duration, or until a demand call is
// made.
tokio::select! {
_ = tokio::time::sleep(wait_for) => {
trace!("timeout woke loader task");
}
_ = rx => {
trace!("demand call woke loader task");
}
}
// Execute the user-provided future to resolve the actual value.
let v = loader.await;
// And attempt to update the value container, if it hasn't
// already resolved.
//
// This will panic if the value has already been resolved, but
// that should be impossible because this task is the one that
// resolves it.
let callers = {
let mut guard = value.lock();
match guard.take().unwrap() {
State::Unresolved(_) => {
// The background task woke and completed before any
// caller demanded the value.
*guard = Some(State::Resolved(v));
None
}
State::Loading(callers) => {
// At least one caller is demanding the value, and
// must be woken after the lock is released.
*guard = Some(State::Resolved(v));
Some(callers)
}
State::Resolved(_) => unreachable!(),
}
};
// Wake the waiters, if any, outside of the lock to avoid
// unnecessary contention. If there are >1 threads waiting for
// the value, they make contend for the value lock however.
if let Some(callers) = callers {
callers.notify_waiters();
}
}
});
Self { value, handle }
}
}
impl<T> DeferredLoad<T>
where
T: Clone + Send + Sync,
{
/// Read `T`.
///
/// If `T` was pre-fetched in the background, it is returned immediately. If
/// `T` has not yet been resolved, this call blocks while the [`Future`]
/// provided at construction is executed.
///
/// # Concurrency
///
/// If this method requires resolving `T`, all callers to this method will
/// wait for completion while the single background task resolves `T`.
///
/// # Cancellation
///
/// This method is cancellation safe.
pub(crate) async fn get(&self) -> T {
let waker = {
let mut state = self.value.lock();
// The happy path - the value has been resolved already.
if let Some(State::Resolved(v)) = &*state {
return v.clone();
}
// If execution reaches here, this call will have to wait for the
// value to be resolved, and potentially must wake the background
// task to do so.
let (waker, new_state) = self.get_load_waker(state.take().unwrap());
*state = Some(new_state);
waker
};
// Wait for the background task to complete resolving the value.
waker.notified().await;
match self.value.lock().as_ref().unwrap() {
State::Unresolved(_) | State::Loading(_) => unreachable!(),
State::Resolved(v) => v.clone(),
}
}
}
impl<T> Drop for DeferredLoad<T> {
fn drop(&mut self) {
// Attempt to abort the background task, regardless of it having
// completed or not.
self.handle.abort()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use futures::{executor::block_on, future, pin_mut};
use test_helpers::timeout::FutureTimeout;
use super::*;
const TIMEOUT: Duration = Duration::from_secs(5);
/// Tests that want to exercise the demand loading configure the
/// DeferredLoad delay to this value, encouraging the background load to
/// happen a long time in the future.
///
/// Because the delay before the background task runs is selected from a
/// uniform distribution, it is certainly possible the background task runs
/// before the demand call, invalidating the test - it's just statistically
/// very unlikely. Over multiple test runs, the odds of the background task
/// running before the demand call is so low, that if you see it happen
/// multiple times, I'd suggest buying a lottery ticket.
const LONG_LONG_TIME: Duration = Duration::from_secs(100_000_000);
#[tokio::test]
async fn test_demand() {
let d = DeferredLoad::new(LONG_LONG_TIME, async { 42 });
assert_eq!(d.get().with_timeout_panic(TIMEOUT).await, 42);
// Subsequent calls also succeed
assert_eq!(d.get().with_timeout_panic(TIMEOUT).await, 42);
}
#[tokio::test]
async fn test_concurrent_demand() {
let d = Arc::new(DeferredLoad::new(LONG_LONG_TIME, async {
// Add a little delay to induce some contention
tokio::time::sleep(Duration::from_millis(50)).await;
42
}));
// Synchronise the attempts of the threads.
let barrier = Arc::new(std::sync::Barrier::new(2));
// In a different thread (because the barrier is blocking) try to
// resolve the value in parallel.
let h = std::thread::spawn({
let d = Arc::clone(&d);
let barrier = Arc::clone(&barrier);
move || {
let got = block_on(async {
barrier.wait();
d.get().await
});
assert_eq!(got, 42);
}
});
barrier.wait();
assert_eq!(d.get().with_timeout_panic(TIMEOUT).await, 42);
// Assert the thread didn't panic.
h.join().expect("second thread panicked resolving value")
}
#[tokio::test]
async fn test_background_load() {
// Configure the background load to fire (practically) immediately.
let d = Arc::new(DeferredLoad::new(Duration::from_millis(1), async { 42 }));
// Spin and wait for the state to change to resolved WITHOUT a demand
// call being made.
let peek = {
let d = Arc::clone(&d);
async {
loop {
match d.value.lock().as_ref().unwrap() {
State::Unresolved(_) | State::Loading(_) => {}
State::Resolved(v) => return *v,
}
tokio::task::yield_now().await
}
}
.with_timeout_panic(TIMEOUT)
.await
};
let got = d.get().with_timeout_panic(TIMEOUT).await;
assert_eq!(got, peek);
assert_eq!(got, 42);
}
#[tokio::test]
async fn test_background_load_concurrent_demand() {
// This channel is used to signal the background load has begun.
let (signal_start, started) = oneshot::channel();
// This channel is used to block the background task from completing
// after the above channel has signalled it has begun.
let (allow_complete, can_complete) = oneshot::channel();
// Configure the background load to fire (practically) immediately but
// block waiting for rx to be unblocked.
//
// This allows the current thread time to issue a demand and wait on the
// result before the background load completes.
let d = Arc::new(DeferredLoad::new(Duration::from_millis(1), async {
// Signal the background task has begun.
signal_start.send(()).expect("test task died");
// Wait for the test thread to issue the demand call and unblock
// this fn.
can_complete.await.expect("sender died");
42
}));
// Wait for the background task to begin.
started
.with_timeout_panic(TIMEOUT)
.await
.expect("background task died");
// Issue a demand call
let fut = future::maybe_done(d.get());
pin_mut!(fut);
assert_eq!(fut.as_mut().take_output(), None);
// Unblock the background task.
allow_complete.send(()).expect("background task died");
// And await the demand call
fut.as_mut().with_timeout_panic(TIMEOUT).await;
assert_eq!(fut.as_mut().take_output(), Some(42));
}
#[tokio::test]
async fn test_display() {
// This channel is used to block the background task from completing
// after the above channel has signalled it has begun.
let (allow_complete, can_complete) = oneshot::channel();
// Configure the background load to fire (practically) immediately but
// block waiting for rx to be unblocked.
let d = Arc::new(DeferredLoad::new(Duration::from_millis(1), async {
// Wait for the test thread to issue the demand call and unblock
// this fn.
can_complete.await.expect("sender died");
42
}));
assert_eq!("<unresolved>", d.to_string());
// Issue a demand call
let fut = future::maybe_done(d.get());
pin_mut!(fut);
assert_eq!(fut.as_mut().take_output(), None);
assert_eq!("<unresolved>", d.to_string());
// Unblock the background task.
allow_complete.send(()).expect("background task died");
// And await the demand call
fut.as_mut().await;
assert_eq!(fut.as_mut().take_output(), Some(42));
// And assert Display is delegated to the resolved value
assert_eq!("42", d.to_string());
}
#[tokio::test]
async fn test_prefetch_concurrent_demand() {
// This channel is used to signal the background load has begun.
let (signal_start, started) = oneshot::channel();
// This channel is used to block the background task from completing
// after the above channel has signalled it has begun.
let (allow_complete, can_complete) = oneshot::channel();
// Configure the background load to fire (practically) immediately but
// block waiting for rx to be unblocked.
//
// This allows the current thread time to issue a demand and wait on the
// result before the background load completes.
let d = Arc::new(DeferredLoad::new(LONG_LONG_TIME, async {
// Signal the background task has begun.
signal_start.send(()).expect("test task died");
// Wait for the test thread to issue the demand call and unblock
// this fn.
can_complete.await.expect("sender died");
42
}));
d.prefetch_now();
// Wait for the background task to begin.
started
.with_timeout_panic(Duration::from_secs(5))
.await
.expect("background task died");
// Issue a demand call
let fut = future::maybe_done(d.get());
pin_mut!(fut);
assert_eq!(fut.as_mut().take_output(), None);
// Unblock the background task.
allow_complete.send(()).expect("background task died");
// And await the demand call
fut.as_mut().await;
assert_eq!(fut.as_mut().take_output(), Some(42));
}
#[tokio::test]
async fn test_prefetch_already_loaded() {
let d = Arc::new(DeferredLoad::new(LONG_LONG_TIME, async { 42 }));
let _ = d.get().with_timeout_panic(TIMEOUT).await;
d.prefetch_now();
}
}

View File

@ -0,0 +1,39 @@
use std::collections::VecDeque;
use async_trait::async_trait;
use dml::DmlOperation;
use parking_lot::Mutex;
use super::{DmlError, DmlSink};
#[derive(Debug, Default)]
struct MockDmlSinkState {
calls: Vec<DmlOperation>,
ret: VecDeque<Result<(), DmlError>>,
}
#[derive(Debug, Default)]
pub(crate) struct MockDmlSink {
state: Mutex<MockDmlSinkState>,
}
impl MockDmlSink {
pub(crate) fn with_apply_return(self, ret: impl Into<VecDeque<Result<(), DmlError>>>) -> Self {
self.state.lock().ret = ret.into();
self
}
pub(crate) fn get_calls(&self) -> Vec<DmlOperation> {
self.state.lock().calls.clone()
}
}
#[async_trait]
impl DmlSink for MockDmlSink {
type Error = DmlError;
async fn apply(&self, op: DmlOperation) -> Result<(), DmlError> {
let mut state = self.state.lock();
state.calls.push(op);
state.ret.pop_front().expect("no mock sink value to return")
}
}

View File

@ -0,0 +1,5 @@
mod r#trait;
pub(crate) use r#trait::*;
#[cfg(test)]
pub(crate) mod mock_sink;

View File

@ -0,0 +1,34 @@
use std::{error::Error, fmt::Debug, ops::Deref, sync::Arc};
use async_trait::async_trait;
use dml::DmlOperation;
use thiserror::Error;
#[derive(Debug, Error)]
pub(crate) enum DmlError {
/// An error applying a [`DmlOperation`] to a [`BufferTree`].
///
/// [`BufferTree`]: crate::buffer_tree::BufferTree
#[error("failed to buffer op: {0}")]
Buffer(#[from] mutable_batch::Error),
}
/// A [`DmlSink`] handles [`DmlOperation`] instances in some abstract way.
#[async_trait]
pub(crate) trait DmlSink: Debug + Send + Sync {
type Error: Error + Into<DmlError> + Send;
/// Apply `op` to the implementer's state.
async fn apply(&self, op: DmlOperation) -> Result<(), Self::Error>;
}
#[async_trait]
impl<T> DmlSink for Arc<T>
where
T: DmlSink,
{
type Error = T::Error;
async fn apply(&self, op: DmlOperation) -> Result<(), Self::Error> {
self.deref().apply(op).await
}
}

46
ingester2/src/lib.rs Normal file
View File

@ -0,0 +1,46 @@
//! IOx Ingester V2 implementation.
#![allow(dead_code)] // Until ingester2 is used.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
clippy::clone_on_ref_ptr,
clippy::dbg_macro,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::todo,
clippy::use_self,
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
unreachable_pub
)]
use data_types::ShardId;
/// During the testing of ingester2, the catalog will require a ShardId for
/// various operations. This is a const value for these occasions.
const TRANSITION_SHARD_ID: ShardId = ShardId::new(1);
//
// !!! PLEASE DO NOT EXPORT !!!
//
// Please be judicious when deciding if something should be visible outside this
// crate. Overzealous use of `pub` exposes the internals of an Ingester and
// causes tight coupling with other components. I know it's tempting, but it
// causes problems in the long run.
//
// Ideally anything that NEEDS to be shared with other components happens via a
// shared common crate. Any external components should interact with an Ingester
// through its public API only, and not by poking around at the internals.
//
mod arcmap;
mod buffer_tree;
mod deferred_load;
mod dml_sink;
mod query;
mod query_adaptor;
mod sequence_range;
#[cfg(test)]
mod test_util;

View File

@ -0,0 +1,38 @@
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use observability_deps::tracing::*;
use trace::span::{Span, SpanRecorder};
use super::{QueryError, QueryExec};
use crate::query::response::Response;
#[derive(Debug)]
pub(crate) struct QueryRunner;
impl QueryRunner {
pub(crate) fn new() -> Self {
Self {}
}
}
#[async_trait]
impl QueryExec for QueryRunner {
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Response, QueryError> {
let mut _span_recorder = SpanRecorder::new(span);
info!(
namespace_id=%namespace_id,
table_id=%table_id,
columns=?columns,
"executing query"
);
unimplemented!();
}
}

View File

@ -0,0 +1,150 @@
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
use trace::span::Span;
use super::{response::Response, QueryExec};
use crate::query::QueryError;
/// An instrumentation decorator over a [`QueryExec`] implementation.
///
/// This wrapper captures the latency distribution of the decorated
/// [`QueryExec::query_exec()`] call, faceted by success/error result.
#[derive(Debug)]
pub(crate) struct QueryExecInstrumentation<T, P = SystemProvider> {
inner: T,
time_provider: P,
/// Query execution duration distribution for successes.
query_duration_success: DurationHistogram,
/// Query execution duration distribution for "not found" errors
query_duration_error_not_found: DurationHistogram,
}
impl<T> QueryExecInstrumentation<T> {
pub(crate) fn new(inner: T, metrics: &metric::Registry) -> Self {
// Record query duration metrics, broken down by query execution result
let query_duration: Metric<DurationHistogram> = metrics.register_metric(
"ingester_flight_query_duration",
"flight request query execution duration",
);
let query_duration_success = query_duration.recorder(&[("result", "success")]);
let query_duration_error_not_found =
query_duration.recorder(&[("result", "error"), ("reason", "not_found")]);
Self {
inner,
time_provider: Default::default(),
query_duration_success,
query_duration_error_not_found,
}
}
}
#[async_trait]
impl<T, P> QueryExec for QueryExecInstrumentation<T, P>
where
T: QueryExec,
P: TimeProvider,
{
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Response, QueryError> {
let t = self.time_provider.now();
let res = self
.inner
.query_exec(namespace_id, table_id, columns, span)
.await;
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self.query_duration_success.record(delta),
Err(QueryError::TableNotFound { .. } | QueryError::NamespaceNotFound { .. }) => {
self.query_duration_error_not_found.record(delta)
}
};
}
res
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use metric::Attributes;
use super::*;
use crate::query::{mock_query_exec::MockQueryExec, response::PartitionStream};
macro_rules! test_metric {
(
$name:ident,
inner = $inner:expr,
want_metric_attr = $want_metric_attr:expr,
want_ret = $($want_ret:tt)+
) => {
paste::paste! {
#[tokio::test]
async fn [<test_metric_ $name>]() {
let metrics = metric::Registry::default();
let decorator = QueryExecInstrumentation::new($inner, &metrics);
// Call the decorator and assert the return value
let got = decorator
.query_exec(NamespaceId::new(42), TableId::new(24), vec![], None)
.await;
assert_matches!(got, $($want_ret)+);
// Validate the histogram with the specified attributes saw
// an observation
let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("ingester_flight_query_duration")
.expect("failed to find metric")
.get_observer(&Attributes::from(&$want_metric_attr))
.expect("failed to find attributes")
.fetch();
assert_eq!(histogram.sample_count(), 1);
}
}
};
}
test_metric!(
ok,
inner = {
let stream: PartitionStream = Box::pin(Box::new(futures::stream::iter([])));
MockQueryExec::default().with_result(Ok(Response::new(stream)))
},
want_metric_attr = [("result", "success")],
want_ret = Ok(_)
);
test_metric!(
namespace_not_found,
inner = MockQueryExec::default()
.with_result(Err(QueryError::NamespaceNotFound(NamespaceId::new(42)))),
want_metric_attr = [("result", "error"), ("reason", "not_found")],
want_ret = Err(QueryError::NamespaceNotFound(ns)) => {
assert_eq!(ns, NamespaceId::new(42));
}
);
test_metric!(
table_not_found,
inner = MockQueryExec::default()
.with_result(Err(QueryError::TableNotFound(NamespaceId::new(42), TableId::new(24)))),
want_metric_attr = [("result", "error"), ("reason", "not_found")],
want_ret = Err(QueryError::TableNotFound(ns, t)) => {
assert_eq!(ns, NamespaceId::new(42));
assert_eq!(t, TableId::new(24));
}
);
}

View File

@ -0,0 +1,34 @@
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use parking_lot::Mutex;
use trace::span::Span;
use super::{response::Response, QueryError, QueryExec};
#[derive(Debug, Default)]
pub(crate) struct MockQueryExec {
response: Mutex<Option<Result<Response, QueryError>>>,
}
impl MockQueryExec {
pub(crate) fn with_result(self, r: Result<Response, QueryError>) -> Self {
*self.response.lock() = Some(r);
self
}
}
#[async_trait]
impl QueryExec for MockQueryExec {
async fn query_exec(
&self,
_namespace_id: NamespaceId,
_table_id: TableId,
_columns: Vec<String>,
_span: Option<Span>,
) -> Result<Response, QueryError> {
self.response
.lock()
.take()
.unwrap_or(Err(QueryError::NamespaceNotFound(NamespaceId::new(42))))
}
}

View File

@ -0,0 +1,14 @@
//! Query execution abstraction & types.
mod r#trait;
pub(crate) use r#trait::*;
// Response types
pub(crate) mod partition_response;
pub(crate) mod response;
pub(crate) mod exec;
pub(crate) mod instrumentation;
#[cfg(test)]
pub(crate) mod mock_query_exec;

View File

@ -0,0 +1,64 @@
//! The per-partition data nested in a query [`Response`].
//!
//! [`Response`]: super::response::Response
use std::pin::Pin;
use arrow::error::ArrowError;
use data_types::{PartitionId, SequenceNumber};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::Stream;
/// Stream of [`RecordBatch`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
pub(crate) type RecordBatchStream =
Pin<Box<dyn Stream<Item = Result<SendableRecordBatchStream, ArrowError>> + Send>>;
/// Response data for a single partition.
pub(crate) struct PartitionResponse {
/// Stream of snapshots.
batches: RecordBatchStream,
/// Partition ID.
id: PartitionId,
/// Max sequence number persisted
max_persisted_sequence_number: Option<SequenceNumber>,
}
impl std::fmt::Debug for PartitionResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PartitionResponse")
.field("batches", &"<SNAPSHOT STREAM>")
.field("partition_id", &self.id)
.field("max_persisted", &self.max_persisted_sequence_number)
.finish()
}
}
impl PartitionResponse {
pub(crate) fn new(
batches: RecordBatchStream,
id: PartitionId,
max_persisted_sequence_number: Option<SequenceNumber>,
) -> Self {
Self {
batches,
id,
max_persisted_sequence_number,
}
}
pub(crate) fn id(&self) -> PartitionId {
self.id
}
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
self.max_persisted_sequence_number
}
pub(crate) fn into_record_batch_stream(self) -> RecordBatchStream {
self.batches
}
}

View File

@ -0,0 +1,62 @@
//! The response type returned from a query [`QueryExec::query_exec()`] call.
//!
//! [`QueryExec::query_exec()`]: super::QueryExec::query_exec()
use std::{pin::Pin, sync::Arc};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
use futures::{Stream, StreamExt, TryStreamExt};
use super::partition_response::PartitionResponse;
/// Stream of partitions in this response.
pub(crate) type PartitionStream =
Pin<Box<dyn Stream<Item = Result<PartitionResponse, ArrowError>> + Send>>;
/// A response stream wrapper for ingester query requests.
///
/// The data structure is constructed to allow lazy/streaming/pull-based data
/// sourcing..
pub(crate) struct Response {
/// Stream of partitions.
partitions: PartitionStream,
}
impl std::fmt::Debug for Response {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Response")
.field("partitions", &"<PARTITION STREAM>")
.finish()
}
}
impl Response {
/// Make a response
pub(crate) fn new(partitions: PartitionStream) -> Self {
Self { partitions }
}
/// Return the stream of [`PartitionResponse`].
pub(crate) fn into_partition_stream(self) -> PartitionStream {
self.partitions
}
/// Reduce the [`Response`] to a set of [`RecordBatch`].
pub(crate) async fn into_record_batches(self) -> Result<Vec<RecordBatch>, ArrowError> {
self.into_partition_stream()
.map_ok(|partition| {
partition
.into_record_batch_stream()
.map_ok(|snapshot| {
let schema = Arc::new(optimize_schema(&snapshot.schema()));
snapshot
.map(move |batch| optimize_record_batch(&batch?, Arc::clone(&schema)))
})
.try_flatten()
})
.try_flatten()
.try_collect()
.await
}
}

View File

@ -0,0 +1,47 @@
use std::{fmt::Debug, ops::Deref, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use thiserror::Error;
use trace::span::Span;
use super::response::Response;
#[derive(Debug, Error)]
#[allow(missing_copy_implementations)]
pub(crate) enum QueryError {
#[error("namespace id {0} not found")]
NamespaceNotFound(NamespaceId),
#[error("table id {1} not found in namespace id {0}")]
TableNotFound(NamespaceId, TableId),
}
#[async_trait]
pub(crate) trait QueryExec: Send + Sync + Debug {
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Response, QueryError>;
}
#[async_trait]
impl<T> QueryExec for Arc<T>
where
T: QueryExec,
{
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Response, QueryError> {
self.deref()
.query_exec(namespace_id, table_id, columns, span)
.await
}
}

View File

@ -0,0 +1,212 @@
//! An adaptor over a set of [`RecordBatch`] allowing them to be used as an IOx
//! [`QueryChunk`].
use std::{any::Any, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::util::ensure_schema;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary};
use datafusion::error::DataFusionError;
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
util::{compute_timenanosecond_min_max, create_basic_summary},
QueryChunk, QueryChunkData, QueryChunkMeta,
};
use once_cell::sync::OnceCell;
use predicate::Predicate;
use schema::{merge::merge_record_batch_schemas, sort::SortKey, Projection, Schema};
/// A queryable wrapper over a set of ordered [`RecordBatch`] snapshot from a
/// single [`PartitionData`].
///
/// It is an invariant that a [`QueryAdaptor`] MUST always contain at least one
/// row. This frees the caller of having to reason about empty [`QueryAdaptor`]
/// instances yielding empty [`RecordBatch`].
///
/// [`PartitionData`]: crate::buffer_tree::partition::PartitionData
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct QueryAdaptor {
/// The snapshot data from a partition.
///
/// This MUST be non-pub(crate) / closed for modification / immutable to support
/// interning the merged schema in [`Self::schema()`].
data: Vec<Arc<RecordBatch>>,
/// The catalog ID of the partition the this data is part of.
partition_id: PartitionId,
/// Chunk ID.
id: ChunkId,
/// An interned schema for all [`RecordBatch`] in data.
schema: OnceCell<Arc<Schema>>,
/// An interned table summary.
summary: OnceCell<Arc<TableSummary>>,
}
impl QueryAdaptor {
/// Construct a [`QueryAdaptor`].
///
/// # Panics
///
/// This constructor panics if `data` contains no [`RecordBatch`], or all
/// [`RecordBatch`] are empty.
pub(crate) fn new(partition_id: PartitionId, data: Vec<Arc<RecordBatch>>) -> Self {
// There must always be at least one record batch and one row.
//
// This upholds an invariant that simplifies dealing with empty
// partitions - if there is a QueryAdaptor, it contains data.
assert!(data.iter().map(|b| b.num_rows()).sum::<usize>() > 0);
Self {
data,
partition_id,
// To return a value for debugging and make it consistent with ChunkId created in Compactor,
// use Uuid for this. Draw this UUID during chunk generation so that it is stable during the whole query process.
id: ChunkId::new(),
schema: OnceCell::default(),
summary: OnceCell::default(),
}
}
pub(crate) fn project_selection(&self, selection: Projection<'_>) -> Vec<RecordBatch> {
// Project the column selection across all RecordBatch
self.data
.iter()
.map(|data| {
let batch = data.as_ref();
let schema = batch.schema();
// Apply selection to in-memory batch
match selection {
Projection::All => batch.clone(),
Projection::Some(columns) => {
let projection = columns
.iter()
.flat_map(|&column_name| {
// ignore non-existing columns
schema.index_of(column_name).ok()
})
.collect::<Vec<_>>();
batch.project(&projection).expect("bug in projection")
}
}
})
.collect()
}
/// Returns the [`RecordBatch`] instances in this [`QueryAdaptor`].
pub(crate) fn record_batches(&self) -> &[Arc<RecordBatch>] {
self.data.as_ref()
}
/// Returns the partition ID from which the data this [`QueryAdaptor`] was
/// sourced from.
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
}
impl QueryChunkMeta for QueryAdaptor {
fn summary(&self) -> Arc<TableSummary> {
Arc::clone(self.summary.get_or_init(|| {
let ts_min_max = compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref()))
.expect("Should have time range");
Arc::new(create_basic_summary(
self.data.iter().map(|b| b.num_rows()).sum::<usize>() as u64,
&self.schema(),
ts_min_max,
))
}))
}
fn schema(&self) -> Arc<Schema> {
Arc::clone(
self.schema
.get_or_init(|| merge_record_batch_schemas(&self.data)),
)
}
fn partition_sort_key(&self) -> Option<&SortKey> {
None // Ingester data has not persisted yet and should not be attached to any partition
}
fn partition_id(&self) -> PartitionId {
self.partition_id
}
fn sort_key(&self) -> Option<&SortKey> {
None // Ingester data is not sorted
}
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
&[]
}
}
impl QueryChunk for QueryAdaptor {
fn id(&self) -> ChunkId {
self.id
}
/// Returns true if the chunk may contain a duplicate "primary key" within
/// itself
fn may_contain_pk_duplicates(&self) -> bool {
// always true because the rows across record batches have not been
// de-duplicated.
true
}
/// Returns a set of Strings with column names from the specified
/// table that have at least one row that matches `predicate`, if
/// the predicate can be evaluated entirely on the metadata of
/// this Chunk. Returns `None` otherwise
fn column_names(
&self,
_ctx: IOxSessionContext,
_predicate: &Predicate,
_columns: Projection<'_>,
) -> Result<Option<StringSet>, DataFusionError> {
Ok(None)
}
/// Return a set of Strings containing the distinct values in the
/// specified columns. If the predicate can be evaluated entirely
/// on the metadata of this Chunk. Returns `None` otherwise
///
/// The requested columns must all have String type.
fn column_values(
&self,
_ctx: IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<StringSet>, DataFusionError> {
Ok(None)
}
fn data(&self) -> QueryChunkData {
let schema = self.schema().as_arrow();
QueryChunkData::RecordBatches(
self.data
.iter()
.map(|b| ensure_schema(&schema, b).expect("schema handling broken"))
.collect(),
)
}
/// Returns chunk type
fn chunk_type(&self) -> &str {
"QueryAdaptor"
}
fn order(&self) -> ChunkOrder {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@ -0,0 +1,141 @@
use data_types::SequenceNumber;
/// A range of sequence numbers, both inclusive [min, max].
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub(crate) struct SequenceNumberRange {
range: Option<(SequenceNumber, SequenceNumber)>,
}
impl SequenceNumberRange {
pub(crate) fn observe(&mut self, n: SequenceNumber) {
self.range = Some(match self.range {
Some((min, max)) => {
assert!(n > max, "monotonicity violation");
(min, n)
}
None => (n, n),
});
}
/// Returns the inclusive lower bound on [`SequenceNumber`] values observed.
pub(crate) fn inclusive_min(&self) -> Option<SequenceNumber> {
self.range.map(|v| v.0)
}
/// Returns the inclusive upper bound on [`SequenceNumber`] values observed.
pub(crate) fn inclusive_max(&self) -> Option<SequenceNumber> {
self.range.map(|v| v.1)
}
/// Merge two [`SequenceNumberRange`] instances, returning a new, merged
/// instance.
///
/// The merge result contains the minimum of [`Self::inclusive_min()`] from
/// each instance, and the maximum of [`Self::inclusive_max()`].
///
/// If both `self` and `other` contain no [`SequenceNumber`] observations,
/// the returned instance contains no observations.
pub(crate) fn merge(&self, other: &Self) -> Self {
let merged_range = self
.range
.into_iter()
.chain(other.range)
.reduce(|a, b| (a.0.min(b.0), a.1.max(b.1)));
Self {
range: merged_range,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ranges() {
let mut r = SequenceNumberRange::default();
r.observe(SequenceNumber::new(0));
r.observe(SequenceNumber::new(2));
r.observe(SequenceNumber::new(3));
assert_eq!(r.inclusive_min(), Some(SequenceNumber::new(0)));
assert_eq!(r.inclusive_max(), Some(SequenceNumber::new(3)));
}
#[test]
#[should_panic = "monotonicity violation"]
fn test_monotonicity() {
let mut r = SequenceNumberRange::default();
r.observe(SequenceNumber::new(1));
r.observe(SequenceNumber::new(3));
r.observe(SequenceNumber::new(2));
}
#[test]
#[should_panic = "monotonicity violation"]
fn test_exactly_once() {
let mut r = SequenceNumberRange::default();
r.observe(SequenceNumber::new(1));
r.observe(SequenceNumber::new(1));
}
#[test]
fn test_merge() {
let mut a = SequenceNumberRange::default();
let mut b = SequenceNumberRange::default();
a.observe(SequenceNumber::new(4));
b.observe(SequenceNumber::new(2));
let a_b = a.merge(&b);
assert_eq!(a_b.inclusive_min(), Some(SequenceNumber::new(2)));
assert_eq!(a_b.inclusive_max(), Some(SequenceNumber::new(4)));
let b_a = b.merge(&a);
assert_eq!(b_a.inclusive_min(), Some(SequenceNumber::new(2)));
assert_eq!(b_a.inclusive_max(), Some(SequenceNumber::new(4)));
assert_eq!(a_b, b_a);
}
#[test]
fn test_merge_half_empty() {
let mut a = SequenceNumberRange::default();
let b = SequenceNumberRange::default();
a.observe(SequenceNumber::new(4));
// B observes nothing
let a_b = a.merge(&b);
assert_eq!(a_b.inclusive_min(), Some(SequenceNumber::new(4)));
assert_eq!(a_b.inclusive_max(), Some(SequenceNumber::new(4)));
let b_a = b.merge(&a);
assert_eq!(b_a.inclusive_min(), Some(SequenceNumber::new(4)));
assert_eq!(b_a.inclusive_max(), Some(SequenceNumber::new(4)));
assert_eq!(a_b, b_a);
}
#[test]
fn test_merge_both_empty() {
let a = SequenceNumberRange::default();
let b = SequenceNumberRange::default();
// Neither observe anything
let a_b = a.merge(&b);
assert_eq!(a_b.inclusive_min(), None);
assert_eq!(a_b.inclusive_max(), None);
let b_a = b.merge(&a);
assert_eq!(b_a.inclusive_min(), None);
assert_eq!(b_a.inclusive_max(), None);
assert_eq!(a_b, b_a);
}
}

View File

@ -0,0 +1,70 @@
use data_types::{
NamespaceId, PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId,
};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::interface::Catalog;
use mutable_batch_lp::lines_to_batches;
/// Construct a [`DmlWrite`] with the specified parameters, for LP that contains
/// a single table identified by `table_id`.
///
/// # Panics
///
/// This method panics if `lines` contains data for more than one table.
pub(crate) fn make_write_op(
partition_key: &PartitionKey,
namespace_id: NamespaceId,
table_name: &str,
table_id: TableId,
sequence_number: i64,
lines: &str,
) -> DmlWrite {
let mut tables_by_name = lines_to_batches(lines, 0).unwrap();
assert_eq!(tables_by_name.len(), 1);
let tables_by_id = [(table_id, tables_by_name.remove(table_name).unwrap())]
.into_iter()
.collect();
DmlWrite::new(
namespace_id,
tables_by_id,
partition_key.clone(),
DmlMeta::sequenced(
Sequence {
shard_index: ShardIndex::new(i32::MAX),
sequence_number: SequenceNumber::new(sequence_number),
},
iox_time::Time::MIN,
None,
42,
),
)
}
pub(crate) async fn populate_catalog(
catalog: &dyn Catalog,
shard_index: ShardIndex,
namespace: &str,
table: &str,
) -> (ShardId, NamespaceId, TableId) {
let mut c = catalog.repositories().await;
let topic = c.topics().create_or_get("kafka-topic").await.unwrap();
let query_pool = c.query_pools().create_or_get("query-pool").await.unwrap();
let ns_id = c
.namespaces()
.create(namespace, None, topic.id, query_pool.id)
.await
.unwrap()
.id;
let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id;
let shard_id = c
.shards()
.create_or_get(&topic, shard_index)
.await
.unwrap()
.id;
assert_eq!(shard_id, crate::TRANSITION_SHARD_ID);
(shard_id, ns_id, table_id)
}