feat: implement gRPC API and migrate influxdb_iox_client to use it (#853)

* feat: implement gRPC management API

* feat: migrate influxdb_iox_client to use gRPC API

* fix: review comments

* refactor: separate influxdb_iox_client error types

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-03-02 17:51:46 +00:00 committed by GitHub
parent c7c4977e53
commit 51981c92f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 787 additions and 979 deletions

19
Cargo.lock generated
View File

@ -172,19 +172,6 @@ dependencies = [
"wait-timeout",
]
[[package]]
name = "async-compression"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b72c1f1154e234325b50864a349b9c8e56939e266a4c307c0f159812df2f9537"
dependencies = [
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
]
[[package]]
name = "async-stream"
version = "0.3.0"
@ -1587,11 +1574,11 @@ name = "influxdb_iox_client"
version = "0.1.0"
dependencies = [
"arrow_deps",
"data_types",
"futures-util",
"generated_types",
"http",
"hyper",
"rand 0.8.3",
"reqwest",
"serde",
"serde_json",
"thiserror",
@ -2897,7 +2884,6 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0460542b551950620a3648c6aa23318ac6b3cd779114bd873209e6e8b5eb1c34"
dependencies = [
"async-compression",
"base64 0.13.0",
"bytes",
"encoding_rs",
@ -2923,7 +2909,6 @@ dependencies = [
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",

View File

@ -259,6 +259,8 @@ impl MutableBufferConfig {
}
}
// TODO: Remove this when deprecating HTTP API - cannot be used in gRPC as no
// explicit NULL support
impl Default for MutableBufferConfig {
fn default() -> Self {
Self {
@ -388,7 +390,7 @@ pub enum PartitionSort {
impl Default for PartitionSort {
fn default() -> Self {
Self::LastWriteTime
Self::CreatedAtTime
}
}
@ -446,7 +448,7 @@ pub enum Order {
impl Default for Order {
fn default() -> Self {
Self::Asc
Self::Desc
}
}

View File

@ -10,12 +10,12 @@ flight = ["arrow_deps", "serde/derive", "serde_json", "futures-util"]
[dependencies]
# Workspace dependencies, in alphabetical order
arrow_deps = { path = "../arrow_deps", optional = true }
data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
# Crates.io dependencies, in alphabetical order
futures-util = { version = "0.3.1", optional = true }
reqwest = { version = "0.11.0", features = ["gzip", "json"] }
http = "0.2.3"
hyper = "0.14"
serde = "1.0.118"
serde_json = { version = "1.0.44", optional = true }
thiserror = "1.0.23"

View File

@ -1,161 +0,0 @@
use std::time::Duration;
use reqwest::Url;
use crate::Client;
#[cfg(feature = "flight")]
use crate::FlightClient;
/// The default User-Agent header sent by the HTTP client.
pub const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
/// Configure and construct a new [`Client`] instance for using the IOx HTTP
/// API.
///
/// ```
/// # use influxdb_iox_client::ClientBuilder;
/// use std::time::Duration;
///
/// let c = ClientBuilder::default()
/// .timeout(Duration::from_secs(42))
/// .user_agent("my_awesome_client")
/// .build("http://127.0.0.1:8080/");
/// ```
#[derive(Debug)]
pub struct ClientBuilder {
user_agent: String,
connect_timeout: Duration,
timeout: Duration,
}
impl std::default::Default for ClientBuilder {
fn default() -> Self {
Self {
user_agent: USER_AGENT.into(),
connect_timeout: Duration::from_secs(1),
timeout: Duration::from_secs(30),
}
}
}
impl ClientBuilder {
/// Construct the [`Client`] instance using the specified base URL.
pub fn build<T>(self, base_url: T) -> Result<Client, Box<dyn std::error::Error>>
where
T: AsRef<str>,
{
let http = reqwest::ClientBuilder::new()
.user_agent(self.user_agent)
.gzip(true)
.referer(false)
.connect_timeout(self.connect_timeout)
.timeout(self.timeout)
.build()
.map_err(Box::new)?;
// Construct a base URL.
//
// This MUST end in a trailing slash, otherwise the last portion of the
// path is interpreted as being a filename and removed when joining
// paths to it. This assumes the user is specifying a URL to the
// endpoint, and not a file path and as a result provides the same
// behaviour to the user with and without the slash (avoiding some
// confusion!)
let base: Url = format!("{}/", base_url.as_ref().trim_end_matches('/')).parse()?;
if base.cannot_be_a_base() {
// This is the case if the scheme and : delimiter are not followed
// by a / slash, as is typically the case of data: and mailto: URLs.
return Err(format!("endpoint URL {} is invalid", base).into());
}
Ok(Client { http, base })
}
/// Set the `User-Agent` header sent by this client.
pub fn user_agent(self, user_agent: impl Into<String>) -> Self {
Self {
user_agent: user_agent.into(),
..self
}
}
/// Sets the maximum duration of time the client will wait for the IOx
/// server to accept the TCP connection before aborting the request.
///
/// Note this does not bound the request duration - see
/// [`timeout`][Self::timeout].
pub fn connect_timeout(self, timeout: Duration) -> Self {
Self {
connect_timeout: timeout,
..self
}
}
/// Bounds the total amount of time a single client HTTP request take before
/// being aborted.
///
/// This timeout includes:
///
/// - Establishing the TCP connection (see [`connect_timeout`])
/// - Sending the HTTP request
/// - Waiting for, and receiving the entire HTTP response
///
/// [`connect_timeout`]: Self::connect_timeout
pub fn timeout(self, timeout: Duration) -> Self {
Self { timeout, ..self }
}
}
#[cfg(feature = "flight")]
#[derive(Debug)]
/// Configure and construct a new [`FlightClient`] instance for using the IOx
/// Arrow Flight API.
///
/// ```
/// # use influxdb_iox_client::FlightClientBuilder;
///
/// let c = FlightClientBuilder::default()
/// .build("http://127.0.0.1:8080/");
/// ```
pub struct FlightClientBuilder {}
#[cfg(feature = "flight")]
impl Default for FlightClientBuilder {
fn default() -> Self {
Self {}
}
}
#[cfg(feature = "flight")]
impl FlightClientBuilder {
/// Construct the [`FlightClient`] instance using the specified URL to the
/// server and port where the Arrow Flight API is available.
pub async fn build<T>(self, flight_url: T) -> Result<FlightClient, Box<dyn std::error::Error>>
where
T: std::convert::TryInto<tonic::transport::Endpoint>,
T::Error: Into<tonic::codegen::StdError>,
{
Ok(FlightClient::connect(flight_url).await.map_err(Box::new)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_base_url() {
let c = ClientBuilder::default()
.build("http://127.0.0.1/proxy")
.unwrap();
assert_eq!(c.base.as_str(), "http://127.0.0.1/proxy/");
let c = ClientBuilder::default()
.build("http://127.0.0.1/proxy/")
.unwrap();
assert_eq!(c.base.as_str(), "http://127.0.0.1/proxy/");
}
}

View File

@ -1,370 +1,9 @@
use std::num::NonZeroU32;
use data_types::database_rules::DatabaseRules;
use reqwest::{Method, Url};
use crate::errors::{ClientError, CreateDatabaseError, Error, ServerErrorResponse};
use data_types::{http::ListDatabasesResponse, DatabaseName};
#[cfg(feature = "flight")]
mod flight;
/// Client for the gRPC health checking API
pub mod health;
// can't combine these into one statement that uses `{}` because of this bug in
// the `unreachable_pub` lint: https://github.com/rust-lang/rust/issues/64762
/// Client for the management API
pub mod management;
#[cfg(feature = "flight")]
pub use flight::FlightClient;
#[cfg(feature = "flight")]
pub use flight::PerformQuery;
// TODO: move DatabaseRules / WriterId to the API client
/// An IOx HTTP API client.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use data_types::database_rules::DatabaseRules;
/// use influxdb_iox_client::ClientBuilder;
///
/// let client = ClientBuilder::default()
/// .build("http://127.0.0.1:8080")
/// .unwrap();
///
/// // Ping the IOx server
/// client.ping().await.expect("server is down :(");
///
/// // Create a new database!
/// client
/// .create_database("bananas", &DatabaseRules::new())
/// .await
/// .expect("failed to create database");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
pub(crate) http: reqwest::Client,
/// The base URL to which request paths are joined.
///
/// A base path of:
///
/// ```text
/// https://www.influxdata.com/maybe-proxy/
/// ```
///
/// Joined with a request path of `/a/reg/` would result in:
///
/// ```text
/// https://www.influxdata.com/maybe-proxy/a/req/
/// ```
///
/// Paths joined to this `base` MUST be relative to be appended to the base
/// path. Absolute paths joined to `base` are still absolute.
pub(crate) base: Url,
}
impl std::default::Default for Client {
fn default() -> Self {
crate::ClientBuilder::default()
.build("http://127.0.0.1:8080")
.expect("default client builder is invalid")
}
}
impl Client {
/// Ping the IOx server, checking for a HTTP 200 response.
pub async fn ping(&self) -> Result<(), Error> {
const PING_PATH: &str = "ping";
let r = self
.http
.request(Method::GET, self.url_for(PING_PATH))
.send()
.await?;
match r {
r if r.status() == 200 => Ok(()),
r => Err(ServerErrorResponse::from_response(r).await.into()),
}
}
/// Creates a new IOx database.
pub async fn create_database(
&self,
name: impl AsRef<str>,
rules: &DatabaseRules,
) -> Result<(), CreateDatabaseError> {
let url = self.db_url(name.as_ref())?;
let r = self
.http
.request(Method::PUT, url)
.json(rules)
.send()
.await?;
// Filter out the good states, and convert all others into errors.
match r {
r if r.status() == 200 => Ok(()),
r => Err(ServerErrorResponse::from_response(r).await.into()),
}
}
/// Set the server's writer ID.
pub async fn set_writer_id(&self, id: NonZeroU32) -> Result<(), Error> {
const SET_WRITER_PATH: &str = "iox/api/v1/id";
let url = self.url_for(SET_WRITER_PATH);
// TODO: move this into a shared type
#[derive(serde::Serialize)]
struct WriterIdBody {
id: u32,
}
let r = self
.http
.request(Method::PUT, url)
.json(&WriterIdBody { id: id.get() })
.send()
.await?;
match r {
r if r.status() == 200 => Ok(()),
r => Err(ServerErrorResponse::from_response(r).await.into()),
}
}
/// Get the server's writer ID.
pub async fn get_writer_id(&self) -> Result<u32, Error> {
const GET_WRITER_PATH: &str = "iox/api/v1/id";
let url = self.url_for(GET_WRITER_PATH);
// TODO: move this into a shared type
#[derive(serde::Deserialize)]
struct WriterIdBody {
id: u32,
}
let r = self.http.request(Method::GET, url).send().await?;
match r {
r if r.status() == 200 => Ok(r.json::<WriterIdBody>().await?.id),
r => Err(ServerErrorResponse::from_response(r).await.into()),
}
}
/// List databases.
pub async fn list_databases(&self) -> Result<ListDatabasesResponse, Error> {
const LIST_DATABASES_PATH: &str = "iox/api/v1/databases";
let url = self.url_for(LIST_DATABASES_PATH);
let r = self.http.request(Method::GET, url).send().await?;
match r {
r if r.status() == 200 => Ok(r.json::<ListDatabasesResponse>().await?),
r => Err(ServerErrorResponse::from_response(r).await.into()),
}
}
/// Build the request path for relative `path`.
///
/// # Safety
///
/// Panics in debug builds if `path` contains an absolute path.
fn url_for(&self, path: &str) -> Url {
// In non-release builds, assert the path is not an absolute path.
//
// Paths should be relative so the full base path is used.
debug_assert_ne!(
path.chars().next().unwrap(),
'/',
"should not join absolute paths to base URL"
);
self.base
.join(path)
.expect("failed to construct request URL")
}
fn db_url(&self, database: &str) -> Result<Url, ClientError> {
const DB_PATH: &str = "iox/api/v1/databases/";
// Perform validation in the client as URL parser silently drops invalid
// characters
let name = DatabaseName::new(database).map_err(|_| ClientError::InvalidDatabaseName)?;
self.url_for(DB_PATH)
.join(name.as_ref())
.map_err(|_| ClientError::InvalidDatabaseName)
}
}
#[cfg(test)]
mod tests {
use crate::ClientBuilder;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use super::*;
/// If `TEST_IOX_ENDPOINT` is set, load the value and return it to the
/// caller.
///
/// If `TEST_IOX_ENDPOINT` is not set, skip the calling test by returning
/// early. Additionally if `TEST_INTEGRATION` is set, turn this early return
/// into a panic to force a hard fail for skipped integration tests.
macro_rules! maybe_skip_integration {
() => {
match (
std::env::var("TEST_IOX_ENDPOINT").is_ok(),
std::env::var("TEST_INTEGRATION").is_ok(),
) {
(true, _) => std::env::var("TEST_IOX_ENDPOINT").unwrap(),
(false, true) => {
panic!("TEST_INTEGRATION is set which requires running integration tests, but TEST_IOX_ENDPOINT is not")
}
_ => {
eprintln!("skipping integration test - set TEST_IOX_ENDPOINT to run");
return;
}
}
};
}
#[tokio::test]
async fn test_ping() {
let endpoint = maybe_skip_integration!();
let c = ClientBuilder::default().build(endpoint).unwrap();
c.ping().await.expect("ping failed");
}
#[tokio::test]
async fn test_set_get_writer_id() {
const TEST_ID: u32 = 42;
let endpoint = maybe_skip_integration!();
let c = ClientBuilder::default().build(endpoint).unwrap();
c.set_writer_id(NonZeroU32::new(TEST_ID).unwrap())
.await
.expect("set ID failed");
let got = c.get_writer_id().await.expect("get ID failed");
assert_eq!(got, TEST_ID);
}
#[tokio::test]
async fn test_create_database() {
let endpoint = maybe_skip_integration!();
let c = ClientBuilder::default().build(endpoint).unwrap();
c.set_writer_id(NonZeroU32::new(42).unwrap())
.await
.expect("set ID failed");
c.create_database(rand_name(), &DatabaseRules::new())
.await
.expect("create database failed");
}
#[tokio::test]
async fn test_create_database_duplicate_name() {
let endpoint = maybe_skip_integration!();
let c = ClientBuilder::default().build(endpoint).unwrap();
c.set_writer_id(NonZeroU32::new(42).unwrap())
.await
.expect("set ID failed");
let db_name = rand_name();
c.create_database(db_name.clone(), &DatabaseRules::new())
.await
.expect("create database failed");
let err = c
.create_database(db_name, &DatabaseRules::new())
.await
.expect_err("create database failed");
assert!(matches!(dbg!(err), CreateDatabaseError::AlreadyExists))
}
#[tokio::test]
async fn test_create_database_invalid_name() {
let endpoint = maybe_skip_integration!();
let c = ClientBuilder::default().build(endpoint).unwrap();
c.set_writer_id(NonZeroU32::new(42).unwrap())
.await
.expect("set ID failed");
let err = c
.create_database("my_example\ndb", &DatabaseRules::new())
.await
.expect_err("expected request to fail");
assert!(matches!(
dbg!(err),
CreateDatabaseError::ClientError(ClientError::InvalidDatabaseName)
));
}
#[tokio::test]
async fn test_list_databases() {
let endpoint = maybe_skip_integration!();
let c = ClientBuilder::default().build(endpoint).unwrap();
c.set_writer_id(NonZeroU32::new(42).unwrap())
.await
.expect("set ID failed");
let name = rand_name();
c.create_database(&name, &DatabaseRules::default())
.await
.expect("create database failed");
let r = c.list_databases().await.expect("list databases failed");
assert!(r.names.contains(&name));
}
#[test]
fn test_default() {
// Ensures the Default impl does not panic
let c = Client::default();
assert_eq!(c.base.as_str(), "http://127.0.0.1:8080/");
}
#[test]
fn test_paths() {
let c = ClientBuilder::default()
.build("http://127.0.0.2:8081/proxy")
.unwrap();
assert_eq!(
c.url_for("bananas").as_str(),
"http://127.0.0.2:8081/proxy/bananas"
);
}
#[test]
#[should_panic(expected = "absolute paths")]
fn test_absolute_path_panics() {
let c = ClientBuilder::default()
.build("http://127.0.0.2:8081/proxy")
.unwrap();
c.url_for("/bananas");
}
fn rand_name() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect()
}
}
/// Client for the flight API
pub mod flight;

View File

@ -1,3 +1,10 @@
use std::{convert::TryFrom, sync::Arc};
use futures_util::stream::StreamExt;
use serde::Serialize;
use thiserror::Error;
use tonic::Streaming;
use arrow_deps::{
arrow::{
array::Array,
@ -10,26 +17,56 @@ use arrow_deps::{
Ticket,
},
};
use futures_util::stream::StreamExt;
use serde::Serialize;
use std::{convert::TryFrom, sync::Arc};
use tonic::Streaming;
use crate::errors::{GrpcError, GrpcQueryError};
use crate::connection::Connection;
/// Error responses when querying an IOx database using the Arrow Flight gRPC
/// API.
#[derive(Debug, Error)]
pub enum Error {
/// An error occurred while serializing the query.
#[error(transparent)]
QuerySerializeError(#[from] serde_json::Error),
/// There were no FlightData messages returned when we expected to get one
/// containing a Schema.
#[error("no FlightData containing a Schema returned")]
NoSchema,
/// An error involving an Arrow operation occurred.
#[error(transparent)]
ArrowError(#[from] arrow_deps::arrow::error::ArrowError),
/// The data contained invalid Flatbuffers.
#[error("Invalid Flatbuffer: `{0}`")]
InvalidFlatbuffer(String),
/// The message header said it was a dictionary batch, but interpreting the
/// message as a dictionary batch returned `None`. Indicates malformed
/// Flight data from the server.
#[error("Message with header of type dictionary batch could not return a dictionary batch")]
CouldNotGetDictionaryBatch,
/// An unknown server error occurred. Contains the `tonic::Status` returned
/// from the server.
#[error(transparent)]
GrpcError(#[from] tonic::Status),
}
/// An IOx Arrow Flight gRPC API client.
///
/// ```rust,no_run
/// #[tokio::main]
/// # async fn main() {
/// use data_types::database_rules::DatabaseRules;
/// use influxdb_iox_client::FlightClientBuilder;
/// use influxdb_iox_client::{connection::Builder, flight::Client};
///
/// let mut client = FlightClientBuilder::default()
/// let connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .expect("client should be valid");
///
/// let mut client = Client::new(connection);
///
/// let mut query_results = client
/// .perform_query("my_database", "select * from cpu_load")
/// .await
@ -43,19 +80,16 @@ use crate::errors::{GrpcError, GrpcQueryError};
/// # }
/// ```
#[derive(Debug)]
pub struct FlightClient {
inner: FlightServiceClient<tonic::transport::Channel>,
pub struct Client {
inner: FlightServiceClient<Connection>,
}
impl FlightClient {
pub(crate) async fn connect<D>(dst: D) -> Result<Self, GrpcError>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<tonic::codegen::StdError>,
{
Ok(Self {
inner: FlightServiceClient::connect(dst).await?,
})
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: FlightServiceClient::new(channel),
}
}
/// Query the given database with the given SQL query, and return a
@ -64,7 +98,7 @@ impl FlightClient {
&mut self,
database_name: impl Into<String>,
sql_query: impl Into<String>,
) -> Result<PerformQuery, GrpcQueryError> {
) -> Result<PerformQuery, Error> {
PerformQuery::new(self, database_name.into(), sql_query.into()).await
}
}
@ -88,10 +122,10 @@ pub struct PerformQuery {
impl PerformQuery {
pub(crate) async fn new(
flight: &mut FlightClient,
flight: &mut Client,
database_name: String,
sql_query: String,
) -> Result<Self, GrpcQueryError> {
) -> Result<Self, Error> {
let query = ReadInfo {
database_name,
sql_query,
@ -102,7 +136,7 @@ impl PerformQuery {
};
let mut response = flight.inner.do_get(t).await?.into_inner();
let flight_data_schema = response.next().await.ok_or(GrpcQueryError::NoSchema)??;
let flight_data_schema = response.next().await.ok_or(Error::NoSchema)??;
let schema = Arc::new(Schema::try_from(&flight_data_schema)?);
let dictionaries_by_field = vec![None; schema.fields().len()];
@ -116,7 +150,7 @@ impl PerformQuery {
/// Returns the next `RecordBatch` available for this query, or `None` if
/// there are no further results available.
pub async fn next(&mut self) -> Result<Option<RecordBatch>, GrpcQueryError> {
pub async fn next(&mut self) -> Result<Option<RecordBatch>, Error> {
let Self {
schema,
dictionaries_by_field,
@ -129,14 +163,14 @@ impl PerformQuery {
};
let mut message = ipc::root_as_message(&data.data_header[..])
.map_err(|e| GrpcQueryError::InvalidFlatbuffer(e.to_string()))?;
.map_err(|e| Error::InvalidFlatbuffer(e.to_string()))?;
while message.header_type() == ipc::MessageHeader::DictionaryBatch {
reader::read_dictionary(
&data.data_body,
message
.header_as_dictionary_batch()
.ok_or(GrpcQueryError::CouldNotGetDictionaryBatch)?,
.ok_or(Error::CouldNotGetDictionaryBatch)?,
&schema,
dictionaries_by_field,
)?;
@ -147,7 +181,7 @@ impl PerformQuery {
};
message = ipc::root_as_message(&data.data_header[..])
.map_err(|e| GrpcQueryError::InvalidFlatbuffer(e.to_string()))?;
.map_err(|e| Error::InvalidFlatbuffer(e.to_string()))?;
}
Ok(Some(flight_data_to_arrow_batch(

View File

@ -1,6 +1,9 @@
use generated_types::grpc::health::v1::*;
use thiserror::Error;
use generated_types::grpc::health::v1::*;
use crate::connection::Connection;
/// Error type for the health check client
#[derive(Debug, Error)]
pub enum Error {
@ -29,19 +32,15 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Allows checking the status of a given service
#[derive(Debug)]
pub struct Client {
inner: health_client::HealthClient<tonic::transport::Channel>,
inner: health_client::HealthClient<Connection>,
}
impl Client {
/// Create a new client with the provided endpoint
pub async fn connect<D>(dst: D) -> Result<Self>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<tonic::codegen::StdError>,
{
Ok(Self {
inner: health_client::HealthClient::connect(dst).await?,
})
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: health_client::HealthClient::new(channel),
}
}
/// Returns `Ok()` if the corresponding service is serving

View File

@ -0,0 +1,201 @@
use std::num::NonZeroU32;
use thiserror::Error;
use self::generated_types::{management_service_client::ManagementServiceClient, *};
use crate::connection::Connection;
use std::convert::TryInto;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::management::v1::*;
}
/// Errors returned by Client::update_writer_id
#[derive(Debug, Error)]
pub enum UpdateWriterIdError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::get_writer_id
#[derive(Debug, Error)]
pub enum GetWriterIdError {
/// Writer ID is not set
#[error("Writer ID not set")]
NoWriterId,
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::create_database
#[derive(Debug, Error)]
pub enum CreateDatabaseError {
/// Writer ID is not set
#[error("Writer ID not set")]
NoWriterId,
/// Database already exists
#[error("Database already exists")]
DatabaseAlreadyExists,
/// Server returned an invalid argument error
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
InvalidArgument(tonic::Status),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::list_databases
#[derive(Debug, Error)]
pub enum ListDatabaseError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::get_database
#[derive(Debug, Error)]
pub enum GetDatabaseError {
/// Writer ID is not set
#[error("Writer ID not set")]
NoWriterId,
/// Database not found
#[error("Database not found")]
DatabaseNotFound,
/// Response contained no payload
#[error("Server returned an empty response")]
EmptyResponse,
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// An IOx Management API client.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use influxdb_iox_client::{
/// management::{Client, generated_types::DatabaseRules},
/// connection::Builder,
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .unwrap();
///
/// let mut client = Client::new(connection);
///
/// // Create a new database!
/// client
/// .create_database(DatabaseRules{
/// name: "bananas".to_string(),
/// ..Default::default()
/// })
/// .await
/// .expect("failed to create database");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: ManagementServiceClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: tonic::transport::Channel) -> Self {
Self {
inner: ManagementServiceClient::new(channel),
}
}
/// Set the server's writer ID.
pub async fn update_writer_id(&mut self, id: NonZeroU32) -> Result<(), UpdateWriterIdError> {
self.inner
.update_writer_id(UpdateWriterIdRequest { id: id.into() })
.await
.map_err(UpdateWriterIdError::ServerError)?;
Ok(())
}
/// Get the server's writer ID.
pub async fn get_writer_id(&mut self) -> Result<NonZeroU32, GetWriterIdError> {
let response = self
.inner
.get_writer_id(GetWriterIdRequest {})
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => GetWriterIdError::NoWriterId,
_ => GetWriterIdError::ServerError(status),
})?;
let id = response
.get_ref()
.id
.try_into()
.map_err(|_| GetWriterIdError::NoWriterId)?;
Ok(id)
}
/// Creates a new IOx database.
pub async fn create_database(
&mut self,
rules: DatabaseRules,
) -> Result<(), CreateDatabaseError> {
self.inner
.create_database(CreateDatabaseRequest { rules: Some(rules) })
.await
.map_err(|status| match status.code() {
tonic::Code::AlreadyExists => CreateDatabaseError::DatabaseAlreadyExists,
tonic::Code::FailedPrecondition => CreateDatabaseError::NoWriterId,
tonic::Code::InvalidArgument => CreateDatabaseError::InvalidArgument(status),
_ => CreateDatabaseError::ServerError(status),
})?;
Ok(())
}
/// List databases.
pub async fn list_databases(&mut self) -> Result<Vec<String>, ListDatabaseError> {
let response = self
.inner
.list_databases(ListDatabasesRequest {})
.await
.map_err(ListDatabaseError::ServerError)?;
Ok(response.into_inner().names)
}
/// Get database configuration
pub async fn get_database(
&mut self,
name: impl Into<String>,
) -> Result<DatabaseRules, GetDatabaseError> {
let response = self
.inner
.get_database(GetDatabaseRequest { name: name.into() })
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => GetDatabaseError::DatabaseNotFound,
tonic::Code::FailedPrecondition => GetDatabaseError::NoWriterId,
_ => GetDatabaseError::ServerError(status),
})?;
let rules = response
.into_inner()
.rules
.ok_or(GetDatabaseError::EmptyResponse)?;
Ok(rules)
}
}

View File

@ -0,0 +1,123 @@
use http::{uri::InvalidUri, Uri};
use std::convert::TryInto;
use std::time::Duration;
use thiserror::Error;
use tonic::transport::Endpoint;
/// The connection type used for clients
pub type Connection = tonic::transport::Channel;
/// The default User-Agent header sent by the HTTP client.
pub const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
/// The default connection timeout
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
/// The default request timeout
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
/// Errors returned by the ConnectionBuilder
#[derive(Debug, Error)]
pub enum Error {
/// Server returned an invalid argument error
#[error("Connection error: {}", .0)]
TransportError(#[from] tonic::transport::Error),
/// Client received an unexpected error from the server
#[error("Invalid URI: {}", .0)]
InvalidUri(#[from] InvalidUri),
}
/// Result type for the ConnectionBuilder
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A builder that produces a connection that can be used with any of the gRPC
/// clients
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use influxdb_iox_client::{connection::Builder, management::Client};
/// use std::time::Duration;
///
/// let connection = Builder::default()
/// .timeout(Duration::from_secs(42))
/// .user_agent("my_awesome_client")
/// .build("http://127.0.0.1:8082/")
/// .await
/// .expect("connection must succeed");
///
/// let client = Client::new(connection);
/// # }
/// ```
#[derive(Debug)]
pub struct Builder {
user_agent: String,
connect_timeout: Duration,
timeout: Duration,
}
impl std::default::Default for Builder {
fn default() -> Self {
Self {
user_agent: USER_AGENT.into(),
connect_timeout: DEFAULT_CONNECT_TIMEOUT,
timeout: DEFAULT_TIMEOUT,
}
}
}
impl Builder {
/// Construct the [`Client`] instance using the specified base URL.
pub async fn build<D>(self, dst: D) -> Result<Connection>
where
D: TryInto<Uri, Error = InvalidUri>,
{
let endpoint = Endpoint::from(dst.try_into()?)
.user_agent(self.user_agent)?
.timeout(self.timeout);
// Manually construct connector to workaround https://github.com/hyperium/tonic/issues/498
let mut connector = hyper::client::HttpConnector::new();
connector.set_connect_timeout(Some(self.connect_timeout));
// Defaults from from tonic::channel::Endpoint
connector.enforce_http(false);
connector.set_nodelay(true);
connector.set_keepalive(None);
Ok(endpoint.connect_with_connector(connector).await?)
}
/// Set the `User-Agent` header sent by this client.
pub fn user_agent(self, user_agent: impl Into<String>) -> Self {
Self {
user_agent: user_agent.into(),
..self
}
}
/// Sets the maximum duration of time the client will wait for the IOx
/// server to accept the TCP connection before aborting the request.
///
/// Note this does not bound the request duration - see
/// [`timeout`][Self::timeout].
pub fn connect_timeout(self, timeout: Duration) -> Self {
Self {
connect_timeout: timeout,
..self
}
}
/// Bounds the total amount of time a single client HTTP request take before
/// being aborted.
///
/// This timeout includes:
///
/// - Establishing the TCP connection (see [`connect_timeout`])
/// - Sending the HTTP request
/// - Waiting for, and receiving the entire HTTP response
///
/// [`connect_timeout`]: Self::connect_timeout
pub fn timeout(self, timeout: Duration) -> Self {
Self { timeout, ..self }
}
}

View File

@ -1,9 +0,0 @@
use thiserror::Error;
/// Error responses when creating a new IOx database.
#[derive(Debug, Error)]
pub enum ClientError {
/// The database name contains an invalid character.
#[error("the database name contains an invalid character")]
InvalidDatabaseName,
}

View File

@ -1,51 +0,0 @@
use thiserror::Error;
use super::{ApiErrorCode, ClientError, HttpError, ServerErrorResponse};
/// Error responses when creating a new IOx database.
#[derive(Debug, Error)]
pub enum CreateDatabaseError {
/// The database name contains an invalid character.
#[error("the database name contains an invalid character")]
InvalidName,
/// The database being created already exists.
#[error("a database with the requested name already exists")]
AlreadyExists,
/// An unknown server error occurred.
///
/// The error string contains the error string returned by the server.
#[error(transparent)]
ServerError(ServerErrorResponse),
/// A non-application HTTP request/response error occurred.
#[error(transparent)]
HttpError(#[from] HttpError),
/// An error occurred in the client.
#[error(transparent)]
ClientError(#[from] ClientError),
}
/// Convert a [`ServerErrorResponse`] into a [`CreateDatabaseError`].
///
/// This conversion plucks any errors with API error codes that are applicable
/// to [`CreateDatabaseError`] types, and everything else becomes a
/// `ServerError`.
impl From<ServerErrorResponse> for CreateDatabaseError {
fn from(err: ServerErrorResponse) -> Self {
match err.error_code() {
Some(c) if c == ApiErrorCode::DB_INVALID_NAME as u32 => Self::InvalidName,
Some(c) if c == ApiErrorCode::DB_ALREADY_EXISTS as u32 => Self::AlreadyExists,
_ => Self::ServerError(err),
}
}
}
/// Convert errors from the underlying HTTP client into `HttpError` instances.
impl From<reqwest::Error> for CreateDatabaseError {
fn from(err: reqwest::Error) -> Self {
Self::HttpError(err.into())
}
}

View File

@ -1,25 +0,0 @@
/// A gRPC request error.
///
/// This is a non-application level error returned when a gRPC request to the
/// IOx server has failed.
#[derive(Debug)]
pub struct GrpcError(tonic::transport::Error);
impl std::fmt::Display for GrpcError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for GrpcError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}
/// Convert errors from the underlying gRPC client into `GrpcError` instances.
impl From<tonic::transport::Error> for GrpcError {
fn from(v: tonic::transport::Error) -> Self {
Self(v)
}
}

View File

@ -1,34 +0,0 @@
use thiserror::Error;
/// Error responses when querying an IOx database using the Arrow Flight gRPC
/// API.
#[derive(Debug, Error)]
pub enum GrpcQueryError {
/// An error occurred while serializing the query.
#[error(transparent)]
QuerySerializeError(#[from] serde_json::Error),
/// There were no FlightData messages returned when we expected to get one
/// containing a Schema.
#[error("no FlightData containing a Schema returned")]
NoSchema,
/// An error involving an Arrow operation occurred.
#[error(transparent)]
ArrowError(#[from] arrow_deps::arrow::error::ArrowError),
/// The data contained invalid Flatbuffers.
#[error("Invalid Flatbuffer: `{0}`")]
InvalidFlatbuffer(String),
/// The message header said it was a dictionary batch, but interpreting the
/// message as a dictionary batch returned `None`. Indicates malformed
/// Flight data from the server.
#[error("Message with header of type dictionary batch could not return a dictionary batch")]
CouldNotGetDictionaryBatch,
/// An unknown server error occurred. Contains the `tonic::Status` returned
/// from the server.
#[error(transparent)]
GrpcError(#[from] tonic::Status),
}

View File

@ -1,33 +0,0 @@
/// An HTTP request error.
///
/// This is a non-application level error returned when an HTTP request to the
/// IOx server has failed.
#[derive(Debug)]
pub struct HttpError(reqwest::Error);
// This wrapper type decouples the underlying HTTP client, ensuring the HTTP
// library error doesn't become part of the public API. This makes bumping the
// internal http client version / swapping it for something else a backwards
// compatible change.
//
// The reqwest error isn't exactly full of useful information beyond the Display
// impl (which this wrapper passes through) anyway.
impl std::fmt::Display for HttpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for HttpError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}
/// Convert errors from the underlying HTTP client into `HttpError` instances.
impl From<reqwest::Error> for HttpError {
fn from(v: reqwest::Error) -> Self {
Self(v)
}
}

View File

@ -1,91 +0,0 @@
//! Error types returned by a client.
//!
//! A request to an IOx server can fail in three main ways:
//!
//! - an HTTP transport error (network error)
//! - a known API handler error response
//! - an unknown error returned by the server
//!
//! The first is converted into a [`HttpError`] and contains details of the
//! failed request.
//!
//! The second case is when the "business logic" of the API handler returns a
//! defined, meaningful error to the client. Examples of this include "the
//! database name is invalid" or "the database already exists". These are mapped
//! to per-handler error types (see [`CreateDatabaseError`] as an example).
//!
//! The last case is a generic error returned by the IOx server. These become
//! [`ServerErrorResponse`] instances and contain the error string, optional
//! error code and HTTP status code sent by the server.
//!
//! If using the Arrow Flight API, errors from gRPC requests will be converted
//! into a [`GrpcError`] containing details of the failed request.
use thiserror::Error;
mod http_error;
pub use http_error::*;
mod client_error;
pub use client_error::*;
mod server_error_response;
pub use server_error_response::*;
mod create_database;
pub use create_database::*;
#[cfg(feature = "flight")]
mod grpc_error;
#[cfg(feature = "flight")]
pub use grpc_error::*;
#[cfg(feature = "flight")]
mod grpc_query_error;
#[cfg(feature = "flight")]
pub use grpc_query_error::*;
/// Constants used in API error codes.
///
/// Expressing this as a enum prevents reuse of discriminants, and as they're
/// effectively consts this uses UPPER_SNAKE_CASE.
#[allow(non_camel_case_types)]
#[derive(Debug, PartialEq)]
pub enum ApiErrorCode {
/// An unknown/unhandled error
UNKNOWN = 100,
/// The database name in the request is invalid.
DB_INVALID_NAME = 101,
/// The database referenced already exists.
DB_ALREADY_EXISTS = 102,
/// The database referenced does not exist.
DB_NOT_FOUND = 103,
}
impl From<ApiErrorCode> for u32 {
fn from(v: ApiErrorCode) -> Self {
v as u32
}
}
/// `Error` defines the generic error type for API methods that do not have
/// specific error types.
#[derive(Debug, Error)]
pub enum Error {
/// A non-application HTTP request/response error has occurred.
#[error("http request/response error: {0}")]
HttpError(#[from] HttpError),
/// The IOx server has responded with an error.
#[error("error response from server: {0}")]
ServerError(#[from] ServerErrorResponse),
}
/// Convert errors from the underlying HTTP client into `HttpError` instances.
impl From<reqwest::Error> for Error {
fn from(err: reqwest::Error) -> Self {
Self::HttpError(err.into())
}
}

View File

@ -1,83 +0,0 @@
use reqwest::Response;
/// A generic error message from the IOx server.
///
/// All IOx error responses are first deserialised into this type. API methods
/// that have specific error types pick out known error codes (using
/// [`ServerErrorResponse::error_code()`]) and map them to their respective
/// error instances to provide more context to the caller.
///
/// API methods without specific error types freely return this type as a
/// generic "server error" response.
#[derive(Debug, serde::Deserialize)]
pub struct ServerErrorResponse {
error: String,
error_code: Option<u32>,
http_status: Option<u16>,
}
impl std::fmt::Display for ServerErrorResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(
format!(
"{} (HTTP status {:?}, IOx error code {:?})",
&self.error, &self.http_status, &self.error_code,
)
.as_str(),
)
}
}
impl std::error::Error for ServerErrorResponse {}
impl ServerErrorResponse {
/// Try and parse a JSON "error" field from the [reqwest::Response].
pub(crate) async fn from_response(r: Response) -> Self {
let status = r.status().as_u16();
match r.json::<ServerErrorResponse>().await {
Ok(e) => e,
Err(e) => Self {
error: format!("error decoding JSON body: {}", e),
error_code: None,
http_status: Some(status),
},
}
}
/// Return the IOx error code in the response, if any.
pub fn error_code(&self) -> Option<u32> {
self.error_code
}
/// Return the HTTP status code sent by the server.
pub fn http_status_code(&self) -> Option<u16> {
self.http_status
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_generic_error_response() {
let body = serde_json::json!({"error": "something terrible", "error_code": 42}).to_string();
let response: ServerErrorResponse = serde_json::from_str(&body).unwrap();
assert_eq!(response.error, "something terrible");
assert_eq!(response.error_code, Some(42));
assert_eq!(response.http_status, None);
}
#[test]
fn test_parse_generic_error_no_code() {
let body = serde_json::json!({"error": "something terrible"}).to_string();
let response: ServerErrorResponse = serde_json::from_str(&body).unwrap();
assert_eq!(response.error, "something terrible");
assert_eq!(response.error_code, None);
assert_eq!(response.http_status, None);
}
}

View File

@ -8,17 +8,9 @@
)]
#![allow(clippy::missing_docs_in_private_items)]
mod builder;
pub use builder::*;
pub use client::{flight, health, management};
/// Builder for constructing connections for use with the various gRPC clients
pub mod connection;
mod client;
pub use client::*;
// can't combine these into one statement that uses `{}` because of this bug in
// the `unreachable_pub` lint: https://github.com/rust-lang/rust/issues/64762
#[cfg(feature = "flight")]
pub use client::FlightClient;
#[cfg(feature = "flight")]
pub use client::PerformQuery;
pub mod errors;

View File

@ -39,6 +39,32 @@ use std::{fmt::Debug, str, sync::Arc};
mod format;
use format::QueryOutputFormat;
/// Constants used in API error codes.
///
/// Expressing this as a enum prevents reuse of discriminants, and as they're
/// effectively consts this uses UPPER_SNAKE_CASE.
#[allow(non_camel_case_types)]
#[derive(Debug, PartialEq)]
pub enum ApiErrorCode {
/// An unknown/unhandled error
UNKNOWN = 100,
/// The database name in the request is invalid.
DB_INVALID_NAME = 101,
/// The database referenced already exists.
DB_ALREADY_EXISTS = 102,
/// The database referenced does not exist.
DB_NOT_FOUND = 103,
}
impl From<ApiErrorCode> for u32 {
fn from(v: ApiErrorCode) -> Self {
v as Self
}
}
#[derive(Debug, Snafu)]
pub enum ApplicationError {
// Internal (unexpected) errors
@ -237,8 +263,6 @@ impl ApplicationError {
/// Map the error type into an API error code.
fn api_error_code(&self) -> u32 {
use influxdb_iox_client::errors::ApiErrorCode;
match self {
Self::DatabaseNameError { .. } => ApiErrorCode::DB_INVALID_NAME,
Self::DatabaseNotFound { .. } => ApiErrorCode::DB_NOT_FOUND,

View File

@ -9,6 +9,7 @@ use data_types::error::ErrorLogger;
use server::{ConnectionManager, Server};
mod flight;
mod management;
mod storage;
mod testing;
@ -48,7 +49,8 @@ where
.add_service(health_service)
.add_service(testing::make_server())
.add_service(storage::make_server(Arc::clone(&server)))
.add_service(flight::make_server(server))
.add_service(flight::make_server(Arc::clone(&server)))
.add_service(management::make_server(server))
.serve_with_incoming(stream)
.await
.context(ServerError {})

View File

@ -0,0 +1,127 @@
use std::convert::TryInto;
use std::fmt::Debug;
use std::sync::Arc;
use tonic::{Request, Response, Status};
use tracing::error;
use data_types::database_rules::DatabaseRules;
use data_types::DatabaseName;
use generated_types::google::{
AlreadyExists, FieldViolation, FieldViolationExt, InternalError, NotFound,
PreconditionViolation,
};
use generated_types::influxdata::iox::management::v1::*;
use query::DatabaseStore;
use server::{ConnectionManager, Error, Server};
struct ManagementService<M: ConnectionManager> {
server: Arc<Server<M>>,
}
fn default_error_handler(error: Error) -> tonic::Status {
match error {
Error::IdNotSet => PreconditionViolation {
category: "Writer ID".to_string(),
subject: "influxdata.com/iox".to_string(),
description: "Writer ID must be set".to_string(),
}
.into(),
error => {
error!(?error, "Unexpected error");
InternalError {}.into()
}
}
}
#[tonic::async_trait]
impl<M> management_service_server::ManagementService for ManagementService<M>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
async fn get_writer_id(
&self,
_: Request<GetWriterIdRequest>,
) -> Result<Response<GetWriterIdResponse>, Status> {
match self.server.require_id().ok() {
Some(id) => Ok(Response::new(GetWriterIdResponse { id })),
None => return Err(NotFound::default().into()),
}
}
async fn update_writer_id(
&self,
request: Request<UpdateWriterIdRequest>,
) -> Result<Response<UpdateWriterIdResponse>, Status> {
self.server.set_id(request.get_ref().id);
Ok(Response::new(UpdateWriterIdResponse {}))
}
async fn list_databases(
&self,
_: Request<ListDatabasesRequest>,
) -> Result<Response<ListDatabasesResponse>, Status> {
let names = self.server.db_names_sorted().await;
Ok(Response::new(ListDatabasesResponse { names }))
}
async fn get_database(
&self,
request: Request<GetDatabaseRequest>,
) -> Result<Response<GetDatabaseResponse>, Status> {
let name = DatabaseName::new(request.into_inner().name).field("name")?;
match self.server.db_rules(&name).await {
Some(rules) => Ok(Response::new(GetDatabaseResponse {
rules: Some(rules.into()),
})),
None => {
return Err(NotFound {
resource_type: "database".to_string(),
resource_name: name.to_string(),
..Default::default()
}
.into())
}
}
}
async fn create_database(
&self,
request: Request<CreateDatabaseRequest>,
) -> Result<Response<CreateDatabaseResponse>, Status> {
let rules: DatabaseRules = request
.into_inner()
.rules
.ok_or_else(|| FieldViolation::required(""))
.and_then(TryInto::try_into)
.map_err(|e| e.scope("rules"))?;
let name =
DatabaseName::new(rules.name.clone()).expect("protobuf mapping didn't validate name");
match self.server.create_database(name, rules).await {
Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
Err(Error::DatabaseAlreadyExists { db_name }) => {
return Err(AlreadyExists {
resource_type: "database".to_string(),
resource_name: db_name,
..Default::default()
}
.into())
}
Err(e) => Err(default_error_handler(e)),
}
}
}
pub fn make_server<M>(
server: Arc<Server<M>>,
) -> management_service_server::ManagementServiceServer<
impl management_service_server::ManagementService,
>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
management_service_server::ManagementServiceServer::new(ManagementService { server })
}

View File

@ -17,18 +17,24 @@
// - Creating a unique org_id per test
// - Stopping the server after all relevant tests are run
use assert_cmd::prelude::*;
use data_types::{database_rules::DatabaseRules, names::org_and_bucket_to_database, DatabaseName};
use futures::prelude::*;
use generated_types::{storage_client::StorageClient, ReadSource, TimestampRange};
use prost::Message;
use std::convert::TryInto;
use std::process::{Child, Command};
use std::str;
use std::time::{Duration, SystemTime};
use std::u32;
use assert_cmd::prelude::*;
use futures::prelude::*;
use prost::Message;
use tempfile::TempDir;
use data_types::{names::org_and_bucket_to_database, DatabaseName};
use end_to_end_cases::*;
use generated_types::{
influxdata::iox::management::v1::DatabaseRules, storage_client::StorageClient, ReadSource,
TimestampRange,
};
// These port numbers are chosen to not collide with a development ioxd server
// running locally.
// TODO(786): allocate random free ports instead of hardcoding.
@ -57,7 +63,6 @@ type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
mod end_to_end_cases;
use end_to_end_cases::*;
#[tokio::test]
async fn read_and_write_data() {
@ -66,7 +71,12 @@ async fn read_and_write_data() {
let http_client = reqwest::Client::new();
let influxdb2 = influxdb2_client::Client::new(HTTP_BASE, TOKEN);
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap();
let grpc = influxdb_iox_client::connection::Builder::default()
.build(GRPC_URL_BASE)
.await
.unwrap();
let mut storage_client = StorageClient::new(grpc.clone());
let mut management_client = influxdb_iox_client::management::Client::new(grpc);
// These tests share data; TODO: a better way to indicate this
{
@ -74,19 +84,25 @@ async fn read_and_write_data() {
.set_org_id("0000111100001111")
.set_bucket_id("1111000011110000");
create_database(&http_client, &scenario.database_name()).await;
create_database(&mut management_client, &scenario.database_name()).await;
let expected_read_data = load_data(&influxdb2, &scenario).await;
let sql_query = "select * from cpu_load_short";
read_api::test(&http_client, &scenario, sql_query, &expected_read_data).await;
grpc_api::test(&mut storage_client, &scenario).await;
storage_api::test(&mut storage_client, &scenario).await;
flight_api::test(&scenario, sql_query, &expected_read_data).await;
}
// These tests manage their own data
grpc_api::read_group_test(&http_client, &influxdb2, &mut storage_client).await;
grpc_api::read_window_aggregate_test(&http_client, &influxdb2, &mut storage_client).await;
storage_api::read_group_test(&mut management_client, &influxdb2, &mut storage_client).await;
storage_api::read_window_aggregate_test(
&mut management_client,
&influxdb2,
&mut storage_client,
)
.await;
management_api::test(&mut management_client).await;
test_http_error_messages(&influxdb2).await.unwrap();
}
@ -178,17 +194,16 @@ impl Scenario {
}
}
async fn create_database(client: &reqwest::Client, database_name: &str) {
let rules = DatabaseRules::new();
let data = serde_json::to_vec(&rules).unwrap();
async fn create_database(
client: &mut influxdb_iox_client::management::Client,
database_name: &str,
) {
client
.put(&format!(
"{}/iox/api/v1/databases/{}",
HTTP_BASE, database_name
))
.body(data)
.send()
.create_database(DatabaseRules {
name: database_name.to_string(),
mutable_buffer_config: Some(Default::default()),
..Default::default()
})
.await
.unwrap();
}
@ -379,16 +394,23 @@ impl TestServer {
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
match influxdb_iox_client::health::Client::connect(GRPC_URL_BASE).await {
Ok(mut client) => {
match influxdb_iox_client::connection::Builder::default()
.build(GRPC_URL_BASE)
.await
{
Ok(connection) => {
println!("Successfully connected to server");
match client.check_storage().await {
let mut health = influxdb_iox_client::health::Client::new(connection);
match health.check_storage().await {
Ok(_) => {
println!("Storage service is running");
break;
return;
}
Err(e) => {
println!("Error checking storage service status: {}", e);
}
Err(e) => println!("Error checking storage service status: {}", e),
}
}
Err(e) => {
@ -397,22 +419,6 @@ impl TestServer {
}
interval.tick().await;
}
loop {
match StorageClient::connect(GRPC_URL_BASE).await {
Ok(storage_client) => {
println!(
"Successfully connected storage_client: {:?}",
storage_client
);
return;
}
Err(e) => {
println!("Failed to create storage client: {}", e)
}
}
interval.tick().await;
}
};
let try_http_connect = async {

View File

@ -1,12 +1,12 @@
use crate::{Scenario, GRPC_URL_BASE};
use arrow_deps::assert_table_eq;
use influxdb_iox_client::FlightClientBuilder;
use influxdb_iox_client::{connection::Builder, flight::Client};
pub async fn test(scenario: &Scenario, sql_query: &str, expected_read_data: &[String]) {
let mut client = FlightClientBuilder::default()
.build(GRPC_URL_BASE)
.await
.unwrap();
let connection = Builder::default().build(GRPC_URL_BASE).await.unwrap();
let mut client = Client::new(connection);
let mut query_results = client
.perform_query(scenario.database_name(), sql_query)
.await

View File

@ -0,0 +1,159 @@
use std::num::NonZeroU32;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use generated_types::google::protobuf::Empty;
use generated_types::{google::protobuf::Duration, influxdata::iox::management::v1::*};
use influxdb_iox_client::management::{Client, CreateDatabaseError};
pub async fn test(client: &mut Client) {
test_set_get_writer_id(client).await;
test_create_database_duplicate_name(client).await;
test_create_database_invalid_name(client).await;
test_list_databases(client).await;
test_create_get_database(client).await;
}
async fn test_set_get_writer_id(client: &mut Client) {
const TEST_ID: u32 = 42;
client
.update_writer_id(NonZeroU32::new(TEST_ID).unwrap())
.await
.expect("set ID failed");
let got = client.get_writer_id().await.expect("get ID failed");
assert_eq!(got.get(), TEST_ID);
}
async fn test_create_database_duplicate_name(client: &mut Client) {
let db_name = rand_name();
client
.create_database(DatabaseRules {
name: db_name.clone(),
..Default::default()
})
.await
.expect("create database failed");
let err = client
.create_database(DatabaseRules {
name: db_name,
..Default::default()
})
.await
.expect_err("create database failed");
assert!(matches!(
dbg!(err),
CreateDatabaseError::DatabaseAlreadyExists
))
}
async fn test_create_database_invalid_name(client: &mut Client) {
let err = client
.create_database(DatabaseRules {
name: "my_example\ndb".to_string(),
..Default::default()
})
.await
.expect_err("expected request to fail");
assert!(matches!(dbg!(err), CreateDatabaseError::InvalidArgument(_)));
}
async fn test_list_databases(client: &mut Client) {
let name = rand_name();
client
.create_database(DatabaseRules {
name: name.clone(),
..Default::default()
})
.await
.expect("create database failed");
let names = client
.list_databases()
.await
.expect("list databases failed");
assert!(names.contains(&name));
}
async fn test_create_get_database(client: &mut Client) {
let db_name = rand_name();
// Specify everything to allow direct comparison between request and response
// Otherwise would expect difference due to server-side defaulting
let rules = DatabaseRules {
name: db_name.clone(),
partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part {
part: Some(partition_template::part::Part::Table(Empty {})),
}],
}),
replication_config: Some(ReplicationConfig {
replications: vec!["cupcakes".to_string()],
replication_count: 3,
replication_queue_max_size: 20,
}),
subscription_config: Some(SubscriptionConfig {
subscriptions: vec![subscription_config::Subscription {
name: "subscription".to_string(),
host_group_id: "hostgroup".to_string(),
matcher: Some(Matcher {
predicate: "pred".to_string(),
table_matcher: Some(matcher::TableMatcher::All(Empty {})),
}),
}],
}),
query_config: Some(QueryConfig {
query_local: true,
primary: Default::default(),
secondaries: vec![],
read_only_partitions: vec![],
}),
wal_buffer_config: Some(WalBufferConfig {
buffer_size: 24,
segment_size: 2,
buffer_rollover: wal_buffer_config::Rollover::DropIncoming as _,
persist_segments: true,
close_segment_after: Some(Duration {
seconds: 324,
nanos: 2,
}),
}),
mutable_buffer_config: Some(MutableBufferConfig {
buffer_size: 553,
reject_if_not_persisted: true,
partition_drop_order: Some(mutable_buffer_config::PartitionDropOrder {
order: Order::Asc as _,
sort: Some(
mutable_buffer_config::partition_drop_order::Sort::CreatedAtTime(Empty {}),
),
}),
persist_after_cold_seconds: 34,
}),
};
client
.create_database(rules.clone())
.await
.expect("create database failed");
let response = client
.get_database(db_name)
.await
.expect("get database failed");
assert_eq!(response, rules);
}
fn rand_name() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect()
}

View File

@ -1,3 +1,4 @@
pub mod flight_api;
pub mod grpc_api;
pub mod management_api;
pub mod read_api;
pub mod storage_api;

View File

@ -11,6 +11,7 @@ use generated_types::{
MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadGroupRequest,
ReadWindowAggregateRequest, Tag, TagKeysRequest, TagValuesRequest, TimestampRange,
};
use influxdb_iox_client::management;
use std::str;
use test_helpers::tag_key_bytes_to_strings;
use tonic::transport::Channel;
@ -281,7 +282,7 @@ async fn measurement_fields_endpoint(
}
pub async fn read_group_test(
client: &reqwest::Client,
management: &mut management::Client,
influxdb2: &influxdb2_client::Client,
storage_client: &mut StorageClient<Channel>,
) {
@ -289,7 +290,7 @@ pub async fn read_group_test(
.set_org_id("0000111100001110")
.set_bucket_id("1111000011110001");
create_database(&client, &scenario.database_name()).await;
create_database(management, &scenario.database_name()).await;
load_read_group_data(&influxdb2, &scenario).await;
@ -534,7 +535,7 @@ async fn test_read_group_last_agg(
// Standalone test that all the pipes are hooked up for read window aggregate
pub async fn read_window_aggregate_test(
client: &reqwest::Client,
management: &mut management::Client,
influxdb2: &influxdb2_client::Client,
storage_client: &mut StorageClient<Channel>,
) {
@ -543,7 +544,7 @@ pub async fn read_window_aggregate_test(
.set_bucket_id("1111000011110011");
let read_source = scenario.read_source();
create_database(&client, &scenario.database_name()).await;
create_database(management, &scenario.database_name()).await;
let line_protocol = vec![
"h2o,state=MA,city=Boston temp=70.0 100",