Merge pull request #1078 from influxdata/ntran/for1001
feat: make it easy to get OperationMetadata from Operationpull/24376/head
commit
a51b1008a2
|
@ -1492,6 +1492,7 @@ dependencies = [
|
||||||
"generated_types",
|
"generated_types",
|
||||||
"http",
|
"http",
|
||||||
"hyper",
|
"hyper",
|
||||||
|
"prost",
|
||||||
"rand 0.8.3",
|
"rand 0.8.3",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
|
|
@ -17,6 +17,7 @@ generated_types = { path = "../generated_types" }
|
||||||
futures-util = { version = "0.3.1", optional = true }
|
futures-util = { version = "0.3.1", optional = true }
|
||||||
http = "0.2.3"
|
http = "0.2.3"
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
|
prost = "0.7"
|
||||||
serde = "1.0.118"
|
serde = "1.0.118"
|
||||||
serde_json = { version = "1.0.44", optional = true }
|
serde_json = { version = "1.0.44", optional = true }
|
||||||
thiserror = "1.0.23"
|
thiserror = "1.0.23"
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
use ::generated_types::google::FieldViolation;
|
use ::generated_types::{
|
||||||
|
google::FieldViolation, influxdata::iox::management::v1 as management, protobuf_type_url_eq,
|
||||||
use crate::connection::Connection;
|
};
|
||||||
|
|
||||||
use self::generated_types::{operations_client::OperationsClient, *};
|
use self::generated_types::{operations_client::OperationsClient, *};
|
||||||
|
use crate::connection::Connection;
|
||||||
/// Re-export generated_types
|
/// Re-export generated_types
|
||||||
pub mod generated_types {
|
pub mod generated_types {
|
||||||
pub use generated_types::google::longrunning::*;
|
pub use generated_types::google::longrunning::*;
|
||||||
|
@ -25,6 +25,10 @@ pub enum Error {
|
||||||
/// Client received an unexpected error from the server
|
/// Client received an unexpected error from the server
|
||||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||||
ServerError(tonic::Status),
|
ServerError(tonic::Status),
|
||||||
|
|
||||||
|
/// Operation is not type url
|
||||||
|
#[error("Operation metadata is not type_url")]
|
||||||
|
WrongOperationMetaData,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result type for the operations Client
|
/// Result type for the operations Client
|
||||||
|
@ -61,15 +65,18 @@ impl Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get information about all operations
|
/// Get information of all client operation
|
||||||
pub async fn list_operations(&mut self) -> Result<Vec<Operation>> {
|
pub async fn list_operations(&mut self) -> Result<Vec<ClientOperation>> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.inner
|
.inner
|
||||||
.list_operations(ListOperationsRequest::default())
|
.list_operations(ListOperationsRequest::default())
|
||||||
.await
|
.await
|
||||||
.map_err(Error::ServerError)?
|
.map_err(Error::ServerError)?
|
||||||
.into_inner()
|
.into_inner()
|
||||||
.operations)
|
.operations
|
||||||
|
.into_iter()
|
||||||
|
.map(|o| ClientOperation::try_new(o).unwrap())
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get information about a specific operation
|
/// Get information about a specific operation
|
||||||
|
@ -122,4 +129,48 @@ impl Client {
|
||||||
})?
|
})?
|
||||||
.into_inner())
|
.into_inner())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the Client Operation
|
||||||
|
pub async fn client_operation(&mut self, id: usize) -> Result<ClientOperation> {
|
||||||
|
let operation = self.get_operation(id).await?;
|
||||||
|
ClientOperation::try_new(operation)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// IOx's Client Operation
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ClientOperation {
|
||||||
|
inner: generated_types::Operation,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClientOperation {
|
||||||
|
/// Create a new Cient Operation
|
||||||
|
pub fn try_new(operation: generated_types::Operation) -> Result<Self> {
|
||||||
|
if operation.metadata.is_some() {
|
||||||
|
let metadata = operation.metadata.clone().unwrap();
|
||||||
|
if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) {
|
||||||
|
return Err(Error::WrongOperationMetaData);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(Error::NotFound(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self { inner: operation })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return Metadata for this client operation
|
||||||
|
pub fn metadata(&self) -> management::OperationMetadata {
|
||||||
|
prost::Message::decode(self.inner.metadata.clone().unwrap().value)
|
||||||
|
.expect("failed to decode metadata")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return name of this operation
|
||||||
|
pub fn name(&self) -> String {
|
||||||
|
self.inner.name.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the inner's Operation
|
||||||
|
pub fn operation(self) -> Operation {
|
||||||
|
self.inner
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,7 @@ pub async fn command(url: String, config: Config) -> Result<()> {
|
||||||
.list_operations()
|
.list_operations()
|
||||||
.await?
|
.await?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
.map(|c| c.operation())
|
||||||
.map(TryInto::try_into)
|
.map(TryInto::try_into)
|
||||||
.collect();
|
.collect();
|
||||||
let operations = result?;
|
let operations = result?;
|
||||||
|
|
|
@ -4,15 +4,11 @@ use generated_types::{
|
||||||
google::protobuf::{Duration, Empty},
|
google::protobuf::{Duration, Empty},
|
||||||
influxdata::iox::management::v1::*,
|
influxdata::iox::management::v1::*,
|
||||||
};
|
};
|
||||||
use influxdb_iox_client::management::CreateDatabaseError;
|
use influxdb_iox_client::{management::CreateDatabaseError, operations};
|
||||||
use test_helpers::assert_contains;
|
use test_helpers::assert_contains;
|
||||||
|
|
||||||
use super::{
|
use super::scenario::{
|
||||||
operations_api::get_operation_metadata,
|
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
|
||||||
scenario::{
|
|
||||||
create_readable_database, create_two_partition_database, create_unreadable_database,
|
|
||||||
rand_name,
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
use crate::common::server_fixture::ServerFixture;
|
use crate::common::server_fixture::ServerFixture;
|
||||||
|
|
||||||
|
@ -559,7 +555,9 @@ async fn test_close_partition_chunk() {
|
||||||
println!("Operation response is {:?}", operation);
|
println!("Operation response is {:?}", operation);
|
||||||
let operation_id = operation.name.parse().expect("not an integer");
|
let operation_id = operation.name.parse().expect("not an integer");
|
||||||
|
|
||||||
let meta = get_operation_metadata(operation.metadata);
|
let meta = operations::ClientOperation::try_new(operation)
|
||||||
|
.unwrap()
|
||||||
|
.metadata();
|
||||||
|
|
||||||
// ensure we got a legit job description back
|
// ensure we got a legit job description back
|
||||||
if let Some(Job::CloseChunk(close_chunk)) = meta.job {
|
if let Some(Job::CloseChunk(close_chunk)) = meta.job {
|
||||||
|
|
|
@ -1,17 +1,7 @@
|
||||||
use crate::common::server_fixture::ServerFixture;
|
use crate::common::server_fixture::ServerFixture;
|
||||||
use generated_types::google::protobuf::Any;
|
use influxdb_iox_client::{management::generated_types::*, operations};
|
||||||
use influxdb_iox_client::{management::generated_types::*, operations, protobuf_type_url_eq};
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
// TODO remove after #1001 and use something directly in the influxdb_iox_client
|
|
||||||
// crate
|
|
||||||
pub fn get_operation_metadata(metadata: Option<Any>) -> OperationMetadata {
|
|
||||||
assert!(metadata.is_some());
|
|
||||||
let metadata = metadata.unwrap();
|
|
||||||
assert!(protobuf_type_url_eq(&metadata.type_url, OPERATION_METADATA));
|
|
||||||
prost::Message::decode(metadata.value).expect("failed to decode metadata")
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_operations() {
|
async fn test_operations() {
|
||||||
let server_fixture = ServerFixture::create_single_use().await;
|
let server_fixture = ServerFixture::create_single_use().await;
|
||||||
|
@ -38,22 +28,21 @@ async fn test_operations() {
|
||||||
.expect("list operations failed");
|
.expect("list operations failed");
|
||||||
|
|
||||||
assert_eq!(running_ops.len(), 1);
|
assert_eq!(running_ops.len(), 1);
|
||||||
assert_eq!(running_ops[0].name, operation.name);
|
assert_eq!(running_ops[0].name(), operation.name);
|
||||||
|
|
||||||
let id = operation.name.parse().expect("not an integer");
|
let id = operation.name.parse().expect("not an integer");
|
||||||
|
|
||||||
// Check we can fetch metadata for Operation
|
let meta = operations_client
|
||||||
let fetched = operations_client
|
.client_operation(id)
|
||||||
.get_operation(id)
|
|
||||||
.await
|
.await
|
||||||
.expect("get operation failed");
|
.unwrap()
|
||||||
let meta = get_operation_metadata(fetched.metadata);
|
.metadata();
|
||||||
|
|
||||||
let job = meta.job.expect("expected a job");
|
let job = meta.job.expect("expected a job");
|
||||||
|
|
||||||
assert_eq!(meta.task_count, 2);
|
assert_eq!(meta.task_count, 2);
|
||||||
assert_eq!(meta.pending_count, 1);
|
assert_eq!(meta.pending_count, 1);
|
||||||
assert_eq!(job, operation_metadata::Job::Dummy(Dummy { nanos }));
|
assert_eq!(job, operation_metadata::Job::Dummy(Dummy { nanos }));
|
||||||
assert!(!fetched.done);
|
|
||||||
|
|
||||||
// Check wait times out correctly
|
// Check wait times out correctly
|
||||||
let fetched = operations_client
|
let fetched = operations_client
|
||||||
|
@ -79,7 +68,9 @@ async fn test_operations() {
|
||||||
.expect("failed to cancel operation");
|
.expect("failed to cancel operation");
|
||||||
|
|
||||||
let waited = wait.await.unwrap();
|
let waited = wait.await.unwrap();
|
||||||
let meta = get_operation_metadata(waited.metadata);
|
let meta = operations::ClientOperation::try_new(waited.clone())
|
||||||
|
.unwrap()
|
||||||
|
.metadata();
|
||||||
|
|
||||||
assert!(waited.done);
|
assert!(waited.done);
|
||||||
assert!(meta.wall_nanos > 0);
|
assert!(meta.wall_nanos > 0);
|
||||||
|
|
Loading…
Reference in New Issue