diff --git a/Cargo.lock b/Cargo.lock index 2262adf918..4b7fca6646 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2969,6 +2969,7 @@ dependencies = [ "humantime", "influxdb3_client", "influxdb3_process", + "influxdb3_types", "observability_deps", "parking_lot", "rand", @@ -3200,6 +3201,7 @@ dependencies = [ "hyper 0.14.32", "influxdb3_cache", "iox_http", + "iox_query_params", "serde", "thiserror 1.0.69", ] diff --git a/influxdb3/src/commands/common.rs b/influxdb3/src/commands/common.rs index 1937814080..b05c0f002d 100644 --- a/influxdb3/src/commands/common.rs +++ b/influxdb3/src/commands/common.rs @@ -42,7 +42,7 @@ impl Format { } } -impl From for influxdb3_client::Format { +impl From for influxdb3_types::http::QueryFormat { fn from(this: Format) -> Self { match this { Format::Pretty => Self::Pretty, diff --git a/influxdb3/tests/server/client.rs b/influxdb3/tests/server/client.rs index 68060995eb..822483bd50 100644 --- a/influxdb3/tests/server/client.rs +++ b/influxdb3/tests/server/client.rs @@ -2,8 +2,8 @@ //! //! This is useful for verifying that the client can parse API responses from the server -use influxdb3_client::{Format, Precision}; -use influxdb3_types::http::LastCacheCreatedResponse; +use influxdb3_client::Precision; +use influxdb3_types::http::{LastCacheCreatedResponse, QueryFormat as Format}; use crate::server::TestServer; diff --git a/influxdb3_cache/src/distinct_cache/cache.rs b/influxdb3_cache/src/distinct_cache/cache.rs index 62c8793aaa..e946d51a64 100644 --- a/influxdb3_cache/src/distinct_cache/cache.rs +++ b/influxdb3_cache/src/distinct_cache/cache.rs @@ -17,7 +17,7 @@ use influxdb3_id::{ColumnId, TableId}; use influxdb3_wal::{DistinctCacheDefinition, FieldData, Row}; use iox_time::TimeProvider; use schema::{InfluxColumnType, InfluxFieldType}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; #[derive(Debug, thiserror::Error)] pub enum CacheError { @@ -68,7 +68,7 @@ pub struct CreateDistinctCacheArgs { pub column_ids: Vec, } -#[derive(Debug, Clone, Copy, Deserialize)] +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] pub struct MaxCardinality(NonZeroUsize); impl TryFrom for MaxCardinality { @@ -89,6 +89,12 @@ impl Default for MaxCardinality { } } +impl From for MaxCardinality { + fn from(v: NonZeroUsize) -> Self { + Self(v) + } +} + impl From for usize { fn from(value: MaxCardinality) -> Self { value.0.into() diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 10e1e07a09..07858b47a6 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -1,9 +1,12 @@ use bytes::Bytes; use hashbrown::HashMap; use iox_query_params::StatementParam; -use reqwest::{Body, IntoUrl, Method, StatusCode}; +use reqwest::{ + header::{HeaderMap, HeaderValue, CONTENT_TYPE}, + Body, IntoUrl, Method, StatusCode, +}; use secrecy::{ExposeSecret, Secret}; -use serde::Serialize; +use serde::{de::DeserializeOwned, Serialize}; use std::{fmt::Display, num::NonZeroUsize, string::FromUtf8Error, time::Duration}; use url::Url; @@ -22,6 +25,9 @@ pub enum Error { #[error("failed to read the API response bytes: {0}")] Bytes(#[source] reqwest::Error), + #[error("failed to serialize the request body: {0}")] + RequestSerialization(#[source] serde_json::Error), + #[error( "provided parameter ('{name}') could not be converted \ to a statment parameter" @@ -130,9 +136,12 @@ impl Client { pub fn api_v3_write_lp>(&self, db: S) -> WriteRequestBuilder<'_, NoBody> { WriteRequestBuilder { client: self, - db: db.into(), - precision: None, - accept_partial: None, + params: WriteParams { + db: db.into(), + precision: None, + accept_partial: None, + no_sync: None, + }, body: NoBody, } } @@ -161,10 +170,12 @@ impl Client { QueryRequestBuilder { client: self, kind: QueryKind::Sql, - db: db.into(), - query: query.into(), - format: None, - params: None, + request: ClientQueryRequest { + database: db.into(), + query_str: query.into(), + format: None, + params: None, + }, } } @@ -192,10 +203,12 @@ impl Client { QueryRequestBuilder { client: self, kind: QueryKind::InfluxQl, - db: db.into(), - query: query.into(), - format: None, - params: None, + request: ClientQueryRequest { + database: db.into(), + query_str: query.into(), + format: None, + params: None, + }, } } @@ -234,26 +247,20 @@ impl Client { table: impl Into + Send, name: impl Into + Send, ) -> Result<()> { - let url = self.base_url.join("/api/v3/configure/last_cache")?; - let mut req = self.http_client.delete(url).json(&LastCacheDeleteRequest { - db: db.into(), - table: table.into(), - name: name.into(), - }); - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req.send().await.map_err(|src| { - Error::request_send(Method::DELETE, "/api/v3/configure/last_cache", src) - })?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::DELETE, + "/api/v3/configure/last_cache", + Some(LastCacheDeleteRequest { + db: db.into(), + table: table.into(), + name: name.into(), + }), + None::<()>, + None, + ) + .await?; + Ok(()) } /// Compose a request to the `POST /api/v3/configure/distinct_cache` API @@ -293,90 +300,61 @@ impl Client { table: impl Into + Send, name: impl Into + Send, ) -> Result<()> { - let url = self.base_url.join("/api/v3/configure/distinct_cache")?; - let mut req = self - .http_client - .delete(url) - .json(&DistinctCacheDeleteRequest { - db: db.into(), - table: table.into(), - name: name.into(), - }); - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req.send().await.map_err(|src| { - Error::request_send(Method::DELETE, "/api/v3/configure/distinct_cache", src) - })?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::DELETE, + "/api/v3/configure/distinct_cache", + Some(DistinctCacheDeleteRequest { + db: db.into(), + table: table.into(), + name: name.into(), + }), + None::<()>, + None, + ) + .await?; + Ok(()) } /// Compose a request to the `GET /api/v3/configure/database` API pub fn api_v3_configure_db_show(&self) -> ShowDatabasesRequestBuilder<'_> { ShowDatabasesRequestBuilder { client: self, - show_deleted: false, - format: Format::Json, + request: ShowDatabasesRequest { + show_deleted: false, + format: QueryFormat::Json, + }, } } /// Make a request to the `POST /api/v3/configure/database` API pub async fn api_v3_configure_db_create(&self, db: impl Into + Send) -> Result<()> { - let api_path = "/api/v3/configure/database"; - - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .post(url) - .json(&CreateDatabaseRequest { db: db.into() }); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::POST, + "/api/v3/configure/database", + Some(CreateDatabaseRequest { db: db.into() }), + None::<()>, + None, + ) + .await?; + Ok(()) } /// Make a request to the `DELETE /api/v3/configure/database?db=foo` API pub async fn api_v3_configure_db_delete(&self, db: impl AsRef + Send) -> Result<()> { - let api_path = "/api/v3/configure/database"; - - let url = self.base_url.join(api_path)?; - - let mut req = self.http_client.delete(url).query(&[("db", db.as_ref())]); - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::DELETE, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::DELETE, + "/api/v3/configure/database", + None::<()>, + Some(DeleteDatabaseRequest { + db: db.as_ref().to_string(), + }), + None, + ) + .await?; + Ok(()) } /// Make a request to the `DELETE /api/v3/configure/table?db=foo&table=bar` API @@ -385,29 +363,19 @@ impl Client { db: T, table: T, ) -> Result<()> { - let api_path = "/api/v3/configure/table"; - - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .delete(url) - .query(&[("db", db.as_ref()), ("table", table.as_ref())]); - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::DELETE, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::DELETE, + "/api/v3/configure/table", + None::<()>, + Some(DeleteTableRequest { + db: db.as_ref().to_string(), + table: table.as_ref().to_string(), + }), + None, + ) + .await?; + Ok(()) } /// Make a request to the `POST /api/v3/configure/table` API @@ -418,38 +386,27 @@ impl Client { tags: Vec + Send>, fields: Vec<(impl Into + Send, impl Into + Send)>, ) -> Result<()> { - let api_path = "/api/v3/configure/table"; - - let url = self.base_url.join(api_path)?; - - let mut req = self.http_client.post(url).json(&CreateTableRequest { - db: db.into(), - table: table.into(), - tags: tags.into_iter().map(Into::into).collect(), - fields: fields - .into_iter() - .map(|(name, r#type)| CreateTableField { - name: name.into(), - r#type: r#type.into(), - }) - .collect(), - }); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::POST, + "/api/v3/configure/table", + Some(CreateTableRequest { + db: db.into(), + table: table.into(), + tags: tags.into_iter().map(Into::into).collect(), + fields: fields + .into_iter() + .map(|(name, r#type)| CreateTableField { + name: name.into(), + r#type: r#type.into(), + }) + .collect(), + }), + None::<()>, + None, + ) + .await?; + Ok(()) } /// Make a request to the `POST /api/v3/configure/processing_engine_plugin` API @@ -460,70 +417,44 @@ impl Client { file_name: impl Into + Send, plugin_type: impl Into + Send, ) -> Result<()> { - let api_path = "/api/v3/configure/processing_engine_plugin"; - - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .post(url) - .json(&ProcessingEnginePluginCreateRequest { - db: db.into(), - plugin_name: plugin_name.into(), - file_name: file_name.into(), - plugin_type: plugin_type.into(), - }); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::POST, + "/api/v3/configure/processing_engine_plugin", + Some(ProcessingEnginePluginCreateRequest { + db: db.into(), + plugin_name: plugin_name.into(), + file_name: file_name.into(), + plugin_type: plugin_type.into(), + }), + None::<()>, + None, + ) + .await?; + Ok(()) } + /// Make a request to the `DELETE /api/v3/configure/processing_engine_plugin` API pub async fn api_v3_configure_processing_engine_plugin_delete( &self, db: impl Into + Send, plugin_name: impl Into + Send, ) -> Result<()> { - let api_path = "/api/v3/configure/processing_engine_plugin"; - - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .delete(url) - .json(&ProcessingEnginePluginDeleteRequest { - db: db.into(), - plugin_name: plugin_name.into(), - }); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::DELETE, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::DELETE, + "/api/v3/configure/processing_engine_plugin", + Some(ProcessingEnginePluginDeleteRequest { + db: db.into(), + plugin_name: plugin_name.into(), + }), + None::<()>, + None, + ) + .await?; + Ok(()) } + /// Make a request to `POST /api/v3/configure/processing_engine_trigger` pub async fn api_v3_configure_processing_engine_trigger_create( &self, @@ -534,38 +465,25 @@ impl Client { trigger_arguments: Option>, disabled: bool, ) -> Result<()> { - let api_path = "/api/v3/configure/processing_engine_trigger"; - - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .post(url) - .json(&ProcessingEngineTriggerCreateRequest { - db: db.into(), - trigger_name: trigger_name.into(), - plugin_filename: plugin_filename.into(), - trigger_specification: trigger_spec.into(), - trigger_arguments, - disabled, - }); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::POST, + "/api/v3/configure/processing_engine_trigger", + Some(ProcessingEngineTriggerCreateRequest { + db: db.into(), + trigger_name: trigger_name.into(), + plugin_filename: plugin_filename.into(), + trigger_specification: trigger_spec.into(), + trigger_arguments, + disabled, + }), + None::<()>, + None, + ) + .await?; + Ok(()) } + /// Make a request to `DELETE /api/v3/configure/processing_engine_trigger` pub async fn api_v3_configure_processing_engine_trigger_delete( &self, @@ -573,34 +491,20 @@ impl Client { trigger_name: impl Into + Send, force: bool, ) -> Result<()> { - let api_path = "/api/v3/configure/processing_engine_trigger"; - - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .delete(url) - .json(&ProcessingEngineTriggerDeleteRequest { - db: db.into(), - trigger_name: trigger_name.into(), - force, - }); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::DELETE, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::DELETE, + "/api/v3/configure/processing_engine_trigger", + Some(ProcessingEngineTriggerDeleteRequest { + db: db.into(), + trigger_name: trigger_name.into(), + force, + }), + None::<()>, + None, + ) + .await?; + Ok(()) } /// Make a request to `POST /api/v3/configure/processing_engine_trigger/enable` @@ -609,29 +513,19 @@ impl Client { db: impl Into + Send, trigger_name: impl Into + Send, ) -> Result<()> { - let api_path = "/api/v3/configure/processing_engine_trigger/enable"; - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .post(url) - .query(&[("db", db.into()), ("trigger_name", trigger_name.into())]); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - - match resp.status() { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::POST, + "/api/v3/configure/processing_engine_trigger/enable", + None::<()>, + Some(ProcessingEngineTriggerIdentifier { + db: db.into(), + trigger_name: trigger_name.into(), + }), + None, + ) + .await?; + Ok(()) } /// Make a request to `POST /api/v3/configure/plugin_environment/install_packages` @@ -639,27 +533,16 @@ impl Client { &self, packages: Vec, ) -> Result<()> { - let api_path = "/api/v3/configure/plugin_environment/install_packages"; - let url = self.base_url.join(api_path)?; - let mut req = self - .http_client - .post(url) - .json(&ProcessingEngineInstallPackagesRequest { packages }); - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::POST, + "/api/v3/configure/plugin_environment/install_packages", + Some(ProcessingEngineInstallPackagesRequest { packages }), + None::<()>, + None, + ) + .await?; + Ok(()) } /// Make a request to `POST /api/v3/configure/plugin_environment/install_requirements` @@ -667,29 +550,18 @@ impl Client { &self, requirements_location: impl Into + Send, ) -> Result<()> { - let api_path = "/api/v3/configure/plugin_environment/install_requirements"; - let url = self.base_url.join(api_path)?; - let mut req = - self.http_client - .post(url) - .json(&ProcessingEngineInstallRequirementsRequest { + let _bytes = self + .send_json_get_bytes( + Method::POST, + "/api/v3/configure/plugin_environment/install_requirements", + Some(ProcessingEngineInstallRequirementsRequest { requirements_location: requirements_location.into(), - }); - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + }), + None::<()>, + None, + ) + .await?; + Ok(()) } /// Make a request to `POST /api/v3/configure/processing_engine_trigger/disable` @@ -698,29 +570,19 @@ impl Client { db: impl Into + Send, trigger_name: impl Into + Send, ) -> Result<()> { - let api_path = "/api/v3/configure/processing_engine_trigger/disable"; - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .post(url) - .query(&[("db", db.into()), ("trigger_name", trigger_name.into())]); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - - match resp.status() { - StatusCode::OK => Ok(()), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + let _bytes = self + .send_json_get_bytes( + Method::POST, + "/api/v3/configure/processing_engine_trigger/disable", + None::<()>, + Some(ProcessingEngineTriggerIdentifier { + db: db.into(), + trigger_name: trigger_name.into(), + }), + None, + ) + .await?; + Ok(()) } /// Make a request to the `POST /api/v3/plugin_test/wal` API @@ -728,28 +590,13 @@ impl Client { &self, wal_plugin_test_request: WalPluginTestRequest, ) -> Result { - let api_path = "/api/v3/plugin_test/wal"; - - let url = self.base_url.join(api_path)?; - - let mut req = self.http_client.post(url).json(&wal_plugin_test_request); - - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - - if resp.status().is_success() { - resp.json().await.map_err(Error::Json) - } else { - Err(Error::ApiError { - code: resp.status(), - message: resp.text().await.map_err(Error::Text)?, - }) - } + self.send_json( + Method::POST, + "/api/v3/plugin_test/wal", + Some(wal_plugin_test_request), + None::<()>, + ) + .await } /// Make a request to the `POST /api/v3/plugin_test/schedule` API @@ -757,29 +604,13 @@ impl Client { &self, schedule_plugin_test_request: SchedulePluginTestRequest, ) -> Result { - let api_path = "/api/v3/plugin_test/schedule"; - let url = self.base_url.join(api_path)?; - - let mut req = self - .http_client - .post(url) - .json(&schedule_plugin_test_request); - if let Some(token) = &self.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() - .await - .map_err(|src| Error::request_send(Method::POST, api_path, src))?; - - if resp.status().is_success() { - resp.json().await.map_err(Error::Json) - } else { - Err(Error::ApiError { - code: resp.status(), - message: resp.text().await.map_err(Error::Text)?, - }) - } + self.send_json( + Method::POST, + "/api/v3/plugin_test/schedule", + Some(schedule_plugin_test_request), + None::<()>, + ) + .await } /// Send a `/ping` request to the target `influxdb3` server to check its @@ -803,24 +634,155 @@ impl Client { }) } } -} -/// The URL parameters of the request to the `/api/v3/write_lp` API -// TODO - this should re-use a type defined in the server code, or a separate crate, -// central to both. -#[derive(Debug, Serialize)] -struct WriteParams<'a> { - db: &'a str, - precision: Option, - accept_partial: Option, -} + /// Serialize the given `B` to json then send the request and return the resulting bytes. + async fn send_json_get_bytes( + &self, + method: Method, + url_path: &str, + body: Option, + query: Option, + mut headers: Option, + ) -> Result + where + B: Serialize + Send + Sync, + Q: Serialize + Send + Sync, + { + let b = body + .map(|body| serde_json::to_string(&body)) + .transpose() + .map_err(Error::RequestSerialization)? + .map(Into::into); + let hs = headers.get_or_insert_default(); + hs.insert( + CONTENT_TYPE, + HeaderValue::from_str("application/json").unwrap(), + ); + self.send_get_bytes(method, url_path, b, query, headers) + .await + } -impl<'a, B> From<&'a WriteRequestBuilder<'a, B>> for WriteParams<'a> { - fn from(builder: &'a WriteRequestBuilder<'a, B>) -> Self { - Self { - db: &builder.db, - precision: builder.precision, - accept_partial: builder.accept_partial, + /// Send an HTTP request with the specified parameters, return the bytes read from the response + /// body. + async fn send_get_bytes( + &self, + method: Method, + url_path: &str, + body: Option, + query: Option, + headers: Option, + ) -> Result + where + Q: Serialize + Send + Sync, + { + let url = self.base_url.join(url_path)?; + let mut req = self.http_client.request(method.clone(), url.clone()); + if let Some(token) = &self.auth_token { + req = req.bearer_auth(token.expose_secret()); + } + if let Some(body) = body { + req = req.body(body); + } + if let Some(query) = query { + req = req.query(&query); + } + if let Some(headers) = headers { + req = req.headers(headers); + } + let resp = req + .send() + .await + .map_err(|src| Error::request_send(method, url, src))?; + let status = resp.status(); + let content = resp.bytes().await.map_err(Error::Bytes)?; + + match status { + s if s.is_success() => Ok(content), + code => Err(Error::ApiError { + code, + message: String::from_utf8(content.to_vec()).map_err(Error::InvalidUtf8)?, + }), + } + } + + /// Send an HTTP request and return `Some(O)` if the response status is HTTP 201 Created. + async fn send_create( + &self, + method: Method, + url_path: &str, + body: Option, + query: Option, + ) -> Result> + where + B: Serialize + Send + Sync, + Q: Serialize + Send + Sync, + O: DeserializeOwned + Send + Sync, + { + let url = self.base_url.join(url_path)?; + let mut req = self.http_client.request(method.clone(), url.clone()); + if let Some(token) = &self.auth_token { + req = req.bearer_auth(token.expose_secret()); + } + if let Some(body) = body { + req = req.json(&body); + } + if let Some(query) = query { + req = req.query(&query); + } + let resp = req + .send() + .await + .map_err(|src| Error::request_send(method, url, src))?; + let status = resp.status(); + match status { + StatusCode::CREATED => { + let content = resp.json::().await.map_err(Error::Json)?; + Ok(Some(content)) + } + StatusCode::NO_CONTENT => Ok(None), + code => Err(Error::ApiError { + code, + message: resp.text().await.map_err(Error::Text)?, + }), + } + } + + /// Send an HTTP request and return `O` on success. + async fn send_json( + &self, + method: Method, + url_path: &str, + body: Option, + query: Option, + ) -> Result + where + B: Serialize + Send + Sync, + Q: Serialize + Send + Sync, + O: DeserializeOwned + Send + Sync, + { + let url = self.base_url.join(url_path)?; + let mut req = self.http_client.request(method.clone(), url.clone()); + if let Some(token) = &self.auth_token { + req = req.bearer_auth(token.expose_secret()); + } + if let Some(body) = body { + req = req.json(&body); + } + if let Some(query) = query { + req = req.query(&query); + } + let resp = req + .send() + .await + .map_err(|src| Error::request_send(method, url, src))?; + let status = resp.status(); + if status.is_success() { + resp.json().await.map_err(Error::Json) + } else { + Err(Error::ApiError { + code: resp.status(), + message: resp.text().await.map_err(Error::Text)?, + }) } } } @@ -831,22 +793,20 @@ impl<'a, B> From<&'a WriteRequestBuilder<'a, B>> for WriteParams<'a> { #[derive(Debug)] pub struct WriteRequestBuilder<'c, B> { client: &'c Client, - db: String, - precision: Option, - accept_partial: Option, + params: WriteParams, body: B, } impl WriteRequestBuilder<'_, B> { /// Set the precision pub fn precision(mut self, set_to: Precision) -> Self { - self.precision = Some(set_to); + self.params.precision = Some(set_to); self } /// Set the `accept_partial` parameter pub fn accept_partial(mut self, set_to: bool) -> Self { - self.accept_partial = Some(set_to); + self.params.accept_partial = Some(set_to); self } } @@ -859,9 +819,7 @@ impl<'c> WriteRequestBuilder<'c, NoBody> { pub fn body>(self, body: T) -> WriteRequestBuilder<'c, Body> { WriteRequestBuilder { client: self.client, - db: self.db, - precision: self.precision, - accept_partial: self.accept_partial, + params: self.params, body: body.into(), } } @@ -870,27 +828,19 @@ impl<'c> WriteRequestBuilder<'c, NoBody> { impl WriteRequestBuilder<'_, Body> { /// Send the request to the server pub async fn send(self) -> Result<()> { - let url = self.client.base_url.join("/api/v3/write_lp")?; - let params = WriteParams::from(&self); - let mut req = self.client.http_client.post(url).query(¶ms); - if let Some(token) = &self.client.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .body(self.body) - .send() - .await - .map_err(|src| Error::request_send(Method::POST, "/api/v3/write_lp", src))?; - let status = resp.status(); - let content = resp.bytes().await.map_err(Error::Bytes)?; - match status { - // TODO - handle the OK response content, return to caller, etc. - StatusCode::OK | StatusCode::NO_CONTENT => Ok(()), - code => Err(Error::ApiError { - code, - message: String::from_utf8(content.to_vec())?, - }), - } + // ignore the returned value since we don't expect a response body + let _bytes = self + .client + .send_get_bytes( + Method::POST, + "/api/v3/write_lp", + Some(self.body), + Some(self.params), + None, + ) + .await?; + + Ok(()) } } @@ -906,10 +856,7 @@ pub struct NoBody; pub struct QueryRequestBuilder<'c> { client: &'c Client, kind: QueryKind, - db: String, - query: String, - format: Option, - params: Option>, + request: ClientQueryRequest, } // TODO - for now the send method just returns the bytes from the response. @@ -917,8 +864,8 @@ pub struct QueryRequestBuilder<'c> { // send, e.g., using types more specific to the format selected. impl QueryRequestBuilder<'_> { /// Specify the format, `json`, `csv`, `pretty`, or `parquet` - pub fn format(mut self, format: Format) -> Self { - self.format = Some(format); + pub fn format(mut self, format: QueryFormat) -> Self { + self.request.format = Some(format); self } @@ -945,7 +892,8 @@ impl QueryRequestBuilder<'_> { name: S, param: P, ) -> Self { - self.params + self.request + .params .get_or_insert_with(Default::default) .insert(name.into(), param.into()); self @@ -988,7 +936,8 @@ impl QueryRequestBuilder<'_> { source, })?; - self.params + self.request + .params .get_or_insert_with(Default::default) .insert(name, param); } @@ -1025,7 +974,8 @@ impl QueryRequestBuilder<'_> { name: name.clone(), source, })?; - self.params + self.request + .params .get_or_insert_with(Default::default) .insert(name, param); Ok(self) @@ -1034,48 +984,12 @@ impl QueryRequestBuilder<'_> { /// Send the request to `/api/v3/query_sql` or `/api/v3/query_influxql` pub async fn send(self) -> Result { let url = match self.kind { - QueryKind::Sql => self.client.base_url.join("/api/v3/query_sql")?, - QueryKind::InfluxQl => self.client.base_url.join("/api/v3/query_influxql")?, + QueryKind::Sql => "/api/v3/query_sql", + QueryKind::InfluxQl => "/api/v3/query_influxql", }; - let params = QueryParams::from(&self); - let mut req = self.client.http_client.post(url).json(¶ms); - if let Some(token) = &self.client.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req.send().await.map_err(|src| { - Error::request_send(Method::POST, format!("/api/v3/query_{}", self.kind), src) - })?; - let status = resp.status(); - let content = resp.bytes().await.map_err(Error::Bytes)?; - - match status { - StatusCode::OK => Ok(content), - code => Err(Error::ApiError { - code, - message: String::from_utf8(content.to_vec()).map_err(Error::InvalidUtf8)?, - }), - } - } -} - -/// Query parameters for the `/api/v3/query_sql` API -#[derive(Debug, Serialize)] -pub struct QueryParams<'a> { - db: &'a str, - #[serde(rename = "q")] - query: &'a str, - format: Option, - params: Option<&'a HashMap>, -} - -impl<'a> From<&'a QueryRequestBuilder<'a>> for QueryParams<'a> { - fn from(builder: &'a QueryRequestBuilder<'a>) -> Self { - Self { - db: &builder.db, - query: &builder.query, - format: builder.format, - params: builder.params.as_ref(), - } + self.client + .send_json_get_bytes(Method::POST, url, Some(self.request), None::<()>, None) + .await } } @@ -1095,61 +1009,31 @@ impl Display for QueryKind { } } -/// Output format to request from the server when producing results from APIs that use the -/// query executor, e.g., `/api/v3/query_sql` and `GET /api/v3/configure/database` -#[derive(Debug, Serialize, Copy, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Format { - Json, - #[serde(rename = "jsonl")] - JsonLines, - Csv, - Parquet, - Pretty, -} - -#[derive(Debug, Serialize)] +#[derive(Debug)] pub struct ShowDatabasesRequestBuilder<'c> { - #[serde(skip_serializing)] client: &'c Client, - format: Format, - show_deleted: bool, + request: ShowDatabasesRequest, } impl ShowDatabasesRequestBuilder<'_> { /// Specify whether or not to show deleted databases in the output pub fn with_show_deleted(mut self, show_deleted: bool) -> Self { - self.show_deleted = show_deleted; + self.request.show_deleted = show_deleted; self } - /// Specify the [`Format`] of the returned `Bytes` - pub fn with_format(mut self, format: Format) -> Self { - self.format = format; + /// Specify the [`QueryFormat`] of the returned `Bytes` + pub fn with_format(mut self, format: QueryFormat) -> Self { + self.request.format = format; self } /// Send the request, returning the raw [`Bytes`] in the response from the server pub async fn send(self) -> Result { - let url = self.client.base_url.join("/api/v3/configure/database")?; - let mut req = self.client.http_client.get(url).query(&self); - if let Some(token) = &self.client.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req - .send() + let url = "/api/v3/configure/database"; + self.client + .send_json_get_bytes(Method::GET, url, None::<()>, Some(self.request), None) .await - .map_err(|src| Error::request_send(Method::GET, "/api/v3/configure/database", src))?; - let status = resp.status(); - let content = resp.bytes().await.map_err(Error::Bytes)?; - - match status { - StatusCode::OK => Ok(content), - code => Err(Error::ApiError { - code, - message: String::from_utf8(content.to_vec()).map_err(Error::InvalidUtf8)?, - }), - } } } @@ -1157,18 +1041,7 @@ impl ShowDatabasesRequestBuilder<'_> { pub struct CreateLastCacheRequestBuilder<'c> { #[serde(skip_serializing)] client: &'c Client, - db: String, - table: String, - #[serde(skip_serializing_if = "Option::is_none")] - name: Option, - #[serde(skip_serializing_if = "Option::is_none")] - key_columns: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - value_columns: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - count: Option, - #[serde(skip_serializing_if = "Option::is_none")] - ttl: Option, + request: LastCacheCreateRequest, } impl<'c> CreateLastCacheRequestBuilder<'c> { @@ -1176,89 +1049,67 @@ impl<'c> CreateLastCacheRequestBuilder<'c> { fn new(client: &'c Client, db: impl Into, table: impl Into) -> Self { Self { client, - db: db.into(), - table: table.into(), - name: None, - key_columns: None, - value_columns: None, - count: None, - ttl: None, + request: LastCacheCreateRequest { + db: db.into(), + table: table.into(), + name: None, + key_columns: None, + value_columns: None, + count: None, + ttl: None, + }, } } /// Specify a cache name pub fn name(mut self, name: impl Into) -> Self { - self.name = Some(name.into()); + self.request.name = Some(name.into()); self } /// Speciffy the key columns for the cache pub fn key_columns(mut self, column_names: impl IntoIterator>) -> Self { - self.key_columns = Some(column_names.into_iter().map(Into::into).collect()); + self.request.key_columns = Some(column_names.into_iter().map(Into::into).collect()); self } /// Specify the value columns for the cache pub fn value_columns(mut self, column_names: impl IntoIterator>) -> Self { - self.value_columns = Some(column_names.into_iter().map(Into::into).collect()); + self.request.value_columns = Some(column_names.into_iter().map(Into::into).collect()); self } /// Specify the size, or number of new entries a cache will hold before evicting old ones pub fn count(mut self, count: usize) -> Self { - self.count = Some(count); + self.request.count = Some(count); self } /// Specify the time-to-live (TTL) in seconds for entries in the cache pub fn ttl(mut self, ttl: u64) -> Self { - self.ttl = Some(ttl); + self.request.ttl = Some(ttl); self } /// Send the request to `POST /api/v3/configure/last_cache` pub async fn send(self) -> Result> { - let url = self.client.base_url.join("/api/v3/configure/last_cache")?; - let mut req = self.client.http_client.post(url).json(&self); - if let Some(token) = &self.client.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req.send().await.map_err(|src| { - Error::request_send(Method::POST, "/api/v3/configure/last_cache", src) - })?; - let status = resp.status(); - match status { - StatusCode::CREATED => { - let content = resp - .json::() - .await - .map_err(Error::Json)?; - Ok(Some(content)) - } - StatusCode::NO_CONTENT => Ok(None), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + self.client + .send_create( + Method::POST, + "/api/v3/configure/last_cache", + Some(self.request), + None::<()>, + ) + .await } } /// Type for composing requests to the `POST /api/v3/configure/distinct_cache` API created by the /// [`Client::api_v3_configure_distinct_cache_create`] method -#[derive(Debug, Serialize)] +#[derive(Debug)] pub struct CreateDistinctCacheRequestBuilder<'c> { - #[serde(skip_serializing)] client: &'c Client, - db: String, - table: String, - columns: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - name: Option, - #[serde(skip_serializing_if = "Option::is_none")] - max_cardinality: Option, - #[serde(skip_serializing_if = "Option::is_none")] - max_age: Option, + request: DistinctCacheCreateRequest, } impl<'c> CreateDistinctCacheRequestBuilder<'c> { @@ -1270,61 +1121,45 @@ impl<'c> CreateDistinctCacheRequestBuilder<'c> { ) -> Self { Self { client, - db: db.into(), - table: table.into(), - columns: columns.into_iter().map(Into::into).collect(), - name: None, - max_cardinality: None, - max_age: None, + request: DistinctCacheCreateRequest { + db: db.into(), + table: table.into(), + columns: columns.into_iter().map(Into::into).collect(), + name: None, + max_cardinality: None, + max_age: None, + }, } } /// Specify the name of the cache to be created, `snake_case` names are encouraged pub fn name(mut self, name: impl Into) -> Self { - self.name = Some(name.into()); + self.request.name = Some(name.into()); self } /// Specify the maximum cardinality for the cache as a non-zero unsigned integer pub fn max_cardinality(mut self, max_cardinality: NonZeroUsize) -> Self { - self.max_cardinality = Some(max_cardinality); + self.request.max_cardinality = Some(max_cardinality.into()); self } /// Specify the maximum age for entries in the cache pub fn max_age(mut self, max_age: Duration) -> Self { - self.max_age = Some(max_age.as_secs()); + self.request.max_age = Some(max_age.as_secs()); self } /// Send the create cache request pub async fn send(self) -> Result> { - let url = self - .client - .base_url - .join("/api/v3/configure/distinct_cache")?; - let mut req = self.client.http_client.post(url).json(&self); - if let Some(token) = &self.client.auth_token { - req = req.bearer_auth(token.expose_secret()); - } - let resp = req.send().await.map_err(|src| { - Error::request_send(Method::POST, "/api/v3/configure/distinct_cache", src) - })?; - let status = resp.status(); - match status { - StatusCode::CREATED => { - let content = resp - .json::() - .await - .map_err(Error::Json)?; - Ok(Some(content)) - } - StatusCode::NO_CONTENT => Ok(None), - code => Err(Error::ApiError { - code, - message: resp.text().await.map_err(Error::Text)?, - }), - } + self.client + .send_create( + Method::POST, + "/api/v3/configure/distinct_cache", + Some(self.request), + None::<()>, + ) + .await } } @@ -1333,7 +1168,7 @@ mod tests { use mockito::{Matcher, Server}; use serde_json::json; - use crate::{Client, Format, Precision}; + use crate::{Client, Precision, QueryFormat}; #[tokio::test] async fn api_v3_write_lp() { @@ -1403,7 +1238,7 @@ mod tests { let r = client .api_v3_query_sql(db, query) - .format(Format::Json) + .format(QueryFormat::Json) .send() .await .expect("send request to server"); @@ -1474,7 +1309,7 @@ mod tests { let r = client .api_v3_query_influxql(db, query) - .format(Format::Json) + .format(QueryFormat::Json) .send() .await .expect("send request to server"); diff --git a/influxdb3_load_generator/Cargo.toml b/influxdb3_load_generator/Cargo.toml index 7dba8ff24d..220e4661a5 100644 --- a/influxdb3_load_generator/Cargo.toml +++ b/influxdb3_load_generator/Cargo.toml @@ -13,6 +13,7 @@ trogging.workspace = true # Local Deps influxdb3_client = { path = "../influxdb3_client" } influxdb3_process = { path = "../influxdb3_process", default-features = false } +influxdb3_types = { path = "../influxdb3_types" } # crates.io Dependencies anyhow.workspace = true diff --git a/influxdb3_load_generator/src/query_generator.rs b/influxdb3_load_generator/src/query_generator.rs index 6b22ee409d..94f8acdec7 100644 --- a/influxdb3_load_generator/src/query_generator.rs +++ b/influxdb3_load_generator/src/query_generator.rs @@ -56,7 +56,7 @@ pub enum Format { Csv, } -impl From for influxdb3_client::Format { +impl From for influxdb3_types::http::QueryFormat { fn from(format: Format) -> Self { match format { Format::Json => Self::Json, diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 0cea2a80b8..59ec00ad74 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -529,9 +529,9 @@ where database, body, default_time, - params.accept_partial, - params.precision, - params.no_sync, + params.accept_partial.unwrap_or(true), + params.precision.unwrap_or(Precision::Auto), + params.no_sync.unwrap_or(false), ) .await?; @@ -1681,33 +1681,6 @@ async fn record_batch_stream_to_body( } } -// This is a hack around the fact that bool default is false not true -const fn true_fn() -> bool { - true -} -#[derive(Debug, Deserialize)] -pub(crate) struct WriteParams { - pub(crate) db: String, - #[serde(default = "true_fn")] - pub(crate) accept_partial: bool, - #[serde(default)] - pub(crate) precision: Precision, - #[serde(default)] - pub(crate) no_sync: bool, -} - -impl From for WriteParams { - fn from(legacy: iox_http::write::WriteParams) -> Self { - Self { - db: legacy.namespace.to_string(), - // legacy behaviour was to not accept partial: - accept_partial: false, - precision: legacy.precision.into(), - no_sync: false, - } - } -} - pub(crate) async fn route_request( http_server: Arc>, mut req: Request, diff --git a/influxdb3_types/Cargo.toml b/influxdb3_types/Cargo.toml index d99faef27d..30c705424c 100644 --- a/influxdb3_types/Cargo.toml +++ b/influxdb3_types/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] # Core Crates iox_http.workspace = true +iox_query_params.workspace = true # Local deps influxdb3_cache = { path = "../influxdb3_cache" } diff --git a/influxdb3_types/src/http.rs b/influxdb3_types/src/http.rs index 8703031700..c75d2f5e0e 100644 --- a/influxdb3_types/src/http.rs +++ b/influxdb3_types/src/http.rs @@ -1,11 +1,13 @@ use hashbrown::HashMap; - use hyper::header::ACCEPT; use hyper::http::HeaderValue; use hyper::HeaderMap; use influxdb3_cache::distinct_cache::MaxCardinality; +use iox_query_params::StatementParams; use serde::{Deserialize, Serialize}; +use crate::write::Precision; + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("invalid mime type ({0})")] @@ -15,7 +17,7 @@ pub enum Error { NonUtf8MimeType(#[from] std::string::FromUtf8Error), } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct PingResponse { pub version: String, pub revision: String, @@ -44,7 +46,7 @@ pub enum LastCacheValueColumnsDef { AllNonKeyColumns, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct LastCacheCreatedResponse { /// The table name the cache is associated with pub table: String, @@ -61,7 +63,7 @@ pub struct LastCacheCreatedResponse { } /// Request definition for the `POST /api/v3/configure/distinct_cache` API -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct DistinctCacheCreateRequest { /// The name of the database associated with the cache pub db: String, @@ -69,6 +71,7 @@ pub struct DistinctCacheCreateRequest { pub table: String, /// The name of the cache. If not provided, the cache name will be generated from the table /// name and selected column names. + #[serde(skip_serializing_if = "Option::is_none")] pub name: Option, /// The columns to create the cache on. // TODO: this should eventually be made optional, so that if not provided, the columns used will @@ -76,13 +79,15 @@ pub struct DistinctCacheCreateRequest { // https://github.com/influxdata/influxdb/issues/25585 pub columns: Vec, /// The maximumn number of distinct value combinations to hold in the cache + #[serde(skip_serializing_if = "Option::is_none")] pub max_cardinality: Option, /// The duration in seconds that entries will be kept in the cache before being evicted + #[serde(skip_serializing_if = "Option::is_none")] pub max_age: Option, } /// Resposne definition for the `POST /api/v3/configure/distinct_cache` API -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct DistinctCacheCreatedResponse { /// The id of the table the cache was created on pub table_id: u32, @@ -107,14 +112,19 @@ pub struct DistinctCacheDeleteRequest { } /// Request definition for the `POST /api/v3/configure/last_cache` API -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct LastCacheCreateRequest { pub db: String, pub table: String, + #[serde(skip_serializing_if = "Option::is_none")] pub name: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub key_columns: Option>, + #[serde(skip_serializing_if = "Option::is_none")] pub value_columns: Option>, + #[serde(skip_serializing_if = "Option::is_none")] pub count: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub ttl: Option, } @@ -174,14 +184,14 @@ pub struct ProcessingEngineInstallRequirementsRequest { pub requirements_location: String, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct ProcessingEngineTriggerIdentifier { pub db: String, pub trigger_name: String, } /// Request definition for the `POST /api/v3/plugin_test/wal` API -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct WalPluginTestRequest { pub filename: String, pub database: String, @@ -190,7 +200,7 @@ pub struct WalPluginTestRequest { } /// Response definition for the `POST /api/v3/plugin_test/wal` API -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct WalPluginTestResponse { pub log_lines: Vec, pub database_writes: HashMap>, @@ -198,7 +208,7 @@ pub struct WalPluginTestResponse { } /// Request definition for the `POST /api/v3/plugin_test/schedule` API -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct SchedulePluginTestRequest { pub filename: String, pub database: String, @@ -207,7 +217,7 @@ pub struct SchedulePluginTestRequest { } /// Response definition for the `POST /api/v3/plugin_test/schedule` API -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct SchedulePluginTestResponse { pub trigger_time: Option, pub log_lines: Vec, @@ -216,7 +226,7 @@ pub struct SchedulePluginTestResponse { } /// Request definition for the `GET /api/v3/configure/database` API -#[derive(Clone, Copy, Debug, Deserialize)] +#[derive(Clone, Copy, Debug, Deserialize, Serialize)] pub struct ShowDatabasesRequest { pub format: QueryFormat, #[serde(default)] @@ -230,7 +240,7 @@ pub struct CreateDatabaseRequest { } /// Request definition for the `DELETE /api/v3/configure/database` API -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct DeleteDatabaseRequest { pub db: String, } @@ -251,14 +261,16 @@ pub struct CreateTableField { } /// Request definition for the `DELETE /api/v3/configure/table` API -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct DeleteTableRequest { pub db: String, pub table: String, } +pub type ClientQueryRequest = QueryRequest, StatementParams>; + /// Request definition for the `POST /api/v3/query_sql` and `POST /api/v3/query_influxql` APIs -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct QueryRequest { #[serde(rename = "db")] pub database: D, @@ -268,7 +280,7 @@ pub struct QueryRequest { pub params: Option

, } -#[derive(Copy, Clone, Debug, Deserialize)] +#[derive(Copy, Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub enum QueryFormat { Parquet, @@ -313,3 +325,24 @@ impl QueryFormat { } } } + +/// The URL parameters of the request to the `/api/v3/write_lp` API +#[derive(Debug, Deserialize, Serialize)] +pub struct WriteParams { + pub db: String, + pub precision: Option, + pub accept_partial: Option, + pub no_sync: Option, +} + +impl From for WriteParams { + fn from(legacy: iox_http::write::WriteParams) -> Self { + Self { + db: legacy.namespace.to_string(), + // legacy behaviour was to not accept partial: + accept_partial: Some(false), + precision: Some(legacy.precision.into()), + no_sync: Some(false), + } + } +}