refactor: DRY up influxdb3_client code (#25962)

* deduplicate QueryParams->QueryRequest and Format->QueryFormat
* move WriteParams into influxdb3_types crate
* DRY up client HTTP request handling code in *RequestBuilder.send
  methods.
* DRY up a bunch of other non-Builder http request handling
chore/fix-build
wayne 2025-02-04 08:54:37 -07:00 committed by GitHub
parent 27653f5a76
commit fa18b6d8da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 539 additions and 688 deletions

2
Cargo.lock generated
View File

@ -2969,6 +2969,7 @@ dependencies = [
"humantime",
"influxdb3_client",
"influxdb3_process",
"influxdb3_types",
"observability_deps",
"parking_lot",
"rand",
@ -3200,6 +3201,7 @@ dependencies = [
"hyper 0.14.32",
"influxdb3_cache",
"iox_http",
"iox_query_params",
"serde",
"thiserror 1.0.69",
]

View File

@ -42,7 +42,7 @@ impl Format {
}
}
impl From<Format> for influxdb3_client::Format {
impl From<Format> for influxdb3_types::http::QueryFormat {
fn from(this: Format) -> Self {
match this {
Format::Pretty => Self::Pretty,

View File

@ -2,8 +2,8 @@
//!
//! This is useful for verifying that the client can parse API responses from the server
use influxdb3_client::{Format, Precision};
use influxdb3_types::http::LastCacheCreatedResponse;
use influxdb3_client::Precision;
use influxdb3_types::http::{LastCacheCreatedResponse, QueryFormat as Format};
use crate::server::TestServer;

View File

@ -17,7 +17,7 @@ use influxdb3_id::{ColumnId, TableId};
use influxdb3_wal::{DistinctCacheDefinition, FieldData, Row};
use iox_time::TimeProvider;
use schema::{InfluxColumnType, InfluxFieldType};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)]
pub enum CacheError {
@ -68,7 +68,7 @@ pub struct CreateDistinctCacheArgs {
pub column_ids: Vec<ColumnId>,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
pub struct MaxCardinality(NonZeroUsize);
impl TryFrom<usize> for MaxCardinality {
@ -89,6 +89,12 @@ impl Default for MaxCardinality {
}
}
impl From<NonZeroUsize> for MaxCardinality {
fn from(v: NonZeroUsize) -> Self {
Self(v)
}
}
impl From<MaxCardinality> for usize {
fn from(value: MaxCardinality) -> Self {
value.0.into()

File diff suppressed because it is too large Load Diff

View File

@ -13,6 +13,7 @@ trogging.workspace = true
# Local Deps
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_types = { path = "../influxdb3_types" }
# crates.io Dependencies
anyhow.workspace = true

View File

@ -56,7 +56,7 @@ pub enum Format {
Csv,
}
impl From<Format> for influxdb3_client::Format {
impl From<Format> for influxdb3_types::http::QueryFormat {
fn from(format: Format) -> Self {
match format {
Format::Json => Self::Json,

View File

@ -529,9 +529,9 @@ where
database,
body,
default_time,
params.accept_partial,
params.precision,
params.no_sync,
params.accept_partial.unwrap_or(true),
params.precision.unwrap_or(Precision::Auto),
params.no_sync.unwrap_or(false),
)
.await?;
@ -1681,33 +1681,6 @@ async fn record_batch_stream_to_body(
}
}
// This is a hack around the fact that bool default is false not true
const fn true_fn() -> bool {
true
}
#[derive(Debug, Deserialize)]
pub(crate) struct WriteParams {
pub(crate) db: String,
#[serde(default = "true_fn")]
pub(crate) accept_partial: bool,
#[serde(default)]
pub(crate) precision: Precision,
#[serde(default)]
pub(crate) no_sync: bool,
}
impl From<iox_http::write::WriteParams> for WriteParams {
fn from(legacy: iox_http::write::WriteParams) -> Self {
Self {
db: legacy.namespace.to_string(),
// legacy behaviour was to not accept partial:
accept_partial: false,
precision: legacy.precision.into(),
no_sync: false,
}
}
}
pub(crate) async fn route_request<T: TimeProvider>(
http_server: Arc<HttpApi<T>>,
mut req: Request<Body>,

View File

@ -8,6 +8,7 @@ license.workspace = true
[dependencies]
# Core Crates
iox_http.workspace = true
iox_query_params.workspace = true
# Local deps
influxdb3_cache = { path = "../influxdb3_cache" }

View File

@ -1,11 +1,13 @@
use hashbrown::HashMap;
use hyper::header::ACCEPT;
use hyper::http::HeaderValue;
use hyper::HeaderMap;
use influxdb3_cache::distinct_cache::MaxCardinality;
use iox_query_params::StatementParams;
use serde::{Deserialize, Serialize};
use crate::write::Precision;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("invalid mime type ({0})")]
@ -15,7 +17,7 @@ pub enum Error {
NonUtf8MimeType(#[from] std::string::FromUtf8Error),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PingResponse {
pub version: String,
pub revision: String,
@ -44,7 +46,7 @@ pub enum LastCacheValueColumnsDef {
AllNonKeyColumns,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct LastCacheCreatedResponse {
/// The table name the cache is associated with
pub table: String,
@ -61,7 +63,7 @@ pub struct LastCacheCreatedResponse {
}
/// Request definition for the `POST /api/v3/configure/distinct_cache` API
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct DistinctCacheCreateRequest {
/// The name of the database associated with the cache
pub db: String,
@ -69,6 +71,7 @@ pub struct DistinctCacheCreateRequest {
pub table: String,
/// The name of the cache. If not provided, the cache name will be generated from the table
/// name and selected column names.
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// The columns to create the cache on.
// TODO: this should eventually be made optional, so that if not provided, the columns used will
@ -76,13 +79,15 @@ pub struct DistinctCacheCreateRequest {
// https://github.com/influxdata/influxdb/issues/25585
pub columns: Vec<String>,
/// The maximumn number of distinct value combinations to hold in the cache
#[serde(skip_serializing_if = "Option::is_none")]
pub max_cardinality: Option<MaxCardinality>,
/// The duration in seconds that entries will be kept in the cache before being evicted
#[serde(skip_serializing_if = "Option::is_none")]
pub max_age: Option<u64>,
}
/// Resposne definition for the `POST /api/v3/configure/distinct_cache` API
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct DistinctCacheCreatedResponse {
/// The id of the table the cache was created on
pub table_id: u32,
@ -107,14 +112,19 @@ pub struct DistinctCacheDeleteRequest {
}
/// Request definition for the `POST /api/v3/configure/last_cache` API
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct LastCacheCreateRequest {
pub db: String,
pub table: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub key_columns: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub value_columns: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub count: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ttl: Option<u64>,
}
@ -174,14 +184,14 @@ pub struct ProcessingEngineInstallRequirementsRequest {
pub requirements_location: String,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct ProcessingEngineTriggerIdentifier {
pub db: String,
pub trigger_name: String,
}
/// Request definition for the `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct WalPluginTestRequest {
pub filename: String,
pub database: String,
@ -190,7 +200,7 @@ pub struct WalPluginTestRequest {
}
/// Response definition for the `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct WalPluginTestResponse {
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
@ -198,7 +208,7 @@ pub struct WalPluginTestResponse {
}
/// Request definition for the `POST /api/v3/plugin_test/schedule` API
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct SchedulePluginTestRequest {
pub filename: String,
pub database: String,
@ -207,7 +217,7 @@ pub struct SchedulePluginTestRequest {
}
/// Response definition for the `POST /api/v3/plugin_test/schedule` API
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct SchedulePluginTestResponse {
pub trigger_time: Option<String>,
pub log_lines: Vec<String>,
@ -216,7 +226,7 @@ pub struct SchedulePluginTestResponse {
}
/// Request definition for the `GET /api/v3/configure/database` API
#[derive(Clone, Copy, Debug, Deserialize)]
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub struct ShowDatabasesRequest {
pub format: QueryFormat,
#[serde(default)]
@ -230,7 +240,7 @@ pub struct CreateDatabaseRequest {
}
/// Request definition for the `DELETE /api/v3/configure/database` API
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct DeleteDatabaseRequest {
pub db: String,
}
@ -251,14 +261,16 @@ pub struct CreateTableField {
}
/// Request definition for the `DELETE /api/v3/configure/table` API
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct DeleteTableRequest {
pub db: String,
pub table: String,
}
pub type ClientQueryRequest = QueryRequest<String, Option<QueryFormat>, StatementParams>;
/// Request definition for the `POST /api/v3/query_sql` and `POST /api/v3/query_influxql` APIs
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct QueryRequest<D, F, P> {
#[serde(rename = "db")]
pub database: D,
@ -268,7 +280,7 @@ pub struct QueryRequest<D, F, P> {
pub params: Option<P>,
}
#[derive(Copy, Clone, Debug, Deserialize)]
#[derive(Copy, Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum QueryFormat {
Parquet,
@ -313,3 +325,24 @@ impl QueryFormat {
}
}
}
/// The URL parameters of the request to the `/api/v3/write_lp` API
#[derive(Debug, Deserialize, Serialize)]
pub struct WriteParams {
pub db: String,
pub precision: Option<Precision>,
pub accept_partial: Option<bool>,
pub no_sync: Option<bool>,
}
impl From<iox_http::write::WriteParams> for WriteParams {
fn from(legacy: iox_http::write::WriteParams) -> Self {
Self {
db: legacy.namespace.to_string(),
// legacy behaviour was to not accept partial:
accept_partial: Some(false),
precision: Some(legacy.precision.into()),
no_sync: Some(false),
}
}
}