refactor: Extract more helper functions
parent
c4be42324b
commit
daa8b7ef0a
|
@ -61,30 +61,67 @@ async fn read_and_write_data() {
|
||||||
let http_client = reqwest::Client::new();
|
let http_client = reqwest::Client::new();
|
||||||
let influxdb2 = influxdb2_client::Client::new(HTTP_BASE, TOKEN);
|
let influxdb2 = influxdb2_client::Client::new(HTTP_BASE, TOKEN);
|
||||||
|
|
||||||
|
create_database(&http_client, &database_name).await;
|
||||||
|
|
||||||
|
let ns_since_epoch = ns_since_epoch();
|
||||||
|
let expected_read_data = load_data(&influxdb2, org_id_str, bucket_id_str, ns_since_epoch).await;
|
||||||
|
let sql_query = "select * from cpu_load_short";
|
||||||
|
|
||||||
|
test_read_api(
|
||||||
|
&http_client,
|
||||||
|
org_id_str,
|
||||||
|
bucket_id_str,
|
||||||
|
sql_query,
|
||||||
|
&expected_read_data,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
test_grpc_api(
|
||||||
|
&influxdb2,
|
||||||
|
org_id_str,
|
||||||
|
org_id,
|
||||||
|
bucket_id_str,
|
||||||
|
bucket_id,
|
||||||
|
ns_since_epoch,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
test_http_error_messages(&influxdb2).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_database(client: &reqwest::Client, database_name: &str) {
|
||||||
let rules = DatabaseRules {
|
let rules = DatabaseRules {
|
||||||
store_locally: true,
|
store_locally: true,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let data = serde_json::to_vec(&rules).unwrap();
|
let data = serde_json::to_vec(&rules).unwrap();
|
||||||
|
|
||||||
http_client
|
client
|
||||||
.put(&format!(
|
.put(&format!(
|
||||||
"{}/iox/api/v1/databases/{}",
|
"{}/iox/api/v1/databases/{}",
|
||||||
HTTP_BASE, &database_name
|
HTTP_BASE, database_name
|
||||||
))
|
))
|
||||||
.body(data)
|
.body(data)
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
let start_time = SystemTime::now();
|
fn ns_since_epoch() -> i64 {
|
||||||
let ns_since_epoch: i64 = start_time
|
SystemTime::now()
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
.expect("System time should have been after the epoch")
|
.expect("System time should have been after the epoch")
|
||||||
.as_nanos()
|
.as_nanos()
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect("Unable to represent system time");
|
.expect("Unable to represent system time")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_data(
|
||||||
|
influxdb2: &influxdb2_client::Client,
|
||||||
|
org_id_str: &str,
|
||||||
|
bucket_id_str: &str,
|
||||||
|
ns_since_epoch: i64,
|
||||||
|
) -> Vec<String> {
|
||||||
// TODO: make a more extensible way to manage data for tests, such as in
|
// TODO: make a more extensible way to manage data for tests, such as in
|
||||||
// external fixture files or with factories.
|
// external fixture files or with factories.
|
||||||
let points = vec![
|
let points = vec![
|
||||||
|
@ -151,7 +188,7 @@ async fn read_and_write_data() {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let expected_read_data = substitute_nanos(
|
substitute_nanos(
|
||||||
ns_since_epoch,
|
ns_since_epoch,
|
||||||
&[
|
&[
|
||||||
"+----------+---------+---------------------+----------------+",
|
"+----------+---------+---------------------+----------------+",
|
||||||
|
@ -164,23 +201,33 @@ async fn read_and_write_data() {
|
||||||
"| server01 | us-west | ns4 | 0.000003 |",
|
"| server01 | us-west | ns4 | 0.000003 |",
|
||||||
"+----------+---------+---------------------+----------------+",
|
"+----------+---------+---------------------+----------------+",
|
||||||
],
|
],
|
||||||
);
|
|
||||||
|
|
||||||
let text = read_data_as_sql(
|
|
||||||
&http_client,
|
|
||||||
"/read",
|
|
||||||
org_id_str,
|
|
||||||
bucket_id_str,
|
|
||||||
"select * from cpu_load_short",
|
|
||||||
)
|
)
|
||||||
.await;
|
}
|
||||||
|
|
||||||
|
async fn test_read_api(
|
||||||
|
client: &reqwest::Client,
|
||||||
|
org_id_str: &str,
|
||||||
|
bucket_id_str: &str,
|
||||||
|
sql_query: &str,
|
||||||
|
expected_read_data: &[String],
|
||||||
|
) {
|
||||||
|
let text = read_data_as_sql(&client, "/read", org_id_str, bucket_id_str, sql_query).await;
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
text, expected_read_data,
|
text, expected_read_data,
|
||||||
"Actual:\n{:#?}\nExpected:\n{:#?}",
|
"Actual:\n{:#?}\nExpected:\n{:#?}",
|
||||||
text, expected_read_data
|
text, expected_read_data
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn test_grpc_api(
|
||||||
|
influxdb2: &influxdb2_client::Client,
|
||||||
|
org_id_str: &str,
|
||||||
|
org_id: u64,
|
||||||
|
bucket_id_str: &str,
|
||||||
|
bucket_id: u64,
|
||||||
|
ns_since_epoch: i64,
|
||||||
|
) {
|
||||||
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap();
|
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap();
|
||||||
|
|
||||||
// Validate that capabilities rpc endpoint is hooked up
|
// Validate that capabilities rpc endpoint is hooked up
|
||||||
|
@ -402,8 +449,6 @@ async fn read_and_write_data() {
|
||||||
assert_eq!(field.r#type, DataType::Float as i32);
|
assert_eq!(field.r#type, DataType::Float as i32);
|
||||||
assert_eq!(field.timestamp, ns_since_epoch + 4);
|
assert_eq!(field.timestamp, ns_since_epoch + 4);
|
||||||
|
|
||||||
test_http_error_messages(&influxdb2).await.unwrap();
|
|
||||||
|
|
||||||
test_read_window_aggregate(
|
test_read_window_aggregate(
|
||||||
&mut storage_client,
|
&mut storage_client,
|
||||||
&influxdb2,
|
&influxdb2,
|
||||||
|
|
Loading…
Reference in New Issue