feat: add `DeploymentService`

Ref #2980.
pull/24376/head
Marco Neumann 2021-11-01 14:49:59 +01:00
parent 9935f6d855
commit 011af2d6ba
11 changed files with 381 additions and 3 deletions

View File

@ -21,6 +21,7 @@ fn main() -> Result<()> {
/// - `com.github.influxdata.idpe.storage.read.rs`
/// - `influxdata.iox.catalog.v1.rs`
/// - `influxdata.iox.delete.v1.rs`
/// - `influxdata.iox.deployment.v1.rs`
/// - `influxdata.iox.management.v1.rs`
/// - `influxdata.iox.remote.v1.rs`
/// - `influxdata.iox.router.v1.rs`
@ -29,6 +30,7 @@ fn main() -> Result<()> {
fn generate_grpc_types(root: &Path) -> Result<()> {
let catalog_path = root.join("influxdata/iox/catalog/v1");
let delete_path = root.join("influxdata/iox/delete/v1");
let deployment_path = root.join("influxdata/iox/deployment/v1");
let idpe_path = root.join("com/github/influxdata/idpe/storage/read");
let management_path = root.join("influxdata/iox/management/v1");
let remote_path = root.join("influxdata/iox/remote/v1");
@ -41,6 +43,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
catalog_path.join("parquet_metadata.proto"),
catalog_path.join("predicate.proto"),
delete_path.join("service.proto"),
deployment_path.join("service.proto"),
idpe_path.join("source.proto"),
management_path.join("chunk.proto"),
management_path.join("database_rules.proto"),

View File

@ -0,0 +1,45 @@
syntax = "proto3";
package influxdata.iox.deployment.v1;
option go_package = "github.com/influxdata/iox/deployment/v1";
service DeploymentService {
// Get server ID.
rpc GetServerId(GetServerIdRequest) returns (GetServerIdResponse);
// Update server ID.
rpc UpdateServerId(UpdateServerIdRequest) returns (UpdateServerIdResponse);
// Set serving readiness.
rpc SetServingReadiness(SetServingReadinessRequest) returns (SetServingReadinessResponse);
// Get serving readiness.
rpc GetServingReadiness(GetServingReadinessRequest) returns (GetServingReadinessResponse);
}
message GetServerIdRequest {}
message GetServerIdResponse {
// Must be non-zero
uint32 id = 1;
}
message UpdateServerIdRequest {
// Must be non-zero
uint32 id = 1;
}
message UpdateServerIdResponse {}
message SetServingReadinessRequest {
// If false, the IOx server will respond with UNAVAILABLE to all data plane requests.
bool ready = 1;
}
message SetServingReadinessResponse {}
message GetServingReadinessRequest {}
message GetServingReadinessResponse {
// If false, the IOx server will respond with UNAVAILABLE to all data plane requests.
bool ready = 1;
}

View File

@ -9,11 +9,26 @@ import "influxdata/iox/management/v1/chunk.proto";
import "influxdata/iox/management/v1/partition.proto";
service ManagementService {
rpc GetServerId(GetServerIdRequest) returns (GetServerIdResponse);
// Get server ID.
//
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
rpc GetServerId(GetServerIdRequest) returns (GetServerIdResponse) {
option deprecated = true;
};
rpc UpdateServerId(UpdateServerIdRequest) returns (UpdateServerIdResponse);
// Update server ID.
//
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
rpc UpdateServerId(UpdateServerIdRequest) returns (UpdateServerIdResponse) {
option deprecated = true;
};
rpc SetServingReadiness(SetServingReadinessRequest) returns (SetServingReadinessResponse);
// Set serving readiness.
//
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
rpc SetServingReadiness(SetServingReadinessRequest) returns (SetServingReadinessResponse) {
option deprecated = true;
};
// List all databases on this server.
//

View File

@ -48,6 +48,16 @@ pub mod influxdata {
}
}
pub mod deployment {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.deployment.v1.rs"));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.deployment.v1.serde.rs"
));
}
}
pub mod management {
pub mod v1 {
/// Operation metadata type

View File

@ -0,0 +1,85 @@
use data_types::server_id::ServerId;
use generated_types::{
google::{FieldViolation, NotFound},
influxdata::iox::deployment::v1::*,
};
use server::{connection::ConnectionManager, Error, Server};
use std::{convert::TryFrom, fmt::Debug, sync::Arc};
use tonic::{Request, Response, Status};
struct DeploymentService<M: ConnectionManager> {
server: Arc<Server<M>>,
serving_readiness: ServingReadiness,
}
use crate::influxdb_ioxd::rpc::error::default_server_error_handler;
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
#[tonic::async_trait]
impl<M> deployment_service_server::DeploymentService for DeploymentService<M>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
async fn get_server_id(
&self,
_: Request<GetServerIdRequest>,
) -> Result<Response<GetServerIdResponse>, Status> {
match self.server.server_id() {
Some(id) => Ok(Response::new(GetServerIdResponse { id: id.get_u32() })),
None => return Err(NotFound::default().into()),
}
}
async fn update_server_id(
&self,
request: Request<UpdateServerIdRequest>,
) -> Result<Response<UpdateServerIdResponse>, Status> {
let id =
ServerId::try_from(request.get_ref().id).map_err(|_| FieldViolation::required("id"))?;
match self.server.set_id(id) {
Ok(_) => Ok(Response::new(UpdateServerIdResponse {})),
Err(e @ Error::IdAlreadySet) => {
return Err(FieldViolation {
field: "id".to_string(),
description: e.to_string(),
}
.into())
}
Err(e) => Err(default_server_error_handler(e)),
}
}
async fn set_serving_readiness(
&self,
request: Request<SetServingReadinessRequest>,
) -> Result<Response<SetServingReadinessResponse>, Status> {
let SetServingReadinessRequest { ready } = request.into_inner();
self.serving_readiness.set(ready.into());
Ok(Response::new(SetServingReadinessResponse {}))
}
async fn get_serving_readiness(
&self,
_request: Request<GetServingReadinessRequest>,
) -> Result<Response<GetServingReadinessResponse>, Status> {
Ok(Response::new(GetServingReadinessResponse {
ready: self.serving_readiness.get().into(),
}))
}
}
pub fn make_server<M>(
server: Arc<Server<M>>,
serving_readiness: ServingReadiness,
) -> deployment_service_server::DeploymentServiceServer<
impl deployment_service_server::DeploymentService,
>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
deployment_service_server::DeploymentServiceServer::new(DeploymentService {
server,
serving_readiness,
})
}

View File

@ -8,6 +8,7 @@ use crate::influxdb_ioxd::{
};
mod delete;
mod deployment;
mod flight;
mod management;
mod operations;
@ -52,6 +53,13 @@ where
server_type.serving_readiness.clone(),
)
);
add_service!(
builder,
deployment::make_server(
Arc::clone(&server_type.server),
server_type.serving_readiness.clone(),
)
);
add_service!(
builder,
operations::make_server(Arc::clone(server_type.application.job_registry()))

View File

@ -189,6 +189,12 @@ impl ServerFixture {
influxdb_iox_client::delete::Client::new(self.grpc_channel())
}
/// Return a deployment client suitable for communicating with this
/// server
pub fn deployment_client(&self) -> influxdb_iox_client::deployment::Client {
influxdb_iox_client::deployment::Client::new(self.grpc_channel())
}
/// Return an a http client suitable suitable for communicating with this
/// server
pub fn influxdb2_client(&self) -> influxdb2_client::Client {

View File

@ -0,0 +1,64 @@
use influxdb_iox_client::{management::generated_types::DatabaseRules, write::WriteError};
use tonic::Code;
use crate::common::server_fixture::ServerFixture;
#[tokio::test]
async fn test_serving_readiness() {
let server_fixture = ServerFixture::create_single_use().await;
let mut deployment_client = server_fixture.deployment_client();
let mut mgmt_client = server_fixture.management_client();
let mut write_client = server_fixture.write_client();
let name = "foo";
let lp_data = "bar baz=1 10";
deployment_client
.update_server_id(42)
.await
.expect("set ID failed");
server_fixture.wait_server_initialized().await;
mgmt_client
.create_database(DatabaseRules {
name: name.to_string(),
..Default::default()
})
.await
.expect("create database failed");
assert!(deployment_client.get_serving_readiness().await.unwrap());
deployment_client
.set_serving_readiness(false)
.await
.unwrap();
let err = write_client.write(name, lp_data).await.unwrap_err();
assert!(
matches!(&err, WriteError::ServerError(status) if status.code() == Code::Unavailable),
"{}",
&err
);
assert!(!deployment_client.get_serving_readiness().await.unwrap());
deployment_client.set_serving_readiness(true).await.unwrap();
assert!(deployment_client.get_serving_readiness().await.unwrap());
write_client.write(name, lp_data).await.unwrap();
}
#[tokio::test]
async fn test_set_get_writer_id() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = server_fixture.deployment_client();
const TEST_ID: u32 = 42;
client
.update_server_id(TEST_ID)
.await
.expect("set ID failed");
let got = client.get_server_id().await.expect("get ID failed");
assert_eq!(got.get(), TEST_ID);
}

View File

@ -1,6 +1,7 @@
mod debug_cli;
mod delete_api;
mod deletes;
mod deployment_api;
mod flight_api;
mod freeze;
mod http;

View File

@ -4,6 +4,9 @@ pub mod health;
/// Client for delete API
pub mod delete;
/// Client for deployment API
pub mod deployment;
/// Client for management API
pub mod management;

View File

@ -0,0 +1,138 @@
use self::generated_types::{deployment_service_client::DeploymentServiceClient, *};
use crate::connection::Connection;
use std::{convert::TryInto, num::NonZeroU32};
use thiserror::Error;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::deployment::v1::*;
}
/// Errors returned by Client::update_server_id
#[derive(Debug, Error)]
pub enum UpdateServerIdError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::get_server_id
#[derive(Debug, Error)]
pub enum GetServerIdError {
/// Server ID is not set
#[error("Server ID not set")]
NoServerId,
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::set_serving_readiness
#[derive(Debug, Error)]
pub enum SetServingReadinessError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::get_serving_readiness
#[derive(Debug, Error)]
pub enum GetServingReadinessError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// An IOx Deployment API client.
///
/// This client wraps the underlying `tonic` generated client with a
/// more ergonomic interface.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use influxdb_iox_client::{
/// deployment::Client,
/// connection::Builder,
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .unwrap();
///
/// let mut client = Client::new(connection);
///
/// // Update server ID.
/// client
/// .update_server_id(42)
/// .await
/// .expect("could not update server ID");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: DeploymentServiceClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: DeploymentServiceClient::new(channel),
}
}
/// Set the server's ID.
pub async fn update_server_id(&mut self, id: u32) -> Result<(), UpdateServerIdError> {
self.inner
.update_server_id(UpdateServerIdRequest { id })
.await
.map_err(UpdateServerIdError::ServerError)?;
Ok(())
}
/// Get the server's ID.
pub async fn get_server_id(&mut self) -> Result<NonZeroU32, GetServerIdError> {
let response = self
.inner
.get_server_id(GetServerIdRequest {})
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => GetServerIdError::NoServerId,
_ => GetServerIdError::ServerError(status),
})?;
let id = response
.get_ref()
.id
.try_into()
.map_err(|_| GetServerIdError::NoServerId)?;
Ok(id)
}
/// Set serving readiness.
pub async fn set_serving_readiness(
&mut self,
ready: bool,
) -> Result<(), SetServingReadinessError> {
self.inner
.set_serving_readiness(SetServingReadinessRequest { ready })
.await
.map_err(SetServingReadinessError::ServerError)?;
Ok(())
}
/// Get serving readiness.
pub async fn get_serving_readiness(&mut self) -> Result<bool, GetServingReadinessError> {
let response = self
.inner
.get_serving_readiness(GetServingReadinessRequest {})
.await
.map_err(GetServingReadinessError::ServerError)?;
Ok(response.get_ref().ready)
}
}