fix: Update end-to-end.rs to have moe specific error messages (#674)

pull/24376/head
Andrew Lamb 2021-01-19 15:54:13 -05:00 committed by GitHub
parent 7969808f09
commit 4b5204cc8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 76 additions and 46 deletions

View File

@ -54,7 +54,7 @@ async fn read_data_as_sql(
org_id: &str,
bucket_id: &str,
sql_query: &str,
) -> Result<Vec<String>> {
) -> Vec<String> {
let url = format!("{}{}", API_BASE, path);
let lines = client
.get(&url)
@ -64,15 +64,16 @@ async fn read_data_as_sql(
("sql_query", sql_query),
])
.send()
.await?
.error_for_status()?
.await
.unwrap()
.text()
.await?
.await
.unwrap()
.trim()
.split('\n')
.map(str::to_string)
.collect();
Ok(lines)
lines
}
async fn write_data(
@ -88,8 +89,8 @@ async fn write_data(
}
#[tokio::test]
async fn read_and_write_data() -> Result<()> {
let server = TestServer::new()?;
async fn read_and_write_data() {
let server = TestServer::new().unwrap();
server.wait_until_ready().await;
let org_id_str = "0000111100001111";
@ -134,52 +135,63 @@ async fn read_and_write_data() -> Result<()> {
.tag("region", "us-west")
.field("value", 0.64)
.timestamp(ns_since_epoch)
.build()?,
.build()
.unwrap(),
influxdb2_client::DataPoint::builder("cpu_load_short")
.tag("host", "server01")
.field("value", 27.99)
.timestamp(ns_since_epoch + 1)
.build()?,
.build()
.unwrap(),
influxdb2_client::DataPoint::builder("cpu_load_short")
.tag("host", "server02")
.tag("region", "us-west")
.field("value", 3.89)
.timestamp(ns_since_epoch + 2)
.build()?,
.build()
.unwrap(),
influxdb2_client::DataPoint::builder("cpu_load_short")
.tag("host", "server01")
.tag("region", "us-east")
.field("value", 1234567.891011)
.timestamp(ns_since_epoch + 3)
.build()?,
.build()
.unwrap(),
influxdb2_client::DataPoint::builder("cpu_load_short")
.tag("host", "server01")
.tag("region", "us-west")
.field("value", 0.000003)
.timestamp(ns_since_epoch + 4)
.build()?,
.build()
.unwrap(),
influxdb2_client::DataPoint::builder("system")
.tag("host", "server03")
.field("uptime", 1303385)
.timestamp(ns_since_epoch + 5)
.build()?,
.build()
.unwrap(),
influxdb2_client::DataPoint::builder("swap")
.tag("host", "server01")
.tag("name", "disk0")
.field("in", 3)
.field("out", 4)
.timestamp(ns_since_epoch + 6)
.build()?,
.build()
.unwrap(),
influxdb2_client::DataPoint::builder("status")
.field("active", true)
.timestamp(ns_since_epoch + 7)
.build()?,
.build()
.unwrap(),
influxdb2_client::DataPoint::builder("attributes")
.field("color", "blue")
.timestamp(ns_since_epoch + 8)
.build()?,
.build()
.unwrap(),
];
write_data(&client2, org_id_str, bucket_id_str, points).await?;
write_data(&client2, org_id_str, bucket_id_str, points)
.await
.unwrap();
let expected_read_data = substitute_nanos(
ns_since_epoch,
@ -203,7 +215,8 @@ async fn read_and_write_data() -> Result<()> {
bucket_id_str,
"select * from cpu_load_short",
)
.await?;
.await;
assert_eq!(
text, expected_read_data,
"Actual:\n{:#?}\nExpected:\n{:#?}",
@ -213,12 +226,12 @@ async fn read_and_write_data() -> Result<()> {
// Make an invalid organization WAL dir to test that the server ignores it
// instead of crashing
let invalid_org_dir = server.dir.path().join("not-an-org-id");
fs::create_dir(invalid_org_dir)?;
fs::create_dir(invalid_org_dir).unwrap();
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await?;
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap();
// Validate that capabilities rpc endpoint is hooked up
let capabilities_response = storage_client.capabilities(()).await?;
let capabilities_response = storage_client.capabilities(()).await.unwrap();
let capabilities_response = capabilities_response.into_inner();
assert_eq!(
capabilities_response.caps.len(),
@ -234,7 +247,7 @@ async fn read_and_write_data() -> Result<()> {
partition_id,
};
let mut d = Vec::new();
read_source.encode(&mut d)?;
read_source.encode(&mut d).unwrap();
let read_source = prost_types::Any {
type_url: "/TODO".to_string(),
value: d,
@ -255,9 +268,12 @@ async fn read_and_write_data() -> Result<()> {
range: range.clone(),
predicate: predicate.clone(),
});
let read_response = storage_client.read_filter(read_filter_request).await?;
let read_response = storage_client
.read_filter(read_filter_request)
.await
.unwrap();
let responses: Vec<_> = read_response.into_inner().try_collect().await?;
let responses: Vec<_> = read_response.into_inner().try_collect().await.unwrap();
let frames: Vec<Data> = responses
.into_iter()
.flat_map(|r| r.frames)
@ -293,8 +309,8 @@ async fn read_and_write_data() -> Result<()> {
predicate: predicate.clone(),
});
let tag_keys_response = storage_client.tag_keys(tag_keys_request).await?;
let responses: Vec<_> = tag_keys_response.into_inner().try_collect().await?;
let tag_keys_response = storage_client.tag_keys(tag_keys_request).await.unwrap();
let responses: Vec<_> = tag_keys_response.into_inner().try_collect().await.unwrap();
let keys = &responses[0].values;
let keys: Vec<_> = keys
@ -311,8 +327,12 @@ async fn read_and_write_data() -> Result<()> {
tag_key: b"host".to_vec(),
});
let tag_values_response = storage_client.tag_values(tag_values_request).await?;
let responses: Vec<_> = tag_values_response.into_inner().try_collect().await?;
let tag_values_response = storage_client.tag_values(tag_values_request).await.unwrap();
let responses: Vec<_> = tag_values_response
.into_inner()
.try_collect()
.await
.unwrap();
let values = &responses[0].values;
let values: Vec<_> = values
@ -337,11 +357,13 @@ async fn read_and_write_data() -> Result<()> {
let measurement_names_response = storage_client
.measurement_names(measurement_names_request)
.await?;
.await
.unwrap();
let responses: Vec<_> = measurement_names_response
.into_inner()
.try_collect()
.await?;
.await
.unwrap();
let values = &responses[0].values;
let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect();
@ -360,11 +382,13 @@ async fn read_and_write_data() -> Result<()> {
let measurement_tag_keys_response = storage_client
.measurement_tag_keys(measurement_tag_keys_request)
.await?;
.await
.unwrap();
let responses: Vec<_> = measurement_tag_keys_response
.into_inner()
.try_collect()
.await?;
.await
.unwrap();
let values = &responses[0].values;
let values: Vec<_> = values
@ -384,11 +408,13 @@ async fn read_and_write_data() -> Result<()> {
let measurement_tag_values_response = storage_client
.measurement_tag_values(measurement_tag_values_request)
.await?;
.await
.unwrap();
let responses: Vec<_> = measurement_tag_values_response
.into_inner()
.try_collect()
.await?;
.await
.unwrap();
let values = &responses[0].values;
let values: Vec<_> = values
@ -407,11 +433,13 @@ async fn read_and_write_data() -> Result<()> {
let measurement_fields_response = storage_client
.measurement_fields(measurement_fields_request)
.await?;
.await
.unwrap();
let responses: Vec<_> = measurement_fields_response
.into_inner()
.try_collect()
.await?;
.await
.unwrap();
let fields = &responses[0].fields;
assert_eq!(fields.len(), 1);
@ -421,7 +449,7 @@ async fn read_and_write_data() -> Result<()> {
assert_eq!(field.r#type, DataType::Float as i32);
assert_eq!(field.timestamp, ns_since_epoch + 4);
test_http_error_messages(&client2).await?;
test_http_error_messages(&client2).await.unwrap();
test_read_window_aggregate(
&mut storage_client,
@ -431,8 +459,6 @@ async fn read_and_write_data() -> Result<()> {
bucket_id_str,
)
.await;
Ok(())
}
// Don't make a separate #test function so that we can reuse the same
@ -861,14 +887,16 @@ struct TestServer {
impl TestServer {
fn new() -> Result<Self> {
let dir = test_helpers::tmp_dir()?;
let dir = test_helpers::tmp_dir().unwrap();
let server_process = Command::cargo_bin("influxdb_iox")?
let server_process = Command::cargo_bin("influxdb_iox")
.unwrap()
// Can enable for debbugging
//.arg("-vv")
.env("INFLUXDB_IOX_DB_DIR", dir.path())
.env("INFLUXDB_IOX_ID", "1")
.spawn()?;
.spawn()
.unwrap();
Ok(Self {
dir,
@ -878,14 +906,16 @@ impl TestServer {
#[allow(dead_code)]
fn restart(&mut self) -> Result<()> {
self.server_process.kill()?;
self.server_process.wait()?;
self.server_process = Command::cargo_bin("influxdb_iox")?
self.server_process.kill().unwrap();
self.server_process.wait().unwrap();
self.server_process = Command::cargo_bin("influxdb_iox")
.unwrap()
// Can enable for debbugging
//.arg("-vv")
.env("INFLUXDB_IOX_DB_DIR", self.dir.path())
.env("INFLUXDB_IOX_ID", "1")
.spawn()?;
.spawn()
.unwrap();
Ok(())
}