test: Write an e2e test of namespace soft-deletion (#7002)
Co-authored-by: Dom <dom@itsallbroken.com>pull/24376/head
parent
95736ddd1a
commit
b11228c72e
|
@ -1,5 +1,8 @@
|
|||
use futures::FutureExt;
|
||||
use http::StatusCode;
|
||||
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, TestConfig};
|
||||
use test_helpers_end_to_end::{
|
||||
maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig,
|
||||
};
|
||||
|
||||
/// Test the namespace client
|
||||
#[tokio::test]
|
||||
|
@ -38,3 +41,169 @@ async fn querier_namespace_client() {
|
|||
.iter()
|
||||
.any(|ns| ns.name == cluster.namespace()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn soft_deletion() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
// Set up the cluster ====================================
|
||||
// cannot use shared cluster because we're going to restart services
|
||||
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
|
||||
|
||||
let namespace_name = cluster.namespace().to_string();
|
||||
let table_name = "ananas";
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
// Create the namespace, verify it's in the list, then update its retention period
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let mut client = influxdb_iox_client::namespace::Client::new(
|
||||
state.cluster().router().router_grpc_connection(),
|
||||
);
|
||||
let namespace_name = state.cluster().namespace();
|
||||
client.create_namespace(namespace_name, None).await.unwrap();
|
||||
let namespaces = client.get_namespaces().await.unwrap();
|
||||
let created_namespace = namespaces
|
||||
.iter()
|
||||
.find(|ns| ns.name == namespace_name)
|
||||
.unwrap();
|
||||
assert_eq!(created_namespace.retention_period_ns, None);
|
||||
|
||||
let hour_in_ns = 60 * 60 * 1_000_000_000;
|
||||
client
|
||||
.update_namespace_retention(namespace_name, Some(hour_in_ns))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let namespaces = client.get_namespaces().await.unwrap();
|
||||
let updated_namespace = namespaces
|
||||
.iter()
|
||||
.find(|ns| ns.name == namespace_name)
|
||||
.unwrap();
|
||||
assert_eq!(updated_namespace.retention_period_ns, Some(hour_in_ns));
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
// Writing data outside the retention period isn't allowed
|
||||
Step::WriteLineProtocolExpectingError {
|
||||
line_protocol: format!("{table_name},tag1=A,tag2=B val=42i 123456"),
|
||||
expected_error_code: StatusCode::FORBIDDEN,
|
||||
},
|
||||
// Update the retention period again to infinite retention
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let mut client = influxdb_iox_client::namespace::Client::new(
|
||||
state.cluster().router().router_grpc_connection(),
|
||||
);
|
||||
let namespace_name = state.cluster().namespace();
|
||||
client
|
||||
.update_namespace_retention(namespace_name, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let namespaces = client.get_namespaces().await.unwrap();
|
||||
let updated_namespace = namespaces
|
||||
.iter()
|
||||
.find(|ns| ns.name == namespace_name)
|
||||
.unwrap();
|
||||
assert_eq!(updated_namespace.retention_period_ns, None);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
// This write still fails because of caching in the router
|
||||
Step::WriteLineProtocolExpectingError {
|
||||
line_protocol: format!("{table_name},tag1=A,tag2=B val=42i 123456"),
|
||||
expected_error_code: StatusCode::FORBIDDEN,
|
||||
},
|
||||
// Restart the router
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
state.cluster_mut().restart_router().await;
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
// This write will now be allowed
|
||||
Step::WriteLineProtocol(format!("{table_name},tag1=A,tag2=B val=42i 123456")),
|
||||
Step::Query {
|
||||
sql: format!("select * from {table_name}"),
|
||||
expected: vec![
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
],
|
||||
},
|
||||
// Delete the namespace; it no longer appears in the list
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let mut client = influxdb_iox_client::namespace::Client::new(
|
||||
state.cluster().router().router_grpc_connection(),
|
||||
);
|
||||
let namespace_name = state.cluster().namespace();
|
||||
client.delete_namespace(namespace_name).await.unwrap();
|
||||
|
||||
let namespaces = client.get_namespaces().await.unwrap();
|
||||
assert!(!namespaces.iter().any(|ns| ns.name == namespace_name));
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
// Query should work because of caching in the querier
|
||||
Step::Query {
|
||||
sql: format!("select * from {table_name}"),
|
||||
expected: vec![
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
],
|
||||
},
|
||||
// Writing data should work because of caching in the router
|
||||
Step::WriteLineProtocol(format!("{table_name},tag1=B,tag2=A val=84i 1234567")),
|
||||
// Restart the router and querier
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
state.cluster_mut().restart_router().await;
|
||||
state.cluster_mut().restart_querier().await;
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
// Query now fails
|
||||
Step::QueryExpectingError {
|
||||
sql: format!("select * from {table_name}"),
|
||||
expected_error_code: tonic::Code::NotFound,
|
||||
expected_message: format!("Namespace '{namespace_name}' not found"),
|
||||
},
|
||||
// Writing now fails
|
||||
Step::WriteLineProtocolExpectingError {
|
||||
line_protocol: format!("{table_name},tag1=A,tag2=B val=126i 12345678"),
|
||||
expected_error_code: StatusCode::INTERNAL_SERVER_ERROR,
|
||||
},
|
||||
// Recreating the same namespace errors
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let mut client = influxdb_iox_client::namespace::Client::new(
|
||||
state.cluster().router().router_grpc_connection(),
|
||||
);
|
||||
let namespace_name = state.cluster().namespace();
|
||||
|
||||
let error = client
|
||||
.create_namespace(namespace_name, None)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
format!("Internal error: name {namespace_name} already exists"),
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
|
|
@ -76,4 +76,15 @@ impl Client {
|
|||
|
||||
Ok(response.into_inner().namespace.unwrap_field("namespace")?)
|
||||
}
|
||||
|
||||
/// Delete a namespace
|
||||
pub async fn delete_namespace(&mut self, namespace: &str) -> Result<(), Error> {
|
||||
self.inner
|
||||
.delete_namespace(DeleteNamespaceRequest {
|
||||
name: namespace.to_string(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -268,6 +268,15 @@ impl MiniCluster {
|
|||
&self.ingesters
|
||||
}
|
||||
|
||||
/// Restart router.
|
||||
///
|
||||
/// This will break all currently connected clients!
|
||||
pub async fn restart_router(&mut self) {
|
||||
let router = self.router.take().unwrap();
|
||||
let router = router.restart_server().await;
|
||||
self.router = Some(router);
|
||||
}
|
||||
|
||||
/// Restart ingesters.
|
||||
///
|
||||
/// This will break all currently connected clients!
|
||||
|
@ -279,6 +288,15 @@ impl MiniCluster {
|
|||
self.ingesters = restarted;
|
||||
}
|
||||
|
||||
/// Restart querier.
|
||||
///
|
||||
/// This will break all currently connected clients!
|
||||
pub async fn restart_querier(&mut self) {
|
||||
let querier = self.querier.take().unwrap();
|
||||
let querier = querier.restart_server().await;
|
||||
self.querier = Some(querier);
|
||||
}
|
||||
|
||||
/// Retrieve the underlying querier server, if set
|
||||
pub fn querier(&self) -> &ServerFixture {
|
||||
self.querier.as_ref().expect("querier not initialized")
|
||||
|
|
|
@ -135,6 +135,13 @@ pub enum Step {
|
|||
/// endpoint, assert the data was written successfully
|
||||
WriteLineProtocol(String),
|
||||
|
||||
/// Writes the specified line protocol to the `/api/v2/write` endpoint; assert the request
|
||||
/// returned an error with the given code
|
||||
WriteLineProtocolExpectingError {
|
||||
line_protocol: String,
|
||||
expected_error_code: StatusCode,
|
||||
},
|
||||
|
||||
/// Ask the catalog service how many Parquet files it has for this cluster's namespace. Do this
|
||||
/// before a write where you're interested in when the write has been persisted to Parquet;
|
||||
/// then after the write use `WaitForPersisted2` to observe the change in the number of Parquet
|
||||
|
@ -272,6 +279,18 @@ where
|
|||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
info!("====Done writing line protocol");
|
||||
}
|
||||
Step::WriteLineProtocolExpectingError {
|
||||
line_protocol,
|
||||
expected_error_code,
|
||||
} => {
|
||||
info!(
|
||||
"====Begin writing line protocol expecting error to v2 HTTP API:\n{}",
|
||||
line_protocol
|
||||
);
|
||||
let response = state.cluster.write_to_router(line_protocol).await;
|
||||
assert_eq!(response.status(), *expected_error_code);
|
||||
info!("====Done writing line protocol expecting error");
|
||||
}
|
||||
// Get the current number of Parquet files in the cluster's namespace before
|
||||
// starting a new write so we can observe a change when waiting for persistence.
|
||||
Step::RecordNumParquetFiles => {
|
||||
|
|
Loading…
Reference in New Issue