diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index 4bb4727759..b20b79bacc 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -54,7 +54,7 @@ async fn read_data_as_sql( org_id: &str, bucket_id: &str, sql_query: &str, -) -> Result> { +) -> Vec { let url = format!("{}{}", API_BASE, path); let lines = client .get(&url) @@ -64,15 +64,16 @@ async fn read_data_as_sql( ("sql_query", sql_query), ]) .send() - .await? - .error_for_status()? + .await + .unwrap() .text() - .await? + .await + .unwrap() .trim() .split('\n') .map(str::to_string) .collect(); - Ok(lines) + lines } async fn write_data( @@ -88,8 +89,8 @@ async fn write_data( } #[tokio::test] -async fn read_and_write_data() -> Result<()> { - let server = TestServer::new()?; +async fn read_and_write_data() { + let server = TestServer::new().unwrap(); server.wait_until_ready().await; let org_id_str = "0000111100001111"; @@ -134,52 +135,63 @@ async fn read_and_write_data() -> Result<()> { .tag("region", "us-west") .field("value", 0.64) .timestamp(ns_since_epoch) - .build()?, + .build() + .unwrap(), influxdb2_client::DataPoint::builder("cpu_load_short") .tag("host", "server01") .field("value", 27.99) .timestamp(ns_since_epoch + 1) - .build()?, + .build() + .unwrap(), influxdb2_client::DataPoint::builder("cpu_load_short") .tag("host", "server02") .tag("region", "us-west") .field("value", 3.89) .timestamp(ns_since_epoch + 2) - .build()?, + .build() + .unwrap(), influxdb2_client::DataPoint::builder("cpu_load_short") .tag("host", "server01") .tag("region", "us-east") .field("value", 1234567.891011) .timestamp(ns_since_epoch + 3) - .build()?, + .build() + .unwrap(), influxdb2_client::DataPoint::builder("cpu_load_short") .tag("host", "server01") .tag("region", "us-west") .field("value", 0.000003) .timestamp(ns_since_epoch + 4) - .build()?, + .build() + .unwrap(), influxdb2_client::DataPoint::builder("system") .tag("host", "server03") .field("uptime", 1303385) .timestamp(ns_since_epoch + 5) - .build()?, + .build() + .unwrap(), influxdb2_client::DataPoint::builder("swap") .tag("host", "server01") .tag("name", "disk0") .field("in", 3) .field("out", 4) .timestamp(ns_since_epoch + 6) - .build()?, + .build() + .unwrap(), influxdb2_client::DataPoint::builder("status") .field("active", true) .timestamp(ns_since_epoch + 7) - .build()?, + .build() + .unwrap(), influxdb2_client::DataPoint::builder("attributes") .field("color", "blue") .timestamp(ns_since_epoch + 8) - .build()?, + .build() + .unwrap(), ]; - write_data(&client2, org_id_str, bucket_id_str, points).await?; + write_data(&client2, org_id_str, bucket_id_str, points) + .await + .unwrap(); let expected_read_data = substitute_nanos( ns_since_epoch, @@ -203,7 +215,8 @@ async fn read_and_write_data() -> Result<()> { bucket_id_str, "select * from cpu_load_short", ) - .await?; + .await; + assert_eq!( text, expected_read_data, "Actual:\n{:#?}\nExpected:\n{:#?}", @@ -213,12 +226,12 @@ async fn read_and_write_data() -> Result<()> { // Make an invalid organization WAL dir to test that the server ignores it // instead of crashing let invalid_org_dir = server.dir.path().join("not-an-org-id"); - fs::create_dir(invalid_org_dir)?; + fs::create_dir(invalid_org_dir).unwrap(); - let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await?; + let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap(); // Validate that capabilities rpc endpoint is hooked up - let capabilities_response = storage_client.capabilities(()).await?; + let capabilities_response = storage_client.capabilities(()).await.unwrap(); let capabilities_response = capabilities_response.into_inner(); assert_eq!( capabilities_response.caps.len(), @@ -234,7 +247,7 @@ async fn read_and_write_data() -> Result<()> { partition_id, }; let mut d = Vec::new(); - read_source.encode(&mut d)?; + read_source.encode(&mut d).unwrap(); let read_source = prost_types::Any { type_url: "/TODO".to_string(), value: d, @@ -255,9 +268,12 @@ async fn read_and_write_data() -> Result<()> { range: range.clone(), predicate: predicate.clone(), }); - let read_response = storage_client.read_filter(read_filter_request).await?; + let read_response = storage_client + .read_filter(read_filter_request) + .await + .unwrap(); - let responses: Vec<_> = read_response.into_inner().try_collect().await?; + let responses: Vec<_> = read_response.into_inner().try_collect().await.unwrap(); let frames: Vec = responses .into_iter() .flat_map(|r| r.frames) @@ -293,8 +309,8 @@ async fn read_and_write_data() -> Result<()> { predicate: predicate.clone(), }); - let tag_keys_response = storage_client.tag_keys(tag_keys_request).await?; - let responses: Vec<_> = tag_keys_response.into_inner().try_collect().await?; + let tag_keys_response = storage_client.tag_keys(tag_keys_request).await.unwrap(); + let responses: Vec<_> = tag_keys_response.into_inner().try_collect().await.unwrap(); let keys = &responses[0].values; let keys: Vec<_> = keys @@ -311,8 +327,12 @@ async fn read_and_write_data() -> Result<()> { tag_key: b"host".to_vec(), }); - let tag_values_response = storage_client.tag_values(tag_values_request).await?; - let responses: Vec<_> = tag_values_response.into_inner().try_collect().await?; + let tag_values_response = storage_client.tag_values(tag_values_request).await.unwrap(); + let responses: Vec<_> = tag_values_response + .into_inner() + .try_collect() + .await + .unwrap(); let values = &responses[0].values; let values: Vec<_> = values @@ -337,11 +357,13 @@ async fn read_and_write_data() -> Result<()> { let measurement_names_response = storage_client .measurement_names(measurement_names_request) - .await?; + .await + .unwrap(); let responses: Vec<_> = measurement_names_response .into_inner() .try_collect() - .await?; + .await + .unwrap(); let values = &responses[0].values; let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect(); @@ -360,11 +382,13 @@ async fn read_and_write_data() -> Result<()> { let measurement_tag_keys_response = storage_client .measurement_tag_keys(measurement_tag_keys_request) - .await?; + .await + .unwrap(); let responses: Vec<_> = measurement_tag_keys_response .into_inner() .try_collect() - .await?; + .await + .unwrap(); let values = &responses[0].values; let values: Vec<_> = values @@ -384,11 +408,13 @@ async fn read_and_write_data() -> Result<()> { let measurement_tag_values_response = storage_client .measurement_tag_values(measurement_tag_values_request) - .await?; + .await + .unwrap(); let responses: Vec<_> = measurement_tag_values_response .into_inner() .try_collect() - .await?; + .await + .unwrap(); let values = &responses[0].values; let values: Vec<_> = values @@ -407,11 +433,13 @@ async fn read_and_write_data() -> Result<()> { let measurement_fields_response = storage_client .measurement_fields(measurement_fields_request) - .await?; + .await + .unwrap(); let responses: Vec<_> = measurement_fields_response .into_inner() .try_collect() - .await?; + .await + .unwrap(); let fields = &responses[0].fields; assert_eq!(fields.len(), 1); @@ -421,7 +449,7 @@ async fn read_and_write_data() -> Result<()> { assert_eq!(field.r#type, DataType::Float as i32); assert_eq!(field.timestamp, ns_since_epoch + 4); - test_http_error_messages(&client2).await?; + test_http_error_messages(&client2).await.unwrap(); test_read_window_aggregate( &mut storage_client, @@ -431,8 +459,6 @@ async fn read_and_write_data() -> Result<()> { bucket_id_str, ) .await; - - Ok(()) } // Don't make a separate #test function so that we can reuse the same @@ -861,14 +887,16 @@ struct TestServer { impl TestServer { fn new() -> Result { - let dir = test_helpers::tmp_dir()?; + let dir = test_helpers::tmp_dir().unwrap(); - let server_process = Command::cargo_bin("influxdb_iox")? + let server_process = Command::cargo_bin("influxdb_iox") + .unwrap() // Can enable for debbugging //.arg("-vv") .env("INFLUXDB_IOX_DB_DIR", dir.path()) .env("INFLUXDB_IOX_ID", "1") - .spawn()?; + .spawn() + .unwrap(); Ok(Self { dir, @@ -878,14 +906,16 @@ impl TestServer { #[allow(dead_code)] fn restart(&mut self) -> Result<()> { - self.server_process.kill()?; - self.server_process.wait()?; - self.server_process = Command::cargo_bin("influxdb_iox")? + self.server_process.kill().unwrap(); + self.server_process.wait().unwrap(); + self.server_process = Command::cargo_bin("influxdb_iox") + .unwrap() // Can enable for debbugging //.arg("-vv") .env("INFLUXDB_IOX_DB_DIR", self.dir.path()) .env("INFLUXDB_IOX_ID", "1") - .spawn()?; + .spawn() + .unwrap(); Ok(()) }