test: Add router tests that set templates and verify writes
parent
17219d71fe
commit
fe07e34714
|
@ -2,9 +2,14 @@ use std::time::Duration;
|
|||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::NamespaceId;
|
||||
use generated_types::influxdata::iox::{
|
||||
namespace::v1::{namespace_service_server::NamespaceService, *},
|
||||
table::v1::{table_service_server::TableService, *},
|
||||
use generated_types::influxdata::{
|
||||
iox::{
|
||||
ingester::v1::WriteRequest,
|
||||
namespace::v1::{namespace_service_server::NamespaceService, *},
|
||||
partition_template::v1::*,
|
||||
table::v1::{table_service_server::TableService, *},
|
||||
},
|
||||
pbdata::v1::DatabaseBatch,
|
||||
};
|
||||
use hyper::StatusCode;
|
||||
use iox_catalog::interface::{Error as CatalogError, SoftDeletedRows};
|
||||
|
@ -914,3 +919,252 @@ async fn test_table_create() {
|
|||
let response = ctx.write_lp("bananas", "test", lp).await.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_partition_template_implicit_table_creation() {
|
||||
// Initialise a TestContext without a namespace autocreation policy.
|
||||
let ctx = TestContextBuilder::default().build().await;
|
||||
|
||||
// Explicitly create a namespace with a custom partition template.
|
||||
let req = CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: None,
|
||||
partition_template: Some(PartitionTemplate {
|
||||
parts: vec![TemplatePart {
|
||||
part: Some(template_part::Part::TagValue("tag1".into())),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.namespace
|
||||
.unwrap();
|
||||
|
||||
// Write, which implicitly creates the table with the namespace's custom partition template
|
||||
let lp = "plantains,tag1=A,tag2=B val=42i".to_string();
|
||||
let response = ctx.write_lp("bananas", "test", lp).await.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Check the ingester observed the correct write that uses the namespace's template.
|
||||
let writes = ctx.write_calls();
|
||||
assert_eq!(writes.len(), 1);
|
||||
assert_matches!(
|
||||
writes.as_slice(),
|
||||
[
|
||||
WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
table_batches,
|
||||
partition_key,
|
||||
..
|
||||
}),
|
||||
},
|
||||
] => {
|
||||
let table_id = ctx.table_id("bananas_test", "plantains").await.get();
|
||||
assert_eq!(table_batches.len(), 1);
|
||||
assert_eq!(table_batches[0].table_id, table_id);
|
||||
assert_eq!(partition_key, "tag1_A");
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_partition_template_explicit_table_creation_without_partition_template() {
|
||||
// Initialise a TestContext without a namespace autocreation policy.
|
||||
let ctx = TestContextBuilder::default().build().await;
|
||||
|
||||
// Explicitly create a namespace with a custom partition template.
|
||||
let req = CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: None,
|
||||
partition_template: Some(PartitionTemplate {
|
||||
parts: vec![TemplatePart {
|
||||
part: Some(template_part::Part::TagValue("tag1".into())),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.namespace
|
||||
.unwrap();
|
||||
|
||||
// Explicitly create a table *without* a custom partition template.
|
||||
let req = CreateTableRequest {
|
||||
name: "plantains".to_string(),
|
||||
namespace: "bananas_test".to_string(),
|
||||
partition_template: None,
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.table_service()
|
||||
.create_table(Request::new(req))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.table
|
||||
.unwrap();
|
||||
|
||||
// Write to the just-created table
|
||||
let lp = "plantains,tag1=A,tag2=B val=42i".to_string();
|
||||
let response = ctx.write_lp("bananas", "test", lp).await.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Check the ingester observed the correct write that uses the namespace's template.
|
||||
let writes = ctx.write_calls();
|
||||
assert_eq!(writes.len(), 1);
|
||||
assert_matches!(
|
||||
writes.as_slice(),
|
||||
[
|
||||
WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
table_batches,
|
||||
partition_key,
|
||||
..
|
||||
}),
|
||||
},
|
||||
] => {
|
||||
let table_id = ctx.table_id("bananas_test", "plantains").await.get();
|
||||
assert_eq!(table_batches.len(), 1);
|
||||
assert_eq!(table_batches[0].table_id, table_id);
|
||||
assert_eq!(partition_key, "tag1_A");
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_partition_template_explicit_table_creation_with_partition_template() {
|
||||
// Initialise a TestContext without a namespace autocreation policy.
|
||||
let ctx = TestContextBuilder::default().build().await;
|
||||
|
||||
// Explicitly create a namespace with a custom partition template.
|
||||
let req = CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: None,
|
||||
partition_template: Some(PartitionTemplate {
|
||||
parts: vec![TemplatePart {
|
||||
part: Some(template_part::Part::TagValue("tag1".into())),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.namespace
|
||||
.unwrap();
|
||||
|
||||
// Explicitly create a table with a *different* custom partition template.
|
||||
let req = CreateTableRequest {
|
||||
name: "plantains".to_string(),
|
||||
namespace: "bananas_test".to_string(),
|
||||
partition_template: Some(PartitionTemplate {
|
||||
parts: vec![TemplatePart {
|
||||
part: Some(template_part::Part::TagValue("tag2".into())),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.table_service()
|
||||
.create_table(Request::new(req))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.table
|
||||
.unwrap();
|
||||
|
||||
// Write to the just-created table
|
||||
let lp = "plantains,tag1=A,tag2=B val=42i".to_string();
|
||||
let response = ctx.write_lp("bananas", "test", lp).await.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Check the ingester observed the correct write that uses the table's template.
|
||||
let writes = ctx.write_calls();
|
||||
assert_eq!(writes.len(), 1);
|
||||
assert_matches!(
|
||||
writes.as_slice(),
|
||||
[
|
||||
WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
table_batches,
|
||||
partition_key,
|
||||
..
|
||||
}),
|
||||
},
|
||||
] => {
|
||||
let table_id = ctx.table_id("bananas_test", "plantains").await.get();
|
||||
assert_eq!(table_batches.len(), 1);
|
||||
assert_eq!(table_batches[0].table_id, table_id);
|
||||
assert_eq!(partition_key, "tag2_B");
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_without_partition_template_table_with_partition_template() {
|
||||
// Initialise a TestContext without a namespace autocreation policy.
|
||||
let ctx = TestContextBuilder::default().build().await;
|
||||
|
||||
// Explicitly create a namespace _without_ a custom partition template.
|
||||
let req = CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: None,
|
||||
partition_template: None,
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.namespace
|
||||
.unwrap();
|
||||
|
||||
// Explicitly create a table _with_ a custom partition template.
|
||||
let req = CreateTableRequest {
|
||||
name: "plantains".to_string(),
|
||||
namespace: "bananas_test".to_string(),
|
||||
partition_template: Some(PartitionTemplate {
|
||||
parts: vec![TemplatePart {
|
||||
part: Some(template_part::Part::TagValue("tag2".into())),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.table_service()
|
||||
.create_table(Request::new(req))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.table
|
||||
.unwrap();
|
||||
|
||||
// Write to the just-created table
|
||||
let lp = "plantains,tag1=A,tag2=B val=42i".to_string();
|
||||
let response = ctx.write_lp("bananas", "test", lp).await.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Check the ingester observed the correct write that uses the table's template.
|
||||
let writes = ctx.write_calls();
|
||||
assert_eq!(writes.len(), 1);
|
||||
assert_matches!(
|
||||
writes.as_slice(),
|
||||
[
|
||||
WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
table_batches,
|
||||
partition_key,
|
||||
..
|
||||
}),
|
||||
},
|
||||
] => {
|
||||
let table_id = ctx.table_id("bananas_test", "plantains").await.get();
|
||||
assert_eq!(table_batches.len(), 1);
|
||||
assert_eq!(table_batches[0].table_id, table_id);
|
||||
assert_eq!(partition_key, "tag2_B");
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue