test: Check catalog for new Parquet files to know when data is persisted

pull/24376/head
Carol (Nichols || Goulding) 2023-01-02 15:00:01 -05:00
parent e49bee0c26
commit 08ceb4ee48
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 104 additions and 3 deletions

View File

@ -129,6 +129,40 @@ async fn basic_on_parquet() {
.await
}
#[tokio::test]
async fn basic_on_parquet2() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
// Wait for data to be persisted to parquet
Step::WaitForPersisted2 {
table_name: table_name.into(),
},
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
],
},
],
)
.run()
.await
}
#[tokio::test]
async fn basic_no_ingester_connection() {
test_helpers::maybe_start_logging();

View File

@ -103,6 +103,51 @@ pub async fn token_is_persisted(
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
/// Waits for the number of Parquet files in the catalog for the specified namespace and table to
/// change
pub async fn wait_for_new_parquet_file(
connection: Connection,
namespace_name: &str,
table_name: &str,
) {
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
let mut catalog_client = influxdb_iox_client::catalog::Client::new(connection);
let initial_count = catalog_client
.get_parquet_files_by_namespace_table(namespace_name.into(), table_name.into())
.await
.unwrap()
.len();
info!("Initial count of Parquet files: {initial_count}");
tokio::time::timeout(retry_duration, async move {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop {
match catalog_client
.get_parquet_files_by_namespace_table(namespace_name.into(), table_name.into())
.await
{
Ok(parquet_files) => {
let current_count = parquet_files.len();
if current_count > initial_count {
info!("Success; Parquet file count is now {current_count}");
return;
}
info!("Retrying; Parquet file count is still {current_count}");
}
Err(e) => {
info!("Retrying; Got error getting catalog info: {e}");
}
};
interval.tick().await;
}
})
.await
.expect("did not get additional Parquet files in the catalog");
}
/// Waits for the specified predicate to return true
pub async fn wait_for_token<F>(write_token: impl Into<String>, connection: Connection, f: F)
where

View File

@ -176,7 +176,10 @@ impl TestConfig {
)
.with_existing_object_store(ingester_config)
.with_env("INFLUXDB_IOX_RPC_MODE", "2")
.with_env("INFLUXDB_IOX_INGESTER_ADDRESSES", ingester_config.ingester_base().as_ref())
.with_env(
"INFLUXDB_IOX_INGESTER_ADDRESSES",
ingester_config.ingester_base().as_ref(),
)
}
/// Create a minimal all in one configuration

View File

@ -196,7 +196,7 @@ impl MiniCluster {
pub async fn create_non_shared2(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester2(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
// let querier_config = TestConfig::new_querier2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
// Set up the cluster ====================================
Self::new()
@ -204,6 +204,8 @@ impl MiniCluster {
.await
.with_router(router_config)
.await
.with_querier(querier_config)
.await
}
/// Create an all-(minus compactor)-in-one server with the specified configuration

View File

@ -1,6 +1,6 @@
use crate::{
check_flight_error, get_write_token, run_sql, token_is_persisted, try_run_influxql,
try_run_sql, wait_for_persisted, wait_for_readable, MiniCluster,
try_run_sql, wait_for_new_parquet_file, wait_for_persisted, wait_for_readable, MiniCluster,
};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
@ -86,6 +86,11 @@ pub enum Step {
/// Wait for all previously written data to be persisted
WaitForPersisted,
/// Wait for all previously written data to be persisted by observing an increase in the number
/// of Parquet files in the catalog for this cluster's namespace and the specified table name.
/// Needed for router2/ingester2/querier2.
WaitForPersisted2 { table_name: String },
/// Ask the ingester if it has persisted the data. For use in tests where the querier doesn't
/// know about the ingester, so the test needs to ask the ingester directly.
WaitForPersistedAccordingToIngester,
@ -188,6 +193,18 @@ impl<'a> StepTest<'a> {
}
info!("====Done waiting for all write tokens to be persisted");
}
Step::WaitForPersisted2 { table_name } => {
info!("====Begin waiting for a new Parquet file to be persisted");
let querier_grpc_connection =
state.cluster().querier().querier_grpc_connection();
wait_for_new_parquet_file(
querier_grpc_connection,
state.cluster().namespace(),
&table_name,
)
.await;
info!("====Done waiting for a new Parquet file to be persisted");
}
// Specifically for cases when the querier doesn't know about the ingester so the
// test needs to ask the ingester directly.
Step::WaitForPersistedAccordingToIngester => {