Merge pull request #7074 from influxdata/cn/more-querier-tests-to-kafkaless

test: Change more querier tests to only use Kafkaless
pull/24376/head
kodiakhq[bot] 2023-03-01 16:12:43 +00:00 committed by GitHub
commit 59c14fc6bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 28 deletions

View File

@ -1483,6 +1483,7 @@ mod tests {
#[tokio::test]
async fn test_flight_unknown_partitions() {
let ingester_uuid = Uuid::new_v4();
let record_batch = lp_to_record_batch("table foo=1 1");
let schema = record_batch.schema();
@ -1497,12 +1498,16 @@ mod tests {
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
ingester_uuid.to_string(),
3,
),
metadata(
1001,
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
ingester_uuid.to_string(),
4,
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema)),
@ -1513,6 +1518,8 @@ mod tests {
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
ingester_uuid.to_string(),
5,
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema)),
@ -1548,6 +1555,8 @@ mod tests {
#[tokio::test]
async fn test_flight_no_batches() {
let ingester_uuid = Uuid::new_v4();
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
@ -1557,6 +1566,8 @@ mod tests {
Some(PartitionStatus {
parquet_max_sequence_number: None,
}),
ingester_uuid.to_string(),
5,
)],
}),
)])
@ -1573,20 +1584,19 @@ mod tests {
assert_eq!(p.parquet_max_sequence_number, None);
assert_eq!(p.tombstone_max_sequence_number, None);
assert_eq!(p.chunks.len(), 0);
// When using the write buffer path, there should never be a UUID present and the
// completed_persistence_count should always be 0.
assert!(p.ingester_uuid.is_none());
assert_eq!(p.completed_persistence_count, 0);
assert_eq!(p.ingester_uuid.unwrap(), ingester_uuid);
assert_eq!(p.completed_persistence_count, 5);
}
#[tokio::test]
async fn test_flight_err_partition_status_missing() {
let ingester_uuid = Uuid::new_v4();
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
Ok(MockQueryData {
results: vec![metadata(1, None)],
results: vec![metadata(1, None, ingester_uuid.to_string(), 5)],
}),
)])
.await,
@ -1598,6 +1608,8 @@ mod tests {
#[tokio::test]
async fn test_flight_err_duplicate_partition_info() {
let ingester_uuid = Uuid::new_v4();
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
@ -1608,18 +1620,24 @@ mod tests {
Some(PartitionStatus {
parquet_max_sequence_number: None,
}),
ingester_uuid.to_string(),
3,
),
metadata(
2,
Some(PartitionStatus {
parquet_max_sequence_number: None,
}),
ingester_uuid.to_string(),
4,
),
metadata(
1,
Some(PartitionStatus {
parquet_max_sequence_number: None,
}),
ingester_uuid.to_string(),
5,
),
],
}),
@ -1673,6 +1691,9 @@ mod tests {
#[tokio::test]
async fn test_flight_many_batches_no_shard() {
let ingester_uuid1 = Uuid::new_v4();
let ingester_uuid2 = Uuid::new_v4();
let record_batch_1_1_1 = lp_to_record_batch("table foo=1 1");
let record_batch_1_1_2 = lp_to_record_batch("table foo=2 2");
let record_batch_1_2 = lp_to_record_batch("table bar=20,foo=2 2");
@ -1696,6 +1717,8 @@ mod tests {
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
ingester_uuid1.to_string(),
3,
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema_1_1)),
@ -1722,6 +1745,8 @@ mod tests {
Some(PartitionStatus {
parquet_max_sequence_number: Some(21),
}),
ingester_uuid1.to_string(),
4,
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema_2_1)),
@ -1743,6 +1768,8 @@ mod tests {
Some(PartitionStatus {
parquet_max_sequence_number: Some(31),
}),
ingester_uuid2.to_string(),
5,
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema_3_1)),
@ -1808,12 +1835,12 @@ mod tests {
}
#[tokio::test]
async fn ingester2_rpc_write_path_expects_valid_uuid() {
async fn invalid_uuid_errors() {
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
Ok(MockQueryData {
results: vec![ingester2_metadata(
results: vec![metadata(
1,
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
@ -1853,7 +1880,7 @@ mod tests {
"addr1",
Ok(MockQueryData {
results: vec![
ingester2_metadata(
metadata(
1,
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
@ -1861,7 +1888,7 @@ mod tests {
ingester_uuid1.to_string(),
0,
),
ingester2_metadata(
metadata(
2,
Some(PartitionStatus {
parquet_max_sequence_number: Some(21),
@ -1875,7 +1902,7 @@ mod tests {
(
"addr2",
Ok(MockQueryData {
results: vec![ingester2_metadata(
results: vec![metadata(
3,
Some(PartitionStatus {
parquet_max_sequence_number: Some(31),
@ -2036,6 +2063,8 @@ mod tests {
#[tokio::test]
async fn test_flight_per_shard_querying() {
let ingester_uuid = Uuid::new_v4();
let record_batch_1_1 = lp_to_record_batch("table foo=1 1");
let schema_1_1 = record_batch_1_1.schema();
@ -2050,6 +2079,8 @@ mod tests {
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
ingester_uuid.to_string(),
5,
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema_1_1)),
@ -2132,20 +2163,7 @@ mod tests {
type MockFlightResult = Result<(DecodedPayload, IngesterQueryResponseMetadata), FlightError>;
fn metadata(partition_id: i64, status: Option<PartitionStatus>) -> MockFlightResult {
Ok((
DecodedPayload::None,
IngesterQueryResponseMetadata {
partition_id,
status,
// These fields are only used in ingester2.
ingester_uuid: String::new(),
completed_persistence_count: 0,
},
))
}
fn ingester2_metadata(
fn metadata(
partition_id: i64,
status: Option<PartitionStatus>,
ingester_uuid: impl Into<String>,
@ -2273,6 +2291,7 @@ mod tests {
#[test]
fn test_ingester_partition_type_cast() {
let expected_schema = SchemaBuilder::new().tag("t").timestamp().build().unwrap();
let ingester_uuid = Uuid::new_v4();
let cases = vec![
// send a batch that matches the schema exactly
@ -2287,7 +2306,7 @@ mod tests {
// Construct a partition and ensure it doesn't error
let ingester_partition = IngesterPartition::new(
"ingester".into(),
None,
Some(ingester_uuid),
PartitionId::new(1),
ShardId::new(1),
0,
@ -2306,6 +2325,7 @@ mod tests {
#[test]
fn test_ingester_partition_fail_type_cast() {
let ingester_uuid = Uuid::new_v4();
let expected_schema = SchemaBuilder::new()
.field("b", DataType::Boolean)
.unwrap()
@ -2319,7 +2339,7 @@ mod tests {
let tombstone_max_sequence_number = None;
let err = IngesterPartition::new(
"ingester".into(),
None,
Some(ingester_uuid),
PartitionId::new(1),
ShardId::new(1),
0,

View File

@ -12,6 +12,7 @@ use schema::{sort::SortKey, Projection, Schema};
use sharder::JumpHash;
use std::{sync::Arc, time::Duration};
use tokio::runtime::Handle;
use uuid::Uuid;
/// Create a [`QuerierTable`] for testing.
pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -> QuerierTable {
@ -133,7 +134,7 @@ impl IngesterPartitionBuilder {
IngesterPartition::new(
Arc::clone(&self.ingester_name),
None,
Some(Uuid::new_v4()),
self.partition.partition.id,
self.shard.shard.id,
0,