feat: add csv influx v1 (#25030)

* feat: add csv influx v1

* fix: clippy error

* fix: cargo.lock

* fix: apply feedbacks

* test: add csv integration test

* fix: cargo audit
pull/25102/head
Jean Arhancet 2024-06-25 14:45:55 +02:00 committed by GitHub
parent 5cb7874b2c
commit b6718e59e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 291 additions and 314 deletions

287
Cargo.lock generated
View File

@ -1263,16 +1263,15 @@ dependencies = [
[[package]]
name = "curve25519-dalek"
version = "4.1.2"
version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348"
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
dependencies = [
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"digest",
"fiat-crypto",
"platforms",
"rustc_version",
"subtle",
"zeroize",
@ -1757,17 +1756,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "displaydoc"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "dml"
version = "0.1.0"
@ -2472,124 +2460,6 @@ dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526"
dependencies = [
"displaydoc",
"yoke",
"zerofrom",
"zerovec",
]
[[package]]
name = "icu_locid"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637"
dependencies = [
"displaydoc",
"litemap",
"tinystr",
"writeable",
"zerovec",
]
[[package]]
name = "icu_locid_transform"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e"
dependencies = [
"displaydoc",
"icu_locid",
"icu_locid_transform_data",
"icu_provider",
"tinystr",
"zerovec",
]
[[package]]
name = "icu_locid_transform_data"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e"
[[package]]
name = "icu_normalizer"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f"
dependencies = [
"displaydoc",
"icu_collections",
"icu_normalizer_data",
"icu_properties",
"icu_provider",
"smallvec",
"utf16_iter",
"utf8_iter",
"write16",
"zerovec",
]
[[package]]
name = "icu_normalizer_data"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516"
[[package]]
name = "icu_properties"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f8ac670d7422d7f76b32e17a5db556510825b29ec9154f235977c9caba61036"
dependencies = [
"displaydoc",
"icu_collections",
"icu_locid_transform",
"icu_properties_data",
"icu_provider",
"tinystr",
"zerovec",
]
[[package]]
name = "icu_properties_data"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569"
[[package]]
name = "icu_provider"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9"
dependencies = [
"displaydoc",
"icu_locid",
"icu_provider_macros",
"stable_deref_trait",
"tinystr",
"writeable",
"yoke",
"zerofrom",
"zerovec",
]
[[package]]
name = "icu_provider_macros"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "ident_case"
version = "1.0.1"
@ -2598,14 +2468,12 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "idna"
version = "1.0.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed"
checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
dependencies = [
"icu_normalizer",
"icu_properties",
"smallvec",
"utf8_iter",
"unicode-bidi",
"unicode-normalization",
]
[[package]]
@ -2770,6 +2638,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"chrono",
"csv",
"data_types",
"datafusion",
"datafusion_util",
@ -3359,12 +3228,6 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "litemap"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704"
[[package]]
name = "lock_api"
version = "0.4.12"
@ -3498,9 +3361,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.7.3"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae"
checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08"
dependencies = [
"adler",
]
@ -4112,12 +3975,6 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "platforms"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7"
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -5471,12 +5328,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
@ -5524,9 +5375,9 @@ dependencies = [
[[package]]
name = "subtle"
version = "2.5.0"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
checksum = "0d0208408ba0c3df17ed26eb06992cb1a1268d41b2c0e12e65203fbe3972cee5"
[[package]]
name = "syn"
@ -5562,17 +5413,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
[[package]]
name = "synstructure"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "sysinfo"
version = "0.30.12"
@ -5817,16 +5657,6 @@ dependencies = [
"crunchy",
]
[[package]]
name = "tinystr"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f"
dependencies = [
"displaydoc",
"zerovec",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
@ -6376,9 +6206,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.1"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c25da092f0a868cdf09e8674cd3b7ef3a7d92a24253e663a2fb85e2496de56"
checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c"
dependencies = [
"form_urlencoded",
"idna",
@ -6397,18 +6227,6 @@ version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
[[package]]
name = "utf16_iter"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246"
[[package]]
name = "utf8_iter"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
@ -6927,18 +6745,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "write16"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936"
[[package]]
name = "writeable"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
[[package]]
name = "xz2"
version = "0.1.7"
@ -6954,30 +6760,6 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "yoke"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5"
dependencies = [
"serde",
"stable_deref_trait",
"yoke-derive",
"zerofrom",
]
[[package]]
name = "yoke-derive"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
"synstructure",
]
[[package]]
name = "zerocopy"
version = "0.7.34"
@ -6998,27 +6780,6 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "zerofrom"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
"synstructure",
]
[[package]]
name = "zeroize"
version = "1.8.1"
@ -7039,28 +6800,6 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "zerovec"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb2cc8827d6c0994478a15c53f374f46fbd41bea663d809b14744bc42e6b109c"
dependencies = [
"yoke",
"zerofrom",
"zerovec-derive",
]
[[package]]
name = "zerovec-derive"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "zstd"
version = "0.13.1"

View File

@ -10,6 +10,7 @@ use assert_cmd::cargo::CommandCargoExt;
use futures::TryStreamExt;
use influxdb3_client::Precision;
use influxdb_iox_client::flightsql::FlightSqlClient;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::Response;
mod auth;
@ -203,9 +204,25 @@ impl TestServer {
.expect("send /api/v3/query_influxql request to server")
}
pub async fn api_v1_query(&self, params: &[(&str, &str)]) -> Response {
pub async fn api_v1_query(
&self,
params: &[(&str, &str)],
headers: Option<&[(&str, &str)]>,
) -> Response {
let default_headers = [("Accept", "application/json")];
let headers = headers.unwrap_or(&default_headers);
let mut header_map = HeaderMap::new();
for (key, value) in headers {
header_map.insert(
HeaderName::from_bytes(key.as_bytes()).expect("Invalid header key"),
HeaderValue::from_bytes(value.as_bytes()).expect("Invalid header value"),
);
}
self.http_client
.get(format!("{base}/query", base = self.client_addr(),))
.headers(header_map)
.query(params)
.send()
.await

View File

@ -675,7 +675,7 @@ async fn api_v3_query_json_format() {
}
#[tokio::test]
async fn api_v1_query() {
async fn api_v1_query_json_format() {
let server = TestServer::spawn().await;
server
@ -836,7 +836,7 @@ async fn api_v1_query() {
params.push(("epoch", epoch));
}
let resp = server
.api_v1_query(&params)
.api_v1_query(&params, None)
.await
.json::<Value>()
.await
@ -847,6 +847,99 @@ async fn api_v1_query() {
}
}
#[tokio::test]
async fn api_v1_query_csv_format() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"cpu,host=a usage=0.9 1\n\
cpu,host=a usage=0.89 2\n\
cpu,host=a usage=0.85 3\n\
mem,host=a usage=0.5 4\n\
mem,host=a usage=0.6 5\n\
mem,host=a usage=0.7 6",
Precision::Second,
)
.await
.unwrap();
struct TestCase<'a> {
database: Option<&'a str>,
epoch: Option<&'a str>,
query: &'a str,
expected: &'a str,
}
let test_cases = [
// Basic Query:
TestCase {
database: Some("foo"),
epoch: None,
query: "SELECT time, host, usage FROM cpu",
expected: "name,tags,time,host,usage\n\
cpu,,1970-01-01T00:00:01,a,0.9\n\
cpu,,1970-01-01T00:00:02,a,0.89\n\
cpu,,1970-01-01T00:00:03,a,0.85\n\r\n",
},
// Basic Query with multiple measurements:
TestCase {
database: Some("foo"),
epoch: None,
query: "SELECT time, host, usage FROM cpu, mem",
expected: "name,tags,time,host,usage\n\
mem,,1970-01-01T00:00:04,a,0.5\n\
mem,,1970-01-01T00:00:05,a,0.6\n\
mem,,1970-01-01T00:00:06,a,0.7\n\
cpu,,1970-01-01T00:00:01,a,0.9\n\
cpu,,1970-01-01T00:00:02,a,0.89\n\
cpu,,1970-01-01T00:00:03,a,0.85\n\r\n",
},
// Basic Query with db in query string:
TestCase {
database: None,
epoch: None,
query: "SELECT time, host, usage FROM foo.autogen.cpu",
expected: "name,tags,time,host,usage\n\
cpu,,1970-01-01T00:00:01,a,0.9\n\
cpu,,1970-01-01T00:00:02,a,0.89\n\
cpu,,1970-01-01T00:00:03,a,0.85\n\r\n",
},
// Basic Query epoch parameter set:
TestCase {
database: Some("foo"),
epoch: Some("s"),
query: "SELECT time, host, usage FROM cpu",
expected: "name,tags,time,host,usage\n\
cpu,,1,a,0.9\n\
cpu,,2,a,0.89\n\
cpu,,3,a,0.85\n\r\n",
},
];
for t in test_cases {
let mut params = vec![("q", t.query)];
if let Some(db) = t.database {
params.push(("db", db));
}
if let Some(epoch) = t.epoch {
params.push(("epoch", epoch));
}
let headers = vec![("Accept", "application/csv")];
let resp = server
.api_v1_query(&params, Some(&headers))
.await
.text()
.await
.unwrap();
println!("\n{q}", q = t.query);
println!("{resp:#}");
assert_eq!(t.expected, resp, "query failed: {q}", q = t.query);
}
}
#[tokio::test]
async fn api_v1_query_chunked() {
let server = TestServer::spawn().await;
@ -1063,7 +1156,7 @@ async fn api_v1_query_chunked() {
if let Some(chunk_size) = t.chunk_size {
params.push(("chunk_size", chunk_size));
}
let stream = server.api_v1_query(&params).await.bytes_stream();
let stream = server.api_v1_query(&params, None).await.bytes_stream();
let values = stream
.map(|chunk| {
println!("{chunk:?}");

View File

@ -45,6 +45,7 @@ async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
csv.workspace = true
datafusion.workspace = true
flate2.workspace = true
futures.workspace = true

View File

@ -78,6 +78,9 @@ pub enum Error {
#[error("error decoding gzip stream: {0}")]
InvalidGzip(std::io::Error),
#[error("invalid mime type ({0})")]
InvalidMimeType(String),
/// NamespaceName validation error.
#[error("error validating namespace name: {0}")]
InvalidNamespaceName(#[from] data_types::NamespaceNameError),
@ -115,13 +118,13 @@ pub enum Error {
#[error("missing query parameter 'db'")]
MissingWriteParams,
#[error("the mime type specified was not valid UTF8: {0}")]
NonUtf8MimeType(#[from] FromUtf8Error),
/// Serde decode error
#[error("serde error: {0}")]
Serde(#[from] serde_urlencoded::de::Error),
#[error("error in query parameters: {0}")]
QueryParams(#[from] QueryParamsError),
/// Arrow error
#[error("arrow error: {0}")]
Arrow(#[from] arrow::error::ArrowError),
@ -767,17 +770,6 @@ pub(crate) struct QueryRequest<D, F, P> {
pub(crate) params: Option<P>,
}
#[derive(Debug, thiserror::Error)]
pub enum QueryParamsError {
#[error(
"invalid mime type ({0}), available types are \
application/vnd.apache.parquet, text/csv, text/plain, and application/json"
)]
InvalidMimeType(String),
#[error("the mime type specified was not valid UTF8: {0}")]
NonUtf8MimeType(#[from] FromUtf8Error),
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum QueryFormat {
@ -809,8 +801,8 @@ impl QueryFormat {
Some(b"text/plain") => Ok(Self::Pretty),
Some(b"application/json" | b"*/*") | None => Ok(Self::Json),
Some(mime_type) => match String::from_utf8(mime_type.to_vec()) {
Ok(s) => Err(QueryParamsError::InvalidMimeType(s).into()),
Err(e) => Err(QueryParamsError::NonUtf8MimeType(e).into()),
Ok(s) => Err(Error::InvalidMimeType(s)),
Err(e) => Err(Error::NonUtf8MimeType(e)),
},
}
}

View File

@ -18,7 +18,8 @@ use arrow_schema::DataType;
use bytes::Bytes;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{ready, stream::Fuse, Stream, StreamExt};
use hyper::{Body, Request, Response};
use hyper::http::HeaderValue;
use hyper::{header::ACCEPT, header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
use observability_deps::tracing::info;
@ -58,16 +59,23 @@ where
query,
} = params;
let format = QueryFormat::from_request(&req, pretty)?;
info!(?format, "handle v1 format API");
let chunk_size = chunked.then(|| chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE));
// TODO - Currently not supporting parameterized queries, see
// https://github.com/influxdata/influxdb/issues/24805
let stream = self.query_influxql_inner(database, &query, None).await?;
let stream =
QueryResponseStream::new(0, stream, chunk_size, pretty, epoch).map_err(QueryError)?;
QueryResponseStream::new(0, stream, chunk_size, format, epoch).map_err(QueryError)?;
let body = Body::wrap_stream(stream);
Ok(Response::builder().status(200).body(body).unwrap())
Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, format.as_content_type())
.body(body)
.unwrap())
}
}
@ -106,6 +114,71 @@ impl QueryParams {
}
}
/// Enum representing the query format for the v1/query API.
///
/// The original API supports CSV, JSON, and "pretty" JSON formats.
#[derive(Debug, Deserialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub(crate) enum QueryFormat {
Csv,
Json,
JsonPretty,
}
impl QueryFormat {
/// Returns the content type as a string slice for the query format.
///
/// Maps the `QueryFormat` variants to their corresponding MIME types as strings.
/// This is useful for setting the `Content-Type` header in HTTP responses.
fn as_content_type(&self) -> &str {
match self {
Self::Csv => "application/csv",
Self::Json | Self::JsonPretty => "application/json",
}
}
/// Checks if the query format is 'JsonPretty'.
///
/// Determines if the `QueryFormat` is `JsonPretty`, which indicates that the JSON
/// output should be formatted in a human-readable way. Returns `true` if the
/// format is `JsonPretty`, otherwise returns `false`.
fn is_pretty(&self) -> bool {
match self {
Self::Csv | Self::Json => false,
Self::JsonPretty => true,
}
}
/// Extracts the [`QueryFormat`] from an HTTP [`Request`].
///
/// Parses the HTTP request to determine the desired query format. The `pretty`
/// parameter indicates if the pretty format is requested via a query parameter.
/// The function inspects the `Accept` header of the request to determine the
/// format, defaulting to JSON if no specific format is requested. If the format
/// is invalid or non-UTF8, an error is returned.
fn from_request(req: &Request<Body>, pretty: bool) -> Result<Self> {
let mime_type = req.headers().get(ACCEPT).map(HeaderValue::as_bytes);
match mime_type {
Some(b"application/csv" | b"text/csv") => Ok(Self::Csv),
Some(b"application/json" | b"*/*") | None => {
// If no specific format is requested via the Accept header,
// and the 'pretty' parameter is true, use the pretty JSON format.
// Otherwise, default to the regular JSON format.
if pretty {
Ok(Self::JsonPretty)
} else {
Ok(Self::Json)
}
}
Some(mime_type) => match String::from_utf8(mime_type.to_vec()) {
Ok(s) => Err(Error::InvalidMimeType(s)),
Err(e) => Err(Error::NonUtf8MimeType(e)),
},
}
}
}
/// UNIX epoch precision
#[derive(Debug, Deserialize, Clone, Copy)]
enum Precision {
@ -140,23 +213,85 @@ pub struct QueryError(#[from] anyhow::Error);
struct QueryResponse {
results: Vec<StatementResponse>,
#[serde(skip_serializing)]
pretty: bool,
format: QueryFormat,
}
/// Convert [`QueryResponse`] to [`Bytes`] for `hyper`'s [`Body::wrap_stream`] method
impl From<QueryResponse> for Bytes {
fn from(s: QueryResponse) -> Self {
if s.pretty {
serde_json::to_vec_pretty(&s)
} else {
serde_json::to_vec(&s)
/// Convert a [`QueryResponse`] to a JSON byte vector.
///
/// This function serializes the `QueryResponse` to JSON. If the format is
/// `JsonPretty`, it will produce human-readable JSON, otherwise it produces
/// compact JSON.
fn to_json(s: QueryResponse) -> Vec<u8> {
if s.format.is_pretty() {
serde_json::to_vec_pretty(&s)
.expect("Failed to serialize QueryResponse to pretty JSON")
} else {
serde_json::to_vec(&s).expect("Failed to serialize QueryResponse to JSON")
}
}
/// Convert a [`QueryResponse`] to a CSV byte vector.
///
/// This function serializes the `QueryResponse` to CSV format. It dynamically
/// extracts column names from the first series and writes the header and data
/// rows to the CSV writer.
fn to_csv(s: QueryResponse) -> Vec<u8> {
let mut wtr = csv::WriterBuilder::new()
.quote_style(csv::QuoteStyle::Never)
.from_writer(vec![]);
// Extract column names dynamically from the first series
let mut headers = vec!["name", "tags"];
if let Some(first_statement) = s.results.first() {
if let Some(first_series) = first_statement.series.first() {
headers.extend(first_series.columns.iter().map(|s| s.as_str()));
}
}
// Write the header
wtr.write_record(&headers)
.expect("Failed to write CSV header");
// Iterate through the hierarchical structure of QueryResponse to write data
// to the CSV writer. The loop processes each statement, series, and row to
// build and write CSV records. Each record is initialized with the series name
// and an empty tag field, followed by the string representations of the row's values.
// Finally, the record is written to the CSV writer
for statement in s.results {
for series in statement.series {
for row in series.values {
let mut record = vec![series.name.clone(), "".to_string()];
for v in row.0 {
record.push(match v {
Value::String(s) => s.clone(),
_ => v.to_string(),
});
}
wtr.write_record(&record)
.expect("Failed to write CSV record");
}
}
}
// Flush the CSV writer to ensure all data is written
wtr.flush().expect("flush csv writer");
wtr.into_inner().expect("into_inner from csv writer")
}
/// Extend a byte vector with CRLF and convert it to [`Bytes`].
///
/// This function appends a CRLF (`\r\n`) sequence to the given byte vector
/// and converts it to a `Bytes` object.
fn extend_with_crlf(mut bytes: Vec<u8>) -> Bytes {
bytes.extend_from_slice(b"\r\n");
Bytes::from(bytes)
}
match s.format {
QueryFormat::Json | QueryFormat::JsonPretty => extend_with_crlf(to_json(s)),
QueryFormat::Csv => extend_with_crlf(to_csv(s)),
}
.map(|mut b| {
b.extend_from_slice(b"\r\n");
b
})
.expect("valid bytes in statement result")
.into()
}
}
@ -264,7 +399,7 @@ impl ChunkBuffer {
/// `chunked` mode, the entire input stream of [`RecordBatch`]es will be buffered
/// into memory before being emitted.
///
/// `pretty` will emit pretty formatted JSON.
/// `format` will emit CSV, JSON or pretty formatted JSON.
///
/// Providing an `epoch` [`Precision`] will have the `time` column values emitted
/// as UNIX epoch times with the given precision.
@ -276,7 +411,7 @@ struct QueryResponseStream {
input: Fuse<SendableRecordBatchStream>,
column_map: HashMap<String, usize>,
statement_id: usize,
pretty: bool,
format: QueryFormat,
epoch: Option<Precision>,
}
@ -288,7 +423,7 @@ impl QueryResponseStream {
statement_id: usize,
input: SendableRecordBatchStream,
chunk_size: Option<usize>,
pretty: bool,
format: QueryFormat,
epoch: Option<Precision>,
) -> Result<Self, anyhow::Error> {
let buffer = ChunkBuffer::new(chunk_size);
@ -310,7 +445,7 @@ impl QueryResponseStream {
buffer,
column_map,
input: input.fuse(),
pretty,
format,
statement_id,
epoch,
})
@ -402,7 +537,7 @@ impl QueryResponseStream {
statement_id: self.statement_id,
series,
}],
pretty: self.pretty,
format: self.format,
}
}
@ -424,7 +559,7 @@ impl QueryResponseStream {
statement_id: self.statement_id,
series,
}],
pretty: self.pretty,
format: self.format,
})
}
}