refactor: introduce influxdb3_types crate (#25946)

Partially fixes https://github.com/influxdata/influxdb/issues/24672

* move most HTTP req/resp types into `influxdb3_types` crate
* removes the use of locally-scoped request type structs from the `influxdb3_client` crate
* fix plugin dependency/package install bug
  * it looks like the `DELETE` http method was being used where `POST` was expected for `/api/v3/configure/plugin_environment/install_packages` and `/api/v3/configure/plugin_environment/install_requirements`
bugfix/25933
wayne 2025-02-03 11:28:47 -07:00 committed by GitHub
parent 1dab1707ce
commit 0fffcc8c37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 508 additions and 489 deletions

18
Cargo.lock generated
View File

@ -2779,6 +2779,7 @@ dependencies = [
"influxdb3_server",
"influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_types",
"influxdb3_wal",
"influxdb3_write",
"influxdb_iox_client",
@ -2917,6 +2918,7 @@ version = "0.1.0"
dependencies = [
"bytes",
"hashbrown 0.15.2",
"influxdb3_types",
"iox_query_params",
"mockito",
"reqwest 0.11.27",
@ -3012,6 +3014,7 @@ dependencies = [
"influxdb3_client",
"influxdb3_internal_api",
"influxdb3_py_api",
"influxdb3_types",
"influxdb3_wal",
"influxdb3_write",
"iox_query",
@ -3088,6 +3091,7 @@ dependencies = [
"influxdb3_processing_engine",
"influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_types",
"influxdb3_wal",
"influxdb3_write",
"influxdb_influxql_parser",
@ -3188,6 +3192,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "influxdb3_types"
version = "0.1.0"
dependencies = [
"hashbrown 0.15.2",
"hyper 0.14.32",
"influxdb3_cache",
"iox_http",
"serde",
"thiserror 1.0.69",
]
[[package]]
name = "influxdb3_wal"
version = "0.1.0"
@ -3244,13 +3260,13 @@ dependencies = [
"influxdb-line-protocol",
"influxdb3_cache",
"influxdb3_catalog",
"influxdb3_client",
"influxdb3_id",
"influxdb3_internal_api",
"influxdb3_py_api",
"influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_test_helpers",
"influxdb3_types",
"influxdb3_wal",
"insta",
"iox_catalog",

View File

@ -14,7 +14,7 @@ members = [
"influxdb3_py_api",
"influxdb3_server",
"influxdb3_telemetry",
"influxdb3_test_helpers",
"influxdb3_test_helpers", "influxdb3_types",
"influxdb3_wal",
"influxdb3_write",
"iox_query_influxql_rewrite",

View File

@ -33,6 +33,7 @@ influxdb3_server = { path = "../influxdb3_server" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
# Crates.io dependencies

View File

@ -1,8 +1,8 @@
use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList};
use anyhow::Context;
use hashbrown::HashMap;
use influxdb3_client::plugin_development::{SchedulePluginTestRequest, WalPluginTestRequest};
use influxdb3_client::Client;
use influxdb3_types::http::{SchedulePluginTestRequest, WalPluginTestRequest};
use secrecy::ExposeSecret;
use std::error::Error;

View File

@ -2,7 +2,8 @@
//!
//! This is useful for verifying that the client can parse API responses from the server
use influxdb3_client::{Format, LastCacheCreatedResponse, Precision};
use influxdb3_client::{Format, Precision};
use influxdb3_types::http::LastCacheCreatedResponse;
use crate::server::TestServer;

View File

@ -9,6 +9,9 @@ license.workspace = true
# core dependencies
iox_query_params.workspace = true
# Local deps
influxdb3_types = { path = "../influxdb3_types" }
# crates.io dependencies
bytes.workspace = true
hashbrown.workspace = true

View File

@ -1,18 +1,15 @@
pub mod plugin_development;
use crate::plugin_development::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use bytes::Bytes;
use hashbrown::HashMap;
use iox_query_params::StatementParam;
use reqwest::{Body, IntoUrl, Method, StatusCode};
use secrecy::{ExposeSecret, Secret};
use serde::{Deserialize, Serialize};
use serde::Serialize;
use std::{fmt::Display, num::NonZeroUsize, string::FromUtf8Error, time::Duration};
use url::Url;
use influxdb3_types::http::*;
pub use influxdb3_types::write::Precision;
/// Primary error type for the [`Client`]
#[derive(Debug, thiserror::Error)]
pub enum Error {
@ -54,9 +51,6 @@ pub enum Error {
#[source]
source: reqwest::Error,
},
#[error("unrecognized precision unit: {0}")]
UnrecognizedUnit(String),
}
impl Error {
@ -241,13 +235,7 @@ impl Client {
name: impl Into<String> + Send,
) -> Result<()> {
let url = self.base_url.join("/api/v3/configure/last_cache")?;
#[derive(Serialize)]
struct Req {
db: String,
table: String,
name: String,
}
let mut req = self.http_client.delete(url).json(&Req {
let mut req = self.http_client.delete(url).json(&LastCacheDeleteRequest {
db: db.into(),
table: table.into(),
name: name.into(),
@ -306,17 +294,14 @@ impl Client {
name: impl Into<String> + Send,
) -> Result<()> {
let url = self.base_url.join("/api/v3/configure/distinct_cache")?;
#[derive(Serialize)]
struct Req {
db: String,
table: String,
name: String,
}
let mut req = self.http_client.delete(url).json(&Req {
db: db.into(),
table: table.into(),
name: name.into(),
});
let mut req = self
.http_client
.delete(url)
.json(&DistinctCacheDeleteRequest {
db: db.into(),
table: table.into(),
name: name.into(),
});
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
@ -348,12 +333,10 @@ impl Client {
let url = self.base_url.join(api_path)?;
#[derive(Serialize)]
struct Req {
db: String,
}
let mut req = self.http_client.post(url).json(&Req { db: db.into() });
let mut req = self
.http_client
.post(url)
.json(&CreateDatabaseRequest { db: db.into() });
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
@ -439,26 +422,13 @@ impl Client {
let url = self.base_url.join(api_path)?;
#[derive(Serialize)]
struct Req {
db: String,
table: String,
tags: Vec<String>,
fields: Vec<Field>,
}
#[derive(Serialize)]
struct Field {
name: String,
r#type: String,
}
let mut req = self.http_client.post(url).json(&Req {
let mut req = self.http_client.post(url).json(&CreateTableRequest {
db: db.into(),
table: table.into(),
tags: tags.into_iter().map(Into::into).collect(),
fields: fields
.into_iter()
.map(|(name, r#type)| Field {
.map(|(name, r#type)| CreateTableField {
name: name.into(),
r#type: r#type.into(),
})
@ -494,20 +464,15 @@ impl Client {
let url = self.base_url.join(api_path)?;
#[derive(Serialize)]
struct Req {
db: String,
plugin_name: String,
file_name: String,
plugin_type: String,
}
let mut req = self.http_client.post(url).json(&Req {
db: db.into(),
plugin_name: plugin_name.into(),
file_name: file_name.into(),
plugin_type: plugin_type.into(),
});
let mut req = self
.http_client
.post(url)
.json(&ProcessingEnginePluginCreateRequest {
db: db.into(),
plugin_name: plugin_name.into(),
file_name: file_name.into(),
plugin_type: plugin_type.into(),
});
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
@ -535,16 +500,13 @@ impl Client {
let url = self.base_url.join(api_path)?;
#[derive(Serialize)]
struct Req {
db: String,
plugin_name: String,
}
let mut req = self.http_client.delete(url).json(&Req {
db: db.into(),
plugin_name: plugin_name.into(),
});
let mut req = self
.http_client
.delete(url)
.json(&ProcessingEnginePluginDeleteRequest {
db: db.into(),
plugin_name: plugin_name.into(),
});
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
@ -576,23 +538,17 @@ impl Client {
let url = self.base_url.join(api_path)?;
#[derive(Serialize)]
struct Req {
db: String,
trigger_name: String,
plugin_filename: String,
trigger_specification: String,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
}
let mut req = self.http_client.post(url).json(&Req {
db: db.into(),
trigger_name: trigger_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
trigger_arguments,
disabled,
});
let mut req = self
.http_client
.post(url)
.json(&ProcessingEngineTriggerCreateRequest {
db: db.into(),
trigger_name: trigger_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
trigger_arguments,
disabled,
});
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
@ -621,18 +577,14 @@ impl Client {
let url = self.base_url.join(api_path)?;
#[derive(Serialize)]
struct Req {
db: String,
trigger_name: String,
force: bool,
}
let mut req = self.http_client.delete(url).json(&Req {
db: db.into(),
trigger_name: trigger_name.into(),
force,
});
let mut req = self
.http_client
.delete(url)
.json(&ProcessingEngineTriggerDeleteRequest {
db: db.into(),
trigger_name: trigger_name.into(),
force,
});
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
@ -682,25 +634,24 @@ impl Client {
}
}
/// Make a request to api/v3/configure/plugin_environment/install_packages
/// Make a request to `POST /api/v3/configure/plugin_environment/install_packages`
pub async fn api_v3_configure_plugin_environment_install_packages(
&self,
packages: Vec<String>,
) -> Result<()> {
let api_path = "/api/v3/configure/plugin_environment/install_packages";
let url = self.base_url.join(api_path)?;
#[derive(Serialize)]
struct Req {
packages: Vec<String>,
}
let mut req = self.http_client.post(url).json(&Req { packages });
let mut req = self
.http_client
.post(url)
.json(&ProcessingEngineInstallPackagesRequest { packages });
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(Method::DELETE, api_path, src))?;
.map_err(|src| Error::request_send(Method::POST, api_path, src))?;
let status = resp.status();
match status {
StatusCode::OK => Ok(()),
@ -711,27 +662,26 @@ impl Client {
}
}
/// Make a request to api/v3/configure/plugin_environment/install_requirements
/// Make a request to `POST /api/v3/configure/plugin_environment/install_requirements`
pub async fn api_v3_configure_processing_engine_trigger_install_requirements(
&self,
requirements_location: impl Into<String> + Send,
) -> Result<()> {
let api_path = "/api/v3/configure/plugin_environment/install_requirements";
let url = self.base_url.join(api_path)?;
#[derive(Serialize)]
struct Req {
requirements_location: String,
}
let mut req = self.http_client.post(url).json(&Req {
requirements_location: requirements_location.into(),
});
let mut req =
self.http_client
.post(url)
.json(&ProcessingEngineInstallRequirementsRequest {
requirements_location: requirements_location.into(),
});
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(Method::DELETE, api_path, src))?;
.map_err(|src| Error::request_send(Method::POST, api_path, src))?;
let status = resp.status();
match status {
StatusCode::OK => Ok(()),
@ -855,25 +805,6 @@ impl Client {
}
}
/// The response of the `/ping` API on `influxdb3`
#[derive(Debug, Serialize, Deserialize)]
pub struct PingResponse {
version: String,
revision: String,
}
impl PingResponse {
/// Get the `version` from the response
pub fn version(&self) -> &str {
&self.version
}
/// Get the `revision` from the response
pub fn revision(&self) -> &str {
&self.revision
}
}
/// The URL parameters of the request to the `/api/v3/write_lp` API
// TODO - this should re-use a type defined in the server code, or a separate crate,
// central to both.
@ -894,33 +825,6 @@ impl<'a, B> From<&'a WriteRequestBuilder<'a, B>> for WriteParams<'a> {
}
}
/// Time series precision
// TODO - this should re-use a type defined in the server code, or a separate crate,
// central to both.
#[derive(Debug, Copy, Clone, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum Precision {
Second,
Millisecond,
Microsecond,
Nanosecond,
}
impl std::str::FromStr for Precision {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
let p = match s {
"s" => Self::Second,
"ms" => Self::Millisecond,
"us" => Self::Microsecond,
"ns" => Self::Nanosecond,
_ => return Err(Error::UnrecognizedUnit(s.into())),
};
Ok(p)
}
}
/// Builder type for composing a request to `/api/v3/write_lp`
///
/// Produced by [`Client::api_v3_write_lp`]
@ -1340,33 +1244,6 @@ impl<'c> CreateLastCacheRequestBuilder<'c> {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct LastCacheCreatedResponse {
/// The table name the cache is associated with
pub table: String,
/// Given name of the cache
pub name: String,
/// Columns intended to be used as predicates in the cache
pub key_columns: Vec<u32>,
/// Columns that store values in the cache
pub value_columns: LastCacheValueColumnsDef,
/// The number of last values to hold in the cache
pub count: usize,
/// The time-to-live (TTL) in seconds for entries in the cache
pub ttl: u64,
}
/// A last cache will either store values for an explicit set of columns, or will accept all
/// non-key columns
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum LastCacheValueColumnsDef {
/// Explicit list of column names
Explicit { columns: Vec<u32> },
/// Stores all non-key columns
AllNonKeyColumns,
}
/// Type for composing requests to the `POST /api/v3/configure/distinct_cache` API created by the
/// [`Client::api_v3_configure_distinct_cache_create`] method
#[derive(Debug, Serialize)]
@ -1451,22 +1328,6 @@ impl<'c> CreateDistinctCacheRequestBuilder<'c> {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DistinctCacheCreatedResponse {
/// The id of the table the cache was created on
pub table_id: u32,
/// The name of the table the cache was created on
pub table_name: String,
/// The name of the created cache
pub cache_name: String,
/// The columns in the cache
pub column_ids: Vec<u32>,
/// The maximum number of unique value combinations the cache will hold
pub max_cardinality: usize,
/// The maximum age for entries in the cache
pub max_age_seconds: u64,
}
#[cfg(test)]
mod tests {
use mockito::{Matcher, Server};

View File

@ -1,39 +0,0 @@
//! Request structs for the /api/v3/plugin_test API
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
/// Request definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestRequest {
pub filename: String,
pub database: String,
pub input_lp: String,
pub input_arguments: Option<HashMap<String, String>>,
}
/// Response definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestResponse {
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}
/// Request definition for `POST /api/v3/plugin_test/schedule` API
#[derive(Debug, Serialize, Deserialize)]
pub struct SchedulePluginTestRequest {
pub filename: String,
pub database: String,
pub schedule: Option<String>,
pub input_arguments: Option<HashMap<String, String>>,
}
/// Response definition for `POST /api/v3/plugin_test/schedule` API
#[derive(Debug, Serialize, Deserialize)]
pub struct SchedulePluginTestResponse {
pub trigger_time: Option<String>,
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}

View File

@ -23,6 +23,7 @@ influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_py_api = { path = "../influxdb3_py_api" }
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
observability_deps.workspace = true

View File

@ -9,11 +9,11 @@ use hashbrown::HashMap;
use hyper::{Body, Response};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound;
use influxdb3_client::plugin_development::{
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_types::http::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_internal_api::query_executor::QueryExecutor;
#[cfg(feature = "system-py")]
use influxdb3_wal::PluginType;
use influxdb3_wal::{

View File

@ -2,11 +2,11 @@ use crate::environment::{PluginEnvironmentError, PythonEnvironmentManager};
use bytes::Bytes;
use hashbrown::HashMap;
use hyper::{Body, Response};
use influxdb3_client::plugin_development::{
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_types::http::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_write::WriteBuffer;
use std::fmt::Debug;

View File

@ -7,9 +7,9 @@ use data_types::NamespaceName;
use hashbrown::HashMap;
use influxdb3_catalog::catalog::Catalog;
#[cfg(feature = "system-py")]
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
#[cfg(feature = "system-py")]
use influxdb3_internal_api::query_executor::QueryExecutor;
#[cfg(feature = "system-py")]
use influxdb3_types::http::{WalPluginTestRequest, WalPluginTestResponse};
use influxdb3_wal::Gen1Duration;
#[cfg(feature = "system-py")]
use influxdb3_wal::TriggerDefinition;
@ -742,8 +742,8 @@ pub(crate) fn run_test_schedule_plugin(
catalog: Arc<Catalog>,
query_executor: Arc<dyn QueryExecutor>,
code: String,
request: influxdb3_client::plugin_development::SchedulePluginTestRequest,
) -> Result<influxdb3_client::plugin_development::SchedulePluginTestResponse, PluginError> {
request: influxdb3_types::http::SchedulePluginTestRequest,
) -> Result<influxdb3_types::http::SchedulePluginTestResponse, PluginError> {
let database = request.database;
let db = catalog.db_schema(&database).ok_or(PluginError::MissingDb)?;
@ -775,14 +775,12 @@ pub(crate) fn run_test_schedule_plugin(
let errors = test_write_handler.validate_all_writes(&database_writes);
let trigger_time = schedule_time.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true);
Ok(
influxdb3_client::plugin_development::SchedulePluginTestResponse {
trigger_time: Some(trigger_time),
log_lines,
database_writes,
errors,
},
)
Ok(influxdb3_types::http::SchedulePluginTestResponse {
trigger_time: Some(trigger_time),
log_lines,
database_writes,
errors,
})
}
#[cfg(feature = "system-py")]

View File

@ -38,6 +38,7 @@ influxdb3_id = { path = "../influxdb3_id" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_processing_engine = { path = "../influxdb3_processing_engine" }
influxdb3_types = { path = "../influxdb3_types"}
influxdb3_wal = { path = "../influxdb3_wal"}
influxdb3_write = { path = "../influxdb3_write" }
iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }

View File

@ -13,20 +13,19 @@ use datafusion::execution::RecordBatchStream;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::FutureExt;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use hyper::header::ACCEPT;
use hyper::header::AUTHORIZATION;
use hyper::header::CONTENT_ENCODING;
use hyper::header::CONTENT_TYPE;
use hyper::http::HeaderValue;
use hyper::HeaderMap;
use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, MaxAge, MaxCardinality};
use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, MaxAge};
use influxdb3_cache::last_cache;
use influxdb3_catalog::catalog::Error as CatalogError;
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError};
use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION};
use influxdb3_processing_engine::manager::{ProcessingEngineError, ProcessingEngineManager};
use influxdb3_types::http::*;
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_write::persister::TrackedMemoryArrowWriter;
use influxdb3_write::write_buffer::Error as WriteBufferError;
@ -226,6 +225,9 @@ pub enum Error {
#[error("Processing engine error: {0}")]
ProcessingEngine(#[from] influxdb3_processing_engine::manager::ProcessingEngineError),
#[error(transparent)]
Influxdb3TypesHttp(#[from] influxdb3_types::http::Error),
}
#[derive(Debug, Error)]
@ -598,15 +600,9 @@ where
}
fn ping(&self) -> Result<Response<Body>> {
#[derive(Debug, Serialize)]
struct PingResponse<'a> {
version: &'a str,
revision: &'a str,
}
let body = serde_json::to_string(&PingResponse {
version: &INFLUXDB3_VERSION,
revision: INFLUXDB3_GIT_HASH_SHORT,
version: INFLUXDB3_VERSION.to_string(),
revision: INFLUXDB3_GIT_HASH_SHORT.to_string(),
})?;
Ok(Response::new(Body::from(body)))
@ -1004,7 +1000,7 @@ where
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let ProcessEngineTriggerCreateRequest {
let ProcessingEngineTriggerCreateRequest {
db,
plugin_filename,
trigger_name,
@ -1052,7 +1048,7 @@ where
}
async fn delete_processing_engine_trigger(&self, req: Request<Body>) -> Result<Response<Body>> {
let ProcessEngineTriggerDeleteRequest {
let ProcessingEngineTriggerDeleteRequest {
db,
trigger_name,
force,
@ -1172,8 +1168,7 @@ where
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let request: influxdb3_client::plugin_development::WalPluginTestRequest =
self.read_body_json(req).await?;
let request: influxdb3_types::http::WalPluginTestRequest = self.read_body_json(req).await?;
let output = self
.processing_engine
@ -1200,7 +1195,7 @@ where
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let request: influxdb3_client::plugin_development::SchedulePluginTestRequest =
let request: influxdb3_types::http::SchedulePluginTestRequest =
self.read_body_json(req).await?;
let output = self
@ -1220,6 +1215,8 @@ where
trigger_path: &str,
req: Request<Body>,
) -> Result<Response<Body>> {
use hashbrown::HashMap;
// pull out the query params into a hashmap
let uri = req.uri();
let query_str = uri.query().unwrap_or("");
@ -1487,62 +1484,6 @@ pub enum ValidateDbNameError {
Empty,
}
#[derive(Debug, Deserialize)]
pub(crate) struct QueryRequest<D, F, P> {
#[serde(rename = "db")]
pub(crate) database: D,
#[serde(rename = "q")]
pub(crate) query_str: String,
pub(crate) format: F,
pub(crate) params: Option<P>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum QueryFormat {
Parquet,
Csv,
Pretty,
Json,
#[serde(alias = "jsonl")]
JsonLines,
}
impl QueryFormat {
fn as_content_type(&self) -> &str {
match self {
Self::Parquet => "application/vnd.apache.parquet",
Self::Csv => "text/csv",
Self::Pretty => "text/plain; charset=utf-8",
Self::Json => "application/json",
Self::JsonLines => "application/jsonl",
}
}
fn try_from_headers(headers: &HeaderMap) -> Result<Self> {
match headers.get(ACCEPT).map(HeaderValue::as_bytes) {
// Accept Headers use the MIME types maintained by IANA here:
// https://www.iana.org/assignments/media-types/media-types.xhtml
// Note parquet hasn't been accepted yet just Arrow, but there
// is the possibility it will be:
// https://issues.apache.org/jira/browse/PARQUET-1889
Some(b"application/vnd.apache.parquet") => Ok(Self::Parquet),
Some(b"text/csv") => Ok(Self::Csv),
Some(b"text/plain") => Ok(Self::Pretty),
Some(b"application/json" | b"*/*") | None => Ok(Self::Json),
Some(mime_type) => match String::from_utf8(mime_type.to_vec()) {
Ok(s) => {
if s.contains("text/html") || s.contains("*/*") {
return Ok(Self::Json);
}
Err(Error::InvalidMimeType(s))
}
Err(e) => Err(Error::NonUtf8MimeType(e)),
},
}
}
}
async fn record_batch_stream_to_body(
mut stream: Pin<Box<dyn RecordBatchStream + Send>>,
format: QueryFormat,
@ -1767,127 +1708,6 @@ impl From<iox_http::write::WriteParams> for WriteParams {
}
}
/// Request definition for the `POST /api/v3/configure/distinct_cache` API
#[derive(Debug, Deserialize)]
struct DistinctCacheCreateRequest {
/// The name of the database associated with the cache
db: String,
/// The name of the table associated with the cache
table: String,
/// The name of the cache. If not provided, the cache name will be generated from the table
/// name and selected column names.
name: Option<String>,
/// The columns to create the cache on.
// TODO: this should eventually be made optional, so that if not provided, the columns used will
// correspond to the series key columns for the table, i.e., the tags. See:
// https://github.com/influxdata/influxdb/issues/25585
columns: Vec<String>,
/// The maximumn number of distinct value combinations to hold in the cache
max_cardinality: Option<MaxCardinality>,
/// The duration in seconds that entries will be kept in the cache before being evicted
max_age: Option<u64>,
}
/// Request definition for the `DELETE /api/v3/configure/distinct_cache` API
#[derive(Debug, Deserialize)]
struct DistinctCacheDeleteRequest {
db: String,
table: String,
name: String,
}
/// Request definition for the `POST /api/v3/configure/last_cache` API
#[derive(Debug, Deserialize)]
struct LastCacheCreateRequest {
db: String,
table: String,
name: Option<String>,
key_columns: Option<Vec<String>>,
value_columns: Option<Vec<String>>,
count: Option<usize>,
ttl: Option<u64>,
}
/// Request definition for the `DELETE /api/v3/configure/last_cache` API
#[derive(Debug, Deserialize)]
struct LastCacheDeleteRequest {
db: String,
table: String,
name: String,
}
/// Request definition for `POST /api/v3/configure/processing_engine_trigger` API
#[derive(Debug, Deserialize)]
struct ProcessEngineTriggerCreateRequest {
db: String,
plugin_filename: String,
trigger_name: String,
trigger_specification: String,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
}
#[derive(Debug, Deserialize)]
struct ProcessEngineTriggerDeleteRequest {
db: String,
trigger_name: String,
#[serde(default)]
force: bool,
}
#[derive(Debug, Deserialize)]
struct ProcessingEngineInstallPackagesRequest {
packages: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct ProcessingEngineInstallRequirementsRequest {
requirements_location: String,
}
#[derive(Debug, Deserialize)]
struct ProcessingEngineTriggerIdentifier {
db: String,
trigger_name: String,
}
#[derive(Debug, Deserialize)]
struct ShowDatabasesRequest {
format: QueryFormat,
#[serde(default)]
show_deleted: bool,
}
#[derive(Debug, Deserialize)]
struct CreateDatabaseRequest {
db: String,
}
#[derive(Debug, Deserialize)]
struct DeleteDatabaseRequest {
db: String,
}
#[derive(Debug, Deserialize)]
struct CreateTableRequest {
db: String,
table: String,
tags: Vec<String>,
fields: Vec<CreateTableField>,
}
#[derive(Debug, Deserialize)]
struct CreateTableField {
name: String,
r#type: String,
}
#[derive(Debug, Deserialize)]
struct DeleteTableRequest {
db: String,
table: String,
}
pub(crate) async fn route_request<T: TimeProvider>(
http_server: Arc<HttpApi<T>>,
mut req: Request<Body>,
@ -2049,7 +1869,7 @@ fn legacy_write_error_to_response(e: WriteParseError) -> Response<Body> {
mod tests {
use http::{header::ACCEPT, HeaderMap, HeaderValue};
use crate::http::QueryFormat;
use super::QueryFormat;
use super::validate_db_name;
use super::ValidateDbNameError;

View File

@ -0,0 +1,22 @@
[package]
name = "influxdb3_types"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
# Core Crates
iox_http.workspace = true
# Local deps
influxdb3_cache = { path = "../influxdb3_cache" }
# crates.io dependencies
serde.workspace = true
hashbrown.workspace = true
hyper.workspace = true
thiserror.workspace = true
[lints]
workspace = true

315
influxdb3_types/src/http.rs Normal file
View File

@ -0,0 +1,315 @@
use hashbrown::HashMap;
use hyper::header::ACCEPT;
use hyper::http::HeaderValue;
use hyper::HeaderMap;
use influxdb3_cache::distinct_cache::MaxCardinality;
use serde::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("invalid mime type ({0})")]
InvalidMimeType(String),
#[error("the mime type specified was not valid UTF8: {0}")]
NonUtf8MimeType(#[from] std::string::FromUtf8Error),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PingResponse {
pub version: String,
pub revision: String,
}
impl PingResponse {
/// Get the `version` from the response
pub fn version(&self) -> &str {
&self.version
}
/// Get the `revision` from the response
pub fn revision(&self) -> &str {
&self.revision
}
}
/// A last cache will either store values for an explicit set of columns, or will accept all
/// non-key columns
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum LastCacheValueColumnsDef {
/// Explicit list of column names
Explicit { columns: Vec<u32> },
/// Stores all non-key columns
AllNonKeyColumns,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct LastCacheCreatedResponse {
/// The table name the cache is associated with
pub table: String,
/// Given name of the cache
pub name: String,
/// Columns intended to be used as predicates in the cache
pub key_columns: Vec<u32>,
/// Columns that store values in the cache
pub value_columns: LastCacheValueColumnsDef,
/// The number of last values to hold in the cache
pub count: usize,
/// The time-to-live (TTL) in seconds for entries in the cache
pub ttl: u64,
}
/// Request definition for the `POST /api/v3/configure/distinct_cache` API
#[derive(Debug, Deserialize)]
pub struct DistinctCacheCreateRequest {
/// The name of the database associated with the cache
pub db: String,
/// The name of the table associated with the cache
pub table: String,
/// The name of the cache. If not provided, the cache name will be generated from the table
/// name and selected column names.
pub name: Option<String>,
/// The columns to create the cache on.
// TODO: this should eventually be made optional, so that if not provided, the columns used will
// correspond to the series key columns for the table, i.e., the tags. See:
// https://github.com/influxdata/influxdb/issues/25585
pub columns: Vec<String>,
/// The maximumn number of distinct value combinations to hold in the cache
pub max_cardinality: Option<MaxCardinality>,
/// The duration in seconds that entries will be kept in the cache before being evicted
pub max_age: Option<u64>,
}
/// Resposne definition for the `POST /api/v3/configure/distinct_cache` API
#[derive(Debug, Serialize, Deserialize)]
pub struct DistinctCacheCreatedResponse {
/// The id of the table the cache was created on
pub table_id: u32,
/// The name of the table the cache was created on
pub table_name: String,
/// The name of the created cache
pub cache_name: String,
/// The columns in the cache
pub column_ids: Vec<u32>,
/// The maximum number of unique value combinations the cache will hold
pub max_cardinality: usize,
/// The maximum age for entries in the cache
pub max_age_seconds: u64,
}
/// Request definition for the `DELETE /api/v3/configure/distinct_cache` API
#[derive(Debug, Deserialize, Serialize)]
pub struct DistinctCacheDeleteRequest {
pub db: String,
pub table: String,
pub name: String,
}
/// Request definition for the `POST /api/v3/configure/last_cache` API
#[derive(Debug, Deserialize)]
pub struct LastCacheCreateRequest {
pub db: String,
pub table: String,
pub name: Option<String>,
pub key_columns: Option<Vec<String>>,
pub value_columns: Option<Vec<String>>,
pub count: Option<usize>,
pub ttl: Option<u64>,
}
/// Request definition for the `DELETE /api/v3/configure/last_cache` API
#[derive(Debug, Deserialize, Serialize)]
pub struct LastCacheDeleteRequest {
pub db: String,
pub table: String,
pub name: String,
}
/// Request definition for the `POST /api/v3/configure/processing_engine_plugin` API
#[derive(Debug, Deserialize, Serialize)]
pub struct ProcessingEnginePluginCreateRequest {
pub db: String,
pub plugin_name: String,
pub file_name: String,
pub plugin_type: String,
}
/// Request definition for the `DELETE /api/v3/configure/processing_engine_plugin` API
#[derive(Debug, Deserialize, Serialize)]
pub struct ProcessingEnginePluginDeleteRequest {
pub db: String,
pub plugin_name: String,
}
/// Request definition for the `POST /api/v3/configure/processing_engine_trigger` API
#[derive(Debug, Deserialize, Serialize)]
pub struct ProcessingEngineTriggerCreateRequest {
pub db: String,
pub plugin_filename: String,
pub trigger_name: String,
pub trigger_specification: String,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,
}
/// Request definition for the `DELETE /api/v3/configure/processing_engine_trigger` API
#[derive(Debug, Deserialize, Serialize)]
pub struct ProcessingEngineTriggerDeleteRequest {
pub db: String,
pub trigger_name: String,
#[serde(default)]
pub force: bool,
}
/// Request definition for the `POST /api/v3/configure/plugin_environment/install_packages` API
#[derive(Debug, Deserialize, Serialize)]
pub struct ProcessingEngineInstallPackagesRequest {
pub packages: Vec<String>,
}
/// Request definition for the `POST /api/v3/configure/plugin_environment/install_requirements` API
#[derive(Debug, Deserialize, Serialize)]
pub struct ProcessingEngineInstallRequirementsRequest {
pub requirements_location: String,
}
#[derive(Debug, Deserialize)]
pub struct ProcessingEngineTriggerIdentifier {
pub db: String,
pub trigger_name: String,
}
/// Request definition for the `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestRequest {
pub filename: String,
pub database: String,
pub input_lp: String,
pub input_arguments: Option<HashMap<String, String>>,
}
/// Response definition for the `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestResponse {
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}
/// Request definition for the `POST /api/v3/plugin_test/schedule` API
#[derive(Debug, Serialize, Deserialize)]
pub struct SchedulePluginTestRequest {
pub filename: String,
pub database: String,
pub schedule: Option<String>,
pub input_arguments: Option<HashMap<String, String>>,
}
/// Response definition for the `POST /api/v3/plugin_test/schedule` API
#[derive(Debug, Serialize, Deserialize)]
pub struct SchedulePluginTestResponse {
pub trigger_time: Option<String>,
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}
/// Request definition for the `GET /api/v3/configure/database` API
#[derive(Clone, Copy, Debug, Deserialize)]
pub struct ShowDatabasesRequest {
pub format: QueryFormat,
#[serde(default)]
pub show_deleted: bool,
}
/// Request definition for the `POST /api/v3/configure/database` API
#[derive(Debug, Deserialize, Serialize)]
pub struct CreateDatabaseRequest {
pub db: String,
}
/// Request definition for the `DELETE /api/v3/configure/database` API
#[derive(Debug, Deserialize)]
pub struct DeleteDatabaseRequest {
pub db: String,
}
/// Request definition for the `POST /api/v3/configure/table` API
#[derive(Debug, Deserialize, Serialize)]
pub struct CreateTableRequest {
pub db: String,
pub table: String,
pub tags: Vec<String>,
pub fields: Vec<CreateTableField>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct CreateTableField {
pub name: String,
pub r#type: String,
}
/// Request definition for the `DELETE /api/v3/configure/table` API
#[derive(Debug, Deserialize)]
pub struct DeleteTableRequest {
pub db: String,
pub table: String,
}
/// Request definition for the `POST /api/v3/query_sql` and `POST /api/v3/query_influxql` APIs
#[derive(Debug, Deserialize)]
pub struct QueryRequest<D, F, P> {
#[serde(rename = "db")]
pub database: D,
#[serde(rename = "q")]
pub query_str: String,
pub format: F,
pub params: Option<P>,
}
#[derive(Copy, Clone, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QueryFormat {
Parquet,
Csv,
Pretty,
Json,
#[serde(alias = "jsonl")]
JsonLines,
}
impl QueryFormat {
pub fn as_content_type(&self) -> &str {
match self {
Self::Parquet => "application/vnd.apache.parquet",
Self::Csv => "text/csv",
Self::Pretty => "text/plain; charset=utf-8",
Self::Json => "application/json",
Self::JsonLines => "application/jsonl",
}
}
pub fn try_from_headers(headers: &HeaderMap) -> std::result::Result<Self, Error> {
match headers.get(ACCEPT).map(HeaderValue::as_bytes) {
// Accept Headers use the MIME types maintained by IANA here:
// https://www.iana.org/assignments/media-types/media-types.xhtml
// Note parquet hasn't been accepted yet just Arrow, but there
// is the possibility it will be:
// https://issues.apache.org/jira/browse/PARQUET-1889
Some(b"application/vnd.apache.parquet") => Ok(Self::Parquet),
Some(b"text/csv") => Ok(Self::Csv),
Some(b"text/plain") => Ok(Self::Pretty),
Some(b"application/json" | b"*/*") | None => Ok(Self::Json),
Some(mime_type) => match String::from_utf8(mime_type.to_vec()) {
Ok(s) => {
if s.contains("text/html") || s.contains("*/*") {
return Ok(Self::Json);
}
Err(Error::InvalidMimeType(s))
}
Err(e) => Err(Error::NonUtf8MimeType(e)),
},
}
}
}

View File

@ -0,0 +1,2 @@
pub mod http;
pub mod write;

View File

@ -0,0 +1,44 @@
use serde::{Deserialize, Serialize};
/// The precision of the timestamp
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Precision {
Auto,
Second,
Millisecond,
Microsecond,
Nanosecond,
}
impl Default for Precision {
fn default() -> Self {
Self::Auto
}
}
impl From<iox_http::write::Precision> for Precision {
fn from(legacy: iox_http::write::Precision) -> Self {
match legacy {
iox_http::write::Precision::Second => Precision::Second,
iox_http::write::Precision::Millisecond => Precision::Millisecond,
iox_http::write::Precision::Microsecond => Precision::Microsecond,
iox_http::write::Precision::Nanosecond => Precision::Nanosecond,
}
}
}
impl std::str::FromStr for Precision {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let p = match s {
"s" => Self::Second,
"ms" => Self::Millisecond,
"us" => Self::Microsecond,
"ns" => Self::Nanosecond,
_ => return Err(format!("unrecognized precision unit: {s}")),
};
Ok(p)
}
}

View File

@ -26,12 +26,12 @@ schema.workspace = true
# Local deps
influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_py_api = {path = "../influxdb3_py_api"}

View File

@ -28,6 +28,7 @@ use influxdb3_cache::{
};
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema, TableDefinition};
use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId};
pub use influxdb3_types::write::Precision;
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, SnapshotSequenceNumber, Wal,
WalFileSequenceNumber,
@ -360,34 +361,6 @@ impl ParquetFile {
}
}
/// The precision of the timestamp
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Precision {
Auto,
Second,
Millisecond,
Microsecond,
Nanosecond,
}
impl Default for Precision {
fn default() -> Self {
Self::Auto
}
}
impl From<iox_http::write::Precision> for Precision {
fn from(legacy: iox_http::write::Precision) -> Self {
match legacy {
iox_http::write::Precision::Second => Precision::Second,
iox_http::write::Precision::Millisecond => Precision::Millisecond,
iox_http::write::Precision::Microsecond => Precision::Microsecond,
iox_http::write::Precision::Nanosecond => Precision::Nanosecond,
}
}
}
/// Guess precision based off of a given timestamp.
// Note that this will fail in June 2128, but that's not our problem
pub(crate) fn guess_precision(timestamp: i64) -> Precision {