Merge pull request #8085 from influxdata/cn/use-the-test-constants-luke
refactor: Use test constants in more placespull/24376/head
commit
f1eb7bb964
|
@ -271,7 +271,6 @@ mod tests {
|
|||
};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::PartitionId;
|
||||
use futures::Future;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use lazy_static::lazy_static;
|
||||
|
|
|
@ -227,14 +227,6 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{PartitionId, PartitionKey};
|
||||
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
|
||||
use futures::StreamExt;
|
||||
use metric::{Attributes, Metric};
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
buffer_tree::{
|
||||
|
@ -247,10 +239,30 @@ mod tests {
|
|||
query::partition_response::PartitionResponse,
|
||||
test_util::{
|
||||
defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID,
|
||||
ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
|
||||
ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER,
|
||||
ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER,
|
||||
},
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{PartitionId, PartitionKey};
|
||||
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
|
||||
use futures::StreamExt;
|
||||
use lazy_static::lazy_static;
|
||||
use metric::{Attributes, Metric};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
const PARTITION2_ID: PartitionId = PartitionId::new(2);
|
||||
const PARTITION3_ID: PartitionId = PartitionId::new(3);
|
||||
|
||||
const TABLE2_ID: TableId = TableId::new(1234321);
|
||||
const TABLE2_NAME: &str = "another_table";
|
||||
|
||||
const NAMESPACE2_ID: NamespaceId = NamespaceId::new(4321);
|
||||
|
||||
lazy_static! {
|
||||
static ref PARTITION2_KEY: PartitionKey = PartitionKey::from("p2");
|
||||
static ref PARTITION3_KEY: PartitionKey = PartitionKey::from("p3");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_init_table() {
|
||||
|
@ -380,11 +392,11 @@ mod tests {
|
|||
test_write_query!(
|
||||
read_writes,
|
||||
partitions = [PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build()],
|
||||
writes = [make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -410,17 +422,17 @@ mod tests {
|
|||
multiple_partitions,
|
||||
partitions = [
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build(),
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(1))
|
||||
.with_partition_key(PartitionKey::from("p2"))
|
||||
.with_partition_id(PARTITION2_ID)
|
||||
.with_partition_key(PARTITION2_KEY.clone())
|
||||
.build()
|
||||
],
|
||||
writes = [
|
||||
make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -432,7 +444,7 @@ mod tests {
|
|||
None,
|
||||
),
|
||||
make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
&PARTITION2_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -460,19 +472,19 @@ mod tests {
|
|||
filter_multiple_namespaces,
|
||||
partitions = [
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build(),
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(1))
|
||||
.with_partition_key(PartitionKey::from("p2"))
|
||||
.with_namespace_id(NamespaceId::new(4321)) // A different namespace ID.
|
||||
.with_table_id(TableId::new(1234)) // A different table ID.
|
||||
.with_partition_id(PARTITION2_ID)
|
||||
.with_partition_key(PARTITION2_KEY.clone())
|
||||
.with_namespace_id(NAMESPACE2_ID) // A different namespace ID.
|
||||
.with_table_id(TABLE2_ID) // A different table ID.
|
||||
.build()
|
||||
],
|
||||
writes = [
|
||||
make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -484,10 +496,10 @@ mod tests {
|
|||
None,
|
||||
),
|
||||
make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
NamespaceId::new(4321), // A different namespace ID.
|
||||
&PARTITION2_KEY,
|
||||
NAMESPACE2_ID, // A different namespace ID.
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
TableId::new(1234), // A different table ID
|
||||
TABLE2_ID, // A different table ID
|
||||
0,
|
||||
&format!(
|
||||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
|
@ -511,18 +523,18 @@ mod tests {
|
|||
filter_multiple_tabls,
|
||||
partitions = [
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build(),
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(1))
|
||||
.with_partition_key(PartitionKey::from("p2"))
|
||||
.with_table_id(TableId::new(1234)) // A different table ID.
|
||||
.with_partition_id(PARTITION2_ID)
|
||||
.with_partition_key(PARTITION2_KEY.clone())
|
||||
.with_table_id(TABLE2_ID) // A different table ID.
|
||||
.build()
|
||||
],
|
||||
writes = [
|
||||
make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -534,10 +546,10 @@ mod tests {
|
|||
None,
|
||||
),
|
||||
make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
&PARTITION2_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
TableId::new(1234), // A different table ID
|
||||
TABLE2_ID, // A different table ID
|
||||
0,
|
||||
&format!(
|
||||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
|
@ -562,12 +574,12 @@ mod tests {
|
|||
test_write_query!(
|
||||
duplicate_writes,
|
||||
partitions = [PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build()],
|
||||
writes = [
|
||||
make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -579,7 +591,7 @@ mod tests {
|
|||
None,
|
||||
),
|
||||
make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -605,20 +617,18 @@ mod tests {
|
|||
/// single namespace being created, and matching metrics.
|
||||
#[tokio::test]
|
||||
async fn test_metrics() {
|
||||
// Configure the mock partition provider to return a single partition, named
|
||||
// p1.
|
||||
let partition_provider = Arc::new(
|
||||
MockPartitionProvider::default()
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build(),
|
||||
)
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p2"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(PARTITION2_KEY.clone())
|
||||
.build(),
|
||||
),
|
||||
);
|
||||
|
@ -634,9 +644,9 @@ mod tests {
|
|||
Arc::clone(&metrics),
|
||||
);
|
||||
|
||||
// Write data to partition p1, in the arbitrary table
|
||||
// Write data to the arbitrary partition, in the arbitrary table
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -653,7 +663,7 @@ mod tests {
|
|||
// Write a duplicate record with the same series key & timestamp, but a
|
||||
// different temp value.
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
&PARTITION2_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -691,29 +701,26 @@ mod tests {
|
|||
/// single namespace being created, and matching metrics.
|
||||
#[tokio::test]
|
||||
async fn test_partition_iter() {
|
||||
const TABLE2_ID: TableId = TableId::new(1234321);
|
||||
const TABLE2_NAME: &str = "another_table";
|
||||
|
||||
// Configure the mock partition provider to return a single partition, named
|
||||
// p1.
|
||||
let partition_provider = Arc::new(
|
||||
MockPartitionProvider::default()
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build(),
|
||||
)
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(1))
|
||||
.with_partition_key(PartitionKey::from("p2"))
|
||||
.with_partition_id(PARTITION2_ID)
|
||||
.with_partition_key(PARTITION2_KEY.clone())
|
||||
.build(),
|
||||
)
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(2))
|
||||
.with_partition_key(PartitionKey::from("p3"))
|
||||
.with_partition_id(PARTITION3_ID)
|
||||
.with_partition_key(PARTITION3_KEY.clone())
|
||||
.with_table_id(TABLE2_ID)
|
||||
.with_table_name_loader(Arc::new(DeferredLoad::new(
|
||||
Duration::from_secs(1),
|
||||
|
@ -735,9 +742,9 @@ mod tests {
|
|||
|
||||
assert_eq!(buf.partitions().count(), 0);
|
||||
|
||||
// Write data to partition p1, in the arbitrary table
|
||||
// Write data to the arbitrary partition, in the arbitrary table
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -753,9 +760,9 @@ mod tests {
|
|||
|
||||
assert_eq!(buf.partitions().count(), 1);
|
||||
|
||||
// Write data to partition p2, in the arbitrary table
|
||||
// Write data to partition2, in the arbitrary table
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
&PARTITION2_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -771,9 +778,9 @@ mod tests {
|
|||
|
||||
assert_eq!(buf.partitions().count(), 2);
|
||||
|
||||
// Write data to partition p3, in the second table
|
||||
// Write data to partition3, in the second table
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p3"),
|
||||
&PARTITION3_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
TABLE2_NAME,
|
||||
TABLE2_ID,
|
||||
|
@ -787,12 +794,14 @@ mod tests {
|
|||
// Iterate over the partitions and ensure they were all visible.
|
||||
let mut ids = buf
|
||||
.partitions()
|
||||
.map(|p| p.lock().partition_id().get())
|
||||
.map(|p| p.lock().partition_id())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
ids.sort_unstable();
|
||||
|
||||
assert_matches!(*ids, [0, 1, 2]);
|
||||
let mut expected = [ARBITRARY_PARTITION_ID, PARTITION2_ID, PARTITION3_ID];
|
||||
expected.sort_unstable();
|
||||
|
||||
assert_eq!(ids, expected);
|
||||
}
|
||||
|
||||
/// Assert the correct "not found" errors are generated for missing
|
||||
|
@ -803,8 +812,8 @@ mod tests {
|
|||
let partition_provider = Arc::new(
|
||||
MockPartitionProvider::default().with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build(),
|
||||
),
|
||||
);
|
||||
|
@ -827,9 +836,9 @@ mod tests {
|
|||
assert_eq!(ns, ARBITRARY_NAMESPACE_ID);
|
||||
});
|
||||
|
||||
// Write data to partition p1, in the arbitrary table
|
||||
// Write data to the arbitrary partition, in the arbitrary table
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -845,12 +854,12 @@ mod tests {
|
|||
|
||||
// Ensure an unknown table errors
|
||||
let err = buf
|
||||
.query_exec(ARBITRARY_NAMESPACE_ID, TableId::new(1234), vec![], None)
|
||||
.query_exec(ARBITRARY_NAMESPACE_ID, TABLE2_ID, vec![], None)
|
||||
.await
|
||||
.expect_err("query should fail");
|
||||
assert_matches!(err, QueryError::TableNotFound(ns, t) => {
|
||||
assert_eq!(ns, ARBITRARY_NAMESPACE_ID);
|
||||
assert_eq!(t, TableId::new(1234));
|
||||
assert_eq!(t, TABLE2_ID);
|
||||
});
|
||||
|
||||
// Ensure a valid namespace / table does not error
|
||||
|
@ -877,20 +886,19 @@ mod tests {
|
|||
/// ordering of writes.
|
||||
#[tokio::test]
|
||||
async fn test_read_consistency() {
|
||||
// Configure the mock partition provider to return two partitions, named
|
||||
// p1 and p2.
|
||||
// Configure the mock partition provider to return two partitions.
|
||||
let partition_provider = Arc::new(
|
||||
MockPartitionProvider::default()
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(0))
|
||||
.with_partition_key(PartitionKey::from("p1"))
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key(ARBITRARY_PARTITION_KEY.clone())
|
||||
.build(),
|
||||
)
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PartitionId::new(1))
|
||||
.with_partition_key(PartitionKey::from("p2"))
|
||||
.with_partition_id(PARTITION2_ID)
|
||||
.with_partition_key(PARTITION2_KEY.clone())
|
||||
.build(),
|
||||
),
|
||||
);
|
||||
|
@ -904,9 +912,9 @@ mod tests {
|
|||
Arc::new(metric::Registry::default()),
|
||||
);
|
||||
|
||||
// Write data to partition p1, in the arbitrary table
|
||||
// Write data to the arbitrary partition, in the arbitrary table
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -929,9 +937,9 @@ mod tests {
|
|||
.into_partition_stream();
|
||||
|
||||
// Perform a write concurrent to the consumption of the query stream
|
||||
// that creates a new partition (p2) in the same table.
|
||||
// that creates a new partition2 in the same table.
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
&PARTITION2_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -945,10 +953,10 @@ mod tests {
|
|||
.await
|
||||
.expect("failed to perform concurrent write to new partition");
|
||||
|
||||
// Perform another write that hits the partition within the query
|
||||
// results snapshot (p1) before the partition is read.
|
||||
// Perform another write that hits the arbitrary partition within the query
|
||||
// results snapshot before the partition is read.
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
|
@ -965,8 +973,8 @@ mod tests {
|
|||
// Consume the set of partitions within the query stream.
|
||||
//
|
||||
// Under the specified query consistency guarantees, both the first and
|
||||
// third writes (both to p1) should be visible. The second write to p2
|
||||
// should not be visible.
|
||||
// third writes (both to the arbitrary partition) should be visible. The second write to
|
||||
// partition2 should not be visible.
|
||||
let mut partitions: Vec<PartitionResponse> = stream.collect().await;
|
||||
assert_eq!(partitions.len(), 1); // only p1, not p2
|
||||
let partition = partitions.pop().unwrap();
|
||||
|
@ -974,7 +982,7 @@ mod tests {
|
|||
// Perform the partition read
|
||||
let batches = partition.into_record_batches();
|
||||
|
||||
// Assert the contents of p1 contains both the initial write, and the
|
||||
// Assert the contents of the arbitrary partition contains both the initial write, and the
|
||||
// 3rd write in a single RecordBatch.
|
||||
assert_batches_eq!(
|
||||
[
|
||||
|
|
|
@ -70,41 +70,16 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId};
|
||||
use lazy_static::lazy_static;
|
||||
use metric::Attributes;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
buffer_tree::{namespace::NamespaceName, table::TableName},
|
||||
deferred_load::DeferredLoad,
|
||||
dml_sink::{mock_sink::MockDmlSink, DmlError},
|
||||
test_util::make_write_op,
|
||||
test_util::{
|
||||
make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
|
||||
ARBITRARY_TABLE_NAME,
|
||||
},
|
||||
};
|
||||
|
||||
const PARTITION_ID: PartitionId = PartitionId::new(42);
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(24);
|
||||
const TABLE_ID: TableId = TableId::new(2442);
|
||||
const TABLE_NAME: &str = "banana-report";
|
||||
const NAMESPACE_NAME: &str = "platanos";
|
||||
|
||||
lazy_static! {
|
||||
static ref PARTITION_KEY: PartitionKey = PartitionKey::from("bananas");
|
||||
static ref NAMESPACE_NAME_LOADER: Arc<DeferredLoad<NamespaceName>> =
|
||||
Arc::new(DeferredLoad::new(
|
||||
Duration::from_secs(1),
|
||||
async { NamespaceName::from(NAMESPACE_NAME) },
|
||||
&metric::Registry::default(),
|
||||
));
|
||||
static ref TABLE_NAME_LOADER: Arc<DeferredLoad<TableName>> = Arc::new(DeferredLoad::new(
|
||||
Duration::from_secs(1),
|
||||
async { TableName::from(TABLE_NAME) },
|
||||
&metric::Registry::default(),
|
||||
));
|
||||
}
|
||||
use assert_matches::assert_matches;
|
||||
use metric::Attributes;
|
||||
|
||||
const LAYER_NAME: &str = "test-bananas";
|
||||
|
||||
|
@ -123,12 +98,12 @@ mod tests {
|
|||
let decorator = DmlSinkInstrumentation::new(LAYER_NAME, mock, &metrics);
|
||||
|
||||
let op = IngestOp::Write(make_write_op(
|
||||
&PARTITION_KEY,
|
||||
NAMESPACE_ID,
|
||||
TABLE_NAME,
|
||||
TABLE_ID,
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
42,
|
||||
"banana-report,tag=1 v=2 42424242",
|
||||
&format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME),
|
||||
None,
|
||||
));
|
||||
|
||||
|
@ -141,7 +116,9 @@ mod tests {
|
|||
// Validate the histogram with the specified attributes saw
|
||||
// an observation
|
||||
let histogram = metrics
|
||||
.get_instrument::<Metric<DurationHistogram>>("ingester_dml_sink_apply_duration")
|
||||
.get_instrument::<Metric<DurationHistogram>>(
|
||||
"ingester_dml_sink_apply_duration"
|
||||
)
|
||||
.expect("failed to find metric")
|
||||
.get_observer(&Attributes::from(&$want_metric_attr))
|
||||
.expect("failed to find attributes")
|
||||
|
|
|
@ -55,42 +55,17 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId};
|
||||
use lazy_static::lazy_static;
|
||||
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
|
||||
|
||||
use crate::{
|
||||
buffer_tree::{namespace::NamespaceName, table::TableName},
|
||||
deferred_load::DeferredLoad,
|
||||
dml_sink::{mock_sink::MockDmlSink, DmlError},
|
||||
test_util::make_write_op,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
const PARTITION_ID: PartitionId = PartitionId::new(42);
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(24);
|
||||
const TABLE_ID: TableId = TableId::new(2442);
|
||||
const TABLE_NAME: &str = "banana-report";
|
||||
const NAMESPACE_NAME: &str = "platanos";
|
||||
|
||||
lazy_static! {
|
||||
static ref PARTITION_KEY: PartitionKey = PartitionKey::from("bananas");
|
||||
static ref NAMESPACE_NAME_LOADER: Arc<DeferredLoad<NamespaceName>> =
|
||||
Arc::new(DeferredLoad::new(
|
||||
Duration::from_secs(1),
|
||||
async { NamespaceName::from(NAMESPACE_NAME) },
|
||||
&metric::Registry::default(),
|
||||
));
|
||||
static ref TABLE_NAME_LOADER: Arc<DeferredLoad<TableName>> = Arc::new(DeferredLoad::new(
|
||||
Duration::from_secs(1),
|
||||
async { TableName::from(TABLE_NAME) },
|
||||
&metric::Registry::default(),
|
||||
));
|
||||
}
|
||||
use crate::{
|
||||
dml_sink::{mock_sink::MockDmlSink, DmlError},
|
||||
test_util::{
|
||||
make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
|
||||
ARBITRARY_TABLE_NAME,
|
||||
},
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
|
||||
|
||||
#[track_caller]
|
||||
fn assert_trace(name: impl Into<String>, status: SpanStatus, traces: &dyn TraceCollector) {
|
||||
|
@ -120,12 +95,12 @@ mod tests {
|
|||
let span = SpanContext::new(Arc::clone(&traces));
|
||||
|
||||
let op = IngestOp::Write(make_write_op(
|
||||
&PARTITION_KEY,
|
||||
NAMESPACE_ID,
|
||||
TABLE_NAME,
|
||||
TABLE_ID,
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
42,
|
||||
"banana-report,tag=1 v=2 42424242",
|
||||
&format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME),
|
||||
Some(span),
|
||||
));
|
||||
|
||||
|
@ -148,12 +123,12 @@ mod tests {
|
|||
let span = SpanContext::new(Arc::clone(&traces));
|
||||
|
||||
let op = IngestOp::Write(make_write_op(
|
||||
&PARTITION_KEY,
|
||||
NAMESPACE_ID,
|
||||
TABLE_NAME,
|
||||
TABLE_ID,
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
42,
|
||||
"banana-report,tag=1 v=2 42424242",
|
||||
&format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME),
|
||||
Some(span),
|
||||
));
|
||||
|
||||
|
|
|
@ -107,12 +107,12 @@ pub(super) async fn compact_persisting_batch(
|
|||
mod tests {
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use data_types::PartitionId;
|
||||
use iox_query::test::{raw_data, TestChunk};
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use schema::Projection;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::ARBITRARY_PARTITION_ID;
|
||||
|
||||
// this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782
|
||||
// where if sending in a single row it would compact into an output of two batches, one of
|
||||
|
@ -127,7 +127,7 @@ mod tests {
|
|||
.to_arrow(Projection::All)
|
||||
.unwrap();
|
||||
|
||||
let batch = QueryAdaptor::new(PartitionId::new(1), vec![Arc::new(batch)]);
|
||||
let batch = QueryAdaptor::new(ARBITRARY_PARTITION_ID, vec![Arc::new(batch)]);
|
||||
|
||||
// verify PK
|
||||
let schema = batch.schema();
|
||||
|
@ -162,7 +162,7 @@ mod tests {
|
|||
async fn test_compact_batch_on_one_record_batch_no_dupilcates() {
|
||||
// create input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_one_record_batch_with_influxtype_no_duplicates().await,
|
||||
);
|
||||
|
||||
|
@ -211,7 +211,7 @@ mod tests {
|
|||
async fn test_compact_batch_no_sort_key() {
|
||||
// create input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_batches_with_influxtype_different_cardinality().await,
|
||||
);
|
||||
|
||||
|
@ -265,7 +265,7 @@ mod tests {
|
|||
async fn test_compact_batch_with_specified_sort_key() {
|
||||
// create input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_batches_with_influxtype_different_cardinality().await,
|
||||
);
|
||||
|
||||
|
@ -324,7 +324,7 @@ mod tests {
|
|||
async fn test_compact_batch_new_column_for_sort_key() {
|
||||
// create input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_batches_with_influxtype_different_cardinality().await,
|
||||
);
|
||||
|
||||
|
@ -387,7 +387,7 @@ mod tests {
|
|||
async fn test_compact_batch_missing_column_for_sort_key() {
|
||||
// create input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_batches_with_influxtype_different_cardinality().await,
|
||||
);
|
||||
|
||||
|
@ -449,7 +449,7 @@ mod tests {
|
|||
|
||||
// create input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_one_row_record_batch_with_influxtype().await,
|
||||
);
|
||||
|
||||
|
@ -490,7 +490,7 @@ mod tests {
|
|||
async fn test_compact_one_batch_with_duplicates() {
|
||||
// create input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_one_record_batch_with_influxtype_duplicates().await,
|
||||
);
|
||||
|
||||
|
@ -538,7 +538,10 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_compact_many_batches_same_columns_with_duplicates() {
|
||||
// create many-batches input data
|
||||
let batch = QueryAdaptor::new(PartitionId::new(1), create_batches_with_influxtype().await);
|
||||
let batch = QueryAdaptor::new(
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_batches_with_influxtype().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = batch.schema();
|
||||
|
@ -583,7 +586,7 @@ mod tests {
|
|||
async fn test_compact_many_batches_different_columns_with_duplicates() {
|
||||
// create many-batches input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_batches_with_influxtype_different_columns().await,
|
||||
);
|
||||
|
||||
|
@ -634,7 +637,7 @@ mod tests {
|
|||
async fn test_compact_many_batches_different_columns_different_order_with_duplicates() {
|
||||
// create many-batches input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_batches_with_influxtype_different_columns_different_order().await,
|
||||
);
|
||||
|
||||
|
@ -688,7 +691,7 @@ mod tests {
|
|||
async fn test_compact_many_batches_same_columns_different_types() {
|
||||
// create many-batches input data
|
||||
let batch = QueryAdaptor::new(
|
||||
PartitionId::new(1),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
create_batches_with_influxtype_same_columns_different_type().await,
|
||||
);
|
||||
|
||||
|
|
|
@ -156,19 +156,15 @@ pub(crate) mod mock {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp};
|
||||
|
||||
use super::*;
|
||||
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(1);
|
||||
const TABLE_ID: TableId = TableId::new(1);
|
||||
const PARTITION_ID: PartitionId = PartitionId::new(1);
|
||||
use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID};
|
||||
use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp};
|
||||
|
||||
fn arbitrary_file_meta() -> ParquetFileParams {
|
||||
ParquetFileParams {
|
||||
namespace_id: NAMESPACE_ID,
|
||||
table_id: TABLE_ID,
|
||||
partition_id: PARTITION_ID,
|
||||
namespace_id: ARBITRARY_NAMESPACE_ID,
|
||||
table_id: ARBITRARY_TABLE_ID,
|
||||
partition_id: ARBITRARY_PARTITION_ID,
|
||||
partition_hash_id: None,
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(42),
|
||||
|
|
|
@ -148,21 +148,16 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
persist::completion_observer::mock::MockCompletionObserver,
|
||||
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID},
|
||||
};
|
||||
use data_types::{
|
||||
sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, NamespaceId,
|
||||
ParquetFileParams, PartitionId, TableId, Timestamp,
|
||||
sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, ParquetFileParams, Timestamp,
|
||||
};
|
||||
use metric::assert_histogram;
|
||||
|
||||
use crate::persist::completion_observer::mock::MockCompletionObserver;
|
||||
|
||||
use super::*;
|
||||
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(1);
|
||||
const TABLE_ID: TableId = TableId::new(1);
|
||||
const PARTITION_ID: PartitionId = PartitionId::new(1);
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_persisted_file_metrics() {
|
||||
|
@ -172,9 +167,9 @@ mod tests {
|
|||
let decorator = ParquetFileInstrumentation::new(Arc::clone(&inner), &metrics);
|
||||
|
||||
let meta = ParquetFileParams {
|
||||
namespace_id: NAMESPACE_ID,
|
||||
table_id: TABLE_ID,
|
||||
partition_id: PARTITION_ID,
|
||||
namespace_id: ARBITRARY_NAMESPACE_ID,
|
||||
table_id: ARBITRARY_TABLE_ID,
|
||||
partition_id: ARBITRARY_PARTITION_ID,
|
||||
partition_hash_id: None,
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _),
|
||||
|
|
|
@ -426,20 +426,22 @@ struct MetricState {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use crate::{make_batch, make_partition_stream, query::mock_query_exec::MockQueryExec};
|
||||
|
||||
use super::*;
|
||||
|
||||
use crate::{
|
||||
make_batch, make_partition_stream,
|
||||
query::mock_query_exec::MockQueryExec,
|
||||
test_util::{
|
||||
ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_TABLE_ID,
|
||||
},
|
||||
};
|
||||
use arrow::array::{Float32Array, Int64Array};
|
||||
use data_types::{PartitionHashId, PartitionId, PartitionKey};
|
||||
use data_types::PartitionHashId;
|
||||
use futures::{stream, StreamExt};
|
||||
use iox_time::MockProvider;
|
||||
use metric::{assert_histogram, Attributes};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
|
||||
const TABLE_ID: TableId = TableId::new(42);
|
||||
const TIME_STEP: Duration = Duration::from_secs(42);
|
||||
|
||||
/// A query against a table that has been persisted / no longer contains any
|
||||
|
@ -451,10 +453,10 @@ mod tests {
|
|||
// Construct a stream with no batches.
|
||||
let stream = PartitionStream::new(stream::iter([PartitionResponse::new(
|
||||
vec![],
|
||||
PartitionId::new(42),
|
||||
ARBITRARY_PARTITION_ID,
|
||||
Some(PartitionHashId::new(
|
||||
TABLE_ID,
|
||||
&PartitionKey::from("arbitrary"),
|
||||
ARBITRARY_TABLE_ID,
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
)),
|
||||
42,
|
||||
)]));
|
||||
|
@ -465,7 +467,7 @@ mod tests {
|
|||
.with_time_provider(Arc::clone(&mock_time));
|
||||
|
||||
let response = layer
|
||||
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
|
||||
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
|
||||
.await
|
||||
.expect("query should succeed");
|
||||
|
||||
|
@ -546,7 +548,7 @@ mod tests {
|
|||
.with_time_provider(Arc::clone(&mock_time));
|
||||
|
||||
let response = layer
|
||||
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
|
||||
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
|
||||
.await
|
||||
.expect("query should succeed");
|
||||
|
||||
|
@ -626,7 +628,7 @@ mod tests {
|
|||
.with_time_provider(Arc::clone(&mock_time));
|
||||
|
||||
let response = layer
|
||||
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
|
||||
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
|
||||
.await
|
||||
.expect("query should succeed");
|
||||
|
||||
|
@ -706,7 +708,7 @@ mod tests {
|
|||
.with_time_provider(Arc::clone(&mock_time));
|
||||
|
||||
let response = layer
|
||||
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
|
||||
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
|
||||
.await
|
||||
.expect("query should succeed");
|
||||
|
||||
|
|
|
@ -375,22 +375,20 @@ fn encode_response(
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::array::{Float64Array, Int32Array};
|
||||
use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream};
|
||||
use assert_matches::assert_matches;
|
||||
use bytes::Bytes;
|
||||
use data_types::PartitionKey;
|
||||
use tonic::Code;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
make_batch,
|
||||
query::{
|
||||
mock_query_exec::MockQueryExec, partition_response::PartitionResponse,
|
||||
response::PartitionStream,
|
||||
},
|
||||
test_util::ARBITRARY_PARTITION_KEY,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use arrow::array::{Float64Array, Int32Array};
|
||||
use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream};
|
||||
use assert_matches::assert_matches;
|
||||
use bytes::Bytes;
|
||||
use tonic::Code;
|
||||
|
||||
#[tokio::test]
|
||||
async fn limits_concurrent_queries() {
|
||||
|
@ -430,7 +428,7 @@ mod tests {
|
|||
let ingester_id = IngesterId::new();
|
||||
let partition_hash_id = Some(PartitionHashId::new(
|
||||
TableId::new(3),
|
||||
&PartitionKey::from("arbitrary"),
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
));
|
||||
let (batch1, schema1) = make_batch!(
|
||||
Float64Array("float" => vec![1.1, 2.2, 3.3]),
|
||||
|
|
|
@ -220,21 +220,21 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::SequenceNumber;
|
||||
use generated_types::influxdata::pbdata::v1::{
|
||||
column::{SemanticType, Values},
|
||||
Column, DatabaseBatch, TableBatch,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::dml_payload::IngestOp;
|
||||
use crate::dml_sink::mock_sink::MockDmlSink;
|
||||
use crate::{
|
||||
dml_payload::IngestOp,
|
||||
dml_sink::mock_sink::MockDmlSink,
|
||||
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID},
|
||||
};
|
||||
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
|
||||
const PARTITION_KEY: &str = "bananas";
|
||||
const PERSIST_QUEUE_DEPTH: usize = 42;
|
||||
|
||||
macro_rules! test_rpc_write {
|
||||
|
@ -261,7 +261,13 @@ mod tests {
|
|||
.write(Request::new($request))
|
||||
.await;
|
||||
|
||||
assert_eq!(ret.is_err(), $want_err, "wanted handler error {} got {:?}", $want_err, ret);
|
||||
assert_eq!(
|
||||
ret.is_err(),
|
||||
$want_err,
|
||||
"wanted handler error {} got {:?}",
|
||||
$want_err,
|
||||
ret
|
||||
);
|
||||
assert_matches!(mock.get_calls().as_slice(), $($want_calls)+);
|
||||
}
|
||||
}
|
||||
|
@ -272,10 +278,10 @@ mod tests {
|
|||
apply_ok,
|
||||
request = proto::WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
database_id: NAMESPACE_ID.get(),
|
||||
partition_key: PARTITION_KEY.to_string(),
|
||||
database_id: ARBITRARY_NAMESPACE_ID.get(),
|
||||
partition_key: ARBITRARY_PARTITION_KEY.to_string(),
|
||||
table_batches: vec![TableBatch {
|
||||
table_id: 42,
|
||||
table_id: ARBITRARY_TABLE_ID.get(),
|
||||
columns: vec![Column {
|
||||
column_name: "time".to_string(),
|
||||
semantic_type: SemanticType::Time.into(),
|
||||
|
@ -299,10 +305,13 @@ mod tests {
|
|||
want_err = false,
|
||||
want_calls = [IngestOp::Write(w)] => {
|
||||
// Assert the various IngestOp properties match the expected values
|
||||
assert_eq!(w.namespace(), NAMESPACE_ID);
|
||||
assert_eq!(w.namespace(), ARBITRARY_NAMESPACE_ID);
|
||||
assert_eq!(w.tables().count(), 1);
|
||||
assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY));
|
||||
assert_eq!(w.tables().next().unwrap().1.partitioned_data().sequence_number(), SequenceNumber::new(1));
|
||||
assert_eq!(w.partition_key(), &*ARBITRARY_PARTITION_KEY);
|
||||
assert_eq!(
|
||||
w.tables().next().unwrap().1.partitioned_data().sequence_number(),
|
||||
SequenceNumber::new(1)
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -318,8 +327,8 @@ mod tests {
|
|||
no_tables,
|
||||
request = proto::WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
database_id: NAMESPACE_ID.get(),
|
||||
partition_key: PARTITION_KEY.to_string(),
|
||||
database_id: ARBITRARY_NAMESPACE_ID.get(),
|
||||
partition_key: ARBITRARY_PARTITION_KEY.to_string(),
|
||||
table_batches: vec![],
|
||||
}),
|
||||
},
|
||||
|
@ -332,10 +341,10 @@ mod tests {
|
|||
batch_error,
|
||||
request = proto::WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
database_id: NAMESPACE_ID.get(),
|
||||
partition_key: PARTITION_KEY.to_string(),
|
||||
database_id: ARBITRARY_NAMESPACE_ID.get(),
|
||||
partition_key: ARBITRARY_PARTITION_KEY.to_string(),
|
||||
table_batches: vec![TableBatch {
|
||||
table_id: 42,
|
||||
table_id: ARBITRARY_TABLE_ID.get(),
|
||||
columns: vec![Column {
|
||||
column_name: "time".to_string(),
|
||||
semantic_type: SemanticType::Time.into(),
|
||||
|
@ -373,10 +382,10 @@ mod tests {
|
|||
|
||||
let req = proto::WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
database_id: NAMESPACE_ID.get(),
|
||||
partition_key: PARTITION_KEY.to_string(),
|
||||
database_id: ARBITRARY_NAMESPACE_ID.get(),
|
||||
partition_key: ARBITRARY_PARTITION_KEY.to_string(),
|
||||
table_batches: vec![TableBatch {
|
||||
table_id: 42,
|
||||
table_id: ARBITRARY_TABLE_ID.get(),
|
||||
columns: vec![Column {
|
||||
column_name: "time".to_string(),
|
||||
semantic_type: SemanticType::Time.into(),
|
||||
|
@ -430,10 +439,10 @@ mod tests {
|
|||
|
||||
let req = proto::WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
database_id: NAMESPACE_ID.get(),
|
||||
partition_key: PARTITION_KEY.to_string(),
|
||||
database_id: ARBITRARY_NAMESPACE_ID.get(),
|
||||
partition_key: ARBITRARY_PARTITION_KEY.to_string(),
|
||||
table_batches: vec![TableBatch {
|
||||
table_id: 42,
|
||||
table_id: ARBITRARY_TABLE_ID.get(),
|
||||
columns: vec![Column {
|
||||
column_name: "time".to_string(),
|
||||
semantic_type: SemanticType::Time.into(),
|
||||
|
@ -486,10 +495,10 @@ mod tests {
|
|||
|
||||
let req = proto::WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
database_id: NAMESPACE_ID.get(),
|
||||
partition_key: PARTITION_KEY.to_string(),
|
||||
database_id: ARBITRARY_NAMESPACE_ID.get(),
|
||||
partition_key: ARBITRARY_PARTITION_KEY.to_string(),
|
||||
table_batches: vec![TableBatch {
|
||||
table_id: 42,
|
||||
table_id: ARBITRARY_TABLE_ID.get(),
|
||||
columns: vec![Column {
|
||||
column_name: "time".to_string(),
|
||||
semantic_type: SemanticType::Time.into(),
|
||||
|
|
|
@ -229,7 +229,10 @@ macro_rules! make_partition_stream {
|
|||
$(
|
||||
let (batch, this_schema) = $batch;
|
||||
batches.push(batch);
|
||||
schema = Schema::try_merge([schema, (*this_schema).clone()]).expect("incompatible batch schemas");
|
||||
schema = Schema::try_merge([
|
||||
schema,
|
||||
(*this_schema).clone()
|
||||
]).expect("incompatible batch schemas");
|
||||
)+
|
||||
drop(schema);
|
||||
|
||||
|
@ -241,11 +244,11 @@ macro_rules! make_partition_stream {
|
|||
// batches are in a different partition, not what the actual identifier
|
||||
// values are. This will go away when the ingester no longer sends
|
||||
// PartitionIds.
|
||||
PartitionId::new($id),
|
||||
data_types::PartitionId::new($id),
|
||||
Some(
|
||||
PartitionHashId::new(
|
||||
TableId::new($id),
|
||||
&PartitionKey::from("arbitrary")
|
||||
&*ARBITRARY_PARTITION_KEY
|
||||
)
|
||||
),
|
||||
42,
|
||||
|
|
|
@ -204,22 +204,18 @@ impl WalReferenceHandle {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::{pin::Pin, task::Poll, time::Duration};
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID};
|
||||
use assert_matches::assert_matches;
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionId, TableId, Timestamp,
|
||||
};
|
||||
use data_types::{ColumnId, ColumnSet, ParquetFileParams, Timestamp};
|
||||
use futures::{task::Context, Future, FutureExt};
|
||||
use metric::{assert_counter, U64Gauge};
|
||||
use parking_lot::Mutex;
|
||||
use std::{pin::Pin, task::Poll, time::Duration};
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use super::*;
|
||||
|
||||
/// A mock file deleter that records the IDs it was asked to delete.
|
||||
#[derive(Debug, Default)]
|
||||
struct MockWalDeleter {
|
||||
|
@ -265,9 +261,9 @@ mod tests {
|
|||
{
|
||||
Arc::new(CompletedPersist::new(
|
||||
ParquetFileParams {
|
||||
namespace_id: NamespaceId::new(1),
|
||||
table_id: TableId::new(2),
|
||||
partition_id: PartitionId::new(3),
|
||||
namespace_id: ARBITRARY_NAMESPACE_ID,
|
||||
table_id: ARBITRARY_TABLE_ID,
|
||||
partition_id: ARBITRARY_PARTITION_ID,
|
||||
partition_hash_id: None,
|
||||
object_store_id: Default::default(),
|
||||
min_time: Timestamp::new(42),
|
||||
|
|
|
@ -131,26 +131,21 @@ impl WalAppender for Arc<wal::Wal> {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use core::{future::Future, marker::Send, pin::Pin};
|
||||
use std::{future, sync::Arc};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use wal::Wal;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
dml_payload::write::{PartitionedData, TableData, WriteOperation},
|
||||
dml_sink::mock_sink::MockDmlSink,
|
||||
test_util::make_write_op,
|
||||
test_util::{
|
||||
make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
|
||||
ARBITRARY_TABLE_NAME,
|
||||
},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
const TABLE_ID: TableId = TableId::new(44);
|
||||
const TABLE_NAME: &str = "bananas";
|
||||
const NAMESPACE_NAME: &str = "platanos";
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
|
||||
use assert_matches::assert_matches;
|
||||
use core::{future::Future, marker::Send, pin::Pin};
|
||||
use data_types::{SequenceNumber, TableId};
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use std::{future, sync::Arc};
|
||||
use wal::Wal;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_append() {
|
||||
|
@ -161,22 +156,25 @@ mod tests {
|
|||
// Generate a test op containing writes for multiple tables that will
|
||||
// be appended and read back
|
||||
let mut tables_by_name = lines_to_batches(
|
||||
"bananas,region=Madrid temp=35 4242424242\n\
|
||||
&format!(
|
||||
"{},region=Madrid temp=35 4242424242\n\
|
||||
banani,region=Iceland temp=25 7676767676",
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
0,
|
||||
)
|
||||
.expect("invalid line proto");
|
||||
let op = WriteOperation::new(
|
||||
NAMESPACE_ID,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
[
|
||||
(
|
||||
TABLE_ID,
|
||||
ARBITRARY_TABLE_ID,
|
||||
TableData::new(
|
||||
TABLE_ID,
|
||||
ARBITRARY_TABLE_ID,
|
||||
PartitionedData::new(
|
||||
SequenceNumber::new(42),
|
||||
tables_by_name
|
||||
.remove(TABLE_NAME)
|
||||
.remove(ARBITRARY_TABLE_NAME.as_ref())
|
||||
.expect("table does not exist in LP"),
|
||||
),
|
||||
),
|
||||
|
@ -196,7 +194,7 @@ mod tests {
|
|||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
PartitionKey::from("p1"),
|
||||
ARBITRARY_PARTITION_KEY.clone(),
|
||||
None,
|
||||
);
|
||||
|
||||
|
@ -240,7 +238,7 @@ mod tests {
|
|||
assert_eq!(read_op.sequence_number, 42);
|
||||
assert_eq!(
|
||||
read_op.table_write_sequence_numbers,
|
||||
[(TABLE_ID, 42), (SECOND_TABLE_ID, 42)]
|
||||
[(ARBITRARY_TABLE_ID, 42), (SECOND_TABLE_ID, 42)]
|
||||
.into_iter()
|
||||
.collect::<std::collections::HashMap<TableId, u64>>()
|
||||
);
|
||||
|
@ -249,7 +247,7 @@ mod tests {
|
|||
|
||||
// The payload should match the serialised form of the "op" originally
|
||||
// wrote above.
|
||||
let want = encode_write_op(NAMESPACE_ID, &op);
|
||||
let want = encode_write_op(ARBITRARY_NAMESPACE_ID, &op);
|
||||
|
||||
assert_eq!(want, *payload);
|
||||
}
|
||||
|
@ -279,12 +277,15 @@ mod tests {
|
|||
|
||||
// Generate the test op
|
||||
let op = make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
NAMESPACE_ID,
|
||||
TABLE_NAME,
|
||||
TABLE_ID,
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
42,
|
||||
r#"bananas,region=Madrid temp=35 4242424242"#,
|
||||
&format!(
|
||||
r#"{},region=Madrid temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue