fix: Remove test steps and helper functions only used in Kafkaful arch
parent
38f8e18357
commit
2204630d90
|
@ -1,14 +1,9 @@
|
|||
//! Client helpers for writing end to end ng tests
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
|
||||
use futures::TryStreamExt;
|
||||
use http::Response;
|
||||
use hyper::{Body, Client, Request};
|
||||
use influxdb_iox_client::{
|
||||
connection::Connection,
|
||||
write_info::generated_types::{merge_responses, GetWriteInfoResponse, ShardStatus},
|
||||
};
|
||||
use observability_deps::tracing::info;
|
||||
use std::time::Duration;
|
||||
use influxdb_iox_client::connection::Connection;
|
||||
|
||||
/// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router)
|
||||
pub async fn write_to_router(
|
||||
|
@ -37,157 +32,6 @@ pub async fn write_to_router(
|
|||
.expect("http error sending write")
|
||||
}
|
||||
|
||||
/// Extracts the write token from the specified response (to the /api/v2/write api)
|
||||
pub fn get_write_token(response: &Response<Body>) -> String {
|
||||
let message = format!("no write token in {response:?}");
|
||||
response
|
||||
.headers()
|
||||
.get("X-IOx-Write-Token")
|
||||
.expect(&message)
|
||||
.to_str()
|
||||
.expect("Value not a string")
|
||||
.to_string()
|
||||
}
|
||||
|
||||
/// returns the write info from the connection (to either an ingester or a querier) for this token
|
||||
pub async fn token_info(
|
||||
write_token: impl AsRef<str>,
|
||||
connection: Connection,
|
||||
) -> Result<GetWriteInfoResponse, influxdb_iox_client::error::Error> {
|
||||
influxdb_iox_client::write_info::Client::new(connection)
|
||||
.get_write_info(write_token.as_ref())
|
||||
.await
|
||||
}
|
||||
|
||||
/// returns a combined write info that contains the combined
|
||||
/// information across all ingester_connections for all the specified
|
||||
/// tokens
|
||||
pub async fn combined_token_info(
|
||||
write_tokens: Vec<String>,
|
||||
ingester_connections: Vec<Connection>,
|
||||
) -> Result<GetWriteInfoResponse, influxdb_iox_client::error::Error> {
|
||||
let responses = write_tokens
|
||||
.into_iter()
|
||||
.flat_map(|write_token| {
|
||||
ingester_connections
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(move |ingester_connection| {
|
||||
token_info(write_token.clone(), ingester_connection)
|
||||
})
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
// check for errors
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
info!("combining response: {:#?}", responses);
|
||||
|
||||
// merge them together
|
||||
Ok(merge_responses(responses))
|
||||
}
|
||||
|
||||
/// returns true if the write for this token is persisted, false if it
|
||||
/// is not persisted. panic's on error
|
||||
pub async fn token_is_persisted(
|
||||
write_token: impl AsRef<str>,
|
||||
ingester_connection: Connection,
|
||||
) -> bool {
|
||||
let res = token_info(write_token, ingester_connection)
|
||||
.await
|
||||
.expect("Error fetching write info for token");
|
||||
all_persisted(&res)
|
||||
}
|
||||
|
||||
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
|
||||
|
||||
/// Waits for the specified predicate to return true
|
||||
pub async fn wait_for_token<F>(write_token: impl Into<String>, connection: Connection, f: F)
|
||||
where
|
||||
F: Fn(&GetWriteInfoResponse) -> bool,
|
||||
{
|
||||
let write_token = write_token.into();
|
||||
assert!(!write_token.is_empty());
|
||||
|
||||
info!(" write token: {}", write_token);
|
||||
|
||||
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
|
||||
let mut write_info_client = influxdb_iox_client::write_info::Client::new(connection);
|
||||
tokio::time::timeout(retry_duration, async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(500));
|
||||
loop {
|
||||
match write_info_client.get_write_info(&write_token).await {
|
||||
Ok(res) => {
|
||||
if f(&res) {
|
||||
return;
|
||||
}
|
||||
info!("Retrying; predicate not satistified: {:?}", res);
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
info!("Retrying; Got error getting write_info: {}", e);
|
||||
}
|
||||
};
|
||||
interval.tick().await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("did not get passing predicate on token");
|
||||
}
|
||||
|
||||
/// Waits for the specified write token to be readable
|
||||
pub async fn wait_for_readable(write_token: impl Into<String>, connection: Connection) {
|
||||
info!("Waiting for write token to be readable");
|
||||
|
||||
wait_for_token(write_token, connection, |res| {
|
||||
if all_readable(res) {
|
||||
info!("Write is readable: {:?}", res);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Waits for the write token to be persisted
|
||||
pub async fn wait_for_persisted(write_token: impl Into<String>, connection: Connection) {
|
||||
info!("Waiting for write token to be persisted");
|
||||
|
||||
wait_for_token(write_token, connection, |res| {
|
||||
if all_persisted(res) {
|
||||
info!("Write is persisted: {:?}", res);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// returns true if all shards in the response are readable
|
||||
/// TODO: maybe put this in the influxdb_iox_client library / make a
|
||||
/// proper public facing client API. For now, iterate in the end to end tests.
|
||||
pub fn all_readable(res: &GetWriteInfoResponse) -> bool {
|
||||
res.shard_infos.iter().all(|info| {
|
||||
matches!(
|
||||
info.status(),
|
||||
ShardStatus::Readable | ShardStatus::Persisted
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// returns true if all shards in the response are persisted
|
||||
/// TODO: maybe put this in the influxdb_iox_client library / make a
|
||||
/// proper public facing client API. For now, iterate in the end to end tests.
|
||||
pub fn all_persisted(res: &GetWriteInfoResponse) -> bool {
|
||||
res.shard_infos
|
||||
.iter()
|
||||
.all(|info| matches!(info.status(), ShardStatus::Persisted))
|
||||
}
|
||||
|
||||
/// Runs a SQL query using the flight API on the specified connection.
|
||||
pub async fn try_run_sql(
|
||||
sql_query: impl Into<String>,
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use crate::snapshot_comparison::Language;
|
||||
use crate::{
|
||||
check_flight_error, get_write_token, run_influxql, run_sql, snapshot_comparison,
|
||||
token_is_persisted, try_run_influxql, try_run_sql, wait_for_persisted, wait_for_readable,
|
||||
check_flight_error, run_influxql, run_sql, snapshot_comparison, try_run_influxql, try_run_sql,
|
||||
MiniCluster,
|
||||
};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
|
@ -26,9 +25,6 @@ pub struct StepTestState<'a> {
|
|||
/// The mini cluster
|
||||
cluster: &'a mut MiniCluster,
|
||||
|
||||
/// Tokens for all data written in WriteLineProtocol steps
|
||||
write_tokens: Vec<String>,
|
||||
|
||||
/// How many Parquet files the catalog service knows about for the mini cluster's namespace,
|
||||
/// for tracking when persistence has happened. If this is `None`, we haven't ever checked with
|
||||
/// the catalog service.
|
||||
|
@ -48,12 +44,6 @@ impl<'a> StepTestState<'a> {
|
|||
&mut self.cluster
|
||||
}
|
||||
|
||||
/// Get a reference to the step test state's write tokens.
|
||||
#[must_use]
|
||||
pub fn write_tokens(&self) -> &[String] {
|
||||
self.write_tokens.as_ref()
|
||||
}
|
||||
|
||||
/// Store the number of Parquet files the catalog has for the mini cluster's namespace.
|
||||
/// Call this before a write to be able to tell when a write has been persisted by checking for
|
||||
/// a change in this count.
|
||||
|
@ -145,18 +135,6 @@ pub enum Step {
|
|||
/// endpoint, assert the data was written successfully
|
||||
WriteLineProtocol(String),
|
||||
|
||||
/// Wait for all previously written data to be readable
|
||||
WaitForReadable,
|
||||
|
||||
/// Assert that all previously written data is NOT persisted yet
|
||||
AssertNotPersisted,
|
||||
|
||||
/// Assert that last previously written data is NOT persisted yet.
|
||||
AssertLastNotPersisted,
|
||||
|
||||
/// Wait for all previously written data to be persisted
|
||||
WaitForPersisted,
|
||||
|
||||
/// Ask the catalog service how many Parquet files it has for this cluster's namespace. Do this
|
||||
/// before a write where you're interested in when the write has been persisted to Parquet;
|
||||
/// then after the write use `WaitForPersisted2` to observe the change in the number of Parquet
|
||||
|
@ -171,10 +149,6 @@ pub enum Step {
|
|||
/// router2/ingester2/querier2.
|
||||
WaitForPersisted2 { expected_increase: usize },
|
||||
|
||||
/// Ask the ingester if it has persisted the data. For use in tests where the querier doesn't
|
||||
/// know about the ingester, so the test needs to ask the ingester directly.
|
||||
WaitForPersistedAccordingToIngester,
|
||||
|
||||
/// Set the namespace retention interval to a retention period,
|
||||
/// specified in ns relative to `now()`. `None` represents infinite retention
|
||||
/// (i.e. never drop data).
|
||||
|
@ -226,16 +200,16 @@ pub enum Step {
|
|||
expected: Vec<&'static str>,
|
||||
},
|
||||
|
||||
/// Read the InfluxQL queries in the specified file and verify that the results match the expected
|
||||
/// results in the corresponding expected file
|
||||
/// Read the InfluxQL queries in the specified file and verify that the results match the
|
||||
/// expected results in the corresponding expected file
|
||||
InfluxQLQueryAndCompare {
|
||||
input_path: PathBuf,
|
||||
setup_name: String,
|
||||
contents: String,
|
||||
},
|
||||
|
||||
/// Run an InfluxQL query that's expected to fail using the FlightSQL interface and verify that the
|
||||
/// request returns the expected error code and message
|
||||
/// Run an InfluxQL query that's expected to fail using the FlightSQL interface and verify that
|
||||
/// the request returns the expected error code and message
|
||||
InfluxQLExpectingError {
|
||||
query: String,
|
||||
expected_error_code: tonic::Code,
|
||||
|
@ -283,7 +257,6 @@ where
|
|||
|
||||
let mut state = StepTestState {
|
||||
cluster,
|
||||
write_tokens: vec![],
|
||||
num_parquet_files: Default::default(),
|
||||
};
|
||||
|
||||
|
@ -297,27 +270,7 @@ where
|
|||
);
|
||||
let response = state.cluster.write_to_router(line_protocol).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
let write_token = get_write_token(&response);
|
||||
info!("====Done writing line protocol, got token {}", write_token);
|
||||
state.write_tokens.push(write_token);
|
||||
}
|
||||
Step::WaitForReadable => {
|
||||
info!("====Begin waiting for all write tokens to be readable");
|
||||
let querier_grpc_connection =
|
||||
state.cluster().querier().querier_grpc_connection();
|
||||
for write_token in &state.write_tokens {
|
||||
wait_for_readable(write_token, querier_grpc_connection.clone()).await;
|
||||
}
|
||||
info!("====Done waiting for all write tokens to be readable");
|
||||
}
|
||||
Step::WaitForPersisted => {
|
||||
info!("====Begin waiting for all write tokens to be persisted");
|
||||
let querier_grpc_connection =
|
||||
state.cluster().querier().querier_grpc_connection();
|
||||
for write_token in &state.write_tokens {
|
||||
wait_for_persisted(write_token, querier_grpc_connection.clone()).await;
|
||||
}
|
||||
info!("====Done waiting for all write tokens to be persisted");
|
||||
info!("====Done writing line protocol");
|
||||
}
|
||||
// Get the current number of Parquet files in the cluster's namespace before
|
||||
// starting a new write so we can observe a change when waiting for persistence.
|
||||
|
@ -335,38 +288,6 @@ where
|
|||
.await;
|
||||
info!("====Done waiting for a change in the number of Parquet files");
|
||||
}
|
||||
// Specifically for cases when the querier doesn't know about the ingester so the
|
||||
// test needs to ask the ingester directly.
|
||||
Step::WaitForPersistedAccordingToIngester => {
|
||||
info!("====Begin waiting for all write tokens to be persisted");
|
||||
let ingester_grpc_connection =
|
||||
state.cluster().ingester().ingester_grpc_connection();
|
||||
for write_token in &state.write_tokens {
|
||||
wait_for_persisted(write_token, ingester_grpc_connection.clone()).await;
|
||||
}
|
||||
info!("====Done waiting for all write tokens to be persisted");
|
||||
}
|
||||
Step::AssertNotPersisted => {
|
||||
info!("====Begin checking all tokens not persisted");
|
||||
let querier_grpc_connection =
|
||||
state.cluster().querier().querier_grpc_connection();
|
||||
for write_token in &state.write_tokens {
|
||||
let persisted =
|
||||
token_is_persisted(write_token, querier_grpc_connection.clone()).await;
|
||||
assert!(!persisted);
|
||||
}
|
||||
info!("====Done checking all tokens not persisted");
|
||||
}
|
||||
Step::AssertLastNotPersisted => {
|
||||
info!("====Begin checking last tokens not persisted");
|
||||
let querier_grpc_connection =
|
||||
state.cluster().querier().querier_grpc_connection();
|
||||
let write_token = state.write_tokens.last().expect("No data written yet");
|
||||
let persisted =
|
||||
token_is_persisted(write_token, querier_grpc_connection.clone()).await;
|
||||
assert!(!persisted);
|
||||
info!("====Done checking last tokens not persisted");
|
||||
}
|
||||
Step::Compact => {
|
||||
info!("====Begin running compaction");
|
||||
state.cluster.run_compaction();
|
||||
|
|
Loading…
Reference in New Issue