feat: make it easy to get OperationMetadata from Operation

pull/24376/head
Nga Tran 2021-03-30 12:57:11 -04:00
parent d8d5b1568a
commit a630c119ab
6 changed files with 77 additions and 34 deletions

1
Cargo.lock generated
View File

@ -1492,6 +1492,7 @@ dependencies = [
"generated_types",
"http",
"hyper",
"prost",
"rand 0.8.3",
"serde",
"serde_json",

View File

@ -17,6 +17,7 @@ generated_types = { path = "../generated_types" }
futures-util = { version = "0.3.1", optional = true }
http = "0.2.3"
hyper = "0.14"
prost = "0.7"
serde = "1.0.118"
serde_json = { version = "1.0.44", optional = true }
thiserror = "1.0.23"

View File

@ -1,11 +1,11 @@
use thiserror::Error;
use ::generated_types::google::FieldViolation;
use crate::connection::Connection;
use ::generated_types::{
google::FieldViolation, influxdata::iox::management::v1 as management, protobuf_type_url_eq,
};
use self::generated_types::{operations_client::OperationsClient, *};
use crate::connection::Connection;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::google::longrunning::*;
@ -25,6 +25,10 @@ pub enum Error {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
/// Operation is not type url
#[error("Operation metadata is not type_url")]
WrongOperationMetaData,
}
/// Result type for the operations Client
@ -61,15 +65,18 @@ impl Client {
}
}
/// Get information about all operations
pub async fn list_operations(&mut self) -> Result<Vec<Operation>> {
/// Get information of all client operation
pub async fn list_operations(&mut self) -> Result<Vec<ClientOperation>> {
Ok(self
.inner
.list_operations(ListOperationsRequest::default())
.await
.map_err(Error::ServerError)?
.into_inner()
.operations)
.operations
.into_iter()
.map(|o| ClientOperation::try_new(o).unwrap())
.collect())
}
/// Get information about a specific operation
@ -122,4 +129,48 @@ impl Client {
})?
.into_inner())
}
/// Return the Client Operation
pub async fn client_operation(&mut self, id: usize) -> Result<ClientOperation> {
let operation = self.get_operation(id).await?;
ClientOperation::try_new(operation)
}
}
/// IOx's Client Operation
#[derive(Debug, Clone)]
pub struct ClientOperation {
inner: generated_types::Operation,
}
impl ClientOperation {
/// Create a new Cient Operation
pub fn try_new(operation: generated_types::Operation) -> Result<Self> {
if operation.metadata.is_some() {
let metadata = operation.metadata.clone().unwrap();
if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) {
return Err(Error::WrongOperationMetaData);
}
} else {
return Err(Error::NotFound(0));
}
Ok(Self { inner: operation })
}
/// Return Metadata for this client operation
pub fn metadata(&self) -> management::OperationMetadata {
prost::Message::decode(self.inner.metadata.clone().unwrap().value)
.expect("failed to decode metadata")
}
/// Return name of this operation
pub fn name(&self) -> String {
self.inner.name.clone()
}
/// Return the inner's Operation
pub fn operation(self) -> Operation {
self.inner
}
}

View File

@ -76,6 +76,7 @@ pub async fn command(url: String, config: Config) -> Result<()> {
.list_operations()
.await?
.into_iter()
.map(|c| c.operation())
.map(TryInto::try_into)
.collect();
let operations = result?;

View File

@ -4,15 +4,11 @@ use generated_types::{
google::protobuf::{Duration, Empty},
influxdata::iox::management::v1::*,
};
use influxdb_iox_client::management::CreateDatabaseError;
use influxdb_iox_client::{management::CreateDatabaseError, operations};
use test_helpers::assert_contains;
use super::{
operations_api::get_operation_metadata,
scenario::{
create_readable_database, create_two_partition_database, create_unreadable_database,
rand_name,
},
use super::scenario::{
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
};
use crate::common::server_fixture::ServerFixture;
@ -559,7 +555,9 @@ async fn test_close_partition_chunk() {
println!("Operation response is {:?}", operation);
let operation_id = operation.name.parse().expect("not an integer");
let meta = get_operation_metadata(operation.metadata);
let meta = operations::ClientOperation::try_new(operation)
.unwrap()
.metadata();
// ensure we got a legit job description back
if let Some(Job::CloseChunk(close_chunk)) = meta.job {

View File

@ -1,17 +1,7 @@
use crate::common::server_fixture::ServerFixture;
use generated_types::google::protobuf::Any;
use influxdb_iox_client::{management::generated_types::*, operations, protobuf_type_url_eq};
use influxdb_iox_client::{management::generated_types::*, operations};
use std::time::Duration;
// TODO remove after #1001 and use something directly in the influxdb_iox_client
// crate
pub fn get_operation_metadata(metadata: Option<Any>) -> OperationMetadata {
assert!(metadata.is_some());
let metadata = metadata.unwrap();
assert!(protobuf_type_url_eq(&metadata.type_url, OPERATION_METADATA));
prost::Message::decode(metadata.value).expect("failed to decode metadata")
}
#[tokio::test]
async fn test_operations() {
let server_fixture = ServerFixture::create_single_use().await;
@ -38,22 +28,21 @@ async fn test_operations() {
.expect("list operations failed");
assert_eq!(running_ops.len(), 1);
assert_eq!(running_ops[0].name, operation.name);
assert_eq!(running_ops[0].name(), operation.name);
let id = operation.name.parse().expect("not an integer");
// Check we can fetch metadata for Operation
let fetched = operations_client
.get_operation(id)
let meta = operations_client
.client_operation(id)
.await
.expect("get operation failed");
let meta = get_operation_metadata(fetched.metadata);
.unwrap()
.metadata();
let job = meta.job.expect("expected a job");
assert_eq!(meta.task_count, 2);
assert_eq!(meta.pending_count, 1);
assert_eq!(job, operation_metadata::Job::Dummy(Dummy { nanos }));
assert!(!fetched.done);
// Check wait times out correctly
let fetched = operations_client
@ -79,7 +68,9 @@ async fn test_operations() {
.expect("failed to cancel operation");
let waited = wait.await.unwrap();
let meta = get_operation_metadata(waited.metadata);
let meta = operations::ClientOperation::try_new(waited.clone())
.unwrap()
.metadata();
assert!(waited.done);
assert!(meta.wall_nanos > 0);