influxdb/influxdb3_client/src/lib.rs

1613 lines
50 KiB
Rust

use bytes::Bytes;
use hashbrown::HashMap;
use influxdb3_catalog::log::{OrderedCatalogBatch, TriggerSettings};
use iox_query_params::StatementParam;
use reqwest::{
Body, Certificate, IntoUrl, Method, StatusCode,
header::{CONTENT_TYPE, HeaderMap, HeaderValue},
tls::Version,
};
use secrecy::{ExposeSecret, Secret};
use serde::{Serialize, de::DeserializeOwned};
use std::{fmt::Display, num::NonZeroUsize, path::PathBuf, string::FromUtf8Error, time::Duration};
use url::Url;
use influxdb3_types::http::*;
pub use influxdb3_types::write::Precision;
/// Primary error type for the [`Client`]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("base URL error: {0}")]
BaseUrl(#[source] reqwest::Error),
#[error("request URL error: {0}")]
RequestUrl(#[from] url::ParseError),
#[error("failed to read the API response bytes: {0}")]
Bytes(#[source] reqwest::Error),
#[error("failed to serialize the request body: {0}")]
RequestSerialization(#[source] serde_json::Error),
#[error(
"provided parameter ('{name}') could not be converted \
to a statment parameter"
)]
ConvertQueryParam {
name: String,
#[source]
source: iox_query_params::Error,
},
#[error("invalid UTF8 in response: {0}")]
InvalidUtf8(#[from] FromUtf8Error),
#[error("failed to parse JSON response: {0}")]
Json(#[source] reqwest::Error),
#[error("failed to parse plaintext response: {0}")]
Text(#[source] reqwest::Error),
#[error("server responded with error [{code}]: {message}")]
ApiError { code: StatusCode, message: String },
#[error("failed to send {method} {url} request: {source}")]
RequestSend {
method: Method,
url: String,
#[source]
source: reqwest::Error,
},
#[error("failed to build an http client: {0}")]
Builder(#[source] reqwest::Error),
#[error("io error: {0}")]
IO(#[from] std::io::Error),
}
impl Error {
fn request_send(method: Method, url: impl Into<String>, source: reqwest::Error) -> Self {
Self::RequestSend {
method,
url: url.into(),
source,
}
}
}
pub type Result<T> = std::result::Result<T, Error>;
/// The InfluxDB 3 Core Client
///
/// For programmatic access to the HTTP API of InfluxDB 3 Core
#[derive(Debug, Clone)]
pub struct Client {
/// The base URL for making requests to a running InfluxDB 3 Core server
base_url: Url,
/// The `Bearer` token to use for authenticating on each request to the server
auth_token: Option<Secret<String>>,
/// A [`reqwest::Client`] for handling HTTP requests
http_client: reqwest::Client,
}
impl Client {
/// Create a new [`Client`]
pub fn new<U: IntoUrl>(base_url: U, ca_cert: Option<PathBuf>) -> Result<Self> {
let client = reqwest::Client::builder()
.min_tls_version(Version::TLS_1_3)
.use_rustls_tls();
let http_client = if let Some(ca_cert) = ca_cert {
let cert = std::fs::read(&ca_cert)?;
let cert = match ca_cert.extension().and_then(|s| s.to_str()) {
Some("der") => Certificate::from_der(&cert),
Some("pem") | Some(_) | None => Certificate::from_pem(&cert),
}
.map_err(Error::Builder)?;
client
.add_root_certificate(cert)
.build()
.map_err(Error::Builder)?
} else {
client.build().map_err(Error::Builder)?
};
Ok(Self {
base_url: base_url.into_url().map_err(Error::BaseUrl)?,
auth_token: None,
http_client,
})
}
/// Set the `Bearer` token that will be sent with each request to the server
///
/// # Example
/// ```
/// # use influxdb3_client::Client;
/// # use influxdb3_client::Precision;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let token = "secret-token-string";
/// let client = Client::new("http://localhost:8181")?
/// .with_auth_token(token);
/// # Ok(())
/// # }
/// ```
pub fn with_auth_token<S: Into<String>>(mut self, auth_token: S) -> Self {
self.auth_token = Some(Secret::new(auth_token.into()));
self
}
/// Compose a request to the `/api/v3/write_lp` API
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # use influxdb3_client::Precision;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = Client::new("http://localhost:8181")?;
/// client
/// .api_v3_write_lp("db_name")
/// .precision(Precision::Millisecond)
/// .accept_partial(true)
/// .body("cpu,host=s1 usage=0.5")
/// .send()
/// .await
/// .expect("send write_lp request");
/// # Ok(())
/// # }
/// ```
pub fn api_v3_write_lp<S: Into<String>>(&self, db: S) -> WriteRequestBuilder<'_, NoBody> {
WriteRequestBuilder {
client: self,
params: WriteParams {
db: db.into(),
precision: None,
accept_partial: None,
no_sync: None,
},
body: NoBody,
}
}
/// Compose a request to the `/api/v3/query_sql` API
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = Client::new("http://localhost:8181")?;
/// let response_bytes = client
/// .api_v3_query_sql("db_name", "SELECT * FROM foo")
/// .send()
/// .await
/// .expect("send query_sql request");
/// # Ok(())
/// # }
/// ```
pub fn api_v3_query_sql<D: Into<String>, Q: Into<String>>(
&self,
db: D,
query: Q,
) -> QueryRequestBuilder<'_> {
QueryRequestBuilder {
client: self,
kind: QueryKind::Sql,
request: ClientQueryRequest {
database: db.into(),
query_str: query.into(),
format: None,
params: None,
},
}
}
/// Compose a request to the `/api/v3/query_influxql` API
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = Client::new("http://localhost:8181")?;
/// let response_bytes = client
/// .api_v3_query_influxql("db_name", "SELECT * FROM foo")
/// .send()
/// .await
/// .expect("send query_influxql request");
/// # Ok(())
/// # }
/// ```
pub fn api_v3_query_influxql<D: Into<String>, Q: Into<String>>(
&self,
db: D,
query: Q,
) -> QueryRequestBuilder<'_> {
QueryRequestBuilder {
client: self,
kind: QueryKind::InfluxQl,
request: ClientQueryRequest {
database: db.into(),
query_str: query.into(),
format: None,
params: None,
},
}
}
/// Compose a request to the `POST /api/v3/configure/last_cache` API
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = Client::new("http://localhost:8181")?;
/// let resp = client
/// .api_v3_configure_last_cache_create("db_name", "table_name")
/// .ttl(120)
/// .name("cache_name")
/// .count(5)
/// .key_columns(["col1", "col2"])
/// .send()
/// .await
/// .expect("send create last cache request");
/// # Ok(())
/// # }
/// ```
pub fn api_v3_configure_last_cache_create(
&self,
db: impl Into<String>,
table: impl Into<String>,
) -> CreateLastCacheRequestBuilder<'_> {
CreateLastCacheRequestBuilder::new(self, db, table)
}
/// Make a request to the `DELETE /api/v3/configure/last_cache` API
pub async fn api_v3_configure_last_cache_delete(
&self,
db: impl Into<String> + Send,
table: impl Into<String> + Send,
name: impl Into<String> + Send,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::DELETE,
"/api/v3/configure/last_cache",
Some(LastCacheDeleteRequest {
db: db.into(),
table: table.into(),
name: name.into(),
}),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Compose a request to the `POST /api/v3/configure/distinct_cache` API
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # use std::num::NonZeroUsize;
/// # use std::time::Duration;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = Client::new("http://localhost:8181")?;
/// let resp = client
/// .api_v3_configure_distinct_cache_create("db_name", "table_name", ["col1", "col2"])
/// .name("cache_name")
/// .max_cardinality(NonZeroUsize::new(1_000).unwrap())
/// .max_age(Duration::from_secs(3_600))
/// .send()
/// .await
/// .expect("send create distinct cache request");
/// # Ok(())
/// # }
/// ```
pub fn api_v3_configure_distinct_cache_create(
&self,
db: impl Into<String>,
table: impl Into<String>,
columns: impl IntoIterator<Item: Into<String>>,
) -> CreateDistinctCacheRequestBuilder<'_> {
CreateDistinctCacheRequestBuilder::new(self, db, table, columns)
}
/// Make a request to the `DELETE /api/v3/configure/distinct_cache` API
pub async fn api_v3_configure_distinct_cache_delete(
&self,
db: impl Into<String> + Send,
table: impl Into<String> + Send,
name: impl Into<String> + Send,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::DELETE,
"/api/v3/configure/distinct_cache",
Some(DistinctCacheDeleteRequest {
db: db.into(),
table: table.into(),
name: name.into(),
}),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Compose a request to the `GET /api/v3/configure/database` API
pub fn api_v3_configure_db_show(&self) -> ShowDatabasesRequestBuilder<'_> {
ShowDatabasesRequestBuilder {
client: self,
request: ShowDatabasesRequest {
show_deleted: false,
format: QueryFormat::Json,
},
}
}
/// Make a request to the `POST /api/v3/configure/database` API
pub async fn api_v3_configure_db_create(&self, db: impl Into<String> + Send) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::POST,
"/api/v3/configure/database",
Some(CreateDatabaseRequest { db: db.into() }),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Make a request to the `DELETE /api/v3/configure/database?db=foo` API
pub async fn api_v3_configure_db_delete(&self, db: impl AsRef<str> + Send) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::DELETE,
"/api/v3/configure/database",
None::<()>,
Some(DeleteDatabaseRequest {
db: db.as_ref().to_string(),
}),
None,
)
.await?;
Ok(())
}
/// Make a request to the `DELETE /api/v3/configure/table?db=foo&table=bar` API
pub async fn api_v3_configure_table_delete<T: AsRef<str> + Send>(
&self,
db: T,
table: T,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::DELETE,
"/api/v3/configure/table",
None::<()>,
Some(DeleteTableRequest {
db: db.as_ref().to_string(),
table: table.as_ref().to_string(),
}),
None,
)
.await?;
Ok(())
}
/// Make a request to the `POST /api/v3/configure/table` API
pub async fn api_v3_configure_table_create(
&self,
db: impl Into<String> + Send,
table: impl Into<String> + Send,
tags: Vec<impl Into<String> + Send>,
fields: Vec<(impl Into<String> + Send, impl Into<FieldType> + Send)>,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::POST,
"/api/v3/configure/table",
Some(CreateTableRequest {
db: db.into(),
table: table.into(),
tags: tags.into_iter().map(Into::into).collect(),
fields: fields
.into_iter()
.map(|(name, r#type)| CreateTableField {
name: name.into(),
r#type: r#type.into(),
})
.collect(),
}),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Make a request to the `POST /api/v3/configure/processing_engine_plugin` API
pub async fn api_v3_configure_processing_engine_plugin_create(
&self,
db: impl Into<String> + Send,
plugin_name: impl Into<String> + Send,
file_name: impl Into<String> + Send,
plugin_type: impl Into<String> + Send,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::POST,
"/api/v3/configure/processing_engine_plugin",
Some(ProcessingEnginePluginCreateRequest {
db: db.into(),
plugin_name: plugin_name.into(),
file_name: file_name.into(),
plugin_type: plugin_type.into(),
}),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Make a request to the `DELETE /api/v3/configure/processing_engine_plugin` API
pub async fn api_v3_configure_processing_engine_plugin_delete(
&self,
db: impl Into<String> + Send,
plugin_name: impl Into<String> + Send,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::DELETE,
"/api/v3/configure/processing_engine_plugin",
Some(ProcessingEnginePluginDeleteRequest {
db: db.into(),
plugin_name: plugin_name.into(),
}),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Make a request to `POST /api/v3/configure/processing_engine_trigger`
#[allow(clippy::too_many_arguments)]
pub async fn api_v3_configure_processing_engine_trigger_create(
&self,
db: impl Into<String> + Send,
trigger_name: impl Into<String> + Send,
plugin_filename: impl Into<String> + Send,
trigger_spec: impl Into<String> + Send,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
trigger_settings: TriggerSettings,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::POST,
"/api/v3/configure/processing_engine_trigger",
Some(ProcessingEngineTriggerCreateRequest {
db: db.into(),
trigger_name: trigger_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
trigger_settings,
trigger_arguments,
disabled,
}),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Make a request to `DELETE /api/v3/configure/processing_engine_trigger`
pub async fn api_v3_configure_processing_engine_trigger_delete(
&self,
db: impl Into<String> + Send,
trigger_name: impl Into<String> + Send,
force: bool,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::DELETE,
"/api/v3/configure/processing_engine_trigger",
Some(ProcessingEngineTriggerDeleteRequest {
db: db.into(),
trigger_name: trigger_name.into(),
force,
}),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Make a request to `POST /api/v3/configure/processing_engine_trigger/enable`
pub async fn api_v3_configure_processing_engine_trigger_enable(
&self,
db: impl Into<String> + Send,
trigger_name: impl Into<String> + Send,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::POST,
"/api/v3/configure/processing_engine_trigger/enable",
None::<()>,
Some(ProcessingEngineTriggerIdentifier {
db: db.into(),
trigger_name: trigger_name.into(),
}),
None,
)
.await?;
Ok(())
}
/// Make a request to `POST /api/v3/configure/plugin_environment/install_packages`
pub async fn api_v3_configure_plugin_environment_install_packages(
&self,
packages: Vec<String>,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::POST,
"/api/v3/configure/plugin_environment/install_packages",
Some(ProcessingEngineInstallPackagesRequest { packages }),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Make a request to `POST /api/v3/configure/plugin_environment/install_requirements`
pub async fn api_v3_configure_processing_engine_trigger_install_requirements(
&self,
requirements_location: impl Into<String> + Send,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::POST,
"/api/v3/configure/plugin_environment/install_requirements",
Some(ProcessingEngineInstallRequirementsRequest {
requirements_location: requirements_location.into(),
}),
None::<()>,
None,
)
.await?;
Ok(())
}
/// Make a request to `POST /api/v3/configure/processing_engine_trigger/disable`
pub async fn api_v3_configure_processing_engine_trigger_disable(
&self,
db: impl Into<String> + Send,
trigger_name: impl Into<String> + Send,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::POST,
"/api/v3/configure/processing_engine_trigger/disable",
None::<()>,
Some(ProcessingEngineTriggerIdentifier {
db: db.into(),
trigger_name: trigger_name.into(),
}),
None,
)
.await?;
Ok(())
}
/// Make a request to the `POST /api/v3/plugin_test/wal` API
pub async fn wal_plugin_test(
&self,
wal_plugin_test_request: WalPluginTestRequest,
) -> Result<WalPluginTestResponse> {
self.send_json(
Method::POST,
"/api/v3/plugin_test/wal",
Some(wal_plugin_test_request),
None::<()>,
)
.await
}
/// Make a request to the `POST /api/v3/plugin_test/schedule` API
pub async fn schedule_plugin_test(
&self,
schedule_plugin_test_request: SchedulePluginTestRequest,
) -> Result<SchedulePluginTestResponse> {
self.send_json(
Method::POST,
"/api/v3/plugin_test/schedule",
Some(schedule_plugin_test_request),
None::<()>,
)
.await
}
/// Send a `/ping` request to the target `influxdb3` server to check its
/// status and gather `version` and `revision` information
pub async fn ping(&self) -> Result<PingResponse> {
let url = self.base_url.join("/ping")?;
let mut req = self.http_client.get(url);
if let Some(t) = &self.auth_token {
req = req.bearer_auth(t.expose_secret());
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(Method::GET, "/ping", src))?;
if resp.status().is_success() {
resp.json().await.map_err(Error::Json)
} else {
Err(Error::ApiError {
code: resp.status(),
message: resp.text().await.map_err(Error::Text)?,
})
}
}
/// Create an admin token
pub async fn api_v3_configure_create_admin_token(
&self,
) -> Result<Option<CreateTokenWithPermissionsResponse>> {
let response_json: Result<Option<CreateTokenWithPermissionsResponse>> = self
.send_create(
Method::POST,
"/api/v3/configure/token/admin",
None::<()>,
None::<()>,
)
.await;
response_json
}
/// Create "named" admin tokens
pub async fn api_v3_configure_create_named_admin_token(
&self,
token_name: impl Into<String> + Send,
expiry_secs: Option<u64>,
) -> Result<Option<CreateTokenWithPermissionsResponse>> {
let response_json: Result<Option<CreateTokenWithPermissionsResponse>> = self
.send_create(
Method::POST,
"/api/v3/configure/token/named_admin",
Some(CreateNamedAdminTokenRequest {
token_name: token_name.into(),
expiry_secs,
}),
None::<()>,
)
.await;
response_json
}
/// regenerate admin token
pub async fn api_v3_configure_regenerate_admin_token(
&self,
) -> Result<Option<CreateTokenWithPermissionsResponse>> {
let response_json: Result<Option<CreateTokenWithPermissionsResponse>> = self
.send_create(
Method::POST,
"/api/v3/configure/token/admin/regenerate",
None::<()>,
None::<()>,
)
.await;
response_json
}
/// Delete token `DELETE /api/v3/configure/token?token_name=foo` API
pub async fn api_v3_configure_token_delete(
&self,
token_name: impl AsRef<str> + Send,
) -> Result<()> {
let _bytes = self
.send_json_get_bytes(
Method::DELETE,
"/api/v3/configure/token",
None::<()>,
Some(TokenDeleteRequest {
token_name: token_name.as_ref().to_owned(),
}),
None,
)
.await?;
Ok(())
}
/// Serialize the given `B` to json then send the request and return the resulting bytes.
async fn send_json_get_bytes<B, Q>(
&self,
method: Method,
url_path: &str,
body: Option<B>,
query: Option<Q>,
mut headers: Option<HeaderMap>,
) -> Result<Bytes>
where
B: Serialize + Send + Sync,
Q: Serialize + Send + Sync,
{
let b = body
.map(|body| serde_json::to_string(&body))
.transpose()
.map_err(Error::RequestSerialization)?
.map(Into::into);
let hs = headers.get_or_insert_default();
hs.insert(
CONTENT_TYPE,
HeaderValue::from_str("application/json").unwrap(),
);
self.send_get_bytes(method, url_path, b, query, headers)
.await
}
/// Send an HTTP request with the specified parameters, return the bytes read from the response
/// body.
async fn send_get_bytes<Q>(
&self,
method: Method,
url_path: &str,
body: Option<Body>,
query: Option<Q>,
headers: Option<HeaderMap>,
) -> Result<Bytes>
where
Q: Serialize + Send + Sync,
{
let url = self.base_url.join(url_path)?;
let mut req = self.http_client.request(method.clone(), url.clone());
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
if let Some(body) = body {
req = req.body(body);
}
if let Some(query) = query {
req = req.query(&query);
}
if let Some(headers) = headers {
req = req.headers(headers);
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(method, url, src))?;
let status = resp.status();
let content = resp.bytes().await.map_err(Error::Bytes)?;
match status {
s if s.is_success() => Ok(content),
code => Err(Error::ApiError {
code,
message: String::from_utf8(content.to_vec()).map_err(Error::InvalidUtf8)?,
}),
}
}
/// Send an HTTP request and return `Some(O)` if the response status is HTTP 201 Created.
async fn send_create<B, Q, O>(
&self,
method: Method,
url_path: &str,
body: Option<B>,
query: Option<Q>,
) -> Result<Option<O>>
where
B: Serialize + Send + Sync,
Q: Serialize + Send + Sync,
O: DeserializeOwned + Send + Sync,
{
let url = self.base_url.join(url_path)?;
let mut req = self.http_client.request(method.clone(), url.clone());
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
if let Some(body) = body {
req = req.json(&body);
}
if let Some(query) = query {
req = req.query(&query);
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(method, url, src))?;
let status = resp.status();
match status {
StatusCode::CREATED => {
let content = resp.json::<O>().await.map_err(Error::Json)?;
Ok(Some(content))
}
StatusCode::NO_CONTENT => Ok(None),
code => Err(Error::ApiError {
code,
message: resp.text().await.map_err(Error::Text)?,
}),
}
}
/// Send an HTTP request and return `O` on success.
async fn send_json<B, Q, O>(
&self,
method: Method,
url_path: &str,
body: Option<B>,
query: Option<Q>,
) -> Result<O>
where
B: Serialize + Send + Sync,
Q: Serialize + Send + Sync,
O: DeserializeOwned + Send + Sync,
{
let url = self.base_url.join(url_path)?;
let mut req = self.http_client.request(method.clone(), url.clone());
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
if let Some(body) = body {
req = req.json(&body);
}
if let Some(query) = query {
req = req.query(&query);
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(method, url, src))?;
let status = resp.status();
if status.is_success() {
resp.json().await.map_err(Error::Json)
} else {
Err(Error::ApiError {
code: resp.status(),
message: resp.text().await.map_err(Error::Text)?,
})
}
}
}
/// Builder type for composing a request to `/api/v3/write_lp`
///
/// Produced by [`Client::api_v3_write_lp`]
#[derive(Debug)]
pub struct WriteRequestBuilder<'c, B> {
client: &'c Client,
params: WriteParams,
body: B,
}
impl<B> WriteRequestBuilder<'_, B> {
/// Set the precision
pub fn precision(mut self, set_to: Precision) -> Self {
self.params.precision = Some(set_to);
self
}
/// Set the `accept_partial` parameter
pub fn accept_partial(mut self, set_to: bool) -> Self {
self.params.accept_partial = Some(set_to);
self
}
}
impl<'c> WriteRequestBuilder<'c, NoBody> {
/// Set the body of the request to the `/api/v3/write_lp` API
///
/// This essentially wraps `reqwest`'s [`body`][reqwest::RequestBuilder::body]
/// method, and puts the responsibility on the caller for now.
pub fn body<T: Into<Body>>(self, body: T) -> WriteRequestBuilder<'c, Body> {
WriteRequestBuilder {
client: self.client,
params: self.params,
body: body.into(),
}
}
}
impl WriteRequestBuilder<'_, Body> {
/// Send the request to the server
pub async fn send(self) -> Result<()> {
// ignore the returned value since we don't expect a response body
let _bytes = self
.client
.send_get_bytes(
Method::POST,
"/api/v3/write_lp",
Some(self.body),
Some(self.params),
None,
)
.await?;
Ok(())
}
}
#[doc(hidden)]
/// Typestate type for [`WriteRequestBuilder`]
#[derive(Debug, Copy, Clone)]
pub struct NoBody;
/// Used to compose a request to the `/api/v3/query_sql` API
///
/// Produced by [`Client::api_v3_query_sql`] method.
#[derive(Debug)]
pub struct QueryRequestBuilder<'c> {
client: &'c Client,
kind: QueryKind,
request: ClientQueryRequest,
}
// TODO - for now the send method just returns the bytes from the response.
// It may be nicer to have the format parameter dictate how we return from
// send, e.g., using types more specific to the format selected.
impl QueryRequestBuilder<'_> {
/// Specify the format, `json`, `csv`, `pretty`, or `parquet`
pub fn format(mut self, format: QueryFormat) -> Self {
self.request.format = Some(format);
self
}
/// Set a query parameter value with the given `name`
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = Client::new("http://localhost:8181")?;
/// let response_bytes = client
/// .api_v3_query_sql("db_name", "SELECT * FROM foo WHERE bar = $bar AND baz > $baz")
/// .with_param("bar", "bop")
/// .with_param("baz", 0.5)
/// .send()
/// .await
/// .expect("send query_sql request");
/// # Ok(())
/// # }
/// ```
pub fn with_param<S: Into<String>, P: Into<StatementParam>>(
mut self,
name: S,
param: P,
) -> Self {
self.request
.params
.get_or_insert_with(Default::default)
.insert(name.into(), param.into());
self
}
/// Set a query parameters from the given collection of pairs
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// use serde_json::json;
/// use std::collections::HashMap;
///
/// let client = Client::new("http://localhost:8181")?;
/// let response_bytes = client
/// .api_v3_query_sql("db_name", "SELECT * FROM foo WHERE bar = $bar AND foo > $foo")
/// .with_params_from([
/// ("bar", json!(false)),
/// ("foo", json!(10)),
/// ])?
/// .send()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn with_params_from<S, P, C>(mut self, params: C) -> Result<Self>
where
S: Into<String>,
P: TryInto<StatementParam, Error = iox_query_params::Error>,
C: IntoIterator<Item = (S, P)>,
{
for (name, param) in params.into_iter() {
let name = name.into();
let param = param
.try_into()
.map_err(|source| Error::ConvertQueryParam {
name: name.clone(),
source,
})?;
self.request
.params
.get_or_insert_with(Default::default)
.insert(name, param);
}
Ok(self)
}
/// Try to set a query parameter value with the given `name`
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// use serde_json::json;
///
/// let client = Client::new("http://localhost:8181")?;
/// let response_bytes = client
/// .api_v3_query_sql("db_name", "SELECT * FROM foo WHERE bar = $bar AND baz > $baz")
/// .with_try_param("bar", json!("baz"))?
/// .send()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn with_try_param<S, P>(mut self, name: S, param: P) -> Result<Self>
where
S: Into<String>,
P: TryInto<StatementParam, Error = iox_query_params::Error>,
{
let name = name.into();
let param = param
.try_into()
.map_err(|source| Error::ConvertQueryParam {
name: name.clone(),
source,
})?;
self.request
.params
.get_or_insert_with(Default::default)
.insert(name, param);
Ok(self)
}
/// Send the request to `/api/v3/query_sql` or `/api/v3/query_influxql`
pub async fn send(self) -> Result<Bytes> {
let url = match self.kind {
QueryKind::Sql => "/api/v3/query_sql",
QueryKind::InfluxQl => "/api/v3/query_influxql",
};
self.client
.send_json_get_bytes(Method::POST, url, Some(self.request), None::<()>, None)
.await
}
}
/// The type of query, SQL or InfluxQL
#[derive(Debug, Copy, Clone)]
pub enum QueryKind {
Sql,
InfluxQl,
}
impl Display for QueryKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryKind::Sql => write!(f, "sql"),
QueryKind::InfluxQl => write!(f, "influxql"),
}
}
}
#[derive(Debug)]
pub struct ShowDatabasesRequestBuilder<'c> {
client: &'c Client,
request: ShowDatabasesRequest,
}
impl ShowDatabasesRequestBuilder<'_> {
/// Specify whether or not to show deleted databases in the output
pub fn with_show_deleted(mut self, show_deleted: bool) -> Self {
self.request.show_deleted = show_deleted;
self
}
/// Specify the [`QueryFormat`] of the returned `Bytes`
pub fn with_format(mut self, format: QueryFormat) -> Self {
self.request.format = format;
self
}
/// Send the request, returning the raw [`Bytes`] in the response from the server
pub async fn send(self) -> Result<Bytes> {
let url = "/api/v3/configure/database";
self.client
.send_json_get_bytes(Method::GET, url, None::<()>, Some(self.request), None)
.await
}
}
#[derive(Debug, Serialize)]
pub struct CreateLastCacheRequestBuilder<'c> {
#[serde(skip_serializing)]
client: &'c Client,
request: LastCacheCreateRequest,
}
impl<'c> CreateLastCacheRequestBuilder<'c> {
/// Create a new [`CreateLastCacheRequestBuilder`]
fn new(client: &'c Client, db: impl Into<String>, table: impl Into<String>) -> Self {
Self {
client,
request: LastCacheCreateRequest {
db: db.into(),
table: table.into(),
name: None,
key_columns: None,
value_columns: None,
count: Default::default(),
ttl: Default::default(),
},
}
}
/// Specify a cache name
pub fn name(mut self, name: impl Into<String>) -> Self {
self.request.name = Some(name.into());
self
}
/// Speciffy the key columns for the cache
pub fn key_columns(mut self, column_names: impl IntoIterator<Item: Into<String>>) -> Self {
self.request.key_columns = Some(column_names.into_iter().map(Into::into).collect());
self
}
/// Specify the value columns for the cache
pub fn value_columns(mut self, column_names: impl IntoIterator<Item: Into<String>>) -> Self {
self.request.value_columns = Some(column_names.into_iter().map(Into::into).collect());
self
}
/// Specify the size, or number of new entries a cache will hold before evicting old ones
pub fn count(mut self, count: LastCacheSize) -> Self {
self.request.count = count;
self
}
/// Specify the time-to-live (TTL) in seconds for entries in the cache
pub fn ttl(mut self, ttl: LastCacheTtl) -> Self {
self.request.ttl = ttl;
self
}
/// Send the request to `POST /api/v3/configure/last_cache`
pub async fn send(self) -> Result<Option<OrderedCatalogBatch>> {
self.client
.send_create(
Method::POST,
"/api/v3/configure/last_cache",
Some(self.request),
None::<()>,
)
.await
}
}
/// Type for composing requests to the `POST /api/v3/configure/distinct_cache` API created by the
/// [`Client::api_v3_configure_distinct_cache_create`] method
#[derive(Debug)]
pub struct CreateDistinctCacheRequestBuilder<'c> {
client: &'c Client,
request: DistinctCacheCreateRequest,
}
impl<'c> CreateDistinctCacheRequestBuilder<'c> {
fn new(
client: &'c Client,
db: impl Into<String>,
table: impl Into<String>,
columns: impl IntoIterator<Item: Into<String>>,
) -> Self {
Self {
client,
request: DistinctCacheCreateRequest {
db: db.into(),
table: table.into(),
columns: columns.into_iter().map(Into::into).collect(),
name: None,
max_cardinality: Default::default(),
max_age: Default::default(),
},
}
}
/// Specify the name of the cache to be created, `snake_case` names are encouraged
pub fn name(mut self, name: impl Into<String>) -> Self {
self.request.name = Some(name.into());
self
}
/// Specify the maximum cardinality for the cache as a non-zero unsigned integer
pub fn max_cardinality(mut self, max_cardinality: NonZeroUsize) -> Self {
self.request.max_cardinality = max_cardinality.into();
self
}
/// Specify the maximum age for entries in the cache
pub fn max_age(mut self, max_age: Duration) -> Self {
self.request.max_age = max_age.into();
self
}
/// Send the create cache request
pub async fn send(self) -> Result<Option<OrderedCatalogBatch>> {
self.client
.send_create(
Method::POST,
"/api/v3/configure/distinct_cache",
Some(self.request),
None::<()>,
)
.await
}
}
#[cfg(test)]
mod tests {
use influxdb3_types::http::{LastCacheSize, LastCacheTtl};
use mockito::{Matcher, Server};
use serde_json::json;
use crate::{Client, Precision, QueryFormat};
#[tokio::test]
async fn api_v3_write_lp() {
let token = "super-secret-token";
let db = "stats";
let body = "\
cpu,host=s1 usage=0.5
cpu,host=s1,region=us-west usage=0.7";
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("POST", "/api/v3/write_lp")
.match_header("Authorization", format!("Bearer {token}").as_str())
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("precision".into(), "millisecond".into()),
Matcher::UrlEncoded("db".into(), db.into()),
Matcher::UrlEncoded("accept_partial".into(), "true".into()),
]))
.match_body(body)
.create_async()
.await;
let client = Client::new(mock_server.url(), None)
.expect("create client")
.with_auth_token(token);
client
.api_v3_write_lp(db)
.precision(Precision::Millisecond)
.accept_partial(true)
.body(body)
.send()
.await
.expect("send write_lp request");
mock.assert_async().await;
}
#[tokio::test]
async fn api_v3_query_sql() {
let token = "super-secret-token";
let db = "stats";
let query = "SELECT * FROM foo";
let body = r#"[{"host": "foo", "time": "1990-07-23T06:00:00:000", "val": 1}]"#;
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("POST", "/api/v3/query_sql")
.match_header("Authorization", format!("Bearer {token}").as_str())
.match_body(Matcher::Json(serde_json::json!({
"db": db,
"q": query,
"format": "json",
"params": null,
})))
.with_status(200)
// TODO - could add content-type header but that may be too brittle
// at the moment
// - this will be JSON Lines at some point
.with_body(body)
.create_async()
.await;
let client = Client::new(mock_server.url(), None)
.expect("create client")
.with_auth_token(token);
let r = client
.api_v3_query_sql(db, query)
.format(QueryFormat::Json)
.send()
.await
.expect("send request to server");
assert_eq!(&r, body);
mock.assert_async().await;
}
#[tokio::test]
async fn api_v3_query_sql_params() {
let db = "stats";
let query = "SELECT * FROM foo WHERE bar = $bar";
let body = r#"[{"host": "foo", "time": "1990-07-23T06:00:00:000", "val": 1}]"#;
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("POST", "/api/v3/query_sql")
.match_body(Matcher::Json(serde_json::json!({
"db": db,
"q": query,
"params": {
"bar": "baz",
"baz": false,
},
"format": null
})))
.with_status(200)
.with_body(body)
.create_async()
.await;
let client = Client::new(mock_server.url(), None).expect("create client");
let r = client
.api_v3_query_sql(db, query)
.with_param("bar", "baz")
.with_param("baz", false)
.send()
.await;
mock.assert_async().await;
r.expect("sent request successfully");
}
#[tokio::test]
async fn api_v3_query_influxql() {
let db = "stats";
let query = "SELECT * FROM foo";
let body = r#"[{"host": "foo", "time": "1990-07-23T06:00:00:000", "val": 1}]"#;
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("POST", "/api/v3/query_influxql")
.match_body(Matcher::Json(serde_json::json!({
"db": db,
"q": query,
"format": "json",
"params": null,
})))
.with_status(200)
.with_body(body)
.create_async()
.await;
let client = Client::new(mock_server.url(), None).expect("create client");
let r = client
.api_v3_query_influxql(db, query)
.format(QueryFormat::Json)
.send()
.await
.expect("send request to server");
assert_eq!(&r, body);
mock.assert_async().await;
}
#[tokio::test]
async fn api_v3_query_influxql_params() {
let db = "stats";
let query = "SELECT * FROM foo WHERE a = $a AND b < $b AND c > $c AND d = $d";
let body = r#"[{"host": "foo", "time": "1990-07-23T06:00:00:000", "val": 1}]"#;
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("POST", "/api/v3/query_influxql")
.match_body(Matcher::Json(serde_json::json!({
"db": db,
"q": query,
"params": {
"a": "bar",
"b": 123,
"c": 1.5,
"d": false
},
"format": null
})))
.with_status(200)
.with_body(body)
.create_async()
.await;
let client = Client::new(mock_server.url(), None).expect("create client");
let mut builder = client.api_v3_query_influxql(db, query);
for (name, value) in [
("a", json!("bar")),
("b", json!(123)),
("c", json!(1.5)),
("d", json!(false)),
] {
builder = builder.with_try_param(name, value).unwrap();
}
let r = builder.send().await;
mock.assert_async().await;
r.expect("sent request successfully");
}
#[tokio::test]
async fn api_v3_query_influxql_with_params_from() {
let db = "stats";
let query = "SELECT * FROM foo WHERE a = $a AND b < $b AND c > $c AND d = $d";
let body = r#"[{"host": "foo", "time": "1990-07-23T06:00:00:000", "val": 1}]"#;
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("POST", "/api/v3/query_influxql")
.match_body(Matcher::Json(serde_json::json!({
"db": db,
"q": query,
"params": {
"a": "bar",
"b": 123,
"c": 1.5,
"d": false
},
"format": null
})))
.with_status(200)
.with_body(body)
.create_async()
.await;
let client = Client::new(mock_server.url(), None).expect("create client");
let r = client
.api_v3_query_influxql(db, query)
.with_params_from([
("a", json!("bar")),
("b", json!(123)),
("c", json!(1.5)),
("d", json!(false)),
])
.unwrap()
.send()
.await;
mock.assert_async().await;
r.expect("sent request successfully");
}
// NOTE(trevor): these tests are flaky since we need to fabricate the mock response, considering
// removing them in favour of integration tests that use the actual APIs
#[tokio::test]
#[ignore]
async fn api_v3_configure_last_cache_create_201() {
let db = "db";
let table = "table";
let name = "cache_name";
let key_columns = ["col1", "col2"];
let val_columns = vec!["col3", "col4"];
let ttl = LastCacheTtl::from_secs(120);
let count = LastCacheSize::new(5).unwrap();
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("POST", "/api/v3/configure/last_cache")
.match_body(Matcher::Json(serde_json::json!({
"db": db,
"table": table,
"name": name,
"key_columns": key_columns,
"value_columns": val_columns,
"count": count,
"ttl": ttl,
})))
.with_status(201)
.with_body(
r#"{
"table": "table",
"name": "cache_name",
"key_columns": [0, 1],
"value_columns": {
"explicit": {
"columns": [2, 3]
}
},
"ttl": 120,
"count": 5
}"#,
)
.create_async()
.await;
let client = Client::new(mock_server.url(), None).unwrap();
client
.api_v3_configure_last_cache_create(db, table)
.name(name)
.key_columns(key_columns)
.value_columns(val_columns)
.ttl(ttl)
.count(count)
.send()
.await
.expect("creates last cache and parses response");
mock.assert_async().await;
}
#[tokio::test]
async fn api_v3_configure_last_cache_create_204() {
let db = "db";
let table = "table";
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("POST", "/api/v3/configure/last_cache")
.match_body(Matcher::Json(serde_json::json!({
"db": db,
"table": table,
"ttl": 14400,
"count": 1,
})))
.with_status(204)
.create_async()
.await;
let client = Client::new(mock_server.url(), None).unwrap();
let resp = client
.api_v3_configure_last_cache_create(db, table)
.send()
.await
.unwrap();
mock.assert_async().await;
assert!(resp.is_none());
}
#[tokio::test]
async fn api_v3_configure_last_cache_delete() {
let db = "db";
let table = "table";
let name = "cache_name";
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("DELETE", "/api/v3/configure/last_cache")
.match_body(Matcher::Json(serde_json::json!({
"db": db,
"table": table,
"name": name,
})))
.with_status(200)
.create_async()
.await;
let client = Client::new(mock_server.url(), None).unwrap();
client
.api_v3_configure_last_cache_delete(db, table, name)
.await
.unwrap();
mock.assert_async().await;
}
}