feat: implement gRPC API and migrate influxdb_iox_client to use it (#853)
* feat: implement gRPC management API * feat: migrate influxdb_iox_client to use gRPC API * fix: review comments * refactor: separate influxdb_iox_client error types Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
c7c4977e53
commit
51981c92f5
|
|
@ -172,19 +172,6 @@ dependencies = [
|
|||
"wait-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b72c1f1154e234325b50864a349b9c8e56939e266a4c307c0f159812df2f9537"
|
||||
dependencies = [
|
||||
"flate2",
|
||||
"futures-core",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.0"
|
||||
|
|
@ -1587,11 +1574,11 @@ name = "influxdb_iox_client"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow_deps",
|
||||
"data_types",
|
||||
"futures-util",
|
||||
"generated_types",
|
||||
"http",
|
||||
"hyper",
|
||||
"rand 0.8.3",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
|
|
@ -2897,7 +2884,6 @@ version = "0.11.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0460542b551950620a3648c6aa23318ac6b3cd779114bd873209e6e8b5eb1c34"
|
||||
dependencies = [
|
||||
"async-compression",
|
||||
"base64 0.13.0",
|
||||
"bytes",
|
||||
"encoding_rs",
|
||||
|
|
@ -2923,7 +2909,6 @@ dependencies = [
|
|||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-rustls",
|
||||
"tokio-util",
|
||||
"url",
|
||||
"wasm-bindgen",
|
||||
"wasm-bindgen-futures",
|
||||
|
|
|
|||
|
|
@ -259,6 +259,8 @@ impl MutableBufferConfig {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Remove this when deprecating HTTP API - cannot be used in gRPC as no
|
||||
// explicit NULL support
|
||||
impl Default for MutableBufferConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
|
@ -388,7 +390,7 @@ pub enum PartitionSort {
|
|||
|
||||
impl Default for PartitionSort {
|
||||
fn default() -> Self {
|
||||
Self::LastWriteTime
|
||||
Self::CreatedAtTime
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -446,7 +448,7 @@ pub enum Order {
|
|||
|
||||
impl Default for Order {
|
||||
fn default() -> Self {
|
||||
Self::Asc
|
||||
Self::Desc
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,12 +10,12 @@ flight = ["arrow_deps", "serde/derive", "serde_json", "futures-util"]
|
|||
[dependencies]
|
||||
# Workspace dependencies, in alphabetical order
|
||||
arrow_deps = { path = "../arrow_deps", optional = true }
|
||||
data_types = { path = "../data_types" }
|
||||
generated_types = { path = "../generated_types" }
|
||||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
futures-util = { version = "0.3.1", optional = true }
|
||||
reqwest = { version = "0.11.0", features = ["gzip", "json"] }
|
||||
http = "0.2.3"
|
||||
hyper = "0.14"
|
||||
serde = "1.0.118"
|
||||
serde_json = { version = "1.0.44", optional = true }
|
||||
thiserror = "1.0.23"
|
||||
|
|
|
|||
|
|
@ -1,161 +0,0 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use reqwest::Url;
|
||||
|
||||
use crate::Client;
|
||||
|
||||
#[cfg(feature = "flight")]
|
||||
use crate::FlightClient;
|
||||
|
||||
/// The default User-Agent header sent by the HTTP client.
|
||||
pub const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
|
||||
|
||||
/// Configure and construct a new [`Client`] instance for using the IOx HTTP
|
||||
/// API.
|
||||
///
|
||||
/// ```
|
||||
/// # use influxdb_iox_client::ClientBuilder;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let c = ClientBuilder::default()
|
||||
/// .timeout(Duration::from_secs(42))
|
||||
/// .user_agent("my_awesome_client")
|
||||
/// .build("http://127.0.0.1:8080/");
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct ClientBuilder {
|
||||
user_agent: String,
|
||||
connect_timeout: Duration,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl std::default::Default for ClientBuilder {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
user_agent: USER_AGENT.into(),
|
||||
connect_timeout: Duration::from_secs(1),
|
||||
timeout: Duration::from_secs(30),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientBuilder {
|
||||
/// Construct the [`Client`] instance using the specified base URL.
|
||||
pub fn build<T>(self, base_url: T) -> Result<Client, Box<dyn std::error::Error>>
|
||||
where
|
||||
T: AsRef<str>,
|
||||
{
|
||||
let http = reqwest::ClientBuilder::new()
|
||||
.user_agent(self.user_agent)
|
||||
.gzip(true)
|
||||
.referer(false)
|
||||
.connect_timeout(self.connect_timeout)
|
||||
.timeout(self.timeout)
|
||||
.build()
|
||||
.map_err(Box::new)?;
|
||||
|
||||
// Construct a base URL.
|
||||
//
|
||||
// This MUST end in a trailing slash, otherwise the last portion of the
|
||||
// path is interpreted as being a filename and removed when joining
|
||||
// paths to it. This assumes the user is specifying a URL to the
|
||||
// endpoint, and not a file path and as a result provides the same
|
||||
// behaviour to the user with and without the slash (avoiding some
|
||||
// confusion!)
|
||||
let base: Url = format!("{}/", base_url.as_ref().trim_end_matches('/')).parse()?;
|
||||
if base.cannot_be_a_base() {
|
||||
// This is the case if the scheme and : delimiter are not followed
|
||||
// by a / slash, as is typically the case of data: and mailto: URLs.
|
||||
return Err(format!("endpoint URL {} is invalid", base).into());
|
||||
}
|
||||
|
||||
Ok(Client { http, base })
|
||||
}
|
||||
|
||||
/// Set the `User-Agent` header sent by this client.
|
||||
pub fn user_agent(self, user_agent: impl Into<String>) -> Self {
|
||||
Self {
|
||||
user_agent: user_agent.into(),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the maximum duration of time the client will wait for the IOx
|
||||
/// server to accept the TCP connection before aborting the request.
|
||||
///
|
||||
/// Note this does not bound the request duration - see
|
||||
/// [`timeout`][Self::timeout].
|
||||
pub fn connect_timeout(self, timeout: Duration) -> Self {
|
||||
Self {
|
||||
connect_timeout: timeout,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// Bounds the total amount of time a single client HTTP request take before
|
||||
/// being aborted.
|
||||
///
|
||||
/// This timeout includes:
|
||||
///
|
||||
/// - Establishing the TCP connection (see [`connect_timeout`])
|
||||
/// - Sending the HTTP request
|
||||
/// - Waiting for, and receiving the entire HTTP response
|
||||
///
|
||||
/// [`connect_timeout`]: Self::connect_timeout
|
||||
pub fn timeout(self, timeout: Duration) -> Self {
|
||||
Self { timeout, ..self }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "flight")]
|
||||
#[derive(Debug)]
|
||||
/// Configure and construct a new [`FlightClient`] instance for using the IOx
|
||||
/// Arrow Flight API.
|
||||
///
|
||||
/// ```
|
||||
/// # use influxdb_iox_client::FlightClientBuilder;
|
||||
///
|
||||
/// let c = FlightClientBuilder::default()
|
||||
/// .build("http://127.0.0.1:8080/");
|
||||
/// ```
|
||||
pub struct FlightClientBuilder {}
|
||||
|
||||
#[cfg(feature = "flight")]
|
||||
impl Default for FlightClientBuilder {
|
||||
fn default() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "flight")]
|
||||
impl FlightClientBuilder {
|
||||
/// Construct the [`FlightClient`] instance using the specified URL to the
|
||||
/// server and port where the Arrow Flight API is available.
|
||||
pub async fn build<T>(self, flight_url: T) -> Result<FlightClient, Box<dyn std::error::Error>>
|
||||
where
|
||||
T: std::convert::TryInto<tonic::transport::Endpoint>,
|
||||
T::Error: Into<tonic::codegen::StdError>,
|
||||
{
|
||||
Ok(FlightClient::connect(flight_url).await.map_err(Box::new)?)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_base_url() {
|
||||
let c = ClientBuilder::default()
|
||||
.build("http://127.0.0.1/proxy")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(c.base.as_str(), "http://127.0.0.1/proxy/");
|
||||
|
||||
let c = ClientBuilder::default()
|
||||
.build("http://127.0.0.1/proxy/")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(c.base.as_str(), "http://127.0.0.1/proxy/");
|
||||
}
|
||||
}
|
||||
|
|
@ -1,370 +1,9 @@
|
|||
use std::num::NonZeroU32;
|
||||
|
||||
use data_types::database_rules::DatabaseRules;
|
||||
use reqwest::{Method, Url};
|
||||
|
||||
use crate::errors::{ClientError, CreateDatabaseError, Error, ServerErrorResponse};
|
||||
use data_types::{http::ListDatabasesResponse, DatabaseName};
|
||||
|
||||
#[cfg(feature = "flight")]
|
||||
mod flight;
|
||||
|
||||
/// Client for the gRPC health checking API
|
||||
pub mod health;
|
||||
|
||||
// can't combine these into one statement that uses `{}` because of this bug in
|
||||
// the `unreachable_pub` lint: https://github.com/rust-lang/rust/issues/64762
|
||||
/// Client for the management API
|
||||
pub mod management;
|
||||
|
||||
#[cfg(feature = "flight")]
|
||||
pub use flight::FlightClient;
|
||||
#[cfg(feature = "flight")]
|
||||
pub use flight::PerformQuery;
|
||||
|
||||
// TODO: move DatabaseRules / WriterId to the API client
|
||||
|
||||
/// An IOx HTTP API client.
|
||||
///
|
||||
/// ```no_run
|
||||
/// #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// use data_types::database_rules::DatabaseRules;
|
||||
/// use influxdb_iox_client::ClientBuilder;
|
||||
///
|
||||
/// let client = ClientBuilder::default()
|
||||
/// .build("http://127.0.0.1:8080")
|
||||
/// .unwrap();
|
||||
///
|
||||
/// // Ping the IOx server
|
||||
/// client.ping().await.expect("server is down :(");
|
||||
///
|
||||
/// // Create a new database!
|
||||
/// client
|
||||
/// .create_database("bananas", &DatabaseRules::new())
|
||||
/// .await
|
||||
/// .expect("failed to create database");
|
||||
/// # }
|
||||
/// ```
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
pub(crate) http: reqwest::Client,
|
||||
|
||||
/// The base URL to which request paths are joined.
|
||||
///
|
||||
/// A base path of:
|
||||
///
|
||||
/// ```text
|
||||
/// https://www.influxdata.com/maybe-proxy/
|
||||
/// ```
|
||||
///
|
||||
/// Joined with a request path of `/a/reg/` would result in:
|
||||
///
|
||||
/// ```text
|
||||
/// https://www.influxdata.com/maybe-proxy/a/req/
|
||||
/// ```
|
||||
///
|
||||
/// Paths joined to this `base` MUST be relative to be appended to the base
|
||||
/// path. Absolute paths joined to `base` are still absolute.
|
||||
pub(crate) base: Url,
|
||||
}
|
||||
|
||||
impl std::default::Default for Client {
|
||||
fn default() -> Self {
|
||||
crate::ClientBuilder::default()
|
||||
.build("http://127.0.0.1:8080")
|
||||
.expect("default client builder is invalid")
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Ping the IOx server, checking for a HTTP 200 response.
|
||||
pub async fn ping(&self) -> Result<(), Error> {
|
||||
const PING_PATH: &str = "ping";
|
||||
|
||||
let r = self
|
||||
.http
|
||||
.request(Method::GET, self.url_for(PING_PATH))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
match r {
|
||||
r if r.status() == 200 => Ok(()),
|
||||
r => Err(ServerErrorResponse::from_response(r).await.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new IOx database.
|
||||
pub async fn create_database(
|
||||
&self,
|
||||
name: impl AsRef<str>,
|
||||
rules: &DatabaseRules,
|
||||
) -> Result<(), CreateDatabaseError> {
|
||||
let url = self.db_url(name.as_ref())?;
|
||||
|
||||
let r = self
|
||||
.http
|
||||
.request(Method::PUT, url)
|
||||
.json(rules)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
// Filter out the good states, and convert all others into errors.
|
||||
match r {
|
||||
r if r.status() == 200 => Ok(()),
|
||||
r => Err(ServerErrorResponse::from_response(r).await.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the server's writer ID.
|
||||
pub async fn set_writer_id(&self, id: NonZeroU32) -> Result<(), Error> {
|
||||
const SET_WRITER_PATH: &str = "iox/api/v1/id";
|
||||
|
||||
let url = self.url_for(SET_WRITER_PATH);
|
||||
|
||||
// TODO: move this into a shared type
|
||||
#[derive(serde::Serialize)]
|
||||
struct WriterIdBody {
|
||||
id: u32,
|
||||
}
|
||||
|
||||
let r = self
|
||||
.http
|
||||
.request(Method::PUT, url)
|
||||
.json(&WriterIdBody { id: id.get() })
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
match r {
|
||||
r if r.status() == 200 => Ok(()),
|
||||
r => Err(ServerErrorResponse::from_response(r).await.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the server's writer ID.
|
||||
pub async fn get_writer_id(&self) -> Result<u32, Error> {
|
||||
const GET_WRITER_PATH: &str = "iox/api/v1/id";
|
||||
|
||||
let url = self.url_for(GET_WRITER_PATH);
|
||||
|
||||
// TODO: move this into a shared type
|
||||
#[derive(serde::Deserialize)]
|
||||
struct WriterIdBody {
|
||||
id: u32,
|
||||
}
|
||||
|
||||
let r = self.http.request(Method::GET, url).send().await?;
|
||||
|
||||
match r {
|
||||
r if r.status() == 200 => Ok(r.json::<WriterIdBody>().await?.id),
|
||||
r => Err(ServerErrorResponse::from_response(r).await.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// List databases.
|
||||
pub async fn list_databases(&self) -> Result<ListDatabasesResponse, Error> {
|
||||
const LIST_DATABASES_PATH: &str = "iox/api/v1/databases";
|
||||
let url = self.url_for(LIST_DATABASES_PATH);
|
||||
|
||||
let r = self.http.request(Method::GET, url).send().await?;
|
||||
|
||||
match r {
|
||||
r if r.status() == 200 => Ok(r.json::<ListDatabasesResponse>().await?),
|
||||
r => Err(ServerErrorResponse::from_response(r).await.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the request path for relative `path`.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Panics in debug builds if `path` contains an absolute path.
|
||||
fn url_for(&self, path: &str) -> Url {
|
||||
// In non-release builds, assert the path is not an absolute path.
|
||||
//
|
||||
// Paths should be relative so the full base path is used.
|
||||
debug_assert_ne!(
|
||||
path.chars().next().unwrap(),
|
||||
'/',
|
||||
"should not join absolute paths to base URL"
|
||||
);
|
||||
self.base
|
||||
.join(path)
|
||||
.expect("failed to construct request URL")
|
||||
}
|
||||
|
||||
fn db_url(&self, database: &str) -> Result<Url, ClientError> {
|
||||
const DB_PATH: &str = "iox/api/v1/databases/";
|
||||
|
||||
// Perform validation in the client as URL parser silently drops invalid
|
||||
// characters
|
||||
let name = DatabaseName::new(database).map_err(|_| ClientError::InvalidDatabaseName)?;
|
||||
|
||||
self.url_for(DB_PATH)
|
||||
.join(name.as_ref())
|
||||
.map_err(|_| ClientError::InvalidDatabaseName)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::ClientBuilder;
|
||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||
|
||||
use super::*;
|
||||
|
||||
/// If `TEST_IOX_ENDPOINT` is set, load the value and return it to the
|
||||
/// caller.
|
||||
///
|
||||
/// If `TEST_IOX_ENDPOINT` is not set, skip the calling test by returning
|
||||
/// early. Additionally if `TEST_INTEGRATION` is set, turn this early return
|
||||
/// into a panic to force a hard fail for skipped integration tests.
|
||||
macro_rules! maybe_skip_integration {
|
||||
() => {
|
||||
match (
|
||||
std::env::var("TEST_IOX_ENDPOINT").is_ok(),
|
||||
std::env::var("TEST_INTEGRATION").is_ok(),
|
||||
) {
|
||||
(true, _) => std::env::var("TEST_IOX_ENDPOINT").unwrap(),
|
||||
(false, true) => {
|
||||
panic!("TEST_INTEGRATION is set which requires running integration tests, but TEST_IOX_ENDPOINT is not")
|
||||
}
|
||||
_ => {
|
||||
eprintln!("skipping integration test - set TEST_IOX_ENDPOINT to run");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping() {
|
||||
let endpoint = maybe_skip_integration!();
|
||||
let c = ClientBuilder::default().build(endpoint).unwrap();
|
||||
c.ping().await.expect("ping failed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_get_writer_id() {
|
||||
const TEST_ID: u32 = 42;
|
||||
|
||||
let endpoint = maybe_skip_integration!();
|
||||
let c = ClientBuilder::default().build(endpoint).unwrap();
|
||||
|
||||
c.set_writer_id(NonZeroU32::new(TEST_ID).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
|
||||
let got = c.get_writer_id().await.expect("get ID failed");
|
||||
|
||||
assert_eq!(got, TEST_ID);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_database() {
|
||||
let endpoint = maybe_skip_integration!();
|
||||
let c = ClientBuilder::default().build(endpoint).unwrap();
|
||||
|
||||
c.set_writer_id(NonZeroU32::new(42).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
|
||||
c.create_database(rand_name(), &DatabaseRules::new())
|
||||
.await
|
||||
.expect("create database failed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_database_duplicate_name() {
|
||||
let endpoint = maybe_skip_integration!();
|
||||
let c = ClientBuilder::default().build(endpoint).unwrap();
|
||||
|
||||
c.set_writer_id(NonZeroU32::new(42).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
|
||||
let db_name = rand_name();
|
||||
|
||||
c.create_database(db_name.clone(), &DatabaseRules::new())
|
||||
.await
|
||||
.expect("create database failed");
|
||||
|
||||
let err = c
|
||||
.create_database(db_name, &DatabaseRules::new())
|
||||
.await
|
||||
.expect_err("create database failed");
|
||||
|
||||
assert!(matches!(dbg!(err), CreateDatabaseError::AlreadyExists))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_database_invalid_name() {
|
||||
let endpoint = maybe_skip_integration!();
|
||||
let c = ClientBuilder::default().build(endpoint).unwrap();
|
||||
|
||||
c.set_writer_id(NonZeroU32::new(42).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
|
||||
let err = c
|
||||
.create_database("my_example\ndb", &DatabaseRules::new())
|
||||
.await
|
||||
.expect_err("expected request to fail");
|
||||
|
||||
assert!(matches!(
|
||||
dbg!(err),
|
||||
CreateDatabaseError::ClientError(ClientError::InvalidDatabaseName)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_databases() {
|
||||
let endpoint = maybe_skip_integration!();
|
||||
let c = ClientBuilder::default().build(endpoint).unwrap();
|
||||
|
||||
c.set_writer_id(NonZeroU32::new(42).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
|
||||
let name = rand_name();
|
||||
c.create_database(&name, &DatabaseRules::default())
|
||||
.await
|
||||
.expect("create database failed");
|
||||
let r = c.list_databases().await.expect("list databases failed");
|
||||
assert!(r.names.contains(&name));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default() {
|
||||
// Ensures the Default impl does not panic
|
||||
let c = Client::default();
|
||||
assert_eq!(c.base.as_str(), "http://127.0.0.1:8080/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_paths() {
|
||||
let c = ClientBuilder::default()
|
||||
.build("http://127.0.0.2:8081/proxy")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
c.url_for("bananas").as_str(),
|
||||
"http://127.0.0.2:8081/proxy/bananas"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "absolute paths")]
|
||||
fn test_absolute_path_panics() {
|
||||
let c = ClientBuilder::default()
|
||||
.build("http://127.0.0.2:8081/proxy")
|
||||
.unwrap();
|
||||
|
||||
c.url_for("/bananas");
|
||||
}
|
||||
|
||||
fn rand_name() -> String {
|
||||
thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(10)
|
||||
.map(char::from)
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
/// Client for the flight API
|
||||
pub mod flight;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,10 @@
|
|||
use std::{convert::TryFrom, sync::Arc};
|
||||
|
||||
use futures_util::stream::StreamExt;
|
||||
use serde::Serialize;
|
||||
use thiserror::Error;
|
||||
use tonic::Streaming;
|
||||
|
||||
use arrow_deps::{
|
||||
arrow::{
|
||||
array::Array,
|
||||
|
|
@ -10,26 +17,56 @@ use arrow_deps::{
|
|||
Ticket,
|
||||
},
|
||||
};
|
||||
use futures_util::stream::StreamExt;
|
||||
use serde::Serialize;
|
||||
use std::{convert::TryFrom, sync::Arc};
|
||||
use tonic::Streaming;
|
||||
|
||||
use crate::errors::{GrpcError, GrpcQueryError};
|
||||
use crate::connection::Connection;
|
||||
|
||||
/// Error responses when querying an IOx database using the Arrow Flight gRPC
|
||||
/// API.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
/// An error occurred while serializing the query.
|
||||
#[error(transparent)]
|
||||
QuerySerializeError(#[from] serde_json::Error),
|
||||
|
||||
/// There were no FlightData messages returned when we expected to get one
|
||||
/// containing a Schema.
|
||||
#[error("no FlightData containing a Schema returned")]
|
||||
NoSchema,
|
||||
|
||||
/// An error involving an Arrow operation occurred.
|
||||
#[error(transparent)]
|
||||
ArrowError(#[from] arrow_deps::arrow::error::ArrowError),
|
||||
|
||||
/// The data contained invalid Flatbuffers.
|
||||
#[error("Invalid Flatbuffer: `{0}`")]
|
||||
InvalidFlatbuffer(String),
|
||||
|
||||
/// The message header said it was a dictionary batch, but interpreting the
|
||||
/// message as a dictionary batch returned `None`. Indicates malformed
|
||||
/// Flight data from the server.
|
||||
#[error("Message with header of type dictionary batch could not return a dictionary batch")]
|
||||
CouldNotGetDictionaryBatch,
|
||||
|
||||
/// An unknown server error occurred. Contains the `tonic::Status` returned
|
||||
/// from the server.
|
||||
#[error(transparent)]
|
||||
GrpcError(#[from] tonic::Status),
|
||||
}
|
||||
|
||||
/// An IOx Arrow Flight gRPC API client.
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// use data_types::database_rules::DatabaseRules;
|
||||
/// use influxdb_iox_client::FlightClientBuilder;
|
||||
/// use influxdb_iox_client::{connection::Builder, flight::Client};
|
||||
///
|
||||
/// let mut client = FlightClientBuilder::default()
|
||||
/// let connection = Builder::default()
|
||||
/// .build("http://127.0.0.1:8082")
|
||||
/// .await
|
||||
/// .expect("client should be valid");
|
||||
///
|
||||
/// let mut client = Client::new(connection);
|
||||
///
|
||||
/// let mut query_results = client
|
||||
/// .perform_query("my_database", "select * from cpu_load")
|
||||
/// .await
|
||||
|
|
@ -43,19 +80,16 @@ use crate::errors::{GrpcError, GrpcQueryError};
|
|||
/// # }
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct FlightClient {
|
||||
inner: FlightServiceClient<tonic::transport::Channel>,
|
||||
pub struct Client {
|
||||
inner: FlightServiceClient<Connection>,
|
||||
}
|
||||
|
||||
impl FlightClient {
|
||||
pub(crate) async fn connect<D>(dst: D) -> Result<Self, GrpcError>
|
||||
where
|
||||
D: std::convert::TryInto<tonic::transport::Endpoint>,
|
||||
D::Error: Into<tonic::codegen::StdError>,
|
||||
{
|
||||
Ok(Self {
|
||||
inner: FlightServiceClient::connect(dst).await?,
|
||||
})
|
||||
impl Client {
|
||||
/// Creates a new client with the provided connection
|
||||
pub fn new(channel: Connection) -> Self {
|
||||
Self {
|
||||
inner: FlightServiceClient::new(channel),
|
||||
}
|
||||
}
|
||||
|
||||
/// Query the given database with the given SQL query, and return a
|
||||
|
|
@ -64,7 +98,7 @@ impl FlightClient {
|
|||
&mut self,
|
||||
database_name: impl Into<String>,
|
||||
sql_query: impl Into<String>,
|
||||
) -> Result<PerformQuery, GrpcQueryError> {
|
||||
) -> Result<PerformQuery, Error> {
|
||||
PerformQuery::new(self, database_name.into(), sql_query.into()).await
|
||||
}
|
||||
}
|
||||
|
|
@ -88,10 +122,10 @@ pub struct PerformQuery {
|
|||
|
||||
impl PerformQuery {
|
||||
pub(crate) async fn new(
|
||||
flight: &mut FlightClient,
|
||||
flight: &mut Client,
|
||||
database_name: String,
|
||||
sql_query: String,
|
||||
) -> Result<Self, GrpcQueryError> {
|
||||
) -> Result<Self, Error> {
|
||||
let query = ReadInfo {
|
||||
database_name,
|
||||
sql_query,
|
||||
|
|
@ -102,7 +136,7 @@ impl PerformQuery {
|
|||
};
|
||||
let mut response = flight.inner.do_get(t).await?.into_inner();
|
||||
|
||||
let flight_data_schema = response.next().await.ok_or(GrpcQueryError::NoSchema)??;
|
||||
let flight_data_schema = response.next().await.ok_or(Error::NoSchema)??;
|
||||
let schema = Arc::new(Schema::try_from(&flight_data_schema)?);
|
||||
|
||||
let dictionaries_by_field = vec![None; schema.fields().len()];
|
||||
|
|
@ -116,7 +150,7 @@ impl PerformQuery {
|
|||
|
||||
/// Returns the next `RecordBatch` available for this query, or `None` if
|
||||
/// there are no further results available.
|
||||
pub async fn next(&mut self) -> Result<Option<RecordBatch>, GrpcQueryError> {
|
||||
pub async fn next(&mut self) -> Result<Option<RecordBatch>, Error> {
|
||||
let Self {
|
||||
schema,
|
||||
dictionaries_by_field,
|
||||
|
|
@ -129,14 +163,14 @@ impl PerformQuery {
|
|||
};
|
||||
|
||||
let mut message = ipc::root_as_message(&data.data_header[..])
|
||||
.map_err(|e| GrpcQueryError::InvalidFlatbuffer(e.to_string()))?;
|
||||
.map_err(|e| Error::InvalidFlatbuffer(e.to_string()))?;
|
||||
|
||||
while message.header_type() == ipc::MessageHeader::DictionaryBatch {
|
||||
reader::read_dictionary(
|
||||
&data.data_body,
|
||||
message
|
||||
.header_as_dictionary_batch()
|
||||
.ok_or(GrpcQueryError::CouldNotGetDictionaryBatch)?,
|
||||
.ok_or(Error::CouldNotGetDictionaryBatch)?,
|
||||
&schema,
|
||||
dictionaries_by_field,
|
||||
)?;
|
||||
|
|
@ -147,7 +181,7 @@ impl PerformQuery {
|
|||
};
|
||||
|
||||
message = ipc::root_as_message(&data.data_header[..])
|
||||
.map_err(|e| GrpcQueryError::InvalidFlatbuffer(e.to_string()))?;
|
||||
.map_err(|e| Error::InvalidFlatbuffer(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(Some(flight_data_to_arrow_batch(
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
use generated_types::grpc::health::v1::*;
|
||||
use thiserror::Error;
|
||||
|
||||
use generated_types::grpc::health::v1::*;
|
||||
|
||||
use crate::connection::Connection;
|
||||
|
||||
/// Error type for the health check client
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
|
|
@ -29,19 +32,15 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// Allows checking the status of a given service
|
||||
#[derive(Debug)]
|
||||
pub struct Client {
|
||||
inner: health_client::HealthClient<tonic::transport::Channel>,
|
||||
inner: health_client::HealthClient<Connection>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Create a new client with the provided endpoint
|
||||
pub async fn connect<D>(dst: D) -> Result<Self>
|
||||
where
|
||||
D: std::convert::TryInto<tonic::transport::Endpoint>,
|
||||
D::Error: Into<tonic::codegen::StdError>,
|
||||
{
|
||||
Ok(Self {
|
||||
inner: health_client::HealthClient::connect(dst).await?,
|
||||
})
|
||||
/// Creates a new client with the provided connection
|
||||
pub fn new(channel: Connection) -> Self {
|
||||
Self {
|
||||
inner: health_client::HealthClient::new(channel),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `Ok()` if the corresponding service is serving
|
||||
|
|
|
|||
|
|
@ -0,0 +1,201 @@
|
|||
use std::num::NonZeroU32;
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
use self::generated_types::{management_service_client::ManagementServiceClient, *};
|
||||
|
||||
use crate::connection::Connection;
|
||||
use std::convert::TryInto;
|
||||
|
||||
/// Re-export generated_types
|
||||
pub mod generated_types {
|
||||
pub use generated_types::influxdata::iox::management::v1::*;
|
||||
}
|
||||
|
||||
/// Errors returned by Client::update_writer_id
|
||||
#[derive(Debug, Error)]
|
||||
pub enum UpdateWriterIdError {
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::get_writer_id
|
||||
#[derive(Debug, Error)]
|
||||
pub enum GetWriterIdError {
|
||||
/// Writer ID is not set
|
||||
#[error("Writer ID not set")]
|
||||
NoWriterId,
|
||||
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::create_database
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CreateDatabaseError {
|
||||
/// Writer ID is not set
|
||||
#[error("Writer ID not set")]
|
||||
NoWriterId,
|
||||
|
||||
/// Database already exists
|
||||
#[error("Database already exists")]
|
||||
DatabaseAlreadyExists,
|
||||
|
||||
/// Server returned an invalid argument error
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
InvalidArgument(tonic::Status),
|
||||
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::list_databases
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ListDatabaseError {
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::get_database
|
||||
#[derive(Debug, Error)]
|
||||
pub enum GetDatabaseError {
|
||||
/// Writer ID is not set
|
||||
#[error("Writer ID not set")]
|
||||
NoWriterId,
|
||||
|
||||
/// Database not found
|
||||
#[error("Database not found")]
|
||||
DatabaseNotFound,
|
||||
|
||||
/// Response contained no payload
|
||||
#[error("Server returned an empty response")]
|
||||
EmptyResponse,
|
||||
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// An IOx Management API client.
|
||||
///
|
||||
/// ```no_run
|
||||
/// #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// use influxdb_iox_client::{
|
||||
/// management::{Client, generated_types::DatabaseRules},
|
||||
/// connection::Builder,
|
||||
/// };
|
||||
///
|
||||
/// let mut connection = Builder::default()
|
||||
/// .build("http://127.0.0.1:8082")
|
||||
/// .await
|
||||
/// .unwrap();
|
||||
///
|
||||
/// let mut client = Client::new(connection);
|
||||
///
|
||||
/// // Create a new database!
|
||||
/// client
|
||||
/// .create_database(DatabaseRules{
|
||||
/// name: "bananas".to_string(),
|
||||
/// ..Default::default()
|
||||
/// })
|
||||
/// .await
|
||||
/// .expect("failed to create database");
|
||||
/// # }
|
||||
/// ```
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
inner: ManagementServiceClient<Connection>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Creates a new client with the provided connection
|
||||
pub fn new(channel: tonic::transport::Channel) -> Self {
|
||||
Self {
|
||||
inner: ManagementServiceClient::new(channel),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the server's writer ID.
|
||||
pub async fn update_writer_id(&mut self, id: NonZeroU32) -> Result<(), UpdateWriterIdError> {
|
||||
self.inner
|
||||
.update_writer_id(UpdateWriterIdRequest { id: id.into() })
|
||||
.await
|
||||
.map_err(UpdateWriterIdError::ServerError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the server's writer ID.
|
||||
pub async fn get_writer_id(&mut self) -> Result<NonZeroU32, GetWriterIdError> {
|
||||
let response = self
|
||||
.inner
|
||||
.get_writer_id(GetWriterIdRequest {})
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::NotFound => GetWriterIdError::NoWriterId,
|
||||
_ => GetWriterIdError::ServerError(status),
|
||||
})?;
|
||||
|
||||
let id = response
|
||||
.get_ref()
|
||||
.id
|
||||
.try_into()
|
||||
.map_err(|_| GetWriterIdError::NoWriterId)?;
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Creates a new IOx database.
|
||||
pub async fn create_database(
|
||||
&mut self,
|
||||
rules: DatabaseRules,
|
||||
) -> Result<(), CreateDatabaseError> {
|
||||
self.inner
|
||||
.create_database(CreateDatabaseRequest { rules: Some(rules) })
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::AlreadyExists => CreateDatabaseError::DatabaseAlreadyExists,
|
||||
tonic::Code::FailedPrecondition => CreateDatabaseError::NoWriterId,
|
||||
tonic::Code::InvalidArgument => CreateDatabaseError::InvalidArgument(status),
|
||||
_ => CreateDatabaseError::ServerError(status),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List databases.
|
||||
pub async fn list_databases(&mut self) -> Result<Vec<String>, ListDatabaseError> {
|
||||
let response = self
|
||||
.inner
|
||||
.list_databases(ListDatabasesRequest {})
|
||||
.await
|
||||
.map_err(ListDatabaseError::ServerError)?;
|
||||
Ok(response.into_inner().names)
|
||||
}
|
||||
|
||||
/// Get database configuration
|
||||
pub async fn get_database(
|
||||
&mut self,
|
||||
name: impl Into<String>,
|
||||
) -> Result<DatabaseRules, GetDatabaseError> {
|
||||
let response = self
|
||||
.inner
|
||||
.get_database(GetDatabaseRequest { name: name.into() })
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::NotFound => GetDatabaseError::DatabaseNotFound,
|
||||
tonic::Code::FailedPrecondition => GetDatabaseError::NoWriterId,
|
||||
_ => GetDatabaseError::ServerError(status),
|
||||
})?;
|
||||
|
||||
let rules = response
|
||||
.into_inner()
|
||||
.rules
|
||||
.ok_or(GetDatabaseError::EmptyResponse)?;
|
||||
Ok(rules)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,123 @@
|
|||
use http::{uri::InvalidUri, Uri};
|
||||
use std::convert::TryInto;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tonic::transport::Endpoint;
|
||||
|
||||
/// The connection type used for clients
|
||||
pub type Connection = tonic::transport::Channel;
|
||||
|
||||
/// The default User-Agent header sent by the HTTP client.
|
||||
pub const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
|
||||
/// The default connection timeout
|
||||
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
/// The default request timeout
|
||||
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Errors returned by the ConnectionBuilder
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
/// Server returned an invalid argument error
|
||||
#[error("Connection error: {}", .0)]
|
||||
TransportError(#[from] tonic::transport::Error),
|
||||
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Invalid URI: {}", .0)]
|
||||
InvalidUri(#[from] InvalidUri),
|
||||
}
|
||||
|
||||
/// Result type for the ConnectionBuilder
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// A builder that produces a connection that can be used with any of the gRPC
|
||||
/// clients
|
||||
///
|
||||
/// ```no_run
|
||||
/// #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// use influxdb_iox_client::{connection::Builder, management::Client};
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let connection = Builder::default()
|
||||
/// .timeout(Duration::from_secs(42))
|
||||
/// .user_agent("my_awesome_client")
|
||||
/// .build("http://127.0.0.1:8082/")
|
||||
/// .await
|
||||
/// .expect("connection must succeed");
|
||||
///
|
||||
/// let client = Client::new(connection);
|
||||
/// # }
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct Builder {
|
||||
user_agent: String,
|
||||
connect_timeout: Duration,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl std::default::Default for Builder {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
user_agent: USER_AGENT.into(),
|
||||
connect_timeout: DEFAULT_CONNECT_TIMEOUT,
|
||||
timeout: DEFAULT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Builder {
|
||||
/// Construct the [`Client`] instance using the specified base URL.
|
||||
pub async fn build<D>(self, dst: D) -> Result<Connection>
|
||||
where
|
||||
D: TryInto<Uri, Error = InvalidUri>,
|
||||
{
|
||||
let endpoint = Endpoint::from(dst.try_into()?)
|
||||
.user_agent(self.user_agent)?
|
||||
.timeout(self.timeout);
|
||||
|
||||
// Manually construct connector to workaround https://github.com/hyperium/tonic/issues/498
|
||||
let mut connector = hyper::client::HttpConnector::new();
|
||||
connector.set_connect_timeout(Some(self.connect_timeout));
|
||||
|
||||
// Defaults from from tonic::channel::Endpoint
|
||||
connector.enforce_http(false);
|
||||
connector.set_nodelay(true);
|
||||
connector.set_keepalive(None);
|
||||
|
||||
Ok(endpoint.connect_with_connector(connector).await?)
|
||||
}
|
||||
|
||||
/// Set the `User-Agent` header sent by this client.
|
||||
pub fn user_agent(self, user_agent: impl Into<String>) -> Self {
|
||||
Self {
|
||||
user_agent: user_agent.into(),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the maximum duration of time the client will wait for the IOx
|
||||
/// server to accept the TCP connection before aborting the request.
|
||||
///
|
||||
/// Note this does not bound the request duration - see
|
||||
/// [`timeout`][Self::timeout].
|
||||
pub fn connect_timeout(self, timeout: Duration) -> Self {
|
||||
Self {
|
||||
connect_timeout: timeout,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// Bounds the total amount of time a single client HTTP request take before
|
||||
/// being aborted.
|
||||
///
|
||||
/// This timeout includes:
|
||||
///
|
||||
/// - Establishing the TCP connection (see [`connect_timeout`])
|
||||
/// - Sending the HTTP request
|
||||
/// - Waiting for, and receiving the entire HTTP response
|
||||
///
|
||||
/// [`connect_timeout`]: Self::connect_timeout
|
||||
pub fn timeout(self, timeout: Duration) -> Self {
|
||||
Self { timeout, ..self }
|
||||
}
|
||||
}
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
use thiserror::Error;
|
||||
|
||||
/// Error responses when creating a new IOx database.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ClientError {
|
||||
/// The database name contains an invalid character.
|
||||
#[error("the database name contains an invalid character")]
|
||||
InvalidDatabaseName,
|
||||
}
|
||||
|
|
@ -1,51 +0,0 @@
|
|||
use thiserror::Error;
|
||||
|
||||
use super::{ApiErrorCode, ClientError, HttpError, ServerErrorResponse};
|
||||
|
||||
/// Error responses when creating a new IOx database.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CreateDatabaseError {
|
||||
/// The database name contains an invalid character.
|
||||
#[error("the database name contains an invalid character")]
|
||||
InvalidName,
|
||||
|
||||
/// The database being created already exists.
|
||||
#[error("a database with the requested name already exists")]
|
||||
AlreadyExists,
|
||||
|
||||
/// An unknown server error occurred.
|
||||
///
|
||||
/// The error string contains the error string returned by the server.
|
||||
#[error(transparent)]
|
||||
ServerError(ServerErrorResponse),
|
||||
|
||||
/// A non-application HTTP request/response error occurred.
|
||||
#[error(transparent)]
|
||||
HttpError(#[from] HttpError),
|
||||
|
||||
/// An error occurred in the client.
|
||||
#[error(transparent)]
|
||||
ClientError(#[from] ClientError),
|
||||
}
|
||||
|
||||
/// Convert a [`ServerErrorResponse`] into a [`CreateDatabaseError`].
|
||||
///
|
||||
/// This conversion plucks any errors with API error codes that are applicable
|
||||
/// to [`CreateDatabaseError`] types, and everything else becomes a
|
||||
/// `ServerError`.
|
||||
impl From<ServerErrorResponse> for CreateDatabaseError {
|
||||
fn from(err: ServerErrorResponse) -> Self {
|
||||
match err.error_code() {
|
||||
Some(c) if c == ApiErrorCode::DB_INVALID_NAME as u32 => Self::InvalidName,
|
||||
Some(c) if c == ApiErrorCode::DB_ALREADY_EXISTS as u32 => Self::AlreadyExists,
|
||||
_ => Self::ServerError(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert errors from the underlying HTTP client into `HttpError` instances.
|
||||
impl From<reqwest::Error> for CreateDatabaseError {
|
||||
fn from(err: reqwest::Error) -> Self {
|
||||
Self::HttpError(err.into())
|
||||
}
|
||||
}
|
||||
|
|
@ -1,25 +0,0 @@
|
|||
/// A gRPC request error.
|
||||
///
|
||||
/// This is a non-application level error returned when a gRPC request to the
|
||||
/// IOx server has failed.
|
||||
#[derive(Debug)]
|
||||
pub struct GrpcError(tonic::transport::Error);
|
||||
|
||||
impl std::fmt::Display for GrpcError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for GrpcError {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
Some(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert errors from the underlying gRPC client into `GrpcError` instances.
|
||||
impl From<tonic::transport::Error> for GrpcError {
|
||||
fn from(v: tonic::transport::Error) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,34 +0,0 @@
|
|||
use thiserror::Error;
|
||||
|
||||
/// Error responses when querying an IOx database using the Arrow Flight gRPC
|
||||
/// API.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum GrpcQueryError {
|
||||
/// An error occurred while serializing the query.
|
||||
#[error(transparent)]
|
||||
QuerySerializeError(#[from] serde_json::Error),
|
||||
|
||||
/// There were no FlightData messages returned when we expected to get one
|
||||
/// containing a Schema.
|
||||
#[error("no FlightData containing a Schema returned")]
|
||||
NoSchema,
|
||||
|
||||
/// An error involving an Arrow operation occurred.
|
||||
#[error(transparent)]
|
||||
ArrowError(#[from] arrow_deps::arrow::error::ArrowError),
|
||||
|
||||
/// The data contained invalid Flatbuffers.
|
||||
#[error("Invalid Flatbuffer: `{0}`")]
|
||||
InvalidFlatbuffer(String),
|
||||
|
||||
/// The message header said it was a dictionary batch, but interpreting the
|
||||
/// message as a dictionary batch returned `None`. Indicates malformed
|
||||
/// Flight data from the server.
|
||||
#[error("Message with header of type dictionary batch could not return a dictionary batch")]
|
||||
CouldNotGetDictionaryBatch,
|
||||
|
||||
/// An unknown server error occurred. Contains the `tonic::Status` returned
|
||||
/// from the server.
|
||||
#[error(transparent)]
|
||||
GrpcError(#[from] tonic::Status),
|
||||
}
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
/// An HTTP request error.
|
||||
///
|
||||
/// This is a non-application level error returned when an HTTP request to the
|
||||
/// IOx server has failed.
|
||||
#[derive(Debug)]
|
||||
pub struct HttpError(reqwest::Error);
|
||||
|
||||
// This wrapper type decouples the underlying HTTP client, ensuring the HTTP
|
||||
// library error doesn't become part of the public API. This makes bumping the
|
||||
// internal http client version / swapping it for something else a backwards
|
||||
// compatible change.
|
||||
//
|
||||
// The reqwest error isn't exactly full of useful information beyond the Display
|
||||
// impl (which this wrapper passes through) anyway.
|
||||
|
||||
impl std::fmt::Display for HttpError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for HttpError {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
Some(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert errors from the underlying HTTP client into `HttpError` instances.
|
||||
impl From<reqwest::Error> for HttpError {
|
||||
fn from(v: reqwest::Error) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,91 +0,0 @@
|
|||
//! Error types returned by a client.
|
||||
//!
|
||||
//! A request to an IOx server can fail in three main ways:
|
||||
//!
|
||||
//! - an HTTP transport error (network error)
|
||||
//! - a known API handler error response
|
||||
//! - an unknown error returned by the server
|
||||
//!
|
||||
//! The first is converted into a [`HttpError`] and contains details of the
|
||||
//! failed request.
|
||||
//!
|
||||
//! The second case is when the "business logic" of the API handler returns a
|
||||
//! defined, meaningful error to the client. Examples of this include "the
|
||||
//! database name is invalid" or "the database already exists". These are mapped
|
||||
//! to per-handler error types (see [`CreateDatabaseError`] as an example).
|
||||
//!
|
||||
//! The last case is a generic error returned by the IOx server. These become
|
||||
//! [`ServerErrorResponse`] instances and contain the error string, optional
|
||||
//! error code and HTTP status code sent by the server.
|
||||
//!
|
||||
//! If using the Arrow Flight API, errors from gRPC requests will be converted
|
||||
//! into a [`GrpcError`] containing details of the failed request.
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
mod http_error;
|
||||
pub use http_error::*;
|
||||
|
||||
mod client_error;
|
||||
pub use client_error::*;
|
||||
|
||||
mod server_error_response;
|
||||
pub use server_error_response::*;
|
||||
|
||||
mod create_database;
|
||||
pub use create_database::*;
|
||||
|
||||
#[cfg(feature = "flight")]
|
||||
mod grpc_error;
|
||||
#[cfg(feature = "flight")]
|
||||
pub use grpc_error::*;
|
||||
|
||||
#[cfg(feature = "flight")]
|
||||
mod grpc_query_error;
|
||||
#[cfg(feature = "flight")]
|
||||
pub use grpc_query_error::*;
|
||||
|
||||
/// Constants used in API error codes.
|
||||
///
|
||||
/// Expressing this as a enum prevents reuse of discriminants, and as they're
|
||||
/// effectively consts this uses UPPER_SNAKE_CASE.
|
||||
#[allow(non_camel_case_types)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ApiErrorCode {
|
||||
/// An unknown/unhandled error
|
||||
UNKNOWN = 100,
|
||||
|
||||
/// The database name in the request is invalid.
|
||||
DB_INVALID_NAME = 101,
|
||||
|
||||
/// The database referenced already exists.
|
||||
DB_ALREADY_EXISTS = 102,
|
||||
|
||||
/// The database referenced does not exist.
|
||||
DB_NOT_FOUND = 103,
|
||||
}
|
||||
|
||||
impl From<ApiErrorCode> for u32 {
|
||||
fn from(v: ApiErrorCode) -> Self {
|
||||
v as u32
|
||||
}
|
||||
}
|
||||
/// `Error` defines the generic error type for API methods that do not have
|
||||
/// specific error types.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
/// A non-application HTTP request/response error has occurred.
|
||||
#[error("http request/response error: {0}")]
|
||||
HttpError(#[from] HttpError),
|
||||
|
||||
/// The IOx server has responded with an error.
|
||||
#[error("error response from server: {0}")]
|
||||
ServerError(#[from] ServerErrorResponse),
|
||||
}
|
||||
|
||||
/// Convert errors from the underlying HTTP client into `HttpError` instances.
|
||||
impl From<reqwest::Error> for Error {
|
||||
fn from(err: reqwest::Error) -> Self {
|
||||
Self::HttpError(err.into())
|
||||
}
|
||||
}
|
||||
|
|
@ -1,83 +0,0 @@
|
|||
use reqwest::Response;
|
||||
|
||||
/// A generic error message from the IOx server.
|
||||
///
|
||||
/// All IOx error responses are first deserialised into this type. API methods
|
||||
/// that have specific error types pick out known error codes (using
|
||||
/// [`ServerErrorResponse::error_code()`]) and map them to their respective
|
||||
/// error instances to provide more context to the caller.
|
||||
///
|
||||
/// API methods without specific error types freely return this type as a
|
||||
/// generic "server error" response.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
pub struct ServerErrorResponse {
|
||||
error: String,
|
||||
error_code: Option<u32>,
|
||||
http_status: Option<u16>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ServerErrorResponse {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(
|
||||
format!(
|
||||
"{} (HTTP status {:?}, IOx error code {:?})",
|
||||
&self.error, &self.http_status, &self.error_code,
|
||||
)
|
||||
.as_str(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ServerErrorResponse {}
|
||||
|
||||
impl ServerErrorResponse {
|
||||
/// Try and parse a JSON "error" field from the [reqwest::Response].
|
||||
pub(crate) async fn from_response(r: Response) -> Self {
|
||||
let status = r.status().as_u16();
|
||||
match r.json::<ServerErrorResponse>().await {
|
||||
Ok(e) => e,
|
||||
Err(e) => Self {
|
||||
error: format!("error decoding JSON body: {}", e),
|
||||
error_code: None,
|
||||
http_status: Some(status),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the IOx error code in the response, if any.
|
||||
pub fn error_code(&self) -> Option<u32> {
|
||||
self.error_code
|
||||
}
|
||||
|
||||
/// Return the HTTP status code sent by the server.
|
||||
pub fn http_status_code(&self) -> Option<u16> {
|
||||
self.http_status
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_generic_error_response() {
|
||||
let body = serde_json::json!({"error": "something terrible", "error_code": 42}).to_string();
|
||||
|
||||
let response: ServerErrorResponse = serde_json::from_str(&body).unwrap();
|
||||
|
||||
assert_eq!(response.error, "something terrible");
|
||||
assert_eq!(response.error_code, Some(42));
|
||||
assert_eq!(response.http_status, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_generic_error_no_code() {
|
||||
let body = serde_json::json!({"error": "something terrible"}).to_string();
|
||||
|
||||
let response: ServerErrorResponse = serde_json::from_str(&body).unwrap();
|
||||
|
||||
assert_eq!(response.error, "something terrible");
|
||||
assert_eq!(response.error_code, None);
|
||||
assert_eq!(response.http_status, None);
|
||||
}
|
||||
}
|
||||
|
|
@ -8,17 +8,9 @@
|
|||
)]
|
||||
#![allow(clippy::missing_docs_in_private_items)]
|
||||
|
||||
mod builder;
|
||||
pub use builder::*;
|
||||
pub use client::{flight, health, management};
|
||||
|
||||
/// Builder for constructing connections for use with the various gRPC clients
|
||||
pub mod connection;
|
||||
|
||||
mod client;
|
||||
pub use client::*;
|
||||
|
||||
// can't combine these into one statement that uses `{}` because of this bug in
|
||||
// the `unreachable_pub` lint: https://github.com/rust-lang/rust/issues/64762
|
||||
#[cfg(feature = "flight")]
|
||||
pub use client::FlightClient;
|
||||
#[cfg(feature = "flight")]
|
||||
pub use client::PerformQuery;
|
||||
|
||||
pub mod errors;
|
||||
|
|
|
|||
|
|
@ -39,6 +39,32 @@ use std::{fmt::Debug, str, sync::Arc};
|
|||
mod format;
|
||||
use format::QueryOutputFormat;
|
||||
|
||||
/// Constants used in API error codes.
|
||||
///
|
||||
/// Expressing this as a enum prevents reuse of discriminants, and as they're
|
||||
/// effectively consts this uses UPPER_SNAKE_CASE.
|
||||
#[allow(non_camel_case_types)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ApiErrorCode {
|
||||
/// An unknown/unhandled error
|
||||
UNKNOWN = 100,
|
||||
|
||||
/// The database name in the request is invalid.
|
||||
DB_INVALID_NAME = 101,
|
||||
|
||||
/// The database referenced already exists.
|
||||
DB_ALREADY_EXISTS = 102,
|
||||
|
||||
/// The database referenced does not exist.
|
||||
DB_NOT_FOUND = 103,
|
||||
}
|
||||
|
||||
impl From<ApiErrorCode> for u32 {
|
||||
fn from(v: ApiErrorCode) -> Self {
|
||||
v as Self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum ApplicationError {
|
||||
// Internal (unexpected) errors
|
||||
|
|
@ -237,8 +263,6 @@ impl ApplicationError {
|
|||
|
||||
/// Map the error type into an API error code.
|
||||
fn api_error_code(&self) -> u32 {
|
||||
use influxdb_iox_client::errors::ApiErrorCode;
|
||||
|
||||
match self {
|
||||
Self::DatabaseNameError { .. } => ApiErrorCode::DB_INVALID_NAME,
|
||||
Self::DatabaseNotFound { .. } => ApiErrorCode::DB_NOT_FOUND,
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ use data_types::error::ErrorLogger;
|
|||
use server::{ConnectionManager, Server};
|
||||
|
||||
mod flight;
|
||||
mod management;
|
||||
mod storage;
|
||||
mod testing;
|
||||
|
||||
|
|
@ -48,7 +49,8 @@ where
|
|||
.add_service(health_service)
|
||||
.add_service(testing::make_server())
|
||||
.add_service(storage::make_server(Arc::clone(&server)))
|
||||
.add_service(flight::make_server(server))
|
||||
.add_service(flight::make_server(Arc::clone(&server)))
|
||||
.add_service(management::make_server(server))
|
||||
.serve_with_incoming(stream)
|
||||
.await
|
||||
.context(ServerError {})
|
||||
|
|
|
|||
|
|
@ -0,0 +1,127 @@
|
|||
use std::convert::TryInto;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tonic::{Request, Response, Status};
|
||||
use tracing::error;
|
||||
|
||||
use data_types::database_rules::DatabaseRules;
|
||||
use data_types::DatabaseName;
|
||||
use generated_types::google::{
|
||||
AlreadyExists, FieldViolation, FieldViolationExt, InternalError, NotFound,
|
||||
PreconditionViolation,
|
||||
};
|
||||
use generated_types::influxdata::iox::management::v1::*;
|
||||
use query::DatabaseStore;
|
||||
use server::{ConnectionManager, Error, Server};
|
||||
|
||||
struct ManagementService<M: ConnectionManager> {
|
||||
server: Arc<Server<M>>,
|
||||
}
|
||||
|
||||
fn default_error_handler(error: Error) -> tonic::Status {
|
||||
match error {
|
||||
Error::IdNotSet => PreconditionViolation {
|
||||
category: "Writer ID".to_string(),
|
||||
subject: "influxdata.com/iox".to_string(),
|
||||
description: "Writer ID must be set".to_string(),
|
||||
}
|
||||
.into(),
|
||||
error => {
|
||||
error!(?error, "Unexpected error");
|
||||
InternalError {}.into()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<M> management_service_server::ManagementService for ManagementService<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
async fn get_writer_id(
|
||||
&self,
|
||||
_: Request<GetWriterIdRequest>,
|
||||
) -> Result<Response<GetWriterIdResponse>, Status> {
|
||||
match self.server.require_id().ok() {
|
||||
Some(id) => Ok(Response::new(GetWriterIdResponse { id })),
|
||||
None => return Err(NotFound::default().into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_writer_id(
|
||||
&self,
|
||||
request: Request<UpdateWriterIdRequest>,
|
||||
) -> Result<Response<UpdateWriterIdResponse>, Status> {
|
||||
self.server.set_id(request.get_ref().id);
|
||||
Ok(Response::new(UpdateWriterIdResponse {}))
|
||||
}
|
||||
|
||||
async fn list_databases(
|
||||
&self,
|
||||
_: Request<ListDatabasesRequest>,
|
||||
) -> Result<Response<ListDatabasesResponse>, Status> {
|
||||
let names = self.server.db_names_sorted().await;
|
||||
Ok(Response::new(ListDatabasesResponse { names }))
|
||||
}
|
||||
|
||||
async fn get_database(
|
||||
&self,
|
||||
request: Request<GetDatabaseRequest>,
|
||||
) -> Result<Response<GetDatabaseResponse>, Status> {
|
||||
let name = DatabaseName::new(request.into_inner().name).field("name")?;
|
||||
|
||||
match self.server.db_rules(&name).await {
|
||||
Some(rules) => Ok(Response::new(GetDatabaseResponse {
|
||||
rules: Some(rules.into()),
|
||||
})),
|
||||
None => {
|
||||
return Err(NotFound {
|
||||
resource_type: "database".to_string(),
|
||||
resource_name: name.to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_database(
|
||||
&self,
|
||||
request: Request<CreateDatabaseRequest>,
|
||||
) -> Result<Response<CreateDatabaseResponse>, Status> {
|
||||
let rules: DatabaseRules = request
|
||||
.into_inner()
|
||||
.rules
|
||||
.ok_or_else(|| FieldViolation::required(""))
|
||||
.and_then(TryInto::try_into)
|
||||
.map_err(|e| e.scope("rules"))?;
|
||||
|
||||
let name =
|
||||
DatabaseName::new(rules.name.clone()).expect("protobuf mapping didn't validate name");
|
||||
|
||||
match self.server.create_database(name, rules).await {
|
||||
Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
|
||||
Err(Error::DatabaseAlreadyExists { db_name }) => {
|
||||
return Err(AlreadyExists {
|
||||
resource_type: "database".to_string(),
|
||||
resource_name: db_name,
|
||||
..Default::default()
|
||||
}
|
||||
.into())
|
||||
}
|
||||
Err(e) => Err(default_error_handler(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_server<M>(
|
||||
server: Arc<Server<M>>,
|
||||
) -> management_service_server::ManagementServiceServer<
|
||||
impl management_service_server::ManagementService,
|
||||
>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
management_service_server::ManagementServiceServer::new(ManagementService { server })
|
||||
}
|
||||
|
|
@ -17,18 +17,24 @@
|
|||
// - Creating a unique org_id per test
|
||||
// - Stopping the server after all relevant tests are run
|
||||
|
||||
use assert_cmd::prelude::*;
|
||||
use data_types::{database_rules::DatabaseRules, names::org_and_bucket_to_database, DatabaseName};
|
||||
use futures::prelude::*;
|
||||
use generated_types::{storage_client::StorageClient, ReadSource, TimestampRange};
|
||||
use prost::Message;
|
||||
use std::convert::TryInto;
|
||||
use std::process::{Child, Command};
|
||||
use std::str;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::u32;
|
||||
|
||||
use assert_cmd::prelude::*;
|
||||
use futures::prelude::*;
|
||||
use prost::Message;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use data_types::{names::org_and_bucket_to_database, DatabaseName};
|
||||
use end_to_end_cases::*;
|
||||
use generated_types::{
|
||||
influxdata::iox::management::v1::DatabaseRules, storage_client::StorageClient, ReadSource,
|
||||
TimestampRange,
|
||||
};
|
||||
|
||||
// These port numbers are chosen to not collide with a development ioxd server
|
||||
// running locally.
|
||||
// TODO(786): allocate random free ports instead of hardcoding.
|
||||
|
|
@ -57,7 +63,6 @@ type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
|||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
mod end_to_end_cases;
|
||||
use end_to_end_cases::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_and_write_data() {
|
||||
|
|
@ -66,7 +71,12 @@ async fn read_and_write_data() {
|
|||
|
||||
let http_client = reqwest::Client::new();
|
||||
let influxdb2 = influxdb2_client::Client::new(HTTP_BASE, TOKEN);
|
||||
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap();
|
||||
let grpc = influxdb_iox_client::connection::Builder::default()
|
||||
.build(GRPC_URL_BASE)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut storage_client = StorageClient::new(grpc.clone());
|
||||
let mut management_client = influxdb_iox_client::management::Client::new(grpc);
|
||||
|
||||
// These tests share data; TODO: a better way to indicate this
|
||||
{
|
||||
|
|
@ -74,19 +84,25 @@ async fn read_and_write_data() {
|
|||
.set_org_id("0000111100001111")
|
||||
.set_bucket_id("1111000011110000");
|
||||
|
||||
create_database(&http_client, &scenario.database_name()).await;
|
||||
create_database(&mut management_client, &scenario.database_name()).await;
|
||||
|
||||
let expected_read_data = load_data(&influxdb2, &scenario).await;
|
||||
let sql_query = "select * from cpu_load_short";
|
||||
|
||||
read_api::test(&http_client, &scenario, sql_query, &expected_read_data).await;
|
||||
grpc_api::test(&mut storage_client, &scenario).await;
|
||||
storage_api::test(&mut storage_client, &scenario).await;
|
||||
flight_api::test(&scenario, sql_query, &expected_read_data).await;
|
||||
}
|
||||
|
||||
// These tests manage their own data
|
||||
grpc_api::read_group_test(&http_client, &influxdb2, &mut storage_client).await;
|
||||
grpc_api::read_window_aggregate_test(&http_client, &influxdb2, &mut storage_client).await;
|
||||
storage_api::read_group_test(&mut management_client, &influxdb2, &mut storage_client).await;
|
||||
storage_api::read_window_aggregate_test(
|
||||
&mut management_client,
|
||||
&influxdb2,
|
||||
&mut storage_client,
|
||||
)
|
||||
.await;
|
||||
management_api::test(&mut management_client).await;
|
||||
test_http_error_messages(&influxdb2).await.unwrap();
|
||||
}
|
||||
|
||||
|
|
@ -178,17 +194,16 @@ impl Scenario {
|
|||
}
|
||||
}
|
||||
|
||||
async fn create_database(client: &reqwest::Client, database_name: &str) {
|
||||
let rules = DatabaseRules::new();
|
||||
let data = serde_json::to_vec(&rules).unwrap();
|
||||
|
||||
async fn create_database(
|
||||
client: &mut influxdb_iox_client::management::Client,
|
||||
database_name: &str,
|
||||
) {
|
||||
client
|
||||
.put(&format!(
|
||||
"{}/iox/api/v1/databases/{}",
|
||||
HTTP_BASE, database_name
|
||||
))
|
||||
.body(data)
|
||||
.send()
|
||||
.create_database(DatabaseRules {
|
||||
name: database_name.to_string(),
|
||||
mutable_buffer_config: Some(Default::default()),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
|
@ -379,16 +394,23 @@ impl TestServer {
|
|||
let mut interval = tokio::time::interval(Duration::from_millis(500));
|
||||
|
||||
loop {
|
||||
match influxdb_iox_client::health::Client::connect(GRPC_URL_BASE).await {
|
||||
Ok(mut client) => {
|
||||
match influxdb_iox_client::connection::Builder::default()
|
||||
.build(GRPC_URL_BASE)
|
||||
.await
|
||||
{
|
||||
Ok(connection) => {
|
||||
println!("Successfully connected to server");
|
||||
|
||||
match client.check_storage().await {
|
||||
let mut health = influxdb_iox_client::health::Client::new(connection);
|
||||
|
||||
match health.check_storage().await {
|
||||
Ok(_) => {
|
||||
println!("Storage service is running");
|
||||
break;
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error checking storage service status: {}", e);
|
||||
}
|
||||
Err(e) => println!("Error checking storage service status: {}", e),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
|
@ -397,22 +419,6 @@ impl TestServer {
|
|||
}
|
||||
interval.tick().await;
|
||||
}
|
||||
|
||||
loop {
|
||||
match StorageClient::connect(GRPC_URL_BASE).await {
|
||||
Ok(storage_client) => {
|
||||
println!(
|
||||
"Successfully connected storage_client: {:?}",
|
||||
storage_client
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Failed to create storage client: {}", e)
|
||||
}
|
||||
}
|
||||
interval.tick().await;
|
||||
}
|
||||
};
|
||||
|
||||
let try_http_connect = async {
|
||||
|
|
|
|||
|
|
@ -1,12 +1,12 @@
|
|||
use crate::{Scenario, GRPC_URL_BASE};
|
||||
use arrow_deps::assert_table_eq;
|
||||
use influxdb_iox_client::FlightClientBuilder;
|
||||
use influxdb_iox_client::{connection::Builder, flight::Client};
|
||||
|
||||
pub async fn test(scenario: &Scenario, sql_query: &str, expected_read_data: &[String]) {
|
||||
let mut client = FlightClientBuilder::default()
|
||||
.build(GRPC_URL_BASE)
|
||||
.await
|
||||
.unwrap();
|
||||
let connection = Builder::default().build(GRPC_URL_BASE).await.unwrap();
|
||||
|
||||
let mut client = Client::new(connection);
|
||||
|
||||
let mut query_results = client
|
||||
.perform_query(scenario.database_name(), sql_query)
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -0,0 +1,159 @@
|
|||
use std::num::NonZeroU32;
|
||||
|
||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||
|
||||
use generated_types::google::protobuf::Empty;
|
||||
use generated_types::{google::protobuf::Duration, influxdata::iox::management::v1::*};
|
||||
use influxdb_iox_client::management::{Client, CreateDatabaseError};
|
||||
|
||||
pub async fn test(client: &mut Client) {
|
||||
test_set_get_writer_id(client).await;
|
||||
test_create_database_duplicate_name(client).await;
|
||||
test_create_database_invalid_name(client).await;
|
||||
test_list_databases(client).await;
|
||||
test_create_get_database(client).await;
|
||||
}
|
||||
|
||||
async fn test_set_get_writer_id(client: &mut Client) {
|
||||
const TEST_ID: u32 = 42;
|
||||
|
||||
client
|
||||
.update_writer_id(NonZeroU32::new(TEST_ID).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
|
||||
let got = client.get_writer_id().await.expect("get ID failed");
|
||||
|
||||
assert_eq!(got.get(), TEST_ID);
|
||||
}
|
||||
|
||||
async fn test_create_database_duplicate_name(client: &mut Client) {
|
||||
let db_name = rand_name();
|
||||
|
||||
client
|
||||
.create_database(DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("create database failed");
|
||||
|
||||
let err = client
|
||||
.create_database(DatabaseRules {
|
||||
name: db_name,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect_err("create database failed");
|
||||
|
||||
assert!(matches!(
|
||||
dbg!(err),
|
||||
CreateDatabaseError::DatabaseAlreadyExists
|
||||
))
|
||||
}
|
||||
|
||||
async fn test_create_database_invalid_name(client: &mut Client) {
|
||||
let err = client
|
||||
.create_database(DatabaseRules {
|
||||
name: "my_example\ndb".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect_err("expected request to fail");
|
||||
|
||||
assert!(matches!(dbg!(err), CreateDatabaseError::InvalidArgument(_)));
|
||||
}
|
||||
|
||||
async fn test_list_databases(client: &mut Client) {
|
||||
let name = rand_name();
|
||||
client
|
||||
.create_database(DatabaseRules {
|
||||
name: name.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("create database failed");
|
||||
|
||||
let names = client
|
||||
.list_databases()
|
||||
.await
|
||||
.expect("list databases failed");
|
||||
assert!(names.contains(&name));
|
||||
}
|
||||
|
||||
async fn test_create_get_database(client: &mut Client) {
|
||||
let db_name = rand_name();
|
||||
|
||||
// Specify everything to allow direct comparison between request and response
|
||||
// Otherwise would expect difference due to server-side defaulting
|
||||
let rules = DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
partition_template: Some(PartitionTemplate {
|
||||
parts: vec![partition_template::Part {
|
||||
part: Some(partition_template::part::Part::Table(Empty {})),
|
||||
}],
|
||||
}),
|
||||
replication_config: Some(ReplicationConfig {
|
||||
replications: vec!["cupcakes".to_string()],
|
||||
replication_count: 3,
|
||||
replication_queue_max_size: 20,
|
||||
}),
|
||||
subscription_config: Some(SubscriptionConfig {
|
||||
subscriptions: vec![subscription_config::Subscription {
|
||||
name: "subscription".to_string(),
|
||||
host_group_id: "hostgroup".to_string(),
|
||||
matcher: Some(Matcher {
|
||||
predicate: "pred".to_string(),
|
||||
table_matcher: Some(matcher::TableMatcher::All(Empty {})),
|
||||
}),
|
||||
}],
|
||||
}),
|
||||
query_config: Some(QueryConfig {
|
||||
query_local: true,
|
||||
primary: Default::default(),
|
||||
secondaries: vec![],
|
||||
read_only_partitions: vec![],
|
||||
}),
|
||||
wal_buffer_config: Some(WalBufferConfig {
|
||||
buffer_size: 24,
|
||||
segment_size: 2,
|
||||
buffer_rollover: wal_buffer_config::Rollover::DropIncoming as _,
|
||||
persist_segments: true,
|
||||
close_segment_after: Some(Duration {
|
||||
seconds: 324,
|
||||
nanos: 2,
|
||||
}),
|
||||
}),
|
||||
mutable_buffer_config: Some(MutableBufferConfig {
|
||||
buffer_size: 553,
|
||||
reject_if_not_persisted: true,
|
||||
partition_drop_order: Some(mutable_buffer_config::PartitionDropOrder {
|
||||
order: Order::Asc as _,
|
||||
sort: Some(
|
||||
mutable_buffer_config::partition_drop_order::Sort::CreatedAtTime(Empty {}),
|
||||
),
|
||||
}),
|
||||
persist_after_cold_seconds: 34,
|
||||
}),
|
||||
};
|
||||
|
||||
client
|
||||
.create_database(rules.clone())
|
||||
.await
|
||||
.expect("create database failed");
|
||||
|
||||
let response = client
|
||||
.get_database(db_name)
|
||||
.await
|
||||
.expect("get database failed");
|
||||
|
||||
assert_eq!(response, rules);
|
||||
}
|
||||
|
||||
fn rand_name() -> String {
|
||||
thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(10)
|
||||
.map(char::from)
|
||||
.collect()
|
||||
}
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
pub mod flight_api;
|
||||
pub mod grpc_api;
|
||||
pub mod management_api;
|
||||
pub mod read_api;
|
||||
pub mod storage_api;
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use generated_types::{
|
|||
MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadGroupRequest,
|
||||
ReadWindowAggregateRequest, Tag, TagKeysRequest, TagValuesRequest, TimestampRange,
|
||||
};
|
||||
use influxdb_iox_client::management;
|
||||
use std::str;
|
||||
use test_helpers::tag_key_bytes_to_strings;
|
||||
use tonic::transport::Channel;
|
||||
|
|
@ -281,7 +282,7 @@ async fn measurement_fields_endpoint(
|
|||
}
|
||||
|
||||
pub async fn read_group_test(
|
||||
client: &reqwest::Client,
|
||||
management: &mut management::Client,
|
||||
influxdb2: &influxdb2_client::Client,
|
||||
storage_client: &mut StorageClient<Channel>,
|
||||
) {
|
||||
|
|
@ -289,7 +290,7 @@ pub async fn read_group_test(
|
|||
.set_org_id("0000111100001110")
|
||||
.set_bucket_id("1111000011110001");
|
||||
|
||||
create_database(&client, &scenario.database_name()).await;
|
||||
create_database(management, &scenario.database_name()).await;
|
||||
|
||||
load_read_group_data(&influxdb2, &scenario).await;
|
||||
|
||||
|
|
@ -534,7 +535,7 @@ async fn test_read_group_last_agg(
|
|||
|
||||
// Standalone test that all the pipes are hooked up for read window aggregate
|
||||
pub async fn read_window_aggregate_test(
|
||||
client: &reqwest::Client,
|
||||
management: &mut management::Client,
|
||||
influxdb2: &influxdb2_client::Client,
|
||||
storage_client: &mut StorageClient<Channel>,
|
||||
) {
|
||||
|
|
@ -543,7 +544,7 @@ pub async fn read_window_aggregate_test(
|
|||
.set_bucket_id("1111000011110011");
|
||||
let read_source = scenario.read_source();
|
||||
|
||||
create_database(&client, &scenario.database_name()).await;
|
||||
create_database(management, &scenario.database_name()).await;
|
||||
|
||||
let line_protocol = vec![
|
||||
"h2o,state=MA,city=Boston temp=70.0 100",
|
||||
Loading…
Reference in New Issue