chore: Merge remote-tracking branch 'origin/main' into smith/remove-transactions-main

pull/24376/head
Jeffrey Smith II 2023-05-16 08:49:25 -04:00
commit 45628ba4d3
30 changed files with 1594 additions and 669 deletions

28
Cargo.lock generated
View File

@ -6271,9 +6271,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.3.2"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2"
checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2"
dependencies = [
"getrandom",
]
@ -6309,6 +6309,7 @@ dependencies = [
name = "wal"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"byteorder",
"bytes",
@ -6317,6 +6318,8 @@ dependencies = [
"dml",
"futures",
"generated_types",
"hashbrown 0.13.2",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",
@ -6333,6 +6336,27 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "wal_inspect"
version = "0.1.0"
dependencies = [
"assert_matches",
"data_types",
"dml",
"generated_types",
"hashbrown 0.13.2",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"parquet_to_line_protocol",
"schema",
"test_helpers",
"thiserror",
"tokio",
"wal",
"workspace-hack",
]
[[package]]
name = "walkdir"
version = "2.3.3"

View File

@ -80,6 +80,7 @@ members = [
"tracker",
"trogging",
"wal",
"wal_inspect",
"workspace-hack",
]
default-members = ["influxdb_iox"]

View File

@ -137,10 +137,14 @@ mod tests {
use super::*;
use chrono::TimeZone;
use data_types::{
ColumnId, ColumnSet, CompactionLevel, NamespaceId, NamespaceName, ParquetFile,
ParquetFileParams, PartitionId, TableId, Timestamp,
ColumnId, ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileParams,
PartitionId, TableId, Timestamp,
};
use iox_catalog::{
interface::Catalog,
mem::MemCatalog,
test_helpers::{arbitrary_namespace, arbitrary_table},
};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use object_store::path::Path;
use once_cell::sync::Lazy;
use parquet_file::ParquetFilePath;
@ -155,19 +159,8 @@ mod tests {
let metric_registry = Arc::new(metric::Registry::new());
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(
&NamespaceName::new("namespace_parquet_file_test").unwrap(),
None,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test").await;
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
let partition = repos
.partitions()
.create_or_get("one".into(), table.id)

View File

@ -345,7 +345,10 @@ mod tests {
use crate::{AggregateTSMField, AggregateTSMTag};
use assert_matches::assert_matches;
use data_types::{PartitionId, TableId};
use iox_catalog::mem::MemCatalog;
use iox_catalog::{
mem::MemCatalog,
test_helpers::{arbitrary_namespace, arbitrary_table},
};
use std::collections::HashSet;
#[tokio::test]
@ -427,17 +430,8 @@ mod tests {
{
let mut txn = catalog.repositories().await;
// create namespace, table and columns for weather measurement
let namespace = txn
.namespaces()
.create(&NamespaceName::new("1234_5678").unwrap(), None)
.await
.expect("namespace created");
let mut table = txn
.tables()
.create_or_get("weather", namespace.id)
.await
.map(|t| TableSchema::new_empty_from(&t))
.expect("table created");
let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await;
let table = arbitrary_table(&mut *txn, "weather", &namespace).await;
let time_col = txn
.columns()
.create_or_get("time", table.id, ColumnType::Time)
@ -518,17 +512,8 @@ mod tests {
{
let mut txn = catalog.repositories().await;
// create namespace, table and columns for weather measurement
let namespace = txn
.namespaces()
.create(&NamespaceName::new("1234_5678").unwrap(), None)
.await
.expect("namespace created");
let mut table = txn
.tables()
.create_or_get("weather", namespace.id)
.await
.map(|t| TableSchema::new_empty_from(&t))
.expect("table created");
let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await;
let table = arbitrary_table(&mut *txn, "weather", &namespace).await;
let time_col = txn
.columns()
.create_or_get("time", table.id, ColumnType::Time)
@ -582,17 +567,8 @@ mod tests {
{
let mut txn = catalog.repositories().await;
// create namespace, table and columns for weather measurement
let namespace = txn
.namespaces()
.create(&NamespaceName::new("1234_5678").unwrap(), None)
.await
.expect("namespace created");
let mut table = txn
.tables()
.create_or_get("weather", namespace.id)
.await
.map(|t| TableSchema::new_empty_from(&t))
.expect("table created");
let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await;
let table = arbitrary_table(&mut *txn, "weather", &namespace).await;
let time_col = txn
.columns()
.create_or_get("time", table.id, ColumnType::Time)

View File

@ -47,7 +47,7 @@ tokio = { version = "1.28", features = ["macros", "parking_lot", "rt-multi-threa
tokio-util = "0.7.8"
tonic = { workspace = true }
trace = { version = "0.1.0", path = "../trace" }
uuid = "1.3.2"
uuid = "1.3.3"
wal = { version = "0.1.0", path = "../wal" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

View File

@ -99,6 +99,7 @@ mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table};
use super::*;
@ -114,18 +115,10 @@ mod tests {
let (namespace_id, table_id) = {
let mut repos = catalog.repositories().await;
let table_ns_name = data_types::NamespaceName::new(TABLE_NAME).unwrap();
let ns = repos
.namespaces()
.create(&table_ns_name, None)
.await
.unwrap();
let ns = arbitrary_namespace(&mut *repos, NAMESPACE_NAME).await;
let table = repos
.tables()
.create_or_get(TABLE_NAME, ns.id)
.await
.unwrap();
let table = arbitrary_table(&mut *repos, TABLE_NAME, &ns)
.await;
(ns.id, table.id)
};

View File

@ -2,7 +2,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::interface::Catalog;
use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
use lazy_static::lazy_static;
use mutable_batch_lp::lines_to_batches;
use schema::Projection;
@ -298,13 +298,7 @@ pub(crate) async fn populate_catalog(
table: &str,
) -> (NamespaceId, TableId) {
let mut c = catalog.repositories().await;
let namespace_name = data_types::NamespaceName::new(namespace).unwrap();
let ns_id = c
.namespaces()
.create(&namespace_name, None)
.await
.unwrap()
.id;
let ns_id = arbitrary_namespace(&mut *c, namespace).await.id;
let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id;
(ns_id, table_id)

View File

@ -17,8 +17,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use arrow::record_batch::RecordBatch;
use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket};
use data_types::{
Namespace, NamespaceId, NamespaceName, NamespaceSchema, ParquetFile, PartitionKey,
SequenceNumber, TableId,
Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId,
};
use dml::{DmlMeta, DmlWrite};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
@ -29,6 +28,7 @@ use ingester::{IngesterGuard, IngesterRpcInterface};
use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest;
use iox_catalog::{
interface::{Catalog, SoftDeletedRows},
test_helpers::arbitrary_namespace,
validate_or_insert_schema,
};
use iox_time::TimeProvider;
@ -203,14 +203,8 @@ where
name: &str,
retention_period_ns: Option<i64>,
) -> Namespace {
let ns = self
.catalog
.repositories()
.await
.namespaces()
.create(&NamespaceName::new(name).unwrap(), None)
.await
.expect("failed to create test namespace");
let mut repos = self.catalog.repositories().await;
let ns = arbitrary_namespace(&mut *repos, name).await;
assert!(
self.namespaces

View File

@ -679,7 +679,10 @@ pub async fn list_schemas(
#[cfg(test)]
pub(crate) mod test_helpers {
use crate::{validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES};
use crate::{
test_helpers::{arbitrary_namespace, arbitrary_table},
validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
};
use super::*;
use assert_matches::assert_matches;
@ -786,12 +789,7 @@ pub(crate) mod test_helpers {
.unwrap();
assert!(not_found.is_none());
let namespace2_name = NamespaceName::new("test_namespace2").unwrap();
let namespace2 = repos
.namespaces()
.create(&namespace2_name, None)
.await
.unwrap();
let namespace2 = arbitrary_namespace(&mut *repos, "test_namespace2").await;
let mut namespaces = repos
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
@ -834,13 +832,8 @@ pub(crate) mod test_helpers {
.expect("namespace should be updateable");
assert!(modified.retention_period_ns.is_none());
// create namespace with retention period NULL
let namespace3_name = NamespaceName::new("test_namespace3").unwrap();
let namespace3 = repos
.namespaces()
.create(&namespace3_name, None)
.await
.expect("namespace with NULL retention should be created");
// create namespace with retention period NULL (the default)
let namespace3 = arbitrary_namespace(&mut *repos, "test_namespace3").await;
assert!(namespace3.retention_period_ns.is_none());
// create namespace with retention period
@ -894,16 +887,8 @@ pub(crate) mod test_helpers {
async fn test_namespace_soft_deletion(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let deleted_ns = repos
.namespaces()
.create(&"deleted-ns".try_into().unwrap(), None)
.await
.unwrap();
let active_ns = repos
.namespaces()
.create(&"active-ns".try_into().unwrap(), None)
.await
.unwrap();
let deleted_ns = arbitrary_namespace(&mut *repos, "deleted-ns").await;
let active_ns = arbitrary_namespace(&mut *repos, "active-ns").await;
// Mark "deleted-ns" as soft-deleted.
repos.namespaces().soft_delete("deleted-ns").await.unwrap();
@ -1057,23 +1042,11 @@ pub(crate) mod test_helpers {
async fn test_table(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(&NamespaceName::new("namespace_table_test").unwrap(), None)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "namespace_table_test").await;
// test we can create or get a table
let t = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let tt = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let t = arbitrary_table(&mut *repos, "test_table", &namespace).await;
let tt = arbitrary_table(&mut *repos, "test_table", &namespace).await;
assert!(t.id > TableId::new(0));
assert_eq!(t, tt);
@ -1094,26 +1067,14 @@ pub(crate) mod test_helpers {
assert_eq!(vec![t.clone()], tables);
// test we can create a table of the same name in a different namespace
let namespace2 = repos
.namespaces()
.create(&NamespaceName::new("two").unwrap(), None)
.await
.unwrap();
let namespace2 = arbitrary_namespace(&mut *repos, "two").await;
assert_ne!(namespace, namespace2);
let test_table = repos
.tables()
.create_or_get("test_table", namespace2.id)
.await
.unwrap();
let test_table = arbitrary_table(&mut *repos, "test_table", &namespace2).await;
assert_ne!(tt, test_table);
assert_eq!(test_table.namespace_id, namespace2.id);
// test get by namespace and name
let foo_table = repos
.tables()
.create_or_get("foo", namespace2.id)
.await
.unwrap();
let foo_table = arbitrary_table(&mut *repos, "foo", &namespace2).await;
assert_eq!(
repos
.tables()
@ -1194,16 +1155,8 @@ pub(crate) mod test_helpers {
async fn test_column(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(&NamespaceName::new("namespace_column_test").unwrap(), None)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "namespace_column_test").await;
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
assert_eq!(table.namespace_id, namespace.id);
// test we can create or get a column
@ -1230,11 +1183,7 @@ pub(crate) mod test_helpers {
assert!(matches!(err, Error::ColumnTypeMismatch { .. }));
// test that we can create a column of the same name under a different table
let table2 = repos
.tables()
.create_or_get("test_table_2", namespace.id)
.await
.unwrap();
let table2 = arbitrary_table(&mut *repos, "test_table_2", &namespace).await;
let ccc = repos
.columns()
.create_or_get("column_test", table2.id, ColumnType::U64)
@ -1301,11 +1250,7 @@ pub(crate) mod test_helpers {
));
// test per-namespace column limits are NOT enforced with create_or_get_many_unchecked
let table3 = repos
.tables()
.create_or_get("test_table_3", namespace.id)
.await
.unwrap();
let table3 = arbitrary_table(&mut *repos, "test_table_3", &namespace).await;
let mut columns = HashMap::new();
columns.insert("apples", ColumnType::Tag);
columns.insert("oranges", ColumnType::Tag);
@ -1327,19 +1272,8 @@ pub(crate) mod test_helpers {
async fn test_partition(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(
&NamespaceName::new("namespace_partition_test").unwrap(),
None,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "namespace_partition_test").await;
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
let mut created = BTreeMap::new();
for key in ["foo", "bar"] {
@ -1612,24 +1546,9 @@ pub(crate) mod test_helpers {
/// tests many interactions with the catalog and parquet files. See the individual conditions herein
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(
&NamespaceName::new("namespace_parquet_file_test").unwrap(),
None,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let other_table = repos
.tables()
.create_or_get("other", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test").await;
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
let other_table = arbitrary_table(&mut *repos, "other", &namespace).await;
let partition = repos
.partitions()
.create_or_get("one".into(), table.id)
@ -1800,19 +1719,8 @@ pub(crate) mod test_helpers {
assert_eq!(files.len(), 1);
// test list_by_namespace_not_to_delete
let namespace2 = repos
.namespaces()
.create(
&NamespaceName::new("namespace_parquet_file_test1").unwrap(),
None,
)
.await
.unwrap();
let table2 = repos
.tables()
.create_or_get("test_table2", namespace2.id)
.await
.unwrap();
let namespace2 = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test1").await;
let table2 = arbitrary_table(&mut *repos, "test_table2", &namespace2).await;
let partition2 = repos
.partitions()
.create_or_get("foo".into(), table2.id)
@ -2098,26 +2006,14 @@ pub(crate) mod test_helpers {
async fn test_parquet_file_delete_broken(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace_1 = repos
.namespaces()
.create(&NamespaceName::new("retention_broken_1").unwrap(), None)
.await
.unwrap();
let namespace_1 = arbitrary_namespace(&mut *repos, "retention_broken_1").await;
let namespace_2 = repos
.namespaces()
.create(&NamespaceName::new("retention_broken_2").unwrap(), Some(1))
.await
.unwrap();
let table_1 = repos
.tables()
.create_or_get("test_table", namespace_1.id)
.await
.unwrap();
let table_2 = repos
.tables()
.create_or_get("test_table", namespace_2.id)
.await
.unwrap();
let table_1 = arbitrary_table(&mut *repos, "test_table", &namespace_1).await;
let table_2 = arbitrary_table(&mut *repos, "test_table", &namespace_2).await;
let partition_1 = repos
.partitions()
.create_or_get("one".into(), table_1.id)
@ -2178,19 +2074,9 @@ pub(crate) mod test_helpers {
async fn test_partitions_new_file_between(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(
&NamespaceName::new("test_partitions_new_file_between").unwrap(),
None,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table_for_new_file_between", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "test_partitions_new_file_between").await;
let table =
arbitrary_table(&mut *repos, "test_table_for_new_file_between", &namespace).await;
// param for the tests
let time_now = Timestamp::from(catalog.time_provider().now());
@ -2547,20 +2433,12 @@ pub(crate) mod test_helpers {
async fn test_list_by_partiton_not_to_delete(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(
&NamespaceName::new("namespace_parquet_file_test_list_by_partiton_not_to_delete")
.unwrap(),
None,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(
&mut *repos,
"namespace_parquet_file_test_list_by_partiton_not_to_delete",
)
.await;
let table = arbitrary_table(&mut *repos, "test_table", &namespace).await;
let partition = repos
.partitions()
@ -2658,19 +2536,9 @@ pub(crate) mod test_helpers {
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(
&NamespaceName::new("namespace_update_to_compaction_level_1_test").unwrap(),
None,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("update_table", namespace.id)
.await
.unwrap();
let namespace =
arbitrary_namespace(&mut *repos, "namespace_update_to_compaction_level_1_test").await;
let table = arbitrary_table(&mut *repos, "update_table", &namespace).await;
let partition = repos
.partitions()
.create_or_get("test_update_to_compaction_level_1_one".into(), table.id)
@ -2747,19 +2615,9 @@ pub(crate) mod test_helpers {
/// effective.
async fn test_delete_namespace(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let namespace_1 = repos
.namespaces()
.create(
&NamespaceName::new("namespace_test_delete_namespace_1").unwrap(),
None,
)
.await
.unwrap();
let table_1 = repos
.tables()
.create_or_get("test_table_1", namespace_1.id)
.await
.unwrap();
let namespace_1 =
arbitrary_namespace(&mut *repos, "namespace_test_delete_namespace_1").await;
let table_1 = arbitrary_table(&mut *repos, "test_table_1", &namespace_1).await;
let _c = repos
.columns()
.create_or_get("column_test_1", table_1.id, ColumnType::Tag)
@ -2805,19 +2663,9 @@ pub(crate) mod test_helpers {
// we've now created a namespace with a table and parquet files. before we test deleting
// it, let's create another so we can ensure that doesn't get deleted.
let namespace_2 = repos
.namespaces()
.create(
&NamespaceName::new("namespace_test_delete_namespace_2").unwrap(),
None,
)
.await
.unwrap();
let table_2 = repos
.tables()
.create_or_get("test_table_2", namespace_2.id)
.await
.unwrap();
let namespace_2 =
arbitrary_namespace(&mut *repos, "namespace_test_delete_namespace_2").await;
let table_2 = arbitrary_table(&mut *repos, "test_table_2", &namespace_2).await;
let _c = repos
.columns()
.create_or_get("column_test_2", table_2.id, ColumnType::Tag)

View File

@ -201,6 +201,56 @@ where
Ok(())
}
/// Catalog helper functions for creation of catalog objects
pub mod test_helpers {
use crate::RepoCollection;
use data_types::{Namespace, NamespaceName, Table};
/// When the details of the namespace don't matter; the test just needs *a* catalog namespace
/// with a particular name.
///
/// Use [`NamespaceRepo::create`] directly if:
///
/// - The values of the parameters to `create` need to be different than what's here
/// - The values of the parameters to `create` are relevant to the behavior under test
/// - You expect namespace creation to fail in the test
///
/// [`NamespaceRepo::create`]: crate::interface::NamespaceRepo::create
pub async fn arbitrary_namespace<R: RepoCollection + ?Sized>(
repos: &mut R,
name: &str,
) -> Namespace {
let namespace_name = NamespaceName::new(name).unwrap();
repos
.namespaces()
.create(&namespace_name, None)
.await
.unwrap()
}
/// When the details of the table don't matter; the test just needs *a* catalog table
/// with a particular name in a particular namespace.
///
/// Use [`TableRepo::create_or_get`] directly if:
///
/// - The values of the parameters to `create_or_get` need to be different than what's here
/// - The values of the parameters to `create_or_get` are relevant to the behavior under test
/// - You expect table creation to fail in the test
///
/// [`TableRepo::create_or_get`]: crate::interface::TableRepo::create_or_get
pub async fn arbitrary_table<R: RepoCollection + ?Sized>(
repos: &mut R,
name: &str,
namespace: &Namespace,
) -> Table {
repos
.tables()
.create_or_get(name, namespace.id)
.await
.unwrap()
}
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, sync::Arc};
@ -210,7 +260,6 @@ mod tests {
interface::{get_schema_by_name, SoftDeletedRows},
mem::MemCatalog,
};
use data_types::NamespaceName;
// Generate a test that simulates multiple, sequential writes in `lp` and
// asserts the resulting schema.
@ -228,21 +277,17 @@ mod tests {
#[allow(clippy::bool_assert_comparison)]
#[tokio::test]
async fn [<test_validate_schema_ $name>]() {
use crate::interface::Catalog;
use crate::{interface::Catalog, test_helpers::arbitrary_namespace};
use std::ops::DerefMut;
use pretty_assertions::assert_eq;
const NAMESPACE_NAME: &str = "bananas";
let ns_name = NamespaceName::new(NAMESPACE_NAME).unwrap();
let metrics = Arc::new(metric::Registry::default());
let repo = MemCatalog::new(metrics);
let mut txn = repo.repositories().await;
let namespace = txn
.namespaces()
.create(&ns_name, None)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *txn, NAMESPACE_NAME)
.await;
let schema = NamespaceSchema::new_empty_from(&namespace);

View File

@ -1636,6 +1636,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::{arbitrary_namespace, arbitrary_table};
use assert_matches::assert_matches;
use data_types::{ColumnId, ColumnSet};
use metric::{Attributes, DurationHistogram, Metric};
@ -1819,38 +1820,21 @@ mod tests {
let postgres = setup_db().await;
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut repos = postgres.repositories().await;
let namespace_id = postgres
.repositories()
.await
.namespaces()
.create(&NamespaceName::new("ns4").unwrap(), None)
.await
.expect("namespace create failed")
.id;
let table_id = postgres
.repositories()
.await
.tables()
.create_or_get("table", namespace_id)
.await
.expect("create table failed")
.id;
let namespace = arbitrary_namespace(&mut *repos, "ns4").await;
let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id;
let key = "bananas";
let a = postgres
.repositories()
.await
let a = repos
.partitions()
.create_or_get(key.into(), table_id)
.await
.expect("should create OK");
// Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent.
let b = postgres
.repositories()
.await
let b = repos
.partitions()
.create_or_get(key.into(), table_id)
.await
@ -1957,22 +1941,12 @@ mod tests {
let postgres = setup_db().await;
let metrics = Arc::clone(&postgres.metrics);
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut repos = postgres.repositories().await;
let namespace_id = postgres
.repositories()
let namespace = arbitrary_namespace(&mut *repos, "ns4")
.await;
let table_id = arbitrary_table(&mut *repos, "table", &namespace)
.await
.namespaces()
.create(&NamespaceName::new("ns4").unwrap(), None)
.await
.expect("namespace create failed")
.id;
let table_id = postgres
.repositories()
.await
.tables()
.create_or_get("table", namespace_id)
.await
.expect("create table failed")
.id;
$(
@ -1981,9 +1955,7 @@ mod tests {
insert.insert($col_name, $col_type);
)+
let got = postgres
.repositories()
.await
let got = repos
.columns()
.create_or_get_many_unchecked(table_id, insert.clone())
.await;
@ -2120,29 +2092,14 @@ mod tests {
let postgres = setup_db().await;
let pool = postgres.pool.clone();
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let namespace_id = postgres
.repositories()
.await
.namespaces()
.create(&NamespaceName::new("ns4").unwrap(), None)
.await
.expect("namespace create failed")
.id;
let table_id = postgres
.repositories()
.await
.tables()
.create_or_get("table", namespace_id)
.await
.expect("create table failed")
.id;
let mut repos = postgres.repositories().await;
let namespace = arbitrary_namespace(&mut *repos, "ns4").await;
let namespace_id = namespace.id;
let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id;
let key = "bananas";
let partition_id = postgres
.repositories()
.await
let partition_id = repos
.partitions()
.create_or_get(key.into(), table_id)
.await
@ -2167,9 +2124,7 @@ mod tests {
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: time_now,
};
let f1 = postgres
.repositories()
.await
let f1 = repos
.parquet_files()
.create(p1.clone())
.await
@ -2177,9 +2132,7 @@ mod tests {
// insert the same again with a different size; we should then have 3x1337 as total file size
p1.object_store_id = Uuid::new_v4();
p1.file_size_bytes *= 2;
let _f2 = postgres
.repositories()
.await
let _f2 = repos
.parquet_files()
.create(p1.clone())
.await
@ -2194,9 +2147,7 @@ mod tests {
assert_eq!(total_file_size_bytes, 1337 * 3);
// flag f1 for deletion and assert that the total file size is reduced accordingly.
postgres
.repositories()
.await
repos
.parquet_files()
.flag_for_delete(f1.id)
.await
@ -2211,9 +2162,7 @@ mod tests {
// actually deleting shouldn't change the total
let now = Timestamp::from(time_provider.now());
postgres
.repositories()
.await
repos
.parquet_files()
.delete_old_ids_only(now)
.await

View File

@ -1515,6 +1515,7 @@ fn is_unique_violation(e: &sqlx::Error) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::{arbitrary_namespace, arbitrary_table};
use assert_matches::assert_matches;
use metric::{Attributes, DurationHistogram, Metric};
use std::sync::Arc;
@ -1556,40 +1557,22 @@ mod tests {
#[tokio::test]
async fn test_partition_create_or_get_idempotent() {
let sqlite = setup_db().await;
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut repos = sqlite.repositories().await;
let namespace_id = sqlite
.repositories()
.await
.namespaces()
.create(&NamespaceName::new("ns4").unwrap(), None)
.await
.expect("namespace create failed")
.id;
let table_id = sqlite
.repositories()
.await
.tables()
.create_or_get("table", namespace_id)
.await
.expect("create table failed")
.id;
let namespace = arbitrary_namespace(&mut *repos, "ns4").await;
let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id;
let key = "bananas";
let a = sqlite
.repositories()
.await
let a = repos
.partitions()
.create_or_get(key.into(), table_id)
.await
.expect("should create OK");
// Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent.
let b = sqlite
.repositories()
.await
let b = repos
.partitions()
.create_or_get(key.into(), table_id)
.await
@ -1609,24 +1592,13 @@ mod tests {
async fn [<test_column_create_or_get_many_unchecked_ $name>]() {
let sqlite = setup_db().await;
let metrics = Arc::clone(&sqlite.metrics);
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut repos = sqlite.repositories().await;
let namespace_id = sqlite
.repositories()
let namespace = arbitrary_namespace(&mut *repos, "ns4")
.await;
let table_id = arbitrary_table(&mut *repos, "table", &namespace)
.await
.namespaces()
.create(&NamespaceName::new("ns4").unwrap(), None)
.await
.expect("namespace create failed")
.id;
let table_id = sqlite
.repositories()
.await
.tables()
.create_or_get("table", namespace_id)
.await
.expect("create table failed")
.id;
$(
@ -1635,9 +1607,7 @@ mod tests {
insert.insert($col_name, $col_type);
)+
let got = sqlite
.repositories()
.await
let got = repos
.columns()
.create_or_get_many_unchecked(table_id, insert.clone())
.await;
@ -1771,31 +1741,16 @@ mod tests {
async fn test_billing_summary_on_parqet_file_creation() {
let sqlite = setup_db().await;
let pool = sqlite.pool.clone();
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut repos = sqlite.repositories().await;
let namespace_id = sqlite
.repositories()
.await
.namespaces()
.create(&NamespaceName::new("ns4").unwrap(), None)
.await
.expect("namespace create failed")
.id;
let table_id = sqlite
.repositories()
.await
.tables()
.create_or_get("table", namespace_id)
.await
.expect("create table failed")
.id;
let namespace = arbitrary_namespace(&mut *repos, "ns4").await;
let namespace_id = namespace.id;
let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id;
let key = "bananas";
let partition_id = sqlite
.repositories()
.await
let partition_id = repos
.partitions()
.create_or_get(key.into(), table_id)
.await
@ -1820,9 +1775,7 @@ mod tests {
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: time_now,
};
let f1 = sqlite
.repositories()
.await
let f1 = repos
.parquet_files()
.create(p1.clone())
.await
@ -1830,9 +1783,7 @@ mod tests {
// insert the same again with a different size; we should then have 3x1337 as total file size
p1.object_store_id = Uuid::new_v4();
p1.file_size_bytes *= 2;
let _f2 = sqlite
.repositories()
.await
let _f2 = repos
.parquet_files()
.create(p1.clone())
.await
@ -1847,9 +1798,7 @@ mod tests {
assert_eq!(total_file_size_bytes, 1337 * 3);
// flag f1 for deletion and assert that the total file size is reduced accordingly.
sqlite
.repositories()
.await
repos
.parquet_files()
.flag_for_delete(f1.id)
.await
@ -1864,9 +1813,7 @@ mod tests {
// actually deleting shouldn't change the total
let now = Timestamp::from(time_provider.now());
sqlite
.repositories()
.await
repos
.parquet_files()
.delete_old_ids_only(now)
.await

View File

@ -16,6 +16,7 @@ use iox_catalog::{
get_schema_by_id, get_table_columns_by_id, Catalog, PartitionRepo, SoftDeletedRows,
},
mem::MemCatalog,
test_helpers::arbitrary_table,
};
use iox_query::{
exec::{DedicatedExecutors, Executor, ExecutorConfig},
@ -220,11 +221,7 @@ impl TestNamespace {
pub async fn create_table(self: &Arc<Self>, name: &str) -> Arc<TestTable> {
let mut repos = self.catalog.catalog.repositories().await;
let table = repos
.tables()
.create_or_get(name, self.namespace.id)
.await
.unwrap();
let table = arbitrary_table(&mut *repos, name, &self.namespace).await;
Arc::new(TestTable {
catalog: Arc::clone(&self.catalog),

View File

@ -382,7 +382,10 @@ where
#[cfg(test)]
mod tests {
use data_types::ColumnType;
use iox_catalog::mem::MemCatalog;
use iox_catalog::{
mem::MemCatalog,
test_helpers::{arbitrary_namespace, arbitrary_table},
};
use super::*;
@ -391,17 +394,9 @@ mod tests {
let catalog = Arc::new(MemCatalog::new(Default::default()));
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(&NamespaceName::new("test_ns").unwrap(), None)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "test_ns").await;
let table = repos
.tables()
.create_or_get("name", namespace.id)
.await
.unwrap();
let table = arbitrary_table(&mut *repos, "name", &namespace).await;
let _column = repos
.columns()
.create_or_get("name", table.id, ColumnType::U64)

View File

@ -10,7 +10,7 @@ use influxdb_line_protocol::{builder::FieldValue, FieldValue as LPFieldValue};
use schema::{InfluxColumnType, InfluxFieldType, Schema};
/// Converts a [`RecordBatch`] into line protocol lines.
pub(crate) fn convert_to_lines(
pub fn convert_to_lines(
measurement_name: &str,
iox_schema: &Schema,
batch: &RecordBatch,

View File

@ -31,7 +31,7 @@ use std::{
sync::Arc,
};
mod batch;
use batch::convert_to_lines;
pub use batch::convert_to_lines;
pub type Result<T = (), E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)]

View File

@ -19,6 +19,7 @@ use super::{DmlHandler, Partitioned};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema, TableId};
use dml::{DmlMeta, DmlWrite};
use futures::{stream::FuturesUnordered, StreamExt};
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
@ -46,7 +47,7 @@ pub enum RpcWriteError {
/// There are no healthy ingesters to route a write to.
#[error("no healthy upstream ingesters available")]
NoUpstreams,
NoHealthyUpstreams,
/// The write request was not attempted, because not enough upstream
/// ingesters needed to satisfy the configured replication factor are
@ -202,49 +203,59 @@ where
// Obtain a snapshot of currently-healthy upstreams (and potentially
// some that need probing)
let mut snap = self
let snap = self
.endpoints
.endpoints()
.ok_or(RpcWriteError::NoUpstreams)?;
.ok_or(RpcWriteError::NoHealthyUpstreams)?;
// Validate the required number of writes is possible given the current
// number of healthy endpoints.
if snap.len() < self.n_copies {
if snap.initial_len() < self.n_copies {
return Err(RpcWriteError::NotEnoughReplicas);
}
// Write the desired number of copies of `req`.
for i in 0..self.n_copies {
// Perform the gRPC write to an ingester.
//
// This call is bounded to at most RPC_TIMEOUT duration of time.
write_loop(&mut snap, &req).await.map_err(|e| {
// In all cases, if at least one write succeeded, then this
// becomes a partial write error.
if i > 0 {
return RpcWriteError::PartialWrite {
// Concurrently write to the required number of replicas to reach the
// desired replication factor.
let mut result_stream = (0..self.n_copies)
.map(|_| {
// Acquire a request-scoped snapshot that synchronises with
// other clone instances to uphold the disjoint replica hosts
// invariant.
let mut snap = snap.clone();
let req = req.clone();
async move { write_loop(&mut snap, &req).await }
})
.collect::<FuturesUnordered<_>>()
.enumerate();
// Consume the result stream, eagerly returning if an error is observed.
//
// Because partial writes have different semantics to outright failures
// (principally that you may expect your write to turn up in queries, even though
// the overall request failed), return a PartialWrite error if at least
// one write success has been observed. This is best-effort! It's always
// possible that PartialWrite is not returned, even though a partial
// write has occurred (for example, the next result in the stream is an
// already-completed write ACK).
while let Some((i, res)) = result_stream.next().await {
match res {
Ok(_) => {}
Err(_e) if i > 0 => {
// In all cases, if at least one write succeeded, then this
// becomes a partial write error.
return Err(RpcWriteError::PartialWrite {
want_n_copies: self.n_copies,
acks: i,
};
});
}
// This error was for the first request - there have been no
// ACKs received.
match e {
Err(RpcWriteError::Client(_)) => {
// This error is an internal implementation detail - the
// meaningful error for the user is "there's no healthy
// upstreams".
RpcWriteError::Client(_) => RpcWriteError::NoUpstreams,
// The number of upstreams no longer satisfies the desired
// replication factor.
RpcWriteError::NoUpstreams => RpcWriteError::NotEnoughReplicas,
// All other errors pass through.
v => v,
return Err(RpcWriteError::NoHealthyUpstreams);
}
})?;
// Remove the upstream that was successfully wrote to from the
// candidates
snap.remove_last_unstable();
Err(e) => return Err(e),
}
}
debug!(
@ -267,8 +278,13 @@ where
///
/// If at least one upstream request has failed (returning an error), the most
/// recent error is returned.
///
/// # Panics
///
/// This function panics if `endpoints.next()` returns [`None`] (the number of
/// upstreams should be validated before starting the write loop).
async fn write_loop<T>(
endpoints: &mut UpstreamSnapshot<'_, T>,
endpoints: &mut UpstreamSnapshot<T>,
req: &WriteRequest,
) -> Result<(), RpcWriteError>
where
@ -282,18 +298,31 @@ where
// request succeeds or this async call times out.
let mut delay = Duration::from_millis(50);
loop {
match endpoints
// Because the number of candidate upstreams is validated to be
// greater-than-or-equal-to the number of desired data copies before
// starting the write loop, and because the parallelism of the write
// loop matches the number of desired data copies, it's not possible
// for any thread to observe an empty snapshot, because transitively
// the number of upstreams matches or exceeds the parallelism.
let client = endpoints
.next()
.ok_or(RpcWriteError::NoUpstreams)?
.write(req.clone())
.await
{
Ok(()) => return Ok(()),
.expect("not enough replicas in snapshot to satisfy replication factor");
match client.write(req.clone()).await {
Ok(()) => {
endpoints.remove(client);
return Ok(());
}
Err(e) => {
warn!(error=%e, "failed ingester rpc write");
last_err = Some(e);
}
};
// Drop the client so that it is returned to the UpstreamSet and may
// be retried by another thread before the sleep expires.
drop(client);
tokio::time::sleep(delay).await;
delay = delay.saturating_mul(2);
}
@ -325,7 +354,9 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionKey};
use proptest::{prelude::*, prop_compose, proptest};
use rand::seq::SliceRandom;
use tokio::runtime;
use crate::dml_handlers::rpc_write::circuit_breaking_client::mock::MockCircuitBreaker;
@ -607,7 +638,7 @@ mod tests {
)
.await;
assert_matches!(got, Err(RpcWriteError::NoUpstreams));
assert_matches!(got, Err(RpcWriteError::NoHealthyUpstreams));
}
/// Assert the error response when the only upstream continuously returns an
@ -628,7 +659,7 @@ mod tests {
)
.await;
assert_matches!(got, Err(RpcWriteError::NoUpstreams));
assert_matches!(got, Err(RpcWriteError::NoHealthyUpstreams));
}
/// Assert that an [`RpcWriteClientError::UpstreamNotConnected`] error is mapped
@ -649,7 +680,7 @@ mod tests {
)
.await;
assert_matches!(got, Err(RpcWriteError::NoUpstreams));
assert_matches!(got, Err(RpcWriteError::NoHealthyUpstreams));
}
/// Assert that an error is returned without any RPC request being made when
@ -871,4 +902,149 @@ mod tests {
.chain(client_3.calls().iter())
.all(|v| *v == calls_1[0]));
}
prop_compose! {
/// Return an arbitrary results containing [`RpcWriteError`] from a
/// subset of easily constructed errors, or [`Ok`].
fn arbitrary_write_result()(which in 0..3) -> Result<(), RpcWriteClientError> {
match which {
0 => Ok(()),
1 => Err(RpcWriteClientError::Upstream(tonic::Status::internal("bananas"))),
2 => Err(RpcWriteClientError::UpstreamNotConnected("bananas".to_string())),
_ => unreachable!(),
}
}
}
prop_compose! {
/// Generate an upstream that is arbitrarily healthy/unhealthy, and will
/// arbitrarily succeed or fail when a write is attempted (a bounded
/// number of times).
fn arbitrary_mock_upstream()(
healthy in any::<bool>(),
responses in proptest::collection::vec(arbitrary_write_result(), 0..5)
) -> (Arc<MockCircuitBreaker>, Arc<MockWriteClient>) {
// Generate a mock client that returns all the errors/successes in
// the arbitrarily generated set, and then always succeeds.
let client = Arc::new(MockWriteClient::default().with_ret(
responses.into_iter().chain(iter::repeat_with(|| Ok(()))))
);
// Mark the upstream as arbitrarily healthy or unhealthy.
let circuit = Arc::new(MockCircuitBreaker::default());
circuit.set_healthy(healthy);
(circuit, client)
}
}
proptest! {
/// The invariants this property test asserts are:
///
/// 1. If the number of healthy upstreams is 0, NoHealthyUpstreams is
/// returned and no requests are attempted.
///
/// 2. Given N healthy upstreams (> 0) and a replication factor of R:
/// if N < R, "not enough replicas" is returned and no requests are
/// attempted.
///
/// 3. Upstreams that return an error are retried until the entire
/// write succeeds or times out.
///
/// 4. Writes are replicated to R distinct upstreams successfully, or
/// an error is returned.
///
/// 5. One an upstream write is ack'd as successful, it is never
/// requested again.
///
/// 6. An upstream reporting as unhealthy at the start of the write is
/// never requested (excluding probe requests).
///
#[test]
fn prop_distinct_upstreams(
upstreams in proptest::collection::vec(arbitrary_mock_upstream(), 1_usize..5),
n_copies in 1_usize..5,
) {
// Disallow invalid configurations
prop_assume!(n_copies <= upstreams.len());
// Run the request with the given upstreams and desired replication
// factor in an async context.
let res = runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on({
let upstreams = upstreams.clone();
async move {
let endpoints = upstreams.into_iter()
.map(|(circuit, client)| {
CircuitBreakingClient::new(client, "bananas")
.with_circuit_breaker(circuit)
});
make_request(endpoints, n_copies).await
}
});
// Compute the number of upstreams that were healthy at the time the
// request was made.
let healthy = upstreams.iter()
.filter(|(circuit, _client)| circuit.is_healthy())
.count();
if healthy == 0 {
// Invariant 1: no healthy upstreams yeilds the appropriate
// error.
assert_matches!(res, Err(RpcWriteError::NoHealthyUpstreams));
} else if healthy < n_copies {
// Invariant 2: if N < R yields a "not enough replicas" error
assert_matches!(res, Err(RpcWriteError::NotEnoughReplicas));
}
// For either 1 or 2, no requests should be sent as the unhappy case
// can be computed before performing network I/O.
if healthy < n_copies {
// Assert no upstream requests are made.
assert!(
upstreams.iter()
.all(|(_circuit, client)| client.calls().is_empty())
);
}
// Invariant 3 is validated by asserting that in the case of a write
// timing out, at least one upstream was tried more than once.
//
// This works because the number of distinct upstreams that will be
// requested is small enough that the timeout happens after having
// attempted each at least once.
if matches!(res, Err(RpcWriteError::Timeout(_) | RpcWriteError::PartialWrite {..})) {
assert!(upstreams.iter().any(|(_circuit, client)| client.calls().len() > 1));
}
// Invariant 4 is upheld by ensuring at least R upstreams returned
// success if the overall write succeeded, otherwise the result is
// an error.
let acks = upstreams.iter()
.filter(|(_circuit, client)| client.success_count() == 1)
.count();
assert_eq!(res.is_ok(), acks >= n_copies);
// Invariant 5 is validated by ensuring each mock only returned at
// most one Ok response.
//
// This property should hold regardless of the overall write
// succeeding or failing.
assert!(
upstreams.iter()
.all(|(_circuit, client)| client.success_count() <= 1)
);
// Invariant 6 is validated by ensuring all clients with unhealthy
// circuits never see a write request.
assert!(
upstreams.iter()
.filter(|(circuit, _client)| !circuit.is_healthy())
.all(|(_circuit, client)| client.calls().is_empty())
);
}
}
}

View File

@ -36,7 +36,7 @@ const METRIC_EVAL_INTERVAL: Duration = Duration::from_secs(3);
/// threads) an approximately uniform distribution is achieved.
#[derive(Debug)]
pub(super) struct Balancer<T, C = CircuitBreaker> {
endpoints: Arc<[CircuitBreakingClient<T, C>]>,
endpoints: Arc<[Arc<CircuitBreakingClient<T, C>>]>,
/// An optional metric exporter task that evaluates the state of this
/// [`Balancer`] every [`METRIC_EVAL_INTERVAL`].
@ -54,7 +54,7 @@ where
endpoints: impl IntoIterator<Item = CircuitBreakingClient<T, C>>,
metrics: Option<&metric::Registry>,
) -> Self {
let endpoints = endpoints.into_iter().collect();
let endpoints = endpoints.into_iter().map(Arc::new).collect();
Self {
metric_task: metrics.map(|m| tokio::spawn(metric_task(m, Arc::clone(&endpoints)))),
endpoints,
@ -73,7 +73,7 @@ where
/// evaluated at this point and the result is returned to the caller as an
/// infinite / cycling iterator. A node that becomes unavailable after the
/// snapshot was taken will continue to be returned by the iterator.
pub(super) fn endpoints(&self) -> Option<UpstreamSnapshot<'_, CircuitBreakingClient<T, C>>> {
pub(super) fn endpoints(&self) -> Option<UpstreamSnapshot<Arc<CircuitBreakingClient<T, C>>>> {
// Grab and increment the current counter.
let counter = COUNTER.with(|cell| {
let mut cell = cell.borrow_mut();
@ -96,7 +96,7 @@ where
let mut healthy = Vec::with_capacity(self.endpoints.len());
for e in &*self.endpoints {
if e.is_healthy() {
healthy.push(e);
healthy.push(Arc::clone(e));
continue;
}
@ -104,7 +104,7 @@ where
// probe request - therefore it is added to the front of the
// iter/request queue.
if probe.is_none() && e.should_probe() {
probe = Some(e);
probe = Some(Arc::clone(e));
}
}
@ -128,7 +128,7 @@ where
/// health evaluation future that updates it.
fn metric_task<T, C>(
metrics: &metric::Registry,
endpoints: Arc<[CircuitBreakingClient<T, C>]>,
endpoints: Arc<[Arc<CircuitBreakingClient<T, C>>]>,
) -> impl Future<Output = ()> + Send
where
T: Send + Sync + 'static,
@ -144,7 +144,7 @@ where
async fn metric_loop<T, C>(
metric: metric::Metric<U64Gauge>,
endpoints: Arc<[CircuitBreakingClient<T, C>]>,
endpoints: Arc<[Arc<CircuitBreakingClient<T, C>>]>,
) where
T: Send + Sync + 'static,
C: CircuitBreakerState + 'static,

View File

@ -1,5 +1,7 @@
//! Abstraction over RPC client
use std::sync::Arc;
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::{
write_service_client::WriteServiceClient, WriteRequest,
@ -26,6 +28,16 @@ pub(super) trait WriteClient: Send + Sync + std::fmt::Debug {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError>;
}
#[async_trait]
impl<T> WriteClient for Arc<T>
where
T: WriteClient,
{
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
(**self).write(op).await
}
}
/// An implementation of [`WriteClient`] for the tonic gRPC client.
#[async_trait]
impl WriteClient for WriteServiceClient<tonic::transport::Channel> {
@ -44,6 +56,7 @@ pub mod mock {
struct State {
calls: Vec<WriteRequest>,
ret: Box<dyn Iterator<Item = Result<(), RpcWriteClientError>> + Send + Sync>,
returned_oks: usize,
}
/// A mock implementation of the [`WriteClient`] for testing purposes.
@ -66,6 +79,7 @@ pub mod mock {
state: Mutex::new(State {
calls: Default::default(),
ret: Box::new(iter::repeat_with(|| Ok(()))),
returned_oks: 0,
}),
}
}
@ -77,6 +91,12 @@ pub mod mock {
self.state.lock().calls.clone()
}
/// Retrieve the number of times this mock returned [`Ok`] to a write
/// request.
pub fn success_count(&self) -> usize {
self.state.lock().returned_oks
}
/// Read values off of the provided iterator and return them for calls
/// to [`Self::write()`].
#[cfg(test)]
@ -95,7 +115,14 @@ pub mod mock {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
let mut guard = self.state.lock();
guard.calls.push(op);
guard.ret.next().expect("no mock response")
let ret = guard.ret.next().expect("no mock response");
if ret.is_ok() {
guard.returned_oks += 1;
}
ret
}
}
}

View File

@ -1,17 +1,171 @@
use std::{ops::Deref, sync::Arc};
use parking_lot::Mutex;
use smallvec::SmallVec;
/// Possible states of a single client `C` in an [`UpstreamSnapshot`].
///
/// ```text
/// ┌────────────────┐
/// ┌─▶│ Available │
/// │ └────────────────┘
/// │ │
/// drop next()
/// │ │
/// │ ▼
/// │ ┌────────────────┐
/// └──│ Yielded │
/// └────────────────┘
/// │
/// remove
/// │
/// ▼
/// ┌────────────────┐
/// │ Used │
/// └────────────────┘
/// ```
///
/// When the [`UpstreamSnapshot`] is initialised, all `C` are in the
/// [`UpstreamState::Available`] state. Once a given `C` is lent out, its slot
/// in the [`UpstreamSnapshot`] is replaced with [`UpstreamState::Yielded`],
/// indicating it is lent out, and dropping the reference to `C` (making it
/// impossible to lend out again!).
///
/// Once the caller drops the `C` they were yielded, the state returns to
/// [`UpstreamState::Available`] to be lent out again.
///
/// Once a `C` is removed from the snapshot by calling
/// [`UpstreamSnapshot::remove()`], the slot is transitioned to
/// [`UpstreamState::Used`] to indicate it cannot be reused.
///
#[derive(Debug)]
enum UpstreamState<C> {
/// The given instance of `C` has not been returned to the caller yet, or
/// has been dropped by the caller without calling
/// [`UpstreamSnapshot::remove()`] first.
Available(C),
/// The instance of `C` is currently lent to the caller.
///
/// The yielded `C` has yet not been dropped, or removed from the
/// [`UpstreamSnapshot`].
Yielded,
/// The given `C` has been "used" and removed by a call to
/// [`UpstreamSnapshot::remove()`].
///
/// It cannot be returned to the caller again.
Used,
}
impl<C> UpstreamState<C> {
fn unwrap(self) -> C {
match self {
UpstreamState::Available(v) => v,
UpstreamState::Used | UpstreamState::Yielded => {
panic!("unwrap an unavailable upstream state")
}
}
}
}
/// A smart-pointer dereferencing to a reference to `C`.
///
/// The [`UpstreamSnapshot`] ensures that only one [`Upstream`]-wrapped
/// reference to `C` is ever available at any one time. Dropping an instance of
/// [`Upstream`] returns the `C` it contains back to the [`UpstreamSnapshot`] it
/// came from, allowing it to be lent out to another caller.
///
/// To permanently remove this `C` from the [`UpstreamSnapshot`], pass it to
/// [`UpstreamSnapshot::remove()`].
#[derive(Debug)]
pub(super) struct Upstream<C> {
/// The instance of `C` lent out from the snapshot.
///
/// This option is always [`Some`] until dropped or removed from the
/// snapshot, at which point it is set to [`None`].
///
/// As an optimisation, do not attempt to acquire the set mutex to check if
/// this [`Upstream`] has been marked as [`UpstreamState::Used`] before
/// setting [`UpstreamState::Available`] if this option is [`None`] to
/// reduce lock contention.
inner: Option<C>,
/// The set of clients from which this `C` has been borrowed.
state: Arc<Mutex<SharedState<C>>>,
/// The index into `set` at which this `C` can be found.
idx: usize,
}
impl<C> Deref for Upstream<C> {
type Target = C;
fn deref(&self) -> &Self::Target {
self.inner.as_ref().unwrap()
}
}
impl<C> Drop for Upstream<C> {
fn drop(&mut self) {
let inner = match self.inner.take() {
Some(v) => v,
None => return,
};
*self.state.lock().clients.get_mut(self.idx).unwrap() = UpstreamState::Available(inner);
}
}
/// Mutable state shared between clones of a single [`UpstreamSnapshot`].
#[derive(Debug)]
struct SharedState<C> {
/// The set of `C` for this [`UpstreamSnapshot`].
clients: SmallVec<[UpstreamState<C>; 3]>,
/// The current cursor index for this snapshot instance.
idx: usize,
}
impl<C> SharedState<C> {
#[inline(always)]
fn current_idx(&self) -> usize {
self.idx % self.clients.len()
}
}
/// An infinite cycling iterator, yielding the 0-indexed `i`-th element first
/// (modulo wrapping).
///
/// The last yielded element can be removed from the iterator by calling
/// [`UpstreamSnapshot::remove_last_unstable()`].
#[derive(Debug)]
pub(super) struct UpstreamSnapshot<'a, C> {
clients: SmallVec<[&'a C; 3]>,
idx: usize,
/// The [`UpstreamSnapshot`] contains a set of `C`, maintaining an invariant
/// that writes to `C` do not happen concurrently, by yielding each `C` wrapped
/// in an [`Upstream`] to exactly one caller at a time.
///
/// Combined with the ability to remove a `C` from the set returned by the
/// [`UpstreamSnapshot`], the caller can ensure that once a write has been
/// successfully accepted by `C`, no further write attempts are made to it.
///
/// Cloning this [`UpstreamSnapshot`] allows it to be shared across thread /
/// task boundaries, while internally referencing the same set of `C` and
/// co-ordinating the state of each across each cloned [`UpstreamSnapshot`].
///
/// This allows concurrent replication of writes to N ingesters synchronise
/// using clones of this [`UpstreamSnapshot`], causing each write to land on a
/// distinct ingester.
///
/// If all `C` are currently lent out, this iterator yields [`None`]. If a `C`
/// is then returned, then the iterator will return [`Some`] at the next poll.
#[derive(Debug, Clone)]
pub(super) struct UpstreamSnapshot<C> {
/// The mutable state shared between each cloned copy of this
/// [`UpstreamSnapshot`] instance.
state: Arc<Mutex<SharedState<C>>>,
/// The length of `state.clients` to avoid locking to read this static
/// value.
len: usize,
}
impl<'a, C> UpstreamSnapshot<'a, C> {
impl<C> UpstreamSnapshot<C> {
/// Initialise a new snapshot, yielding the 0-indexed `i`-th element of
/// `clients` next (or wrapping around if `i` is out-of-bounds).
///
@ -19,95 +173,138 @@ impl<'a, C> UpstreamSnapshot<'a, C> {
/// allocation during construction.
///
/// If `clients` is empty, this method returns [`None`].
pub(super) fn new(clients: impl Iterator<Item = &'a C>, i: usize) -> Option<Self> {
let clients: SmallVec<[&'a C; 3]> = clients.collect();
pub(super) fn new(clients: impl Iterator<Item = C>, i: usize) -> Option<Self> {
let clients: SmallVec<[UpstreamState<C>; 3]> =
clients.map(UpstreamState::Available).collect();
if clients.is_empty() {
return None;
}
Some(Self {
clients,
// So first call is the ith element even after the inc in next().
idx: i.wrapping_sub(1),
len: clients.len(),
state: Arc::new(Mutex::new(SharedState {
// So first call is the ith element even after the inc in next().
idx: i.wrapping_sub(1),
clients,
})),
})
}
/// Remove the last yielded upstream from this snapshot.
/// Consume the given `upstream` from the [`UpstreamSnapshot`], taking
/// ownership of it from the caller, and preventing it from being yielded
/// again.
///
/// # Ordering
/// # Panics
///
/// Calling this method MAY change the order of the yielded elements but
/// MUST maintain equal visit counts across all elements.
///
/// # Correctness
///
/// If called before [`UpstreamSnapshot`] has yielded any elements, this MAY
/// remove an arbitrary element from the snapshot.
#[allow(unused)]
pub(super) fn remove_last_unstable(&mut self) {
self.clients.swap_remove(self.idx());
// Try the element now in the idx position next.
self.idx = self.idx.wrapping_sub(1);
/// Panics if `upstream` was not obtained from this [`UpstreamSnapshot`].
pub(super) fn remove(&self, mut upstream: Upstream<C>) {
// Ensure the `upstream` was yielded from this set.
assert!(
Arc::ptr_eq(&self.state, &upstream.state),
"remove from disjoint sets"
);
let old = std::mem::replace(
&mut self.state.lock().clients[upstream.idx],
UpstreamState::Used,
);
// Invariant: any upstream being removed must have been yielded by the
// iterator - the type system should enforce this as the Upstream does
// not implement Clone, and this fn took ownership.
assert!(matches!(old, UpstreamState::Yielded));
// Prevent the drop impl from setting the state to "available" again.
upstream.inner = None;
drop(upstream); // explicit drop for clarity w.r.t the above
}
/// Returns the number of clients in this [`UpstreamSnapshot`].
/// Returns the number of clients in this [`UpstreamSnapshot`] when
/// constructed.
///
/// This value decreases as upstreams are removed by calls to
/// [`UpstreamSnapshot::remove_last_unstable()`].
pub(super) fn len(&self) -> usize {
self.clients.len()
}
#[inline(always)]
fn idx(&self) -> usize {
self.idx % self.clients.len()
/// If [`UpstreamSnapshot::remove()`] has been called since construction,
/// this iterator will yield fewer distinct `C` than this returned number.
pub(super) fn initial_len(&self) -> usize {
self.len
}
}
impl<'a, C> Iterator for UpstreamSnapshot<'a, C> {
type Item = &'a C;
impl<C> Iterator for UpstreamSnapshot<C> {
type Item = Upstream<C>;
fn next(&mut self) -> Option<Self::Item> {
if self.clients.is_empty() {
return None;
}
self.idx = self.idx.wrapping_add(1);
Some(self.clients[self.idx()])
}
// Obtain the client set mutex outside the loop as the overhead of
// acquiring the contended mutex is likely to outweigh the actual loop
// critical section cost - it's better to not yield control until an
// element has been found in the (fast) linear search to avoid
// contention.
let mut guard = self.state.lock();
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(self.len()))
// Remember where in the client array this first attempt was.
let start_idx = guard.current_idx();
loop {
// Move along the client array.
guard.idx = guard.idx.wrapping_add(1);
// Find the array index of this next client.
let current_idx = guard.current_idx();
// If this C is available, mark it as lent out and yield it to the
// caller.
let v = guard.clients.get_mut(current_idx).unwrap();
if matches!(v, UpstreamState::Available(_)) {
let got = std::mem::replace(v, UpstreamState::Yielded).unwrap();
return Some(Upstream {
inner: Some(got),
idx: current_idx,
state: Arc::clone(&self.state),
});
}
// Otherwise ensure the loop doesn't continue forever.
//
// Once all the elements have been visited once, the loop should
// end. If there's no available upstream now, then this request will
// never be satisfiable; another thread holds another upstream may
// return it to the pool for this thread to acquire, but the other
// thread would then fail to find a free upstream.
if current_idx == start_idx {
return None;
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
iter,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use proptest::proptest;
use test_helpers::timeout::FutureTimeout;
use tokio::sync::mpsc;
use super::*;
#[test]
fn test_size_hint() {
fn test_len() {
let elements = [
AtomicUsize::new(0),
AtomicUsize::new(0),
AtomicUsize::new(0),
];
let mut snap = UpstreamSnapshot::new(elements.iter(), 0)
let snap = UpstreamSnapshot::new(elements.iter(), 0)
.expect("non-empty element set should yield snapshot");
assert_eq!(snap.len(), 3);
assert_eq!(snap.initial_len(), 3);
let (min, max) = snap.size_hint();
assert_eq!(min, 0);
assert_eq!(max, Some(3));
snap.remove_last_unstable(); // Arbitrary element removed
let (min, max) = snap.size_hint();
assert_eq!(min, 0);
assert_eq!(max, Some(2));
assert_eq!(snap.len(), 2);
assert_eq!(max, None);
}
#[test]
@ -115,21 +312,21 @@ mod tests {
let elements = [1, 2, 3];
assert_eq!(
*UpstreamSnapshot::new(elements.iter(), 0)
**UpstreamSnapshot::new(elements.iter(), 0)
.expect("non-empty element set should yield snapshot")
.next()
.expect("should yield value"),
1
);
assert_eq!(
*UpstreamSnapshot::new(elements.iter(), 1)
**UpstreamSnapshot::new(elements.iter(), 1)
.expect("non-empty element set should yield snapshot")
.next()
.expect("should yield value"),
2
);
assert_eq!(
*UpstreamSnapshot::new(elements.iter(), 2)
**UpstreamSnapshot::new(elements.iter(), 2)
.expect("non-empty element set should yield snapshot")
.next()
.expect("should yield value"),
@ -138,7 +335,7 @@ mod tests {
// Wraparound
assert_eq!(
*UpstreamSnapshot::new(elements.iter(), 3)
**UpstreamSnapshot::new(elements.iter(), 3)
.expect("non-empty element set should yield snapshot")
.next()
.expect("should yield value"),
@ -179,36 +376,59 @@ mod tests {
{
let mut snap = UpstreamSnapshot::new(elements.iter(), 0)
.expect("non-empty element set should yield snapshot");
assert_eq!(snap.next(), Some(&1));
snap.remove_last_unstable();
assert_eq!(snap.next(), Some(&3)); // Not 2 - unstable remove!
assert_eq!(snap.next(), Some(&2));
assert_eq!(snap.next(), Some(&3));
assert_eq!(snap.initial_len(), 3);
let item = snap.next().unwrap();
assert_eq!(**item, 1);
assert_eq!(item.idx, 0);
snap.remove(item);
// Removing is stable - it does not permute the item order.
assert_eq!(snap.next().as_deref(), Some(&&2));
assert_eq!(snap.next().as_deref(), Some(&&3));
assert_eq!(snap.next().as_deref(), Some(&&2));
assert_eq!(snap.next().as_deref(), Some(&&3));
assert_eq!(snap.initial_len(), 3);
}
// Second element removed
{
let mut snap = UpstreamSnapshot::new(elements.iter(), 0)
.expect("non-empty element set should yield snapshot");
assert_eq!(snap.next(), Some(&1));
assert_eq!(snap.next(), Some(&2));
snap.remove_last_unstable();
assert_eq!(snap.next(), Some(&3));
assert_eq!(snap.next(), Some(&1));
assert_eq!(snap.next(), Some(&3));
assert_eq!(snap.next().as_deref(), Some(&&1));
assert_eq!(snap.initial_len(), 3);
let item = snap.next().unwrap();
assert_eq!(**item, 2);
assert_eq!(item.idx, 1);
snap.remove(item);
assert_eq!(snap.next().as_deref(), Some(&&3));
assert_eq!(snap.next().as_deref(), Some(&&1));
assert_eq!(snap.next().as_deref(), Some(&&3));
assert_eq!(snap.initial_len(), 3);
}
// Last element removed
{
let mut snap = UpstreamSnapshot::new(elements.iter(), 0)
.expect("non-empty element set should yield snapshot");
assert_eq!(snap.next(), Some(&1));
assert_eq!(snap.next(), Some(&2));
assert_eq!(snap.next(), Some(&3));
snap.remove_last_unstable();
assert_eq!(snap.next(), Some(&1));
assert_eq!(snap.next(), Some(&2));
assert_eq!(snap.next(), Some(&1));
assert_eq!(snap.next().as_deref(), Some(&&1));
assert_eq!(snap.next().as_deref(), Some(&&2));
assert_eq!(snap.initial_len(), 3);
let item = snap.next().unwrap();
assert_eq!(**item, 3);
assert_eq!(item.idx, 2);
snap.remove(item);
assert_eq!(snap.next().as_deref(), Some(&&1));
assert_eq!(snap.next().as_deref(), Some(&&2));
assert_eq!(snap.next().as_deref(), Some(&&1));
assert_eq!(snap.initial_len(), 3);
}
}
@ -218,20 +438,195 @@ mod tests {
let mut snap = UpstreamSnapshot::new(elements.iter(), 0)
.expect("non-empty element set should yield snapshot");
assert_eq!(snap.len(), 1);
assert_eq!(snap.initial_len(), 1);
assert_eq!(snap.next(), Some(&42));
assert_eq!(snap.next(), Some(&42));
snap.remove_last_unstable();
assert_eq!(snap.next(), None);
assert_eq!(snap.next(), None);
let item = snap.next().unwrap();
snap.remove(item);
assert!(snap.next().is_none());
assert!(snap.next().is_none());
assert_eq!(snap.len(), 0);
assert_eq!(snap.initial_len(), 1);
}
#[test]
fn test_empty_snap() {
assert!(UpstreamSnapshot::<usize>::new([].iter(), 0).is_none());
assert!(UpstreamSnapshot::<usize>::new([].iter(), 1).is_none());
assert!(UpstreamSnapshot::<usize>::new(iter::empty(), 0).is_none());
assert!(UpstreamSnapshot::<usize>::new(iter::empty(), 1).is_none());
}
#[test]
#[should_panic(expected = "remove from disjoint sets")]
fn test_upstream_from_disjoint_sets() {
let mut set_a = UpstreamSnapshot::new([1].iter(), 0).unwrap();
let set_b = UpstreamSnapshot::new([1].iter(), 0).unwrap();
let item = set_a.next().unwrap();
set_b.remove(item); // Oops - removing a from b!
}
proptest! {
/// Assert the set always cycles indefinitely, visiting all elements
/// equally often (when the number of visits is a multiple of the set
/// size).
///
/// Ensure the starting offset does not affect this property.
#[test]
fn prop_upstream_set_cycles(
complete_iters in (1_usize..5),
set_size in (1_usize..5),
offset in (1_usize..10),
) {
let elements = (0..set_size).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
// Create a snapshot and iterate over it the specified number of
// times.
{
let mut snap = UpstreamSnapshot::new(elements.iter(), offset)
.expect("non-empty element set should yield snapshot");
for _ in 0..(elements.len() * complete_iters) {
snap.next()
.expect("should cycle forever")
.fetch_add(1, Ordering::Relaxed);
}
}
// Assert all elements were visited exactly complete_iters number of
// times.
elements
.into_iter()
.for_each(|v| assert_eq!(v.load(Ordering::Relaxed), complete_iters));
}
/// Assert the set yields any item exactly once at any one time.
#[test]
fn prop_upstream_yield_exactly_once(
complete_iters in (2_usize..5),
set_size in (1_usize..5),
offset in (1_usize..10),
hold_idx in (0_usize..100),
) {
let elements = (0..set_size).map(|_| Arc::new(AtomicUsize::new(0))).collect::<Vec<_>>();
// Create a snapshot and iterate over it the specified number of
// times.
{
let mut snap = UpstreamSnapshot::new(elements.iter(), offset)
.expect("non-empty element set should yield snapshot");
// Take the specified index out of the set and hold onto it.
let hold = snap.clone().nth(hold_idx).unwrap();
// Now iterate over the snapshot and increment the counter of
// each yielded upstream.
//
// Iterate exactly N times over M-1 elements remaining in the
// set.
let count = (elements.len() - 1) * complete_iters;
for _ in 0..count {
snap.next()
.expect("should cycle forever")
.fetch_add(1, Ordering::Relaxed);
}
// Nothing incremented the element we were holding onto.
assert_eq!(hold.load(Ordering::Relaxed), 0);
// Store the expected count so there's a simple check below that
// all the non-0 elements have the same expected count value.
hold.store(complete_iters, Ordering::Relaxed);
}
// Assert all elements were visited exactly complete_iters number of
// times.
elements
.into_iter()
.for_each(|v| assert_eq!(v.load(Ordering::Relaxed), complete_iters));
}
}
/// Ensure two concurrent callers obtain two different elements.
#[tokio::test]
async fn test_concurrent_callers_disjoint_elements() {
let elements = (0..2).collect::<Vec<_>>();
// Create a snapshot and iterate over it the specified number of
// times.
let snap = UpstreamSnapshot::<_>::new(elements.clone().into_iter(), 0)
.expect("non-empty element set should yield snapshot");
let (tx, mut rx) = mpsc::channel(2);
tokio::spawn({
let mut snap = snap.clone();
let tx = tx.clone();
async move {
let got = snap.next().unwrap();
tx.send(*got).await.unwrap();
}
});
tokio::spawn({
let mut snap = snap.clone();
let tx = tx.clone();
async move {
let got = snap.next().unwrap();
tx.send(*got).await.unwrap();
}
});
let a = rx.recv().await.unwrap();
let b = rx.recv().await.unwrap();
assert!((a == 0) ^ (b == 0));
assert!((a == 1) ^ (b == 1));
}
/// When N concurrent callers attempt to obtain one of N-1 elements, exactly
/// one thread must observe [`None`].
#[tokio::test]
async fn test_all_yielded() {
const N: usize = 3;
let elements = (0..(N - 1)).collect::<Vec<_>>();
// Create a snapshot and iterate over it the specified number of
// times.
let snap = UpstreamSnapshot::<_>::new(elements.clone().into_iter(), 0)
.expect("non-empty element set should yield snapshot");
let (tx, mut rx) = mpsc::channel(N);
// One more thread than elements
for _ in 0..N {
tokio::spawn({
let mut snap = snap.clone();
let tx = tx.clone();
async move {
let got = snap.next();
tx.send(got.as_ref().map(|v| **v)).await.unwrap();
// Do not "drop" the Upstream wrapper, in effect holding
// onto the item forever, ensuring this thread doesn't
// return the item to the snapshot for another thread to
// grab.
std::mem::forget(got);
}
});
}
let mut saw_nones = 0;
for _ in 0..N {
let v = rx
.recv()
.with_timeout_panic(Duration::from_secs(5))
.await
.expect("exactly N channel writes should occur");
if v.is_none() {
saw_nones += 1;
}
}
assert_eq!(saw_nones, 1);
}
}

View File

@ -158,7 +158,7 @@ impl From<&DmlError> for StatusCode {
)) => StatusCode::SERVICE_UNAVAILABLE,
DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT,
DmlError::RpcWrite(
RpcWriteError::NoUpstreams
RpcWriteError::NoHealthyUpstreams
| RpcWriteError::NotEnoughReplicas
| RpcWriteError::PartialWrite { .. },
) => StatusCode::SERVICE_UNAVAILABLE,

View File

@ -5,7 +5,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
use generated_types::influxdata::{iox::ingester::v1::WriteRequest, pbdata::v1::DatabaseBatch};
use hashbrown::HashMap;
use hyper::{Body, Request, StatusCode};
use iox_catalog::interface::SoftDeletedRows;
use iox_catalog::{interface::SoftDeletedRows, test_helpers::arbitrary_namespace};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
use router::dml_handlers::{DmlError, RetentionError, SchemaError};
@ -265,17 +265,7 @@ async fn test_write_propagate_ids() {
.await;
// Create the namespace and a set of tables.
let ns = ctx
.catalog()
.repositories()
.await
.namespaces()
.create(
&data_types::NamespaceName::new("bananas_test").unwrap(),
None,
)
.await
.expect("failed to update table limit");
let ns = arbitrary_namespace(&mut *ctx.catalog().repositories().await, "bananas_test").await;
let catalog = ctx.catalog();
let ids = ["another", "test", "table", "platanos"]

View File

@ -197,11 +197,12 @@ fn to_partition(p: data_types::Partition) -> Partition {
#[cfg(test)]
mod tests {
use super::*;
use data_types::{
ColumnId, ColumnSet, CompactionLevel, NamespaceName, ParquetFileParams, Timestamp,
};
use data_types::{ColumnId, ColumnSet, CompactionLevel, ParquetFileParams, Timestamp};
use generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService;
use iox_catalog::mem::MemCatalog;
use iox_catalog::{
mem::MemCatalog,
test_helpers::{arbitrary_namespace, arbitrary_table},
};
use uuid::Uuid;
#[tokio::test]
@ -214,16 +215,8 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("schema_test_table", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await;
let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await;
let partition = repos
.partitions()
.create_or_get("foo".into(), table.id)
@ -277,16 +270,8 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("schema_test_table", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await;
let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await;
partition1 = repos
.partitions()
.create_or_get("foo".into(), table.id)

View File

@ -183,6 +183,7 @@ impl IoxGetRequest {
/// This represents ths JSON fields
#[derive(Deserialize, Debug)]
struct ReadInfoJson {
#[serde(alias = "database", alias = "bucket", alias = "bucket-name")]
namespace_name: String,
sql_query: String,
// If query type is not supplied, defaults to SQL
@ -350,6 +351,41 @@ mod tests {
"my_otherdb",
"SHOW DATABASES;",
),
TestCase::new_influxql(
r#"{"database": "my_otherdb", "sql_query": "SHOW DATABASES;", "query_type": "influxql"}"#,
"my_otherdb",
"SHOW DATABASES;",
),
// influxql bucket metadata
TestCase::new_influxql(
r#"{"bucket": "my_otherdb", "sql_query": "SHOW DATABASES;", "query_type": "influxql"}"#,
"my_otherdb",
"SHOW DATABASES;",
),
// influxql bucket-name metadata
TestCase::new_influxql(
r#"{"bucket-name": "my_otherdb", "sql_query": "SHOW DATABASES;", "query_type": "influxql"}"#,
"my_otherdb",
"SHOW DATABASES;",
),
// sql database metadata
TestCase::new_sql(
r#"{"database": "my_db", "sql_query": "SELECT 1;", "query_type": "sql"}"#,
"my_db",
"SELECT 1;",
),
// sql bucket metadata
TestCase::new_sql(
r#"{"bucket": "my_db", "sql_query": "SELECT 1;", "query_type": "sql"}"#,
"my_db",
"SELECT 1;",
),
// sql bucket-name metadata
TestCase::new_sql(
r#"{"bucket-name": "my_db", "sql_query": "SELECT 1;", "query_type": "sql"}"#,
"my_db",
"SELECT 1;",
),
];
for TestCase { json, expected } in cases {

View File

@ -96,11 +96,12 @@ impl object_store_service_server::ObjectStoreService for ObjectStoreService {
mod tests {
use super::*;
use bytes::Bytes;
use data_types::{
ColumnId, ColumnSet, CompactionLevel, NamespaceName, ParquetFileParams, Timestamp,
};
use data_types::{ColumnId, ColumnSet, CompactionLevel, ParquetFileParams, Timestamp};
use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService;
use iox_catalog::mem::MemCatalog;
use iox_catalog::{
mem::MemCatalog,
test_helpers::{arbitrary_namespace, arbitrary_table},
};
use object_store::{memory::InMemory, ObjectStore};
use uuid::Uuid;
@ -112,16 +113,8 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(&NamespaceName::new("catalog_partition_test").unwrap(), None)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("schema_test_table", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await;
let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await;
let partition = repos
.partitions()
.create_or_get("foo".into(), table.id)

View File

@ -81,9 +81,12 @@ fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaRespons
#[cfg(test)]
mod tests {
use super::*;
use data_types::{ColumnType, NamespaceName};
use data_types::ColumnType;
use generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService;
use iox_catalog::mem::MemCatalog;
use iox_catalog::{
mem::MemCatalog,
test_helpers::{arbitrary_namespace, arbitrary_table},
};
use std::sync::Arc;
#[tokio::test]
@ -93,16 +96,8 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let namespace = repos
.namespaces()
.create(&NamespaceName::new("namespace_schema_test").unwrap(), None)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("schema_test_table", namespace.id)
.await
.unwrap();
let namespace = arbitrary_namespace(&mut *repos, "namespace_schema_test").await;
let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await;
repos
.columns()
.create_or_get("schema_test_column", table.id, ColumnType::Tag)

View File

@ -13,6 +13,9 @@ crc32fast = "1.2.0"
data_types = { path = "../data_types" }
futures = "0.3"
generated_types = { path = "../generated_types" }
hashbrown.workspace = true
mutable_batch = { version = "0.1.0", path = "../mutable_batch" }
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.17", features = ["parking_lot"] }
parking_lot = "0.12"
@ -26,7 +29,7 @@ tokio-util = "0.7"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies] # In alphabetical order
assert_matches = "1.5.0"
dml = { path = "../dml" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
test_helpers = { path = "../test_helpers" }

View File

@ -10,20 +10,6 @@
//! # WAL
//!
//! This crate provides a local-disk WAL for the IOx ingestion pipeline.
use crate::blocking::{
ClosedSegmentFileReader as RawClosedSegmentFileReader, OpenSegmentFileWriter,
};
use data_types::sequence_number_set::SequenceNumberSet;
use generated_types::{
google::{FieldViolation, OptionalField},
influxdata::iox::wal::v1::{
sequenced_wal_op::Op as WalOp, SequencedWalOp as ProtoSequencedWalOp,
},
};
use observability_deps::tracing::info;
use parking_lot::Mutex;
use snafu::prelude::*;
use std::{
collections::BTreeMap,
fs::File,
@ -32,9 +18,27 @@ use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId};
use generated_types::{
google::{FieldViolation, OptionalField},
influxdata::iox::wal::v1::{
sequenced_wal_op::Op as WalOp, SequencedWalOp as ProtoSequencedWalOp,
},
};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use mutable_batch_pb::decode::decode_database_batch;
use observability_deps::tracing::info;
use parking_lot::Mutex;
use snafu::prelude::*;
use tokio::{sync::watch, task::JoinHandle};
use writer_thread::WriterIoThreadHandle;
use crate::blocking::{
ClosedSegmentFileReader as RawClosedSegmentFileReader, OpenSegmentFileWriter,
};
pub mod blocking;
mod writer_thread;
@ -132,6 +136,19 @@ pub enum Error {
},
}
/// Errors that occur when decoding internal types from a WAL file.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum DecodeError {
UnableToCreateMutableBatch {
source: mutable_batch_pb::decode::Error,
},
FailedToReadWal {
source: Error,
},
}
/// A specialized `Result` for WAL-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -554,6 +571,62 @@ impl std::fmt::Debug for ClosedSegmentFileReader {
}
}
/// An in-memory representation of a WAL write operation entry.
#[derive(Debug)]
pub struct WriteOpEntry {
pub namespace: NamespaceId,
pub table_batches: HashMap<i64, MutableBatch>,
}
/// A decoder that reads from a closed segment file and parses write
/// operations from their on-disk format to an internal format.
#[derive(Debug)]
pub struct WriteOpEntryDecoder {
reader: ClosedSegmentFileReader,
}
impl From<ClosedSegmentFileReader> for WriteOpEntryDecoder {
/// Creates a decoder which will use the closed segment file of `reader` to
/// decode write ops from their on-disk format.
fn from(reader: ClosedSegmentFileReader) -> Self {
Self { reader }
}
}
impl Iterator for WriteOpEntryDecoder {
type Item = Result<Vec<WriteOpEntry>, DecodeError>;
/// Reads a collection of write op entries in the next WAL entry batch from the
/// underlying closed segment. A returned Ok(None) indicates that there are no
/// more entries to be decoded from the underlying segment. A zero-length vector
/// may be returned if there are no writes in a WAL entry batch, but does not
/// indicate the decoder is consumed.
fn next(&mut self) -> Option<Self::Item> {
self.reader
.next_batch()
.context(FailedToReadWalSnafu)
.transpose()?
.map(|batch| {
batch
.into_iter()
.filter_map(|sequenced_op| match sequenced_op.op {
WalOp::Write(w) => Some(w),
WalOp::Delete(..) => None,
WalOp::Persist(..) => None,
})
.map(|w| {
Ok(WriteOpEntry {
namespace: NamespaceId::new(w.database_id),
table_batches: decode_database_batch(&w)
.context(UnableToCreateMutableBatchSnafu)?,
})
})
.collect::<Self::Item>()
})
.ok()
}
}
/// Metadata for a WAL segment that is no longer accepting writes, but can be read for replay
/// purposes.
#[derive(Debug, Clone)]
@ -575,7 +648,9 @@ impl ClosedSegment {
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeSet;
use assert_matches::assert_matches;
use data_types::{NamespaceId, SequenceNumber, TableId};
use dml::DmlWrite;
use generated_types::influxdata::{
@ -584,6 +659,10 @@ mod tests {
};
use mutable_batch_lp::lines_to_batches;
use super::*;
const TEST_NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test]
async fn wal_write_and_read_ops() {
let dir = test_helpers::tmp_dir().unwrap();
@ -681,6 +760,89 @@ mod tests {
);
}
#[tokio::test]
async fn decode_write_op_entries() {
let dir = test_helpers::tmp_dir().unwrap();
let wal = Wal::new(&dir.path()).await.unwrap();
let w1 = test_data("m1,t=foo v=1i 1");
let w2 = test_data("m2,u=foo w=2i 2");
let w3 = test_data("m1,t=foo v=3i 3");
let op1 = SequencedWalOp {
sequence_number: 0,
op: WalOp::Write(w1.to_owned()),
};
let op2 = SequencedWalOp {
sequence_number: 1,
op: WalOp::Write(w2.to_owned()),
};
let op3 = SequencedWalOp {
sequence_number: 2,
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
sequence_number: 2,
op: WalOp::Persist(test_persist()),
};
// A third write entry coming after a delete and persist entry must still be yielded
let op5 = SequencedWalOp {
sequence_number: 3,
op: WalOp::Write(w3.to_owned()),
};
wal.write_op(op1.clone());
wal.write_op(op2.clone());
wal.write_op(op3.clone()).changed().await.unwrap();
wal.write_op(op4.clone());
wal.write_op(op5.clone()).changed().await.unwrap();
let (closed, _) = wal.rotate().unwrap();
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id)
.expect("failed to open reader for closed WAL segment"),
);
let wal_entries = decoder
.into_iter()
.map(|r| r.expect("unexpected bad entry"))
.collect::<Vec<_>>();
// The decoder should find 2 entries, with a total of 3 write ops
assert_eq!(wal_entries.len(), 2);
let write_op_entries = wal_entries.into_iter().flatten().collect::<Vec<_>>();
assert_eq!(write_op_entries.len(), 3);
assert_matches!(write_op_entries.get(0), Some(got_op1) => {
assert_op_shape(got_op1, &w1);
});
assert_matches!(write_op_entries.get(1), Some(got_op2) => {
assert_op_shape(got_op2, &w2);
});
assert_matches!(write_op_entries.get(2), Some(got_op3) => {
assert_op_shape(got_op3, &w3);
});
}
fn assert_op_shape(left: &WriteOpEntry, right: &DatabaseBatch) {
assert_eq!(left.namespace, NamespaceId::new(right.database_id));
assert_eq!(left.table_batches.len(), right.table_batches.len());
for right_tb in &right.table_batches {
let right_key = right_tb.table_id;
let left_mb = left
.table_batches
.get(&right_key)
.unwrap_or_else(|| panic!("left value missing table batch for key {right_key}"));
assert_eq!(
left_mb.column_names(),
right_tb
.columns
.iter()
.map(|c| c.column_name.as_str())
.collect::<BTreeSet<_>>()
)
}
}
fn test_data(lp: &str) -> DatabaseBatch {
let batches = lines_to_batches(lp, 0).unwrap();
let batches = batches
@ -690,7 +852,7 @@ mod tests {
.collect();
let write = DmlWrite::new(
NamespaceId::new(42),
TEST_NAMESPACE_ID,
batches,
"bananas".into(),
Default::default(),
@ -701,7 +863,7 @@ mod tests {
fn test_delete() -> DeletePayload {
DeletePayload {
database_id: 42,
database_id: TEST_NAMESPACE_ID.get(),
predicate: None,
table_name: "bananas".into(),
}
@ -709,7 +871,7 @@ mod tests {
fn test_persist() -> PersistOp {
PersistOp {
namespace_id: 42,
namespace_id: TEST_NAMESPACE_ID.get(),
parquet_file_uuid: "b4N4N4Z".into(),
partition_id: 43,
table_id: 44,

25
wal_inspect/Cargo.toml Normal file
View File

@ -0,0 +1,25 @@
[package]
name = "wal_inspect"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies] # In alphabetical order
data_types = { version = "0.1.0", path = "../data_types" }
dml = { version = "0.1.0", path = "../dml" }
generated_types = { version = "0.1.0", path = "../generated_types" }
hashbrown.workspace = true
mutable_batch = { version = "0.1.0", path = "../mutable_batch" }
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
parquet_to_line_protocol = { version = "0.1.0", path = "../parquet_to_line_protocol" }
schema = { version = "0.1.0", path = "../schema" }
thiserror = "1.0.40"
wal = { version = "0.1.0", path = "../wal" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies] # In alphabetical order
assert_matches = "1.5.0"
mutable_batch_lp = { path = "../mutable_batch_lp" }
test_helpers = { path = "../test_helpers" }
tokio = { version = "1.27", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }

382
wal_inspect/src/lib.rs Normal file
View File

@ -0,0 +1,382 @@
//! # WAL Inspect
//!
//! This crate builds on top of the WAL implementation to provide tools for
//! inspecting individual segment files and translating them to human readable
//! formats.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
clippy::clone_on_ref_ptr,
clippy::dbg_macro,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::todo,
clippy::use_self,
missing_copy_implementations,
missing_debug_implementations,
missing_docs
)]
use std::io::Write;
use data_types::{NamespaceId, TableId};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use parquet_to_line_protocol::convert_to_lines;
use thiserror::Error;
/// Errors emitted by a [`LineProtoWriter`] during operation.
#[derive(Debug, Error)]
pub enum WriteError {
/// The mutable batch is in a state that prevents obtaining
/// the data needed to write line protocol
#[error("failed to get required data from mutable batch: {0}")]
BadMutableBatch(#[from] mutable_batch::Error),
/// The record batch could not be mapped to line protocol
#[error("failed to map record batch to line protocol: {0}")]
ConvertToLineProtocolFailed(String),
/// A write failure caused by an IO error
#[error("failed to write translation: {0}")]
IoError(#[from] std::io::Error),
}
/// Provides namespaced write functionality from table-based mutable batches
/// to namespaced line protocol output.
#[derive(Debug)]
pub struct LineProtoWriter<W, F>
where
W: Write,
{
namespaced_output: HashMap<NamespaceId, W>,
new_write_sink: F,
table_name_index: HashMap<TableId, String>,
}
impl<W, F> LineProtoWriter<W, F>
where
W: Write,
{
/// Performs a best effort flush of all write destinations opened by the [`LineProtoWriter`].
pub fn flush(&mut self) -> Result<(), Vec<WriteError>> {
let mut errs = Vec::<WriteError>::new();
for w in self.namespaced_output.values_mut() {
if let Err(e) = w.flush() {
errs.push(WriteError::IoError(e));
}
}
if !errs.is_empty() {
return Err(errs);
}
Ok(())
}
}
impl<W, F> Drop for LineProtoWriter<W, F>
where
W: Write,
{
fn drop(&mut self) {
_ = self.flush()
}
}
impl<W, F> LineProtoWriter<W, F>
where
W: Write,
F: Fn(NamespaceId) -> Result<W, WriteError>,
{
/// Constructs a new [`LineProtoWriter`] that uses `new_write_sink` to
/// get the destination for each line protocol write by its namespace ID.
/// The `table_name_index` must provide a mapping from all table IDs to
/// table name to recover the measurement name, as WAL write entries do
/// not contain this information.
pub fn new(new_write_sink: F, table_name_index: HashMap<TableId, String>) -> Self {
Self {
namespaced_output: HashMap::new(),
new_write_sink,
table_name_index,
}
}
/// Writes the provided set of table batches as line protocol write entries
/// to the destination for the provided namespace ID.
pub fn write_namespaced_table_batches(
&mut self,
ns: NamespaceId,
table_batches: HashMap<i64, MutableBatch>,
) -> Result<(), WriteError> {
let sink = self
.namespaced_output
.entry(ns)
.or_insert((self.new_write_sink)(ns)?);
write_batches_as_line_proto(sink, &self.table_name_index, table_batches)
}
}
fn write_batches_as_line_proto<W>(
sink: &mut W,
table_name_index: &HashMap<TableId, String>,
table_batches: HashMap<i64, MutableBatch>,
) -> Result<(), WriteError>
where
W: Write,
{
for (table_id, mb) in table_batches {
let schema = mb.schema(schema::Projection::All)?;
let record_batch = mb.to_arrow(schema::Projection::All)?;
let measurement_name = table_name_index.get(&TableId::new(table_id)).ok_or(
WriteError::ConvertToLineProtocolFailed(format!(
"missing table name for id {}",
&table_id
)),
)?;
sink.write_all(
convert_to_lines(measurement_name, &schema, &record_batch)
.map_err(WriteError::ConvertToLineProtocolFailed)?
.as_slice(),
)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::fs::{read_dir, OpenOptions};
use assert_matches::assert_matches;
use data_types::TableId;
use dml::DmlWrite;
use generated_types::influxdata::{
iox::wal::v1::sequenced_wal_op::Op, pbdata::v1::DatabaseBatch,
};
use mutable_batch_lp::lines_to_batches;
use wal::{SequencedWalOp, WriteOpEntry, WriteOpEntryDecoder};
use super::*;
#[tokio::test]
async fn translate_good_wal_segment_file() {
let test_dir = test_helpers::tmp_dir().expect("failed to create test dir");
let wal = wal::Wal::new(test_dir.path()).await.unwrap();
// Assign table IDs to the measurements and place some writes in the WAL
let (table_id_index, table_name_index) =
build_indexes([("m1", TableId::new(1)), ("m2", TableId::new(2))]);
let line1 = "m1,t=foo v=1i 1";
let line2 = r#"m2,t=bar v="arán" 1"#;
let line3 = "m1,t=foo v=2i 2";
// Generate a single entry
wal.write_op(SequencedWalOp {
sequence_number: 0,
op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line1)),
});
wal.write_op(SequencedWalOp {
sequence_number: 1,
op: Op::Write(encode_line(NamespaceId::new(2), &table_id_index, line2)),
});
wal.write_op(SequencedWalOp {
sequence_number: 2,
op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line3)),
})
.changed()
.await
.expect("WAL should have changed");
// Rotate the WAL and create the translator.
let (closed, _) = wal.rotate().expect("failed to rotate WAL");
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), table_name_index);
let decoded_entries = decoder
.into_iter()
.map(|r| r.expect("unexpected bad entry"))
.collect::<Vec<_>>();
assert_eq!(decoded_entries.len(), 1);
let decoded_ops = decoded_entries
.into_iter()
.flatten()
.collect::<Vec<WriteOpEntry>>();
assert_eq!(decoded_ops.len(), 3);
for entry in decoded_ops {
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
.expect("batch write should not fail");
}
// The WAL has been given a single entry containing three write ops
let results = &writer.namespaced_output;
// Assert that the namespaced writes contain ONLY the following:
//
// NamespaceId 1:
//
// m1,t=foo v=1i 1
// m1,t=foo v=2i 2
//
// NamespaceId 2:
//
// m2,t=bar v="arán" 1
//
assert_eq!(results.len(), 2);
assert_matches!(results.get(&NamespaceId::new(1)), Some(e) => {
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n{}\n", line1, line3));
});
assert_matches!(results.get(&NamespaceId::new(2)), Some(e) => {
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n", line2));
});
}
#[tokio::test]
async fn partial_translate_bad_wal_segment_file() {
let test_dir = test_helpers::tmp_dir().expect("failed to create test dir");
let wal = wal::Wal::new(test_dir.path()).await.unwrap();
let (table_id_index, table_name_index) =
build_indexes([("m3", TableId::new(3)), ("m4", TableId::new(4))]);
let line1 = "m3,s=baz v=3i 1";
let line2 = "m3,s=baz v=2i 2";
let line3 = "m4,s=qux v=2i 3";
let line4 = "m4,s=qux v=5i 4";
// Generate some WAL entries
wal.write_op(SequencedWalOp {
sequence_number: 0,
op: Op::Write(encode_line(NamespaceId::new(3), &table_id_index, line1)),
});
wal.write_op(SequencedWalOp {
sequence_number: 1,
op: Op::Write(encode_line(NamespaceId::new(3), &table_id_index, line2)),
})
.changed()
.await
.expect("WAL should have changed");
wal.write_op(SequencedWalOp {
sequence_number: 2,
op: Op::Write(encode_line(NamespaceId::new(4), &table_id_index, line3)),
});
wal.write_op(SequencedWalOp {
sequence_number: 3,
op: Op::Write(encode_line(NamespaceId::new(4), &table_id_index, line4)),
})
.changed()
.await
.expect("WAL should have changed");
// Get the path of the only segment file, then rotate it and add some
// garbage to the end.
let mut reader = read_dir(test_dir.path()).unwrap();
let closed_path = reader
.next()
.expect("no segment file found in WAL dir")
.unwrap()
.path();
assert_matches!(reader.next(), None); // Only 1 file should be in the WAL dir prior to rotation
let (closed, _) = wal.rotate().expect("failed to rotate WAL");
{
let mut file = OpenOptions::new()
.append(true)
.open(closed_path)
.expect("unable to open closed WAL segment for writing");
file.write_all(b"bananananananananas").unwrap();
}
// Create the translator and read as much as possible out of the bad segment file
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), table_name_index);
// The translator should be able to read all 2 good entries containing 4 write ops
let decoded_entries = decoder
.into_iter()
.map_while(|r| r.ok())
.collect::<Vec<_>>();
assert_eq!(decoded_entries.len(), 2);
let decoded_ops = decoded_entries
.into_iter()
.flatten()
.collect::<Vec<WriteOpEntry>>();
assert_eq!(decoded_ops.len(), 4);
for entry in decoded_ops {
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
.expect("batch write should not fail");
}
let results = &writer.namespaced_output;
// Assert that the namespaced writes contain ONLY the following:
//
// NamespaceId 3:
//
// m3,s=baz v=3i 1
// m3,s=baz v=2i 2
//
// NamespaceId 4:
//
// m4,s=qux v=2i 3
// m4,s=qux v=5i 4
//
assert_eq!(results.len(), 2);
assert_matches!(results.get(&NamespaceId::new(3)), Some(e) => {
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n{}\n", line1, line2));
});
assert_matches!(results.get(&NamespaceId::new(4)), Some(e) => {
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n{}\n", line3, line4));
});
}
fn build_indexes<'a>(
iter: impl IntoIterator<Item = (&'a str, TableId)>,
) -> (HashMap<String, TableId>, HashMap<TableId, String>) {
let table_id_index: HashMap<String, TableId> =
HashMap::from_iter(iter.into_iter().map(|(s, id)| (s.to_string(), id)));
let table_name_index: HashMap<TableId, String> = table_id_index
.clone()
.into_iter()
.map(|(name, id)| (id, name))
.collect();
(table_id_index, table_name_index)
}
fn encode_line(
ns: NamespaceId,
table_id_index: &HashMap<String, TableId>,
lp: &str,
) -> DatabaseBatch {
let batches = lines_to_batches(lp, 0).unwrap();
let batches = batches
.into_iter()
.map(|(table_name, batch)| {
(
table_id_index
.get(&table_name)
.expect("table name not present in table id index")
.to_owned(),
batch,
)
})
.collect();
let write = DmlWrite::new(ns, batches, "bananas".into(), Default::default());
mutable_batch_pb::encode::encode_write(ns.get(), &write)
}
}