chore: remove kafka full querier end to end tests (#6834)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-02-03 16:25:24 +01:00 committed by GitHub
parent 1535366666
commit b9ab09e944
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 0 additions and 245 deletions

View File

@ -30,237 +30,6 @@ use test_helpers_end_to_end::{
mod with_kafka {
use super::*;
#[tokio::test]
async fn basic_ingester() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{},tag1=A,tag2=B val=42i 123456\n\
{},tag1=A,tag2=C val=43i 123457",
table_name, table_name
)),
Step::WaitForReadable,
Step::AssertNotPersisted,
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"| A | C | 1970-01-01T00:00:00.000123457Z | 43 |",
"+------+------+--------------------------------+-----+",
],
},
],
)
.run()
.await
}
#[tokio::test]
async fn basic_empty() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{},tag1=A,tag2=B val=42i 123456\n\
{},tag1=A,tag2=C val=43i 123457",
table_name, table_name
)),
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
// query returns no results
let sql = format!("select * from {} where time > '2023-01-12'", table_name);
let querier_connection =
state.cluster().querier().querier_grpc_connection();
let namespace = state.cluster().namespace();
let mut client =
influxdb_iox_client::flight::Client::new(querier_connection);
let result_stream = client.sql(namespace, &sql).await.unwrap();
let mut flight_stream = result_stream.into_inner();
// no data is returned
assert!(flight_stream.next().await.is_none());
// even though there are no results, we should have still got the schema
// otherwise other clients may complain
// https://github.com/influxdata/influxdb_iox/pull/6668
assert!(flight_stream.got_schema());
// run the query again and ensure there are no dictionaries
let result_stream = client.sql(namespace, sql).await.unwrap();
verify_schema(result_stream).await;
// run a query that does return results and ensure there are no dictionaries
let sql = format!("select * from {table_name}");
let result_stream = client.sql(namespace, sql).await.unwrap();
verify_schema(result_stream).await;
}
.boxed()
})),
],
)
.run()
.await
}
#[tokio::test]
async fn basic_on_parquet() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
// Wait for data to be persisted to parquet
Step::WaitForPersisted,
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
],
},
],
)
.run()
.await
}
#[tokio::test]
async fn basic_no_ingester_connection() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
let router_config = TestConfig::new_router(&database_url);
// fast parquet
let ingester_config = TestConfig::new_ingester(&router_config);
// specially create a querier config that is NOT connected to the ingester
let querier_config = TestConfig::new_querier_without_ingester(&ingester_config);
// Set up the cluster ====================================
let mut cluster = MiniCluster::new()
.with_router(router_config)
.await
.with_ingester(ingester_config)
.await
.with_querier(querier_config)
.await;
// Write some data into the v2 HTTP API ==============
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
// Wait for data to be persisted to parquet, ask the ingester directly because the
// querier is not connected to the ingester
Step::WaitForPersistedAccordingToIngester,
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
],
},
],
)
.run()
.await
}
#[tokio::test]
async fn query_after_persist_sees_new_files() {
// https://github.com/influxdata/influxdb_iox/issues/4634 added
// caching of tombstones and parquet files in the querier. This
// test ensures that a query issued after new parquet files are
// persisted correctly picks up the new parquet files
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut setup = ForcePersistenceSetup::new(database_url).await;
let steps = vec![
// Write data to a parquet file
Step::WriteLineProtocol(setup.lp_to_force_persistence()),
Step::WaitForPersisted,
Step::Query {
sql: setup.count_star_sql(),
expected: vec![
"+-----------------+",
"| COUNT(UInt8(1)) |",
"+-----------------+",
"| 1 |",
"+-----------------+",
],
},
// second query, should be the same result
Step::Query {
sql: setup.count_star_sql(),
expected: vec![
"+-----------------+",
"| COUNT(UInt8(1)) |",
"+-----------------+",
"| 1 |",
"+-----------------+",
],
},
// write another parquet file
// that has non duplicated data
Step::WriteLineProtocol(setup.lp_to_force_persistence().replace("tag=A", "tag=B")),
Step::WaitForPersisted,
// query should correctly see the data in the second parquet file
Step::Query {
sql: setup.count_star_sql(),
expected: vec![
"+-----------------+",
"| COUNT(UInt8(1)) |",
"+-----------------+",
"| 2 |",
"+-----------------+",
],
},
];
StepTest::new(&mut setup.cluster, steps).run().await
}
#[tokio::test]
async fn table_not_found_on_ingester() {
test_helpers::maybe_start_logging();
@ -850,20 +619,6 @@ mod with_kafka {
"the_table"
}
/// return `SELECT COUNT(*) FROM table_name` query
pub fn count_star_sql(&self) -> String {
format!("select count(*) from {}", self.table_name())
}
/// Return line protocol that is so large it will be persisted
pub fn lp_to_force_persistence(&self) -> String {
format!(
"{},tag=A val=\"{}\" 2\n",
self.table_name(),
self.super_long_string()
)
}
/// We need a trigger for persistence that is not time so the test
/// is as stable as possible. We use a long string to cross the
/// persistence memory threshold.