Merge pull request #6480 from influxdata/cn/groundwork
test: Lay the groundwork for porting query_tests to the RPC write pathpull/24376/head
commit
d4e890d01f
influxdb_iox/tests/end_to_end_cases
test_helpers_end_to_end/src
|
@ -10,247 +10,334 @@ use test_helpers_end_to_end::{
|
|||
get_write_token, maybe_skip_integration, wait_for_readable, MiniCluster,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
/// Temporary duplication: These tests should be kept in sync (as far as what they're logically
|
||||
/// testing, not the exact implementation) with the corresponding tests in the
|
||||
/// `kafkaless_rpc_write` module.
|
||||
///
|
||||
/// When we switch to the RPC write path, this module can be deleted.
|
||||
mod with_kafka {
|
||||
use super::*;
|
||||
|
||||
let table_name = "mytable";
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
// Set up cluster
|
||||
let cluster = MiniCluster::create_shared(database_url).await;
|
||||
let table_name = "mytable";
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
// Set up cluster
|
||||
let cluster = MiniCluster::create_shared(database_url).await;
|
||||
|
||||
// wait for the write to become visible
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
let querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection());
|
||||
// wait for the write to become visible
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace_id().await,
|
||||
cluster.table_id(table_name).await,
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
let querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection());
|
||||
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
|
||||
let mut performed_query = querier_flight
|
||||
.into_inner()
|
||||
.do_get(query.encode_to_vec())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
|
||||
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
|
||||
|
||||
let partition_id = app_metadata.partition_id;
|
||||
assert_eq!(
|
||||
app_metadata,
|
||||
IngesterQueryResponseMetadata {
|
||||
partition_id,
|
||||
status: Some(proto::PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
}),
|
||||
ingester_uuid: String::new(),
|
||||
completed_persistence_count: 0,
|
||||
},
|
||||
);
|
||||
|
||||
let (msg, _) = next_message(&mut performed_query).await.unwrap();
|
||||
let schema = unwrap_schema(msg);
|
||||
|
||||
let mut query_results = vec![];
|
||||
while let Some((msg, _md)) = next_message(&mut performed_query).await {
|
||||
let batch = unwrap_record_batch(msg);
|
||||
query_results.push(batch);
|
||||
}
|
||||
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &query_results);
|
||||
|
||||
// Also ensure that the schema of the batches matches what is
|
||||
// reported by the performed_query.
|
||||
query_results.iter().enumerate().for_each(|(i, b)| {
|
||||
assert_eq!(
|
||||
schema,
|
||||
b.schema(),
|
||||
"Schema mismatch for returned batch {}",
|
||||
i
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace_id().await,
|
||||
cluster.table_id(table_name).await,
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester2_flight_api() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
|
||||
let table_name = "mytable";
|
||||
|
||||
// Set up cluster
|
||||
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
let mut querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
|
||||
let mut performed_query = querier_flight
|
||||
.into_inner()
|
||||
.do_get(query.encode_to_vec())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace_id().await,
|
||||
cluster.table_id(table_name).await,
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
|
||||
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
|
||||
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
let query = query.encode_to_vec();
|
||||
|
||||
let mut performed_query = querier_flight
|
||||
.do_get(query.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
|
||||
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
|
||||
|
||||
let ingester_uuid = app_metadata.ingester_uuid.clone();
|
||||
assert!(!ingester_uuid.is_empty());
|
||||
|
||||
let (msg, _) = next_message(&mut performed_query).await.unwrap();
|
||||
let schema = unwrap_schema(msg);
|
||||
|
||||
let mut query_results = vec![];
|
||||
while let Some((msg, _md)) = next_message(&mut performed_query).await {
|
||||
let batch = unwrap_record_batch(msg);
|
||||
query_results.push(batch);
|
||||
}
|
||||
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &query_results);
|
||||
|
||||
// Also ensure that the schema of the batches matches what is
|
||||
// reported by the performed_query.
|
||||
query_results.iter().enumerate().for_each(|(i, b)| {
|
||||
let partition_id = app_metadata.partition_id;
|
||||
assert_eq!(
|
||||
schema,
|
||||
b.schema(),
|
||||
"Schema mismatch for returned batch {}",
|
||||
i
|
||||
app_metadata,
|
||||
IngesterQueryResponseMetadata {
|
||||
partition_id,
|
||||
status: Some(proto::PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
}),
|
||||
ingester_uuid: String::new(),
|
||||
completed_persistence_count: 0,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
// Ensure the ingester UUID is the same in the next query
|
||||
let mut performed_query = querier_flight
|
||||
.do_get(query.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
let (msg, _) = next_message(&mut performed_query).await.unwrap();
|
||||
let schema = unwrap_schema(msg);
|
||||
|
||||
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
|
||||
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
|
||||
assert_eq!(app_metadata.ingester_uuid, ingester_uuid);
|
||||
let mut query_results = vec![];
|
||||
while let Some((msg, _md)) = next_message(&mut performed_query).await {
|
||||
let batch = unwrap_record_batch(msg);
|
||||
query_results.push(batch);
|
||||
}
|
||||
|
||||
// Restart the ingester and ensure it gets a new UUID
|
||||
cluster.restart_ingester().await;
|
||||
let mut performed_query = querier_flight.do_get(query).await.unwrap().into_inner();
|
||||
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
|
||||
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
|
||||
assert_ne!(app_metadata.ingester_uuid, ingester_uuid);
|
||||
}
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &query_results);
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api_namespace_not_found() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
// Also ensure that the schema of the batches matches what is
|
||||
// reported by the performed_query.
|
||||
query_results.iter().enumerate().for_each(|(i, b)| {
|
||||
assert_eq!(
|
||||
schema,
|
||||
b.schema(),
|
||||
"Schema mismatch for returned batch {}",
|
||||
i
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
// Set up cluster
|
||||
let cluster = MiniCluster::create_shared(database_url).await;
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api_namespace_not_found() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let mut querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
|
||||
.into_inner();
|
||||
// Set up cluster
|
||||
let cluster = MiniCluster::create_shared(database_url).await;
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
NamespaceId::new(i64::MAX),
|
||||
TableId::new(42),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
let mut querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
|
||||
.into_inner();
|
||||
|
||||
let err = querier_flight
|
||||
.do_get(query.encode_to_vec())
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let iox_arrow_flight::FlightError::Tonic(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::NotFound);
|
||||
} else {
|
||||
panic!("Wrong error variant: {err}")
|
||||
let query = IngesterQueryRequest::new(
|
||||
NamespaceId::new(i64::MAX),
|
||||
TableId::new(42),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
|
||||
let err = querier_flight
|
||||
.do_get(query.encode_to_vec())
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let iox_arrow_flight::FlightError::Tonic(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::NotFound);
|
||||
} else {
|
||||
panic!("Wrong error variant: {err}")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api_table_not_found() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
// Set up cluster
|
||||
let cluster = MiniCluster::create_shared(database_url).await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = String::from("my_table,tag1=A,tag2=B val=42i 123456");
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// wait for the write to become visible
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
|
||||
let mut querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
|
||||
.into_inner();
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace_id().await,
|
||||
TableId::new(i64::MAX),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
|
||||
let err = querier_flight
|
||||
.do_get(query.encode_to_vec())
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let iox_arrow_flight::FlightError::Tonic(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::NotFound);
|
||||
} else {
|
||||
panic!("Wrong error variant: {err}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api_table_not_found() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
/// Temporary duplication: These tests should be kept in sync (as far as what they're logically
|
||||
/// testing, not the exact implementation) with the corresponding tests in the
|
||||
/// `with_kafka` module.
|
||||
///
|
||||
/// When we switch to the RPC write path, the code in this module can be unwrapped into its super
|
||||
/// scope and unindented.
|
||||
mod kafkaless_rpc_write {
|
||||
use super::*;
|
||||
|
||||
// Set up cluster
|
||||
let cluster = MiniCluster::create_shared(database_url).await;
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = String::from("my_table,tag1=A,tag2=B val=42i 123456");
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
let table_name = "mytable";
|
||||
|
||||
// wait for the write to become visible
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
// Set up cluster
|
||||
// Don't use a shared cluster because the ingester is going to be restarted
|
||||
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
|
||||
|
||||
let mut querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
let mut querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
|
||||
.into_inner();
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace_id().await,
|
||||
cluster.table_id(table_name).await,
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
let query = query.encode_to_vec();
|
||||
|
||||
let mut performed_query = querier_flight
|
||||
.do_get(query.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace_id().await,
|
||||
TableId::new(i64::MAX),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
|
||||
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
|
||||
|
||||
let err = querier_flight
|
||||
.do_get(query.encode_to_vec())
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let iox_arrow_flight::FlightError::Tonic(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::NotFound);
|
||||
} else {
|
||||
panic!("Wrong error variant: {err}")
|
||||
let ingester_uuid = app_metadata.ingester_uuid.clone();
|
||||
assert!(!ingester_uuid.is_empty());
|
||||
|
||||
let (msg, _) = next_message(&mut performed_query).await.unwrap();
|
||||
let schema = unwrap_schema(msg);
|
||||
|
||||
let mut query_results = vec![];
|
||||
while let Some((msg, _md)) = next_message(&mut performed_query).await {
|
||||
let batch = unwrap_record_batch(msg);
|
||||
query_results.push(batch);
|
||||
}
|
||||
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &query_results);
|
||||
|
||||
// Also ensure that the schema of the batches matches what is
|
||||
// reported by the performed_query.
|
||||
query_results.iter().enumerate().for_each(|(i, b)| {
|
||||
assert_eq!(
|
||||
schema,
|
||||
b.schema(),
|
||||
"Schema mismatch for returned batch {}",
|
||||
i
|
||||
);
|
||||
});
|
||||
|
||||
// Ensure the ingester UUID is the same in the next query
|
||||
let mut performed_query = querier_flight
|
||||
.do_get(query.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
|
||||
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
|
||||
assert_eq!(app_metadata.ingester_uuid, ingester_uuid);
|
||||
|
||||
// Restart the ingester and ensure it gets a new UUID
|
||||
cluster.restart_ingester().await;
|
||||
let mut performed_query = querier_flight.do_get(query).await.unwrap().into_inner();
|
||||
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
|
||||
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
|
||||
assert_ne!(app_metadata.ingester_uuid, ingester_uuid);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api_namespace_not_found() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
// Set up cluster
|
||||
let cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
let mut querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
|
||||
.into_inner();
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
NamespaceId::new(i64::MAX),
|
||||
TableId::new(42),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
|
||||
let err = querier_flight
|
||||
.do_get(query.encode_to_vec())
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let iox_arrow_flight::FlightError::Tonic(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::NotFound);
|
||||
} else {
|
||||
panic!("Wrong error variant: {err}")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api_table_not_found() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
// Set up cluster
|
||||
let cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = String::from("my_table,tag1=A,tag2=B val=42i 123456");
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
let mut querier_flight =
|
||||
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
|
||||
.into_inner();
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace_id().await,
|
||||
TableId::new(i64::MAX),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
|
||||
|
||||
let err = querier_flight
|
||||
.do_get(query.encode_to_vec())
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let iox_arrow_flight::FlightError::Tonic(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::NotFound);
|
||||
} else {
|
||||
panic!("Wrong error variant: {err}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -103,6 +103,51 @@ pub async fn token_is_persisted(
|
|||
|
||||
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
|
||||
|
||||
/// Waits for the number of Parquet files in the catalog for the specified namespace and table to
|
||||
/// change
|
||||
pub async fn wait_for_new_parquet_file(
|
||||
connection: Connection,
|
||||
namespace_name: &str,
|
||||
table_name: &str,
|
||||
) {
|
||||
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
|
||||
let mut catalog_client = influxdb_iox_client::catalog::Client::new(connection);
|
||||
|
||||
let initial_count = catalog_client
|
||||
.get_parquet_files_by_namespace_table(namespace_name.into(), table_name.into())
|
||||
.await
|
||||
.unwrap()
|
||||
.len();
|
||||
|
||||
info!("Initial count of Parquet files: {initial_count}");
|
||||
|
||||
tokio::time::timeout(retry_duration, async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(1000));
|
||||
loop {
|
||||
match catalog_client
|
||||
.get_parquet_files_by_namespace_table(namespace_name.into(), table_name.into())
|
||||
.await
|
||||
{
|
||||
Ok(parquet_files) => {
|
||||
let current_count = parquet_files.len();
|
||||
if current_count > initial_count {
|
||||
info!("Success; Parquet file count is now {current_count}");
|
||||
return;
|
||||
}
|
||||
info!("Retrying; Parquet file count is still {current_count}");
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
info!("Retrying; Got error getting catalog info: {e}");
|
||||
}
|
||||
};
|
||||
interval.tick().await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("did not get additional Parquet files in the catalog");
|
||||
}
|
||||
|
||||
/// Waits for the specified predicate to return true
|
||||
pub async fn wait_for_token<F>(write_token: impl Into<String>, connection: Connection, f: F)
|
||||
where
|
||||
|
|
|
@ -99,13 +99,28 @@ impl TestConfig {
|
|||
.with_default_ingester_options()
|
||||
}
|
||||
|
||||
/// Create a minimal ingester2 configuration, using the dsn configuration specified
|
||||
/// Create a minimal ingester2 configuration, using the dsn configuration specified. Set the
|
||||
/// persistence options such that it will persist as quickly as possible.
|
||||
pub fn new_ingester2(dsn: impl Into<String>) -> Self {
|
||||
let dsn = Some(dsn.into());
|
||||
Self::new(ServerType::Ingester2, dsn, random_catalog_schema_name())
|
||||
.with_new_object_store()
|
||||
.with_new_wal()
|
||||
.with_default_ingester_options()
|
||||
.with_env("INFLUXDB_IOX_WAL_ROTATION_PERIOD_SECONDS", "1")
|
||||
}
|
||||
|
||||
/// Create a minimal ingester2 configuration, using the dsn configuration specified. Set the
|
||||
/// persistence options such that it will likely never persist, to be able to test when data
|
||||
/// only exists in the ingester's memory.
|
||||
pub fn new_ingester2_never_persist(dsn: impl Into<String>) -> Self {
|
||||
let dsn = Some(dsn.into());
|
||||
Self::new(ServerType::Ingester2, dsn, random_catalog_schema_name())
|
||||
.with_new_object_store()
|
||||
.with_new_wal()
|
||||
.with_default_ingester_options()
|
||||
// I didn't run my tests for a day, because that would be too long
|
||||
.with_env("INFLUXDB_IOX_WAL_ROTATION_PERIOD_SECONDS", "86400")
|
||||
}
|
||||
|
||||
/// Create a minimal querier configuration from the specified
|
||||
|
@ -119,6 +134,17 @@ impl TestConfig {
|
|||
.with_ingester_mapping(ingester_config.ingester_base().as_ref())
|
||||
}
|
||||
|
||||
/// Create a minimal querier2 configuration from the specified ingester2 configuration, using
|
||||
/// the same dsn and object store, and pointing at the specified ingester.
|
||||
pub fn new_querier2(ingester_config: &TestConfig) -> Self {
|
||||
assert_eq!(ingester_config.server_type(), ServerType::Ingester2);
|
||||
|
||||
Self::new_querier2_without_ingester2(ingester_config).with_env(
|
||||
"INFLUXDB_IOX_INGESTER_ADDRESSES",
|
||||
ingester_config.ingester_base().as_ref(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a minimal compactor configuration, using the dsn
|
||||
/// configuration from other
|
||||
pub fn new_compactor(other: &TestConfig) -> Self {
|
||||
|
@ -143,6 +169,18 @@ impl TestConfig {
|
|||
.with_shard_to_ingesters_mapping("{\"ignoreAll\": true}")
|
||||
}
|
||||
|
||||
/// Create a minimal querier2 configuration from the specified ingester2 configuration, using
|
||||
/// the same dsn and object store, but without specifying the ingester2 addresses
|
||||
pub fn new_querier2_without_ingester2(ingester_config: &TestConfig) -> Self {
|
||||
Self::new(
|
||||
ServerType::Querier2,
|
||||
ingester_config.dsn().to_owned(),
|
||||
ingester_config.catalog_schema_name(),
|
||||
)
|
||||
.with_existing_object_store(ingester_config)
|
||||
.with_env("INFLUXDB_IOX_RPC_MODE", "2")
|
||||
}
|
||||
|
||||
/// Create a minimal all in one configuration
|
||||
pub fn new_all_in_one(dsn: Option<String>) -> Self {
|
||||
Self::new(ServerType::AllInOne, dsn, random_catalog_schema_name())
|
||||
|
|
|
@ -132,6 +132,46 @@ impl MiniCluster {
|
|||
new_cluster
|
||||
}
|
||||
|
||||
/// Create a "standard" shared MiniCluster that has a router, ingester, and querier (but no
|
||||
/// compactor as that should be run on-demand in tests)
|
||||
///
|
||||
/// Note: Because the underlying server processes are shared across multiple tests, all users
|
||||
/// of this `MiniCluster` instance should only modify their own unique namespace.
|
||||
pub async fn create_shared2(database_url: String) -> Self {
|
||||
let start = Instant::now();
|
||||
let mut shared_servers = GLOBAL_SHARED_SERVERS2.lock().await;
|
||||
debug!(mutex_wait=?start.elapsed(), "creating standard2 cluster");
|
||||
|
||||
// try to reuse existing server processes
|
||||
if let Some(shared) = shared_servers.take() {
|
||||
if let Some(cluster) = shared.creatable_cluster().await {
|
||||
debug!("Reusing existing cluster");
|
||||
|
||||
// Put the server back
|
||||
*shared_servers = Some(shared);
|
||||
let start = Instant::now();
|
||||
// drop the lock prior to calling `create()` to allow others to proceed
|
||||
std::mem::drop(shared_servers);
|
||||
let new_self = cluster.create().await;
|
||||
info!(
|
||||
total_wait=?start.elapsed(),
|
||||
"created new mini cluster2 from existing cluster"
|
||||
);
|
||||
return new_self;
|
||||
} else {
|
||||
info!("some server proceses of previous cluster2 have already returned");
|
||||
}
|
||||
}
|
||||
|
||||
// Have to make a new one
|
||||
info!("Create a new server2");
|
||||
let new_cluster = Self::create_non_shared2(database_url).await;
|
||||
|
||||
// Update the shared servers to point at the newly created server proesses
|
||||
*shared_servers = Some(SharedServers::new(&new_cluster));
|
||||
new_cluster
|
||||
}
|
||||
|
||||
/// Create a non shared "standard" MiniCluster that has a router, ingester, querier. Save
|
||||
/// config for a compactor, but the compactor should be run on-demand in tests using `compactor
|
||||
/// run-once` rather than using `run compactor`.
|
||||
|
@ -152,9 +192,11 @@ impl MiniCluster {
|
|||
.with_compactor_config(compactor_config)
|
||||
}
|
||||
|
||||
/// Create a non-shared "version 2" "standard" MiniCluster that has a router, ingester, querier.
|
||||
pub async fn create_non_shared2(database_url: String) -> Self {
|
||||
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||
let router_config = TestConfig::new_router2(&ingester_config);
|
||||
let querier_config = TestConfig::new_querier2(&ingester_config);
|
||||
|
||||
// Set up the cluster ====================================
|
||||
Self::new()
|
||||
|
@ -162,6 +204,8 @@ impl MiniCluster {
|
|||
.await
|
||||
.with_router(router_config)
|
||||
.await
|
||||
.with_querier(querier_config)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Create an all-(minus compactor)-in-one server with the specified configuration
|
||||
|
@ -485,3 +529,6 @@ fn server_from_weak(server: Option<&Weak<TestServer>>) -> Option<Option<Arc<Test
|
|||
}
|
||||
|
||||
static GLOBAL_SHARED_SERVERS: Lazy<Mutex<Option<SharedServers>>> = Lazy::new(|| Mutex::new(None));
|
||||
// For the new server versions. `GLOBAL_SHARED_SERVERS` can be removed and this can be renamed
|
||||
// when the migration to router2/etc is complete.
|
||||
static GLOBAL_SHARED_SERVERS2: Lazy<Mutex<Option<SharedServers>>> = Lazy::new(|| Mutex::new(None));
|
||||
|
|
|
@ -213,7 +213,7 @@ impl Connections {
|
|||
};
|
||||
|
||||
self.querier_grpc_connection = match server_type {
|
||||
ServerType::AllInOne | ServerType::Querier => {
|
||||
ServerType::AllInOne | ServerType::Querier | ServerType::Querier2 => {
|
||||
let client_base = test_config.addrs().querier_grpc_api().client_base();
|
||||
Some(
|
||||
grpc_channel(test_config, client_base.as_ref())
|
||||
|
@ -509,7 +509,7 @@ impl TestServer {
|
|||
return;
|
||||
}
|
||||
}
|
||||
ServerType::Querier => {
|
||||
ServerType::Querier | ServerType::Querier2 => {
|
||||
if check_arrow_service_health(
|
||||
server_type,
|
||||
connections.querier_grpc_connection(),
|
||||
|
|
|
@ -8,6 +8,7 @@ pub enum ServerType {
|
|||
Router,
|
||||
Router2,
|
||||
Querier,
|
||||
Querier2,
|
||||
Compactor,
|
||||
}
|
||||
|
||||
|
@ -21,6 +22,7 @@ impl ServerType {
|
|||
Self::Router => "router",
|
||||
Self::Router2 => "router2",
|
||||
Self::Querier => "querier",
|
||||
Self::Querier2 => "querier",
|
||||
Self::Compactor => "compactor",
|
||||
}
|
||||
}
|
||||
|
@ -123,6 +125,17 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st
|
|||
addrs.querier_grpc_api().bind_addr().to_string(),
|
||||
),
|
||||
],
|
||||
ServerType::Querier2 => vec![
|
||||
(
|
||||
"INFLUXDB_IOX_BIND_ADDR",
|
||||
addrs.router_http_api().bind_addr().to_string(),
|
||||
),
|
||||
(
|
||||
"INFLUXDB_IOX_GRPC_BIND_ADDR",
|
||||
addrs.querier_grpc_api().bind_addr().to_string(),
|
||||
),
|
||||
("INFLUXDB_IOX_RPC_MODE", "2".to_string()),
|
||||
],
|
||||
ServerType::Compactor => vec![
|
||||
(
|
||||
"INFLUXDB_IOX_BIND_ADDR",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
check_flight_error, get_write_token, run_sql, token_is_persisted, try_run_influxql,
|
||||
try_run_sql, wait_for_persisted, wait_for_readable, MiniCluster,
|
||||
try_run_sql, wait_for_new_parquet_file, wait_for_persisted, wait_for_readable, MiniCluster,
|
||||
};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
|
@ -86,6 +86,11 @@ pub enum Step {
|
|||
/// Wait for all previously written data to be persisted
|
||||
WaitForPersisted,
|
||||
|
||||
/// Wait for all previously written data to be persisted by observing an increase in the number
|
||||
/// of Parquet files in the catalog for this cluster's namespace and the specified table name.
|
||||
/// Needed for router2/ingester2/querier2.
|
||||
WaitForPersisted2 { table_name: String },
|
||||
|
||||
/// Ask the ingester if it has persisted the data. For use in tests where the querier doesn't
|
||||
/// know about the ingester, so the test needs to ask the ingester directly.
|
||||
WaitForPersistedAccordingToIngester,
|
||||
|
@ -188,6 +193,18 @@ impl<'a> StepTest<'a> {
|
|||
}
|
||||
info!("====Done waiting for all write tokens to be persisted");
|
||||
}
|
||||
Step::WaitForPersisted2 { table_name } => {
|
||||
info!("====Begin waiting for a new Parquet file to be persisted");
|
||||
let querier_grpc_connection =
|
||||
state.cluster().querier().querier_grpc_connection();
|
||||
wait_for_new_parquet_file(
|
||||
querier_grpc_connection,
|
||||
state.cluster().namespace(),
|
||||
&table_name,
|
||||
)
|
||||
.await;
|
||||
info!("====Done waiting for a new Parquet file to be persisted");
|
||||
}
|
||||
// Specifically for cases when the querier doesn't know about the ingester so the
|
||||
// test needs to ask the ingester directly.
|
||||
Step::WaitForPersistedAccordingToIngester => {
|
||||
|
|
Loading…
Reference in New Issue