Merge pull request #3203 from influxdata/crepererum/issue2980i
refactor: remove methods from `ManagementService` that were moved to `RemoteService`pull/24376/head
commit
febc686583
|
@ -40,27 +40,6 @@ service ManagementService {
|
|||
// List chunks available on this database
|
||||
rpc ListChunks(ListChunksRequest) returns (ListChunksResponse);
|
||||
|
||||
// List remote IOx servers we know about.
|
||||
//
|
||||
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
|
||||
rpc ListRemotes(ListRemotesRequest) returns (ListRemotesResponse) {
|
||||
option deprecated = true;
|
||||
};
|
||||
|
||||
// Update information about a remote IOx server (upsert).
|
||||
//
|
||||
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
|
||||
rpc UpdateRemote(UpdateRemoteRequest) returns (UpdateRemoteResponse) {
|
||||
option deprecated = true;
|
||||
};
|
||||
|
||||
// Delete a reference to remote IOx server.
|
||||
//
|
||||
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
|
||||
rpc DeleteRemote(DeleteRemoteRequest) returns (DeleteRemoteResponse) {
|
||||
option deprecated = true;
|
||||
};
|
||||
|
||||
// Creates a dummy job that for each value of the nanos field
|
||||
// spawns a task that sleeps for that number of nanoseconds before returning
|
||||
rpc CreateDummyJob(CreateDummyJobRequest) returns (CreateDummyJobResponse) {
|
||||
|
@ -216,39 +195,6 @@ message CreateDummyJobResponse {
|
|||
google.longrunning.Operation operation = 1;
|
||||
}
|
||||
|
||||
message ListRemotesRequest {}
|
||||
|
||||
message ListRemotesResponse {
|
||||
repeated Remote remotes = 1;
|
||||
}
|
||||
|
||||
// This resource represents a remote IOx server.
|
||||
message Remote {
|
||||
// The server ID associated with a remote IOx server.
|
||||
uint32 id = 1;
|
||||
|
||||
// The address of the remote IOx server gRPC endpoint.
|
||||
string connection_string = 2;
|
||||
}
|
||||
|
||||
// Updates information about a remote IOx server.
|
||||
//
|
||||
// If a remote for a given `id` already exists, it is updated in place.
|
||||
message UpdateRemoteRequest {
|
||||
// If omitted, the remote associated with `id` will be removed.
|
||||
Remote remote = 1;
|
||||
|
||||
// TODO(#917): add an optional flag to test the connection or not before adding it.
|
||||
}
|
||||
|
||||
message UpdateRemoteResponse {}
|
||||
|
||||
message DeleteRemoteRequest{
|
||||
uint32 id = 1;
|
||||
}
|
||||
|
||||
message DeleteRemoteResponse {}
|
||||
|
||||
// Request to list all partitions from a named database
|
||||
message ListPartitionsRequest {
|
||||
// the name of the database
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::TABLE_STYLE_SINGLE_LINE_BORDERS;
|
||||
use comfy_table::{Cell, Table};
|
||||
use influxdb_iox_client::{connection::Connection, management};
|
||||
use influxdb_iox_client::{connection::Connection, remote};
|
||||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
|
||||
|
@ -8,10 +8,10 @@ use thiserror::Error;
|
|||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Update remote error: {0}")]
|
||||
UpdateError(#[from] management::UpdateRemoteError),
|
||||
UpdateError(#[from] remote::UpdateRemoteError),
|
||||
|
||||
#[error("List remote error: {0}")]
|
||||
ListError(#[from] management::ListRemotesError),
|
||||
ListError(#[from] remote::ListRemotesError),
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -24,8 +24,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
pub enum Config {
|
||||
/// Set connection parameters for a remote IOx server.
|
||||
Set { id: u32, connection_string: String },
|
||||
|
||||
/// Remove a reference to a remote IOx server.
|
||||
Remove { id: u32 },
|
||||
|
||||
/// List configured remote IOx server.
|
||||
List,
|
||||
}
|
||||
|
@ -36,15 +38,15 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
id,
|
||||
connection_string,
|
||||
} => {
|
||||
let mut client = management::Client::new(connection);
|
||||
let mut client = remote::Client::new(connection);
|
||||
client.update_remote(id, connection_string).await?;
|
||||
}
|
||||
Config::Remove { id } => {
|
||||
let mut client = management::Client::new(connection);
|
||||
let mut client = remote::Client::new(connection);
|
||||
client.delete_remote(id).await?;
|
||||
}
|
||||
Config::List => {
|
||||
let mut client = management::Client::new(connection);
|
||||
let mut client = remote::Client::new(connection);
|
||||
|
||||
let remotes = client.list_remotes().await?;
|
||||
if remotes.is_empty() {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use data_types::{chunk_metadata::ChunkId, server_id::ServerId, DatabaseName};
|
||||
use data_types::{chunk_metadata::ChunkId, DatabaseName};
|
||||
use generated_types::{
|
||||
google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound},
|
||||
influxdata::iox::management::v1::{Error as ProtobufError, *},
|
||||
|
@ -231,56 +231,6 @@ where
|
|||
Ok(Response::new(CreateDummyJobResponse { operation }))
|
||||
}
|
||||
|
||||
async fn list_remotes(
|
||||
&self,
|
||||
_: Request<ListRemotesRequest>,
|
||||
) -> Result<Response<ListRemotesResponse>, Status> {
|
||||
let remotes = self
|
||||
.server
|
||||
.remotes_sorted()
|
||||
.into_iter()
|
||||
.map(|(id, connection_string)| Remote {
|
||||
id: id.get_u32(),
|
||||
connection_string,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Response::new(ListRemotesResponse { remotes }))
|
||||
}
|
||||
|
||||
async fn update_remote(
|
||||
&self,
|
||||
request: Request<UpdateRemoteRequest>,
|
||||
) -> Result<Response<UpdateRemoteResponse>, Status> {
|
||||
let remote = request
|
||||
.into_inner()
|
||||
.remote
|
||||
.ok_or_else(|| FieldViolation::required("remote"))?;
|
||||
let remote_id = ServerId::try_from(remote.id)
|
||||
.map_err(|_| FieldViolation::required("id").scope("remote"))?;
|
||||
|
||||
self.server
|
||||
.update_remote(remote_id, remote.connection_string);
|
||||
|
||||
Ok(Response::new(UpdateRemoteResponse {}))
|
||||
}
|
||||
|
||||
async fn delete_remote(
|
||||
&self,
|
||||
request: Request<DeleteRemoteRequest>,
|
||||
) -> Result<Response<DeleteRemoteResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
let remote_id =
|
||||
ServerId::try_from(request.id).map_err(|_| FieldViolation::required("id"))?;
|
||||
|
||||
match self.server.delete_remote(remote_id) {
|
||||
Some(_) => {}
|
||||
None => return Err(NotFound::default().into()),
|
||||
}
|
||||
|
||||
Ok(Response::new(DeleteRemoteResponse {}))
|
||||
}
|
||||
|
||||
async fn list_partitions(
|
||||
&self,
|
||||
request: Request<ListPartitionsRequest>,
|
||||
|
|
|
@ -24,66 +24,6 @@ use crate::{
|
|||
use std::time::Instant;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_update_remotes() {
|
||||
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut client = server_fixture.management_client();
|
||||
|
||||
const TEST_REMOTE_ID_1: u32 = 42;
|
||||
const TEST_REMOTE_ADDR_1: &str = "1.2.3.4:1234";
|
||||
const TEST_REMOTE_ID_2: u32 = 84;
|
||||
const TEST_REMOTE_ADDR_2: &str = "4.3.2.1:4321";
|
||||
const TEST_REMOTE_ADDR_2_UPDATED: &str = "40.30.20.10:4321";
|
||||
|
||||
let res = client.list_remotes().await.expect("list remotes failed");
|
||||
assert_eq!(res.len(), 0);
|
||||
|
||||
client
|
||||
.update_remote(TEST_REMOTE_ID_1, TEST_REMOTE_ADDR_1)
|
||||
.await
|
||||
.expect("update failed");
|
||||
|
||||
let res = client.list_remotes().await.expect("list remotes failed");
|
||||
assert_eq!(res.len(), 1);
|
||||
|
||||
client
|
||||
.update_remote(TEST_REMOTE_ID_2, TEST_REMOTE_ADDR_2)
|
||||
.await
|
||||
.expect("update failed");
|
||||
|
||||
let res = client.list_remotes().await.expect("list remotes failed");
|
||||
assert_eq!(res.len(), 2);
|
||||
assert_eq!(res[0].id, TEST_REMOTE_ID_1);
|
||||
assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_1);
|
||||
assert_eq!(res[1].id, TEST_REMOTE_ID_2);
|
||||
assert_eq!(res[1].connection_string, TEST_REMOTE_ADDR_2);
|
||||
|
||||
client
|
||||
.delete_remote(TEST_REMOTE_ID_1)
|
||||
.await
|
||||
.expect("delete failed");
|
||||
|
||||
client
|
||||
.delete_remote(TEST_REMOTE_ID_1)
|
||||
.await
|
||||
.expect_err("expected delete to fail");
|
||||
|
||||
let res = client.list_remotes().await.expect("list remotes failed");
|
||||
assert_eq!(res.len(), 1);
|
||||
assert_eq!(res[0].id, TEST_REMOTE_ID_2);
|
||||
assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_2);
|
||||
|
||||
client
|
||||
.update_remote(TEST_REMOTE_ID_2, TEST_REMOTE_ADDR_2_UPDATED)
|
||||
.await
|
||||
.expect("update failed");
|
||||
|
||||
let res = client.list_remotes().await.expect("list remotes failed");
|
||||
assert_eq!(res.len(), 1);
|
||||
assert_eq!(res[0].id, TEST_REMOTE_ID_2);
|
||||
assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_2_UPDATED);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_database_duplicate_name() {
|
||||
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||
|
|
|
@ -116,6 +116,7 @@ async fn test_write_routed() {
|
|||
|
||||
let router = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut router_deployment = router.deployment_client();
|
||||
let mut router_remote = router.remote_client();
|
||||
let mut router_mgmt = router.management_client();
|
||||
router_deployment
|
||||
.update_server_id(test_router_id)
|
||||
|
@ -131,7 +132,7 @@ async fn test_write_routed() {
|
|||
.expect("set ID failed");
|
||||
target_1.wait_server_initialized().await;
|
||||
|
||||
router_mgmt
|
||||
router_remote
|
||||
.update_remote(TEST_REMOTE_ID_1, target_1.grpc_base())
|
||||
.await
|
||||
.expect("set remote failed");
|
||||
|
@ -144,7 +145,7 @@ async fn test_write_routed() {
|
|||
.expect("set ID failed");
|
||||
target_2.wait_server_initialized().await;
|
||||
|
||||
router_mgmt
|
||||
router_remote
|
||||
.update_remote(TEST_REMOTE_ID_2, target_2.grpc_base())
|
||||
.await
|
||||
.expect("set remote failed");
|
||||
|
@ -157,7 +158,7 @@ async fn test_write_routed() {
|
|||
.expect("set ID failed");
|
||||
target_3.wait_server_initialized().await;
|
||||
|
||||
router_mgmt
|
||||
router_remote
|
||||
.update_remote(TEST_REMOTE_ID_3, target_3.grpc_base())
|
||||
.await
|
||||
.expect("set remote failed");
|
||||
|
@ -487,6 +488,7 @@ async fn test_write_routed_no_shard() {
|
|||
let router = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut router_deployment = router.deployment_client();
|
||||
let mut router_mgmt = router.management_client();
|
||||
let mut router_remote = router.remote_client();
|
||||
router_deployment
|
||||
.update_server_id(test_router_id)
|
||||
.await
|
||||
|
@ -501,7 +503,7 @@ async fn test_write_routed_no_shard() {
|
|||
.expect("set ID failed");
|
||||
target_1.wait_server_initialized().await;
|
||||
|
||||
router_mgmt
|
||||
router_remote
|
||||
.update_remote(TEST_REMOTE_ID_1, target_1.grpc_base())
|
||||
.await
|
||||
.expect("set remote failed");
|
||||
|
@ -514,7 +516,7 @@ async fn test_write_routed_no_shard() {
|
|||
.expect("set ID failed");
|
||||
target_2.wait_server_initialized().await;
|
||||
|
||||
router_mgmt
|
||||
router_remote
|
||||
.update_remote(TEST_REMOTE_ID_2, target_2.grpc_base())
|
||||
.await
|
||||
.expect("set remote failed");
|
||||
|
@ -527,7 +529,7 @@ async fn test_write_routed_no_shard() {
|
|||
.expect("set ID failed");
|
||||
target_3.wait_server_initialized().await;
|
||||
|
||||
router_mgmt
|
||||
router_remote
|
||||
.update_remote(TEST_REMOTE_ID_3, target_3.grpc_base())
|
||||
.await
|
||||
.expect("set remote failed");
|
||||
|
|
|
@ -14,34 +14,6 @@ pub mod generated_types {
|
|||
pub use generated_types::influxdata::iox::write_buffer::v1::*;
|
||||
}
|
||||
|
||||
/// Errors returned by Client::update_server_id
|
||||
#[derive(Debug, Error)]
|
||||
pub enum UpdateServerIdError {
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::get_server_id
|
||||
#[derive(Debug, Error)]
|
||||
pub enum GetServerIdError {
|
||||
/// Server ID is not set
|
||||
#[error("Server ID not set")]
|
||||
NoServerId,
|
||||
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::set_serving_readiness
|
||||
#[derive(Debug, Error)]
|
||||
pub enum SetServingReadinessError {
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::create_database
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CreateDatabaseError {
|
||||
|
@ -188,22 +160,6 @@ pub enum ListChunksError {
|
|||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::list_remotes
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ListRemotesError {
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::update_remote
|
||||
#[derive(Debug, Error)]
|
||||
pub enum UpdateRemoteError {
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::create_dummy_job
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CreateDummyJobError {
|
||||
|
@ -681,43 +637,6 @@ impl Client {
|
|||
Ok(response.into_inner().chunks)
|
||||
}
|
||||
|
||||
/// List remotes.
|
||||
pub async fn list_remotes(&mut self) -> Result<Vec<generated_types::Remote>, ListRemotesError> {
|
||||
let response = self
|
||||
.inner
|
||||
.list_remotes(ListRemotesRequest {})
|
||||
.await
|
||||
.map_err(ListRemotesError::ServerError)?;
|
||||
Ok(response.into_inner().remotes)
|
||||
}
|
||||
|
||||
/// Update remote
|
||||
pub async fn update_remote(
|
||||
&mut self,
|
||||
id: u32,
|
||||
connection_string: impl Into<String> + Send,
|
||||
) -> Result<(), UpdateRemoteError> {
|
||||
self.inner
|
||||
.update_remote(UpdateRemoteRequest {
|
||||
remote: Some(generated_types::Remote {
|
||||
id,
|
||||
connection_string: connection_string.into(),
|
||||
}),
|
||||
})
|
||||
.await
|
||||
.map_err(UpdateRemoteError::ServerError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete remote
|
||||
pub async fn delete_remote(&mut self, id: u32) -> Result<(), UpdateRemoteError> {
|
||||
self.inner
|
||||
.delete_remote(DeleteRemoteRequest { id })
|
||||
.await
|
||||
.map_err(UpdateRemoteError::ServerError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List all partitions of the database
|
||||
pub async fn list_partitions(
|
||||
&mut self,
|
||||
|
|
Loading…
Reference in New Issue