test: query limits (#4769)

* test: query limits

This was left out of #4760.

* test: additional debugging

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
pull/24376/head
Marco Neumann 2022-06-03 09:30:30 +02:00 committed by GitHub
parent 81730fd0ff
commit f7cbd5d490
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 298 additions and 1 deletions

View File

@ -425,3 +425,58 @@ impl Stream for GetStream {
}
}
}
#[cfg(test)]
mod tests {
use futures::Future;
use service_common::test_util::TestDatabaseStore;
use tokio::pin;
use super::*;
#[tokio::test]
async fn test_query_semaphore() {
let semaphore_size = 2;
let test_storage = Arc::new(TestDatabaseStore::new_with_semaphore_size(semaphore_size));
// add some data
test_storage.db_or_create("my_db").await;
let service = FlightService {
server: Arc::clone(&test_storage),
};
let ticket = Ticket {
ticket: br#"{"database_name": "my_db", "sql_query": "SELECT 1;"}"#.to_vec(),
};
let streaming_resp1 = service
.do_get(tonic::Request::new(ticket.clone()))
.await
.unwrap();
let _streaming_resp2 = service
.do_get(tonic::Request::new(ticket.clone()))
.await
.unwrap();
// 3rd request is pending
let fut = service.do_get(tonic::Request::new(ticket.clone()));
pin!(fut);
assert_fut_pending(&mut fut).await;
// free permit
drop(streaming_resp1);
let _streaming_resp3 = fut.await;
}
/// Assert that given future is pending.
///
/// This will try to poll the future a bit to ensure that it is not stuck in tokios task preemption.
async fn assert_fut_pending<F>(fut: &mut F)
where
F: Future + Send + Unpin,
{
tokio::select! {
_ = fut => panic!("future is not pending, yielded"),
_ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {},
};
}
}

View File

@ -1470,6 +1470,7 @@ mod tests {
use super::*;
use data_types::ChunkId;
use datafusion::logical_plan::{col, lit, Expr};
use futures::Future;
use generated_types::{i_ox_testing_client::IOxTestingClient, tag_key_predicate::Value};
use influxdb_storage_client::{
connection::{Builder as ConnectionBuilder, Connection},
@ -1482,12 +1483,13 @@ mod tests {
use predicate::{PredicateBuilder, PredicateMatch};
use service_common::test_util::TestDatabaseStore;
use std::{
any::Any,
net::{IpAddr, Ipv4Addr, SocketAddr},
num::NonZeroU64,
sync::Arc,
};
use test_helpers::{assert_contains, tracing::TracingCapture};
use tokio::task::JoinHandle;
use tokio::{pin, task::JoinHandle};
use tokio_stream::wrappers::TcpListenerStream;
fn to_str_vec(s: &[&str]) -> Vec<String> {
@ -2922,6 +2924,233 @@ mod tests {
assert_contains!(response_string, "Sugar we are going down");
}
#[derive(Debug, Clone)]
enum SemaphoredRequest {
MeasurementFields,
MeasurementNames,
MeasurementTagKeys,
MeasurementTagValues,
ReadFilter,
ReadGroup,
ReadWindowAggregate,
TagKeys,
TagValues,
TagValuesGroupedByMeasurementAndTagKey,
}
impl SemaphoredRequest {
async fn request(&self, service: &StorageService<TestDatabaseStore>) -> Box<dyn Any> {
let db_info = org_and_bucket();
let source = Some(StorageClient::read_source(&db_info, 1));
match self {
Self::MeasurementFields => {
let request = MeasurementFieldsRequest {
source: source.clone(),
measurement: "TheMeasurement".into(),
range: Some(make_timestamp_range(0, 2000)),
predicate: Some(make_state_eq_ma_predicate()),
};
let streaming_resp = service
.measurement_fields(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::MeasurementNames => {
let request = MeasurementNamesRequest {
source: source.clone(),
range: None,
predicate: None,
};
let streaming_resp = service
.measurement_names(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::MeasurementTagKeys => {
let request = MeasurementTagKeysRequest {
measurement: "TheMeasurement".into(),
source: source.clone(),
range: Some(make_timestamp_range(0, 200)),
predicate: Some(make_state_eq_ma_predicate()),
};
let streaming_resp = service
.measurement_tag_keys(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::MeasurementTagValues => {
let request = MeasurementTagValuesRequest {
measurement: "TheMeasurement".into(),
source: source.clone(),
range: Some(make_timestamp_range(150, 2000)),
predicate: Some(make_state_eq_ma_predicate()),
tag_key: "state".into(),
};
let streaming_resp = service
.measurement_tag_values(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::ReadFilter => {
let request = ReadFilterRequest {
read_source: source.clone(),
range: Some(make_timestamp_range(0, 10000)),
..Default::default()
};
let streaming_resp = service
.read_filter(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::ReadGroup => {
let group = generated_types::read_group_request::Group::By as i32;
let request = ReadGroupRequest {
read_source: source.clone(),
range: Some(make_timestamp_range(0, 2000)),
predicate: Some(make_state_eq_ma_predicate()),
group_keys: vec!["state".into()],
group,
aggregate: Some(Aggregate {
r#type: aggregate::AggregateType::Sum as i32,
}),
};
let streaming_resp = service
.read_group(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::ReadWindowAggregate => {
let request = ReadWindowAggregateRequest {
read_source: source.clone(),
range: Some(make_timestamp_range(150, 200)),
predicate: Some(make_state_eq_ma_predicate()),
window_every: 0,
offset: 0,
aggregate: vec![Aggregate {
r#type: aggregate::AggregateType::Sum as i32,
}],
// old skool window definition
window: Some(Window {
every: Some(Duration {
nsecs: 1122,
months: 0,
negative: false,
}),
offset: Some(Duration {
nsecs: 0,
months: 4,
negative: false,
}),
}),
};
let streaming_resp = service
.read_window_aggregate(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::TagKeys => {
let request = TagKeysRequest {
tags_source: source.clone(),
range: Some(make_timestamp_range(0, 2000)),
predicate: Some(make_state_eq_ma_predicate()),
};
let streaming_resp = service
.tag_keys(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::TagValues => {
let request = TagValuesRequest {
tags_source: source.clone(),
range: Some(make_timestamp_range(0, 2000)),
predicate: Some(make_state_eq_ma_predicate()),
tag_key: [255].into(),
};
let streaming_resp = service
.tag_values(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
Self::TagValuesGroupedByMeasurementAndTagKey => {
let request = TagValuesGroupedByMeasurementAndTagKeyRequest {
source: source.clone(),
measurement_patterns: vec![],
tag_key_predicate: Some(TagKeyPredicate {
value: Some(Value::Eq("state".into())),
}),
condition: None,
};
let streaming_resp = service
.tag_values_grouped_by_measurement_and_tag_key(tonic::Request::new(request))
.await
.unwrap();
Box::new(streaming_resp) as _
}
}
}
fn all() -> Vec<Self> {
vec![
Self::MeasurementFields,
Self::MeasurementNames,
Self::MeasurementTagKeys,
Self::MeasurementTagValues,
Self::ReadFilter,
Self::ReadGroup,
Self::ReadWindowAggregate,
Self::TagKeys,
Self::TagValues,
Self::TagValuesGroupedByMeasurementAndTagKey,
]
}
}
#[tokio::test]
async fn test_query_semaphore() {
let semaphore_size = 2;
let test_storage = Arc::new(TestDatabaseStore::new_with_semaphore_size(semaphore_size));
// add some data
let db_info = org_and_bucket();
let chunk = TestChunk::new("TheMeasurement")
.with_time_column()
.with_tag_column("state")
.with_one_row_of_data();
test_storage
.db_or_create(db_info.db_name())
.await
.add_chunk("my_partition_key", Arc::new(chunk));
// construct request
for t in SemaphoredRequest::all() {
println!("Testing with request: {:?}", t);
let service = StorageService {
db_store: Arc::clone(&test_storage),
};
let streaming_resp1 = t.request(&service).await;
let _streaming_resp2 = t.request(&service).await;
// 3rd request is pending
let fut = t.request(&service);
pin!(fut);
assert_fut_pending(&mut fut).await;
// free permit
drop(streaming_resp1);
let _streaming_resp3 = fut.await;
}
}
fn make_timestamp_range(start: i64, end: i64) -> TimestampRange {
TimestampRange { start, end }
}
@ -3103,4 +3332,17 @@ mod tests {
self.join_handle.abort();
}
}
/// Assert that given future is pending.
///
/// This will try to poll the future a bit to ensure that it is not stuck in tokios task preemption.
async fn assert_fut_pending<F>(fut: &mut F)
where
F: Future + Send + Unpin,
{
tokio::select! {
_ = fut => panic!("future is not pending, yielded"),
_ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {},
};
}
}