feat: Rewrite the NG end-to-end metrics test (#4404)
* refactor: Expose data generation tool for wider use * feat: Add a step for retrieving the server metrics * refactor: Copy the OG end-to-end metrics test to NG * feat: Rewrite the NG end-to-end metrics test This is still broken because the the row timestamp metrics don't exist in NG. * fix: Test metrics relevant to NG * refactor: Move the data generator to the test helper crate * refactor: Extract a ReadFilter request builder into the test helper crate * refactor: Make test helper request builder able to build other gRPC requests Co-authored-by: Carol (Nichols || Goulding) <carol.nichols@gmail.com>pull/24376/head
parent
86e8f05ed1
commit
e3caf24954
|
@ -6057,7 +6057,9 @@ dependencies = [
|
|||
"arrow",
|
||||
"arrow_util",
|
||||
"assert_cmd",
|
||||
"bytes",
|
||||
"futures",
|
||||
"generated_types",
|
||||
"http",
|
||||
"hyper",
|
||||
"influxdb_iox_client",
|
||||
|
@ -6065,6 +6067,7 @@ dependencies = [
|
|||
"observability_deps",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.0",
|
||||
"prost",
|
||||
"rand",
|
||||
"reqwest",
|
||||
"sqlx",
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
use test_helpers_end_to_end_ng::{
|
||||
maybe_skip_integration, DataGenerator, MiniCluster, Step, StepTest, TestConfig,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_metrics() {
|
||||
let database_url = maybe_skip_integration!();
|
||||
let test_config = TestConfig::new_all_in_one(database_url);
|
||||
let mut cluster = MiniCluster::create_all_in_one(test_config).await;
|
||||
|
||||
let lp = DataGenerator::new().line_protocol().to_owned();
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(lp),
|
||||
Step::WaitForReadable,
|
||||
Step::VerifiedMetrics(Box::new(|_state, metrics| {
|
||||
assert_eq!(
|
||||
metrics
|
||||
.trim()
|
||||
.split('\n')
|
||||
.filter(|x| x.starts_with("catalog_op_duration_ms_bucket"))
|
||||
.count(),
|
||||
180
|
||||
);
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(feature = "jemalloc_replacing_malloc")]
|
||||
#[tokio::test]
|
||||
pub async fn test_jemalloc_metrics() {
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
let database_url = maybe_skip_integration!();
|
||||
let test_config = TestConfig::new_all_in_one(database_url);
|
||||
let mut cluster = MiniCluster::create_all_in_one(test_config).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![Step::VerifiedMetrics(Box::new(|_state, metrics| {
|
||||
let lines: Vec<_> = metrics
|
||||
.trim()
|
||||
.split('\n')
|
||||
.filter(|x| x.starts_with("jemalloc_memstats_bytes"))
|
||||
.collect();
|
||||
|
||||
assert_eq!(lines.len(), 6);
|
||||
assert_contains!(lines[0], r#"jemalloc_memstats_bytes{stat="active"}"#);
|
||||
assert_contains!(lines[1], r#"jemalloc_memstats_bytes{stat="alloc"}"#);
|
||||
assert_contains!(lines[2], r#"jemalloc_memstats_bytes{stat="metadata"}"#);
|
||||
assert_contains!(lines[3], r#"jemalloc_memstats_bytes{stat="mapped"}"#);
|
||||
assert_contains!(lines[4], r#"jemalloc_memstats_bytes{stat="resident"}"#);
|
||||
assert_contains!(lines[5], r#"jemalloc_memstats_bytes{stat="retained"}"#);
|
||||
}))],
|
||||
)
|
||||
.run()
|
||||
.await;
|
||||
}
|
|
@ -3,6 +3,7 @@ mod cli;
|
|||
mod debug;
|
||||
mod ingester;
|
||||
mod logging;
|
||||
mod metrics;
|
||||
mod namespace;
|
||||
mod querier;
|
||||
mod router;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
mod influxrpc;
|
||||
pub(crate) mod influxrpc;
|
||||
mod multi_ingester;
|
||||
|
||||
use futures::FutureExt;
|
||||
|
|
|
@ -1,19 +1,13 @@
|
|||
//! Tests for influxrpc / Storage gRPC endpoints
|
||||
|
||||
mod data;
|
||||
mod dump;
|
||||
mod exprs;
|
||||
mod metadata;
|
||||
mod read_filter;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use generated_types::ReadSource;
|
||||
use prost::Message;
|
||||
use test_helpers_end_to_end_ng::FCustom;
|
||||
use test_helpers_end_to_end_ng::{maybe_skip_integration, MiniCluster, Step, StepTest};
|
||||
|
||||
use self::data::DataGenerator;
|
||||
use test_helpers_end_to_end_ng::{
|
||||
maybe_skip_integration, DataGenerator, FCustom, MiniCluster, Step, StepTest,
|
||||
};
|
||||
|
||||
/// Runs the specified custom function on a cluster with no data
|
||||
pub(crate) async fn run_no_data_test(custom: FCustom) {
|
||||
|
@ -45,31 +39,3 @@ pub(crate) async fn run_data_test(generator: Arc<DataGenerator>, custom: FCustom
|
|||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
/// Creates the appropriate `Any` protobuf magic for a read source
|
||||
/// with a specified org and bucket name
|
||||
pub(crate) fn make_read_source(
|
||||
cluster: &MiniCluster,
|
||||
) -> Option<generated_types::google::protobuf::Any> {
|
||||
let org_id = cluster.org_id();
|
||||
let bucket_id = cluster.bucket_id();
|
||||
let org_id = u64::from_str_radix(org_id, 16).unwrap();
|
||||
let bucket_id = u64::from_str_radix(bucket_id, 16).unwrap();
|
||||
|
||||
let partition_id = u64::from(u32::MAX);
|
||||
let read_source = ReadSource {
|
||||
org_id,
|
||||
bucket_id,
|
||||
partition_id,
|
||||
};
|
||||
|
||||
// Do the magic to-any conversion
|
||||
let mut d = bytes::BytesMut::new();
|
||||
read_source.encode(&mut d).unwrap();
|
||||
let read_source = generated_types::google::protobuf::Any {
|
||||
type_url: "/TODO".to_string(),
|
||||
value: d.freeze(),
|
||||
};
|
||||
|
||||
Some(read_source)
|
||||
}
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
use generated_types::{
|
||||
node::{Comparison, Type as NodeType, Value},
|
||||
Node, Predicate,
|
||||
};
|
||||
|
||||
/// Create a predicate representing tag_name=tag_value in the horrible gRPC
|
||||
/// structs
|
||||
pub(crate) fn make_tag_predicate(
|
||||
tag_name: impl Into<String>,
|
||||
tag_value: impl Into<String>,
|
||||
) -> Predicate {
|
||||
Predicate {
|
||||
root: Some(Node {
|
||||
node_type: NodeType::ComparisonExpression as i32,
|
||||
children: vec![
|
||||
Node {
|
||||
node_type: NodeType::TagRef as i32,
|
||||
children: vec![],
|
||||
value: Some(Value::TagRefValue(tag_name.into().into())),
|
||||
},
|
||||
Node {
|
||||
node_type: NodeType::Literal as i32,
|
||||
children: vec![],
|
||||
value: Some(Value::StringValue(tag_value.into())),
|
||||
},
|
||||
],
|
||||
value: Some(Value::Comparison(Comparison::Equal as _)),
|
||||
}),
|
||||
}
|
||||
}
|
|
@ -1,21 +1,12 @@
|
|||
use super::make_read_source;
|
||||
use super::run_data_test;
|
||||
use super::run_no_data_test;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::FutureExt;
|
||||
use super::{run_data_test, run_no_data_test};
|
||||
use futures::{prelude::*, FutureExt};
|
||||
use generated_types::{
|
||||
google::protobuf::Empty, measurement_fields_response::FieldType,
|
||||
offsets_response::PartitionOffsetResponse, storage_client::StorageClient,
|
||||
MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest,
|
||||
MeasurementTagValuesRequest, OffsetsResponse, TagKeysRequest, TagValuesRequest,
|
||||
offsets_response::PartitionOffsetResponse, storage_client::StorageClient, OffsetsResponse,
|
||||
};
|
||||
use influxdb_storage_client::tag_key_bytes_to_strings;
|
||||
use test_helpers_end_to_end_ng::StepTestState;
|
||||
|
||||
use super::{data::DataGenerator, exprs};
|
||||
use std::sync::Arc;
|
||||
use test_helpers_end_to_end_ng::{DataGenerator, GrpcRequestBuilder, StepTestState};
|
||||
|
||||
#[tokio::test]
|
||||
/// Validate that capabilities storage endpoint is hooked up
|
||||
|
@ -68,16 +59,12 @@ async fn tag_keys() {
|
|||
async move {
|
||||
let mut storage_client =
|
||||
StorageClient::new(state.cluster().querier().querier_grpc_connection());
|
||||
let read_source = make_read_source(state.cluster());
|
||||
let range = generator.timestamp_range();
|
||||
let predicate = exprs::make_tag_predicate("host", "server01");
|
||||
let predicate = Some(predicate);
|
||||
|
||||
let tag_keys_request = tonic::Request::new(TagKeysRequest {
|
||||
tags_source: read_source,
|
||||
range,
|
||||
predicate,
|
||||
});
|
||||
let tag_keys_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.timestamp_range(generator.min_time(), generator.max_time())
|
||||
.tag_predicate("host", "server01")
|
||||
.build_tag_keys();
|
||||
|
||||
let tag_keys_response = storage_client.tag_keys(tag_keys_request).await.unwrap();
|
||||
let responses: Vec<_> = tag_keys_response.into_inner().try_collect().await.unwrap();
|
||||
|
@ -105,17 +92,12 @@ async fn tag_values() {
|
|||
async move {
|
||||
let mut storage_client =
|
||||
StorageClient::new(state.cluster().querier().querier_grpc_connection());
|
||||
let read_source = make_read_source(state.cluster());
|
||||
let range = generator.timestamp_range();
|
||||
let predicate = exprs::make_tag_predicate("host", "server01");
|
||||
let predicate = Some(predicate);
|
||||
|
||||
let tag_values_request = tonic::Request::new(TagValuesRequest {
|
||||
tags_source: read_source,
|
||||
range,
|
||||
predicate,
|
||||
tag_key: b"host".to_vec(),
|
||||
});
|
||||
let tag_values_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.timestamp_range(generator.min_time(), generator.max_time())
|
||||
.tag_predicate("host", "server01")
|
||||
.build_tag_values("host");
|
||||
|
||||
let tag_values_response =
|
||||
storage_client.tag_values(tag_values_request).await.unwrap();
|
||||
|
@ -148,14 +130,11 @@ async fn measurement_names() {
|
|||
async move {
|
||||
let mut storage_client =
|
||||
StorageClient::new(state.cluster().querier().querier_grpc_connection());
|
||||
let read_source = make_read_source(state.cluster());
|
||||
let range = generator.timestamp_range();
|
||||
|
||||
let measurement_names_request = tonic::Request::new(MeasurementNamesRequest {
|
||||
source: read_source,
|
||||
range,
|
||||
predicate: None,
|
||||
});
|
||||
let measurement_names_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.timestamp_range(generator.min_time(), generator.max_time())
|
||||
.build_measurement_names();
|
||||
|
||||
let measurement_names_response = storage_client
|
||||
.measurement_names(measurement_names_request)
|
||||
|
@ -193,18 +172,12 @@ async fn measurement_tag_keys() {
|
|||
async move {
|
||||
let mut storage_client =
|
||||
StorageClient::new(state.cluster().querier().querier_grpc_connection());
|
||||
let read_source = make_read_source(state.cluster());
|
||||
let range = generator.timestamp_range();
|
||||
|
||||
let predicate = exprs::make_tag_predicate("host", "server01");
|
||||
let predicate = Some(predicate);
|
||||
|
||||
let measurement_tag_keys_request = tonic::Request::new(MeasurementTagKeysRequest {
|
||||
source: read_source,
|
||||
measurement: String::from("cpu_load_short"),
|
||||
range,
|
||||
predicate,
|
||||
});
|
||||
let measurement_tag_keys_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.timestamp_range(generator.min_time(), generator.max_time())
|
||||
.tag_predicate("host", "server01")
|
||||
.build_measurement_tag_keys("cpu_load_short");
|
||||
|
||||
let measurement_tag_keys_response = storage_client
|
||||
.measurement_tag_keys(measurement_tag_keys_request)
|
||||
|
@ -239,20 +212,12 @@ async fn measurement_tag_values() {
|
|||
async move {
|
||||
let mut storage_client =
|
||||
StorageClient::new(state.cluster().querier().querier_grpc_connection());
|
||||
let read_source = make_read_source(state.cluster());
|
||||
let range = generator.timestamp_range();
|
||||
|
||||
let predicate = exprs::make_tag_predicate("host", "server01");
|
||||
let predicate = Some(predicate);
|
||||
|
||||
let measurement_tag_values_request =
|
||||
tonic::Request::new(MeasurementTagValuesRequest {
|
||||
source: read_source,
|
||||
measurement: String::from("cpu_load_short"),
|
||||
tag_key: String::from("host"),
|
||||
range,
|
||||
predicate,
|
||||
});
|
||||
let measurement_tag_values_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.timestamp_range(generator.min_time(), generator.max_time())
|
||||
.tag_predicate("host", "server01")
|
||||
.build_measurement_tag_values("cpu_load_short", "host");
|
||||
|
||||
let measurement_tag_values_response = storage_client
|
||||
.measurement_tag_values(measurement_tag_values_request)
|
||||
|
@ -287,18 +252,12 @@ async fn measurement_fields() {
|
|||
async move {
|
||||
let mut storage_client =
|
||||
StorageClient::new(state.cluster().querier().querier_grpc_connection());
|
||||
let read_source = make_read_source(state.cluster());
|
||||
let range = generator.timestamp_range();
|
||||
|
||||
let predicate = exprs::make_tag_predicate("host", "server01");
|
||||
let predicate = Some(predicate);
|
||||
|
||||
let measurement_fields_request = tonic::Request::new(MeasurementFieldsRequest {
|
||||
source: read_source,
|
||||
measurement: String::from("cpu_load_short"),
|
||||
range,
|
||||
predicate,
|
||||
});
|
||||
let measurement_fields_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.timestamp_range(generator.min_time(), generator.max_time())
|
||||
.tag_predicate("host", "server01")
|
||||
.build_measurement_fields("cpu_load_short");
|
||||
|
||||
let ns_since_epoch = generator.ns_since_epoch();
|
||||
let measurement_fields_response = storage_client
|
||||
|
|
|
@ -1,17 +1,8 @@
|
|||
use super::{dump::dump_data_frames, run_data_test};
|
||||
use futures::{prelude::*, FutureExt};
|
||||
use generated_types::{read_response::frame::Data, storage_client::StorageClient};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::FutureExt;
|
||||
use generated_types::{
|
||||
read_response::frame::Data, storage_client::StorageClient, ReadFilterRequest,
|
||||
};
|
||||
use test_helpers_end_to_end_ng::StepTestState;
|
||||
|
||||
use crate::end_to_end_ng_cases::querier::influxrpc::dump::dump_data_frames;
|
||||
|
||||
use super::make_read_source;
|
||||
use super::run_data_test;
|
||||
use super::{data::DataGenerator, exprs};
|
||||
use test_helpers_end_to_end_ng::{DataGenerator, GrpcRequestBuilder, StepTestState};
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_filter() {
|
||||
|
@ -20,31 +11,11 @@ async fn read_filter() {
|
|||
async move {
|
||||
let mut storage_client =
|
||||
StorageClient::new(state.cluster().querier().querier_grpc_connection());
|
||||
let read_source = make_read_source(state.cluster());
|
||||
let range = generator.timestamp_range();
|
||||
|
||||
let predicate = exprs::make_tag_predicate("host", "server01");
|
||||
let predicate = Some(predicate);
|
||||
|
||||
let read_filter_request = tonic::Request::new(ReadFilterRequest {
|
||||
read_source,
|
||||
range,
|
||||
predicate,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let expected_frames = generator.substitute_nanos(&[
|
||||
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,_field=value, type: 0",
|
||||
"FloatPointsFrame, timestamps: [ns1], values: \"27.99\"",
|
||||
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,region=us-east,_field=value, type: 0",
|
||||
"FloatPointsFrame, timestamps: [ns3], values: \"1234567.891011\"",
|
||||
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,region=us-west,_field=value, type: 0",
|
||||
"FloatPointsFrame, timestamps: [ns0, ns4], values: \"0.64,0.000003\"",
|
||||
"SeriesFrame, tags: _measurement=swap,host=server01,name=disk0,_field=in, type: 1",
|
||||
"IntegerPointsFrame, timestamps: [ns6], values: \"3\"",
|
||||
"SeriesFrame, tags: _measurement=swap,host=server01,name=disk0,_field=out, type: 1",
|
||||
"IntegerPointsFrame, timestamps: [ns6], values: \"4\""
|
||||
]);
|
||||
let read_filter_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.timestamp_range(generator.min_time(), generator.max_time())
|
||||
.tag_predicate("host", "server01")
|
||||
.build_read_filter();
|
||||
|
||||
let read_response = storage_client
|
||||
.read_filter(read_filter_request)
|
||||
|
@ -60,6 +31,19 @@ async fn read_filter() {
|
|||
|
||||
let actual_frames = dump_data_frames(&frames);
|
||||
|
||||
let expected_frames = generator.substitute_nanos(&[
|
||||
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,_field=value, type: 0",
|
||||
"FloatPointsFrame, timestamps: [ns1], values: \"27.99\"",
|
||||
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,region=us-east,_field=value, type: 0",
|
||||
"FloatPointsFrame, timestamps: [ns3], values: \"1234567.891011\"",
|
||||
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,region=us-west,_field=value, type: 0",
|
||||
"FloatPointsFrame, timestamps: [ns0, ns4], values: \"0.64,0.000003\"",
|
||||
"SeriesFrame, tags: _measurement=swap,host=server01,name=disk0,_field=in, type: 1",
|
||||
"IntegerPointsFrame, timestamps: [ns6], values: \"3\"",
|
||||
"SeriesFrame, tags: _measurement=swap,host=server01,name=disk0,_field=out, type: 1",
|
||||
"IntegerPointsFrame, timestamps: [ns6], values: \"4\""
|
||||
]);
|
||||
|
||||
assert_eq!(
|
||||
expected_frames,
|
||||
actual_frames,
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
use futures::{prelude::*, FutureExt};
|
||||
use generated_types::{storage_client::StorageClient, ReadFilterRequest, ReadSource};
|
||||
use prost::Message;
|
||||
use generated_types::storage_client::StorageClient;
|
||||
use test_helpers_end_to_end_ng::{
|
||||
maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig, UdpCapture,
|
||||
maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState,
|
||||
TestConfig, UdpCapture,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -73,35 +73,9 @@ pub async fn test_tracing_storage_api() {
|
|||
let mut storage_client =
|
||||
StorageClient::new(cluster.querier().querier_grpc_connection());
|
||||
|
||||
let org_id = cluster.org_id();
|
||||
let bucket_id = cluster.bucket_id();
|
||||
let org_id = u64::from_str_radix(org_id, 16).unwrap();
|
||||
let bucket_id = u64::from_str_radix(bucket_id, 16).unwrap();
|
||||
|
||||
let partition_id = u64::from(u32::MAX);
|
||||
let read_source = ReadSource {
|
||||
org_id,
|
||||
bucket_id,
|
||||
partition_id,
|
||||
};
|
||||
|
||||
// Do the magic to-any conversion
|
||||
let mut d = bytes::BytesMut::new();
|
||||
read_source.encode(&mut d).unwrap();
|
||||
let read_source = generated_types::google::protobuf::Any {
|
||||
type_url: "/TODO".to_string(),
|
||||
value: d.freeze(),
|
||||
};
|
||||
|
||||
let range = None;
|
||||
let predicate = None;
|
||||
|
||||
let read_filter_request = tonic::Request::new(ReadFilterRequest {
|
||||
read_source: Some(read_source),
|
||||
range,
|
||||
predicate,
|
||||
..Default::default()
|
||||
});
|
||||
let read_filter_request = GrpcRequestBuilder::new()
|
||||
.source(state.cluster())
|
||||
.build_read_filter();
|
||||
|
||||
async move {
|
||||
let read_response = storage_client
|
||||
|
|
|
@ -7,7 +7,9 @@ edition = "2021"
|
|||
arrow = { version = "12", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
assert_cmd = "2.0.2"
|
||||
bytes = "1.0"
|
||||
futures = "0.3"
|
||||
generated_types = { path = "../generated_types" }
|
||||
http = "0.2.0"
|
||||
hyper = "0.14"
|
||||
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
|
||||
|
@ -15,6 +17,7 @@ nix = "0.24"
|
|||
observability_deps = { path = "../observability_deps" }
|
||||
once_cell = { version = "1.10.0", features = ["parking_lot"] }
|
||||
parking_lot = "0.12"
|
||||
prost = "0.10"
|
||||
rand = "0.8.3"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
|
||||
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "postgres", "uuid" ] }
|
||||
|
@ -22,5 +25,5 @@ tempfile = "3.1.0"
|
|||
test_helpers = { path = "../test_helpers" }
|
||||
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
|
||||
tokio-util = "0.7"
|
||||
tonic = { version = "0.7" }
|
||||
tonic = "0.7"
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
|
|
@ -1,15 +1,13 @@
|
|||
use std::time::SystemTime;
|
||||
|
||||
use generated_types::TimestampRange;
|
||||
|
||||
/// Manages a dataset for writing / reading
|
||||
pub(crate) struct DataGenerator {
|
||||
pub struct DataGenerator {
|
||||
ns_since_epoch: i64,
|
||||
line_protocol: String,
|
||||
}
|
||||
|
||||
impl DataGenerator {
|
||||
pub(crate) fn new() -> Self {
|
||||
pub fn new() -> Self {
|
||||
let ns_since_epoch = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("System time should have been after the epoch")
|
||||
|
@ -56,16 +54,8 @@ impl DataGenerator {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return a timestamp range that covers the entire data range
|
||||
pub(crate) fn timestamp_range(&self) -> Option<TimestampRange> {
|
||||
Some(TimestampRange {
|
||||
start: self.ns_since_epoch,
|
||||
end: self.ns_since_epoch + 10,
|
||||
})
|
||||
}
|
||||
|
||||
/// substitutes "ns" --> ns_since_epoch, ns1-->ns_since_epoch+1, etc
|
||||
pub(crate) fn substitute_nanos(&self, lines: &[&str]) -> Vec<String> {
|
||||
pub fn substitute_nanos(&self, lines: &[&str]) -> Vec<String> {
|
||||
let ns_since_epoch = self.ns_since_epoch;
|
||||
let substitutions = vec![
|
||||
("ns0", format!("{}", ns_since_epoch)),
|
||||
|
@ -91,13 +81,31 @@ impl DataGenerator {
|
|||
|
||||
/// Get a reference to the data generator's line protocol.
|
||||
#[must_use]
|
||||
pub(crate) fn line_protocol(&self) -> &str {
|
||||
pub fn line_protocol(&self) -> &str {
|
||||
self.line_protocol.as_ref()
|
||||
}
|
||||
|
||||
/// Get the data generator's ns since epoch.
|
||||
#[must_use]
|
||||
pub(crate) fn ns_since_epoch(&self) -> i64 {
|
||||
pub fn ns_since_epoch(&self) -> i64 {
|
||||
self.ns_since_epoch
|
||||
}
|
||||
|
||||
/// Get the minimum time of the range of this data for querying.
|
||||
#[must_use]
|
||||
pub fn min_time(&self) -> i64 {
|
||||
self.ns_since_epoch
|
||||
}
|
||||
|
||||
/// Get the maximum time of the range of this data for querying.
|
||||
#[must_use]
|
||||
pub fn max_time(&self) -> i64 {
|
||||
self.ns_since_epoch + 10
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DataGenerator {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
use crate::MiniCluster;
|
||||
use generated_types::{
|
||||
node::{Comparison, Type as NodeType, Value},
|
||||
MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest,
|
||||
MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadSource, TagKeysRequest,
|
||||
TagValuesRequest, TimestampRange,
|
||||
};
|
||||
use prost::Message;
|
||||
|
||||
pub struct GrpcRequestBuilder {
|
||||
read_source: Option<generated_types::google::protobuf::Any>,
|
||||
range: Option<TimestampRange>,
|
||||
predicate: Option<Predicate>,
|
||||
}
|
||||
|
||||
impl Default for GrpcRequestBuilder {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl GrpcRequestBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
read_source: None,
|
||||
range: None,
|
||||
predicate: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates the appropriate `Any` protobuf magic for a read source with the cluster's org and
|
||||
/// bucket name
|
||||
pub fn source(self, cluster: &MiniCluster) -> Self {
|
||||
let org_id = cluster.org_id();
|
||||
let bucket_id = cluster.bucket_id();
|
||||
let org_id = u64::from_str_radix(org_id, 16).unwrap();
|
||||
let bucket_id = u64::from_str_radix(bucket_id, 16).unwrap();
|
||||
|
||||
let partition_id = u64::from(u32::MAX);
|
||||
let read_source = ReadSource {
|
||||
org_id,
|
||||
bucket_id,
|
||||
partition_id,
|
||||
};
|
||||
|
||||
// Do the magic to-any conversion
|
||||
let mut d = bytes::BytesMut::new();
|
||||
read_source.encode(&mut d).unwrap();
|
||||
let read_source = generated_types::google::protobuf::Any {
|
||||
type_url: "/TODO".to_string(),
|
||||
value: d.freeze(),
|
||||
};
|
||||
|
||||
Self {
|
||||
read_source: Some(read_source),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timestamp_range(self, start: i64, end: i64) -> Self {
|
||||
Self {
|
||||
range: Some(TimestampRange { start, end }),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a predicate representing tag_name=tag_value in the horrible gRPC
|
||||
/// structs
|
||||
pub fn tag_predicate(self, tag_name: impl Into<String>, tag_value: impl Into<String>) -> Self {
|
||||
let predicate = Predicate {
|
||||
root: Some(Node {
|
||||
node_type: NodeType::ComparisonExpression as i32,
|
||||
children: vec![
|
||||
Node {
|
||||
node_type: NodeType::TagRef as i32,
|
||||
children: vec![],
|
||||
value: Some(Value::TagRefValue(tag_name.into().into())),
|
||||
},
|
||||
Node {
|
||||
node_type: NodeType::Literal as i32,
|
||||
children: vec![],
|
||||
value: Some(Value::StringValue(tag_value.into())),
|
||||
},
|
||||
],
|
||||
value: Some(Value::Comparison(Comparison::Equal as _)),
|
||||
}),
|
||||
};
|
||||
Self {
|
||||
predicate: Some(predicate),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_read_filter(self) -> tonic::Request<ReadFilterRequest> {
|
||||
tonic::Request::new(ReadFilterRequest {
|
||||
read_source: self.read_source,
|
||||
range: self.range,
|
||||
predicate: self.predicate,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_tag_keys(self) -> tonic::Request<TagKeysRequest> {
|
||||
tonic::Request::new(TagKeysRequest {
|
||||
tags_source: self.read_source,
|
||||
range: self.range,
|
||||
predicate: self.predicate,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_tag_values(self, tag_key: &str) -> tonic::Request<TagValuesRequest> {
|
||||
tonic::Request::new(TagValuesRequest {
|
||||
tags_source: self.read_source,
|
||||
range: self.range,
|
||||
predicate: self.predicate,
|
||||
tag_key: tag_key.as_bytes().to_vec(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_measurement_names(self) -> tonic::Request<MeasurementNamesRequest> {
|
||||
tonic::Request::new(MeasurementNamesRequest {
|
||||
source: self.read_source,
|
||||
range: self.range,
|
||||
predicate: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_measurement_tag_keys(
|
||||
self,
|
||||
measurement: &str,
|
||||
) -> tonic::Request<MeasurementTagKeysRequest> {
|
||||
tonic::Request::new(MeasurementTagKeysRequest {
|
||||
source: self.read_source,
|
||||
measurement: measurement.to_string(),
|
||||
range: self.range,
|
||||
predicate: self.predicate,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_measurement_tag_values(
|
||||
self,
|
||||
measurement: &str,
|
||||
tag_key: &str,
|
||||
) -> tonic::Request<MeasurementTagValuesRequest> {
|
||||
tonic::Request::new(MeasurementTagValuesRequest {
|
||||
source: self.read_source,
|
||||
measurement: measurement.to_string(),
|
||||
tag_key: tag_key.to_string(),
|
||||
range: self.range,
|
||||
predicate: self.predicate,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_measurement_fields(
|
||||
self,
|
||||
measurement: &str,
|
||||
) -> tonic::Request<MeasurementFieldsRequest> {
|
||||
tonic::Request::new(MeasurementFieldsRequest {
|
||||
source: self.read_source,
|
||||
measurement: measurement.to_string(),
|
||||
range: self.range,
|
||||
predicate: self.predicate,
|
||||
})
|
||||
}
|
||||
}
|
|
@ -6,7 +6,9 @@ use rand::{
|
|||
mod addrs;
|
||||
mod client;
|
||||
mod config;
|
||||
mod data_generator;
|
||||
mod database;
|
||||
mod grpc;
|
||||
mod mini_cluster;
|
||||
mod server_fixture;
|
||||
mod server_type;
|
||||
|
@ -15,6 +17,8 @@ mod udp_listener;
|
|||
|
||||
pub use client::*;
|
||||
pub use config::TestConfig;
|
||||
pub use data_generator::DataGenerator;
|
||||
pub use grpc::GrpcRequestBuilder;
|
||||
pub use mini_cluster::MiniCluster;
|
||||
pub use server_fixture::{ServerFixture, TestServer};
|
||||
pub use server_type::ServerType;
|
||||
|
|
|
@ -103,6 +103,13 @@ pub enum Step {
|
|||
verify: Box<dyn Fn(Vec<RecordBatch>)>,
|
||||
},
|
||||
|
||||
/// Retrieve the metrics and verify the results using the provided
|
||||
/// validation function.
|
||||
///
|
||||
/// The validation function is expected to panic on validation
|
||||
/// failure.
|
||||
VerifiedMetrics(Box<dyn Fn(&mut StepTestState, String)>),
|
||||
|
||||
/// A custom step that can be used to implement special cases that
|
||||
/// are only used once.
|
||||
Custom(FCustom),
|
||||
|
@ -198,6 +205,20 @@ impl<'a> StepTest<'a> {
|
|||
verify(batches);
|
||||
info!("====Done running");
|
||||
}
|
||||
Step::VerifiedMetrics(verify) => {
|
||||
info!("====Begin validating metrics");
|
||||
|
||||
let cluster = state.cluster();
|
||||
let http_base = cluster.router2().router_http_base();
|
||||
let url = format!("{http_base}/metrics");
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let metrics = client.get(&url).send().await.unwrap().text().await.unwrap();
|
||||
|
||||
verify(&mut state, metrics);
|
||||
|
||||
info!("====Done validating metrics");
|
||||
}
|
||||
Step::Custom(f) => {
|
||||
info!("====Begin custom step");
|
||||
f(&mut state).await;
|
||||
|
|
Loading…
Reference in New Issue