From a630c119abe0731c9059d3230bb3d4763d005c29 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 30 Mar 2021 12:57:11 -0400 Subject: [PATCH] feat: make it easy to get OperationMetadata from Operation --- Cargo.lock | 1 + influxdb_iox_client/Cargo.toml | 1 + influxdb_iox_client/src/client/operations.rs | 65 +++++++++++++++++--- src/commands/operations.rs | 1 + tests/end_to_end_cases/management_api.rs | 14 ++--- tests/end_to_end_cases/operations_api.rs | 29 +++------ 6 files changed, 77 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc764449d4..e330654ff2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1492,6 +1492,7 @@ dependencies = [ "generated_types", "http", "hyper", + "prost", "rand 0.8.3", "serde", "serde_json", diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 12f2383f44..3974911f0d 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -17,6 +17,7 @@ generated_types = { path = "../generated_types" } futures-util = { version = "0.3.1", optional = true } http = "0.2.3" hyper = "0.14" +prost = "0.7" serde = "1.0.118" serde_json = { version = "1.0.44", optional = true } thiserror = "1.0.23" diff --git a/influxdb_iox_client/src/client/operations.rs b/influxdb_iox_client/src/client/operations.rs index 462baeb0b3..716603faef 100644 --- a/influxdb_iox_client/src/client/operations.rs +++ b/influxdb_iox_client/src/client/operations.rs @@ -1,11 +1,11 @@ use thiserror::Error; -use ::generated_types::google::FieldViolation; - -use crate::connection::Connection; +use ::generated_types::{ + google::FieldViolation, influxdata::iox::management::v1 as management, protobuf_type_url_eq, +}; use self::generated_types::{operations_client::OperationsClient, *}; - +use crate::connection::Connection; /// Re-export generated_types pub mod generated_types { pub use generated_types::google::longrunning::*; @@ -25,6 +25,10 @@ pub enum Error { /// Client received an unexpected error from the server #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] ServerError(tonic::Status), + + /// Operation is not type url + #[error("Operation metadata is not type_url")] + WrongOperationMetaData, } /// Result type for the operations Client @@ -61,15 +65,18 @@ impl Client { } } - /// Get information about all operations - pub async fn list_operations(&mut self) -> Result> { + /// Get information of all client operation + pub async fn list_operations(&mut self) -> Result> { Ok(self .inner .list_operations(ListOperationsRequest::default()) .await .map_err(Error::ServerError)? .into_inner() - .operations) + .operations + .into_iter() + .map(|o| ClientOperation::try_new(o).unwrap()) + .collect()) } /// Get information about a specific operation @@ -122,4 +129,48 @@ impl Client { })? .into_inner()) } + + /// Return the Client Operation + pub async fn client_operation(&mut self, id: usize) -> Result { + let operation = self.get_operation(id).await?; + ClientOperation::try_new(operation) + } +} + +/// IOx's Client Operation +#[derive(Debug, Clone)] +pub struct ClientOperation { + inner: generated_types::Operation, +} + +impl ClientOperation { + /// Create a new Cient Operation + pub fn try_new(operation: generated_types::Operation) -> Result { + if operation.metadata.is_some() { + let metadata = operation.metadata.clone().unwrap(); + if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) { + return Err(Error::WrongOperationMetaData); + } + } else { + return Err(Error::NotFound(0)); + } + + Ok(Self { inner: operation }) + } + + /// Return Metadata for this client operation + pub fn metadata(&self) -> management::OperationMetadata { + prost::Message::decode(self.inner.metadata.clone().unwrap().value) + .expect("failed to decode metadata") + } + + /// Return name of this operation + pub fn name(&self) -> String { + self.inner.name.clone() + } + + /// Return the inner's Operation + pub fn operation(self) -> Operation { + self.inner + } } diff --git a/src/commands/operations.rs b/src/commands/operations.rs index 037353c5dd..e221a87f80 100644 --- a/src/commands/operations.rs +++ b/src/commands/operations.rs @@ -76,6 +76,7 @@ pub async fn command(url: String, config: Config) -> Result<()> { .list_operations() .await? .into_iter() + .map(|c| c.operation()) .map(TryInto::try_into) .collect(); let operations = result?; diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index b0e4d683af..8ac3b78c63 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -4,15 +4,11 @@ use generated_types::{ google::protobuf::{Duration, Empty}, influxdata::iox::management::v1::*, }; -use influxdb_iox_client::management::CreateDatabaseError; +use influxdb_iox_client::{management::CreateDatabaseError, operations}; use test_helpers::assert_contains; -use super::{ - operations_api::get_operation_metadata, - scenario::{ - create_readable_database, create_two_partition_database, create_unreadable_database, - rand_name, - }, +use super::scenario::{ + create_readable_database, create_two_partition_database, create_unreadable_database, rand_name, }; use crate::common::server_fixture::ServerFixture; @@ -559,7 +555,9 @@ async fn test_close_partition_chunk() { println!("Operation response is {:?}", operation); let operation_id = operation.name.parse().expect("not an integer"); - let meta = get_operation_metadata(operation.metadata); + let meta = operations::ClientOperation::try_new(operation) + .unwrap() + .metadata(); // ensure we got a legit job description back if let Some(Job::CloseChunk(close_chunk)) = meta.job { diff --git a/tests/end_to_end_cases/operations_api.rs b/tests/end_to_end_cases/operations_api.rs index 0fde9ea716..35b9cd55ba 100644 --- a/tests/end_to_end_cases/operations_api.rs +++ b/tests/end_to_end_cases/operations_api.rs @@ -1,17 +1,7 @@ use crate::common::server_fixture::ServerFixture; -use generated_types::google::protobuf::Any; -use influxdb_iox_client::{management::generated_types::*, operations, protobuf_type_url_eq}; +use influxdb_iox_client::{management::generated_types::*, operations}; use std::time::Duration; -// TODO remove after #1001 and use something directly in the influxdb_iox_client -// crate -pub fn get_operation_metadata(metadata: Option) -> OperationMetadata { - assert!(metadata.is_some()); - let metadata = metadata.unwrap(); - assert!(protobuf_type_url_eq(&metadata.type_url, OPERATION_METADATA)); - prost::Message::decode(metadata.value).expect("failed to decode metadata") -} - #[tokio::test] async fn test_operations() { let server_fixture = ServerFixture::create_single_use().await; @@ -38,22 +28,21 @@ async fn test_operations() { .expect("list operations failed"); assert_eq!(running_ops.len(), 1); - assert_eq!(running_ops[0].name, operation.name); + assert_eq!(running_ops[0].name(), operation.name); let id = operation.name.parse().expect("not an integer"); - // Check we can fetch metadata for Operation - let fetched = operations_client - .get_operation(id) + let meta = operations_client + .client_operation(id) .await - .expect("get operation failed"); - let meta = get_operation_metadata(fetched.metadata); + .unwrap() + .metadata(); + let job = meta.job.expect("expected a job"); assert_eq!(meta.task_count, 2); assert_eq!(meta.pending_count, 1); assert_eq!(job, operation_metadata::Job::Dummy(Dummy { nanos })); - assert!(!fetched.done); // Check wait times out correctly let fetched = operations_client @@ -79,7 +68,9 @@ async fn test_operations() { .expect("failed to cancel operation"); let waited = wait.await.unwrap(); - let meta = get_operation_metadata(waited.metadata); + let meta = operations::ClientOperation::try_new(waited.clone()) + .unwrap() + .metadata(); assert!(waited.done); assert!(meta.wall_nanos > 0);