feat: Add different output support to queries (#24616)

This commit adds the ability to choose the output format of a query via
the v3 api so that a user can choose, whether by Accept headers or the
format url param, how the data will be returned to them.

Prior to this commit the default was a pretty printed text format, but
that instead has been changed to json as the default.

There are multiple formats one can choose:

1. json
2. csv
3. pretty printed text
4. parquet

I've tested each of these out and it works well. In particular the
parquet output is exciting as users will be able to perform a query and
receive back parquet data that they can then load into say a Python
script or something else to work on and operate it. As we extend what
data can be queried, as well as persisting it, what people will be able
to do with Edge will be really cool and I'm interested to see how users
will end up using this functionality in the future.
pull/24624/head
Michael Gattozzi 2024-02-12 12:04:05 -05:00 committed by GitHub
parent 8a68ae3f11
commit b555ddf18b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 214 additions and 46 deletions

81
Cargo.lock generated
View File

@ -456,12 +456,12 @@ checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]] [[package]]
name = "async-channel" name = "async-channel"
version = "2.1.1" version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3"
dependencies = [ dependencies = [
"concurrent-queue", "concurrent-queue",
"event-listener 4.0.3", "event-listener 5.0.0",
"event-listener-strategy", "event-listener-strategy",
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
@ -760,9 +760,9 @@ checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205"
[[package]] [[package]]
name = "bytemuck" name = "bytemuck"
version = "1.14.1" version = "1.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2490600f404f2b94c167e31d3ed1d5f3c225a0f3b80230053b3e0b7b962bd9" checksum = "ea31d69bda4949c1c1562c1e6f042a1caefac98cdc8a298260a2ff41c1e2d42b"
[[package]] [[package]]
name = "byteorder" name = "byteorder"
@ -956,9 +956,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.4.18" version = "4.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" checksum = "80c21025abd42669a92efc996ef13cfb2c5c627858421ea58d5c3b331a6c134f"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
"clap_derive", "clap_derive",
@ -994,21 +994,21 @@ dependencies = [
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.4.18" version = "4.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" checksum = "458bf1f341769dfcf849846f65dffdf9146daa56bcd2a47cb4e1de9915567c99"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
"clap_lex", "clap_lex",
"strsim", "strsim 0.11.0",
] ]
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "4.4.7" version = "4.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47"
dependencies = [ dependencies = [
"heck", "heck",
"proc-macro2", "proc-macro2",
@ -1018,9 +1018,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_lex" name = "clap_lex"
version = "0.6.0" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce"
[[package]] [[package]]
name = "client_util" name = "client_util"
@ -1339,9 +1339,9 @@ dependencies = [
[[package]] [[package]]
name = "curve25519-dalek" name = "curve25519-dalek"
version = "4.1.1" version = "4.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"cpufeatures", "cpufeatures",
@ -1385,7 +1385,7 @@ dependencies = [
"ident_case", "ident_case",
"proc-macro2", "proc-macro2",
"quote", "quote",
"strsim", "strsim 0.10.0",
"syn 2.0.48", "syn 2.0.48",
] ]
@ -1781,9 +1781,9 @@ dependencies = [
[[package]] [[package]]
name = "ed25519-dalek" name = "ed25519-dalek"
version = "2.1.0" version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f628eaec48bfd21b865dc2950cfa014450c01d2fa2b69a86c2fd5844ec523c0" checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871"
dependencies = [ dependencies = [
"curve25519-dalek", "curve25519-dalek",
"ed25519", "ed25519",
@ -1861,9 +1861,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]] [[package]]
name = "event-listener" name = "event-listener"
version = "4.0.3" version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1"
dependencies = [ dependencies = [
"concurrent-queue", "concurrent-queue",
"parking", "parking",
@ -1872,11 +1872,11 @@ dependencies = [
[[package]] [[package]]
name = "event-listener-strategy" name = "event-listener-strategy"
version = "0.4.0" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291"
dependencies = [ dependencies = [
"event-listener 4.0.3", "event-listener 5.0.0",
"pin-project-lite", "pin-project-lite",
] ]
@ -2631,6 +2631,7 @@ name = "influxdb3_server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"arrow", "arrow",
"arrow-csv",
"arrow-json", "arrow-json",
"arrow-schema", "arrow-schema",
"async-trait", "async-trait",
@ -2654,6 +2655,7 @@ dependencies = [
"object_store", "object_store",
"observability_deps", "observability_deps",
"parking_lot 0.11.2", "parking_lot 0.11.2",
"parquet",
"parquet_file", "parquet_file",
"schema", "schema",
"serde", "serde",
@ -3181,9 +3183,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
[[package]] [[package]]
name = "jobserver" name = "jobserver"
version = "0.1.27" version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -3886,9 +3888,9 @@ dependencies = [
[[package]] [[package]]
name = "num-complex" name = "num-complex"
version = "0.4.4" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" checksum = "23c6602fda94a57c990fe0df199a035d83576b496aa29f4e634a8ac6004e68a6"
dependencies = [ dependencies = [
"num-traits", "num-traits",
] ]
@ -3905,19 +3907,18 @@ dependencies = [
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.45" version = "0.1.46"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
dependencies = [ dependencies = [
"autocfg",
"num-traits", "num-traits",
] ]
[[package]] [[package]]
name = "num-iter" name = "num-iter"
version = "0.1.43" version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" checksum = "d869c01cc0c455284163fd0092f1f93835385ccab5a98a0dcc497b2f8bf055a9"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"num-integer", "num-integer",
@ -3938,9 +3939,9 @@ dependencies = [
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.17" version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"libm", "libm",
@ -5908,6 +5909,12 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strsim"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01"
[[package]] [[package]]
name = "strum" name = "strum"
version = "0.25.0" version = "0.25.0"
@ -6754,9 +6761,9 @@ dependencies = [
[[package]] [[package]]
name = "unicode-segmentation" name = "unicode-segmentation"
version = "1.10.1" version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
[[package]] [[package]]
name = "unicode-width" name = "unicode-width"

View File

@ -48,8 +48,10 @@ flate2 = "1.0.27"
workspace-hack = { version = "0.1", path = "../workspace-hack" } workspace-hack = { version = "0.1", path = "../workspace-hack" }
arrow-json = "49.0.0" arrow-json = "49.0.0"
arrow-schema = "49.0.0" arrow-schema = "49.0.0"
arrow-csv = "49.0.0"
[dev-dependencies] [dev-dependencies]
parquet.workspace = true
parquet_file = { path = "../parquet_file" } parquet_file = { path = "../parquet_file" }
test_helpers = { path = "../test_helpers", features = ["future_timeout"] } test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" } test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }

View File

@ -6,11 +6,14 @@ use arrow::util::pretty;
use authz::http::AuthorizationHeaderExtension; use authz::http::AuthorizationHeaderExtension;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use data_types::NamespaceName; use data_types::NamespaceName;
use datafusion::execution::memory_pool::UnboundedMemoryPool;
use futures::StreamExt; use futures::StreamExt;
use hyper::header::ACCEPT;
use hyper::header::CONTENT_ENCODING; use hyper::header::CONTENT_ENCODING;
use hyper::http::HeaderValue; use hyper::http::HeaderValue;
use hyper::server::conn::{AddrIncoming, AddrStream}; use hyper::server::conn::{AddrIncoming, AddrStream};
use hyper::{Body, Method, Request, Response, StatusCode}; use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_write::persister::TrackedMemoryArrowWriter;
use influxdb3_write::WriteBuffer; use influxdb3_write::WriteBuffer;
use iox_time::{SystemProvider, TimeProvider}; use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, error, info}; use observability_deps::tracing::{debug, error, info};
@ -117,6 +120,18 @@ pub enum Error {
/// WriteBuffer error /// WriteBuffer error
#[error("write buffer error: {0}")] #[error("write buffer error: {0}")]
WriteBuffer(#[from] influxdb3_write::write_buffer::Error), WriteBuffer(#[from] influxdb3_write::write_buffer::Error),
// ToStrError
#[error("to str error: {0}")]
ToStr(#[from] hyper::header::ToStrError),
// SerdeJsonError
#[error("serde json error: {0}")]
SerdeJson(#[from] serde_json::Error),
// Influxdb3 Write
#[error("serde json error: {0}")]
Influxdb3Write(#[from] influxdb3_write::Error),
} }
impl Error { impl Error {
@ -200,13 +215,102 @@ where
.into_iter() .into_iter()
.map(|b| b.unwrap()) .map(|b| b.unwrap())
.collect(); .collect();
let pretty_string = format!("{}", pretty::pretty_format_batches(&batches)?);
// Create a response with the pretty-printed string as the body. fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
Ok(Response::builder() let batches: Vec<&RecordBatch> = batches.iter().collect();
.status(StatusCode::OK) Ok(Bytes::from(serde_json::to_string(
.header("Content-Type", "text/plain; charset=utf-8") &arrow_json::writer::record_batches_to_json_rows(batches.as_slice())?,
.body(Body::from(pretty_string))?) // Handle this unwrap in production. )?))
}
fn to_csv(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut writer = arrow_csv::writer::Writer::new(Vec::new());
for batch in batches {
writer.write(&batch)?;
}
Ok(Bytes::from(writer.into_inner()))
}
fn to_pretty(batches: Vec<RecordBatch>) -> Result<Bytes> {
Ok(Bytes::from(format!(
"{}",
pretty::pretty_format_batches(&batches)?
)))
}
fn to_parquet(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut bytes = Vec::new();
let mem_pool = Arc::new(UnboundedMemoryPool::default());
let mut writer =
TrackedMemoryArrowWriter::try_new(&mut bytes, batches[0].schema(), mem_pool)?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(Bytes::from(bytes))
}
enum Format {
Parquet,
Csv,
Pretty,
Json,
Error,
}
let (body, format) = match params.format {
None => match req
.headers()
.get(ACCEPT)
.map(HeaderValue::to_str)
.transpose()?
{
// Accept Headers use the MIME types maintained by IANA here:
// https://www.iana.org/assignments/media-types/media-types.xhtml
// Note parquet hasn't been accepted yet just Arrow, but there
// is the possibility it will be:
// https://issues.apache.org/jira/browse/PARQUET-1889
Some("application/vnd.apache.parquet") => {
(to_parquet(batches)?, Format::Parquet)
}
Some("text/csv") => (to_csv(batches)?, Format::Csv),
Some("text/plain") => (to_pretty(batches)?, Format::Pretty),
Some("application/json") => (to_json(batches)?, Format::Json),
Some(_) => (Bytes::from("{ \"error\": \"Available mime types are: application/vnd.apache.parquet, text/csv, text/plain, and application/json\" }"), Format::Error),
None => (to_json(batches)?, Format::Json),
},
Some(format) => match format.as_str() {
"parquet" => (to_parquet(batches)?, Format::Parquet),
"csv" => (to_csv(batches)?, Format::Csv),
"pretty" => (to_pretty(batches)?, Format::Pretty),
"json" => (to_json(batches)?, Format::Json),
_ => (Bytes::from("{ \"error\": \"Available formats are: parquet, csv, pretty, and json\" }"), Format::Error),
},
};
match format {
Format::Parquet => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/vnd.apache.parquet")
.body(Body::from(body))?),
Format::Csv => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/csv")
.body(Body::from(body))?),
Format::Pretty => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; charset=utf-8")
.body(Body::from(body))?),
Format::Json => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))?),
Format::Error => Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("Content-Type", "application/json")
.body(Body::from(body))?),
}
} }
fn health(&self) -> Result<Response<Body>> { fn health(&self) -> Result<Response<Body>> {
@ -284,6 +388,7 @@ where
pub(crate) struct QuerySqlParams { pub(crate) struct QuerySqlParams {
pub(crate) db: String, pub(crate) db: String,
pub(crate) q: String, pub(crate) q: String,
pub(crate) format: Option<String>,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]

View File

@ -39,6 +39,9 @@ pub enum Error {
#[error("datafusion error: {0}")] #[error("datafusion error: {0}")]
DataFusion(#[from] datafusion::error::DataFusionError), DataFusion(#[from] datafusion::error::DataFusionError),
#[error("influxdb3_write error: {0}")]
InfluxDB3Write(#[from] influxdb3_write::Error),
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -225,8 +228,9 @@ mod tests {
let server = format!("http://{}", addr); let server = format!("http://{}", addr);
write_lp(&server, "foo", "cpu,host=a val=1i 123", None).await; write_lp(&server, "foo", "cpu,host=a val=1i 123", None).await;
let res = query(server, "foo", "select * from cpu", None).await;
// Test that we can query the output with a pretty output
let res = query(&server, "foo", "select * from cpu", "pretty", None).await;
let body = body::to_bytes(res.into_body()).await.unwrap(); let body = body::to_bytes(res.into_body()).await.unwrap();
let body = String::from_utf8(body.as_bytes().to_vec()).unwrap(); let body = String::from_utf8(body.as_bytes().to_vec()).unwrap();
let expected = vec![ let expected = vec![
@ -242,6 +246,54 @@ mod tests {
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected, actual expected, actual
); );
// Test that we can query the output with a json output
let res = query(&server, "foo", "select * from cpu", "json", None).await;
let body = body::to_bytes(res.into_body()).await.unwrap();
let actual = std::str::from_utf8(body.as_bytes()).unwrap();
let expected = r#"[{"host":"a","time":"1970-01-01T00:00:00.000000123","val":1}]"#;
assert_eq!(actual, expected);
// Test that we can query the output with a csv output
let res = query(&server, "foo", "select * from cpu", "csv", None).await;
let body = body::to_bytes(res.into_body()).await.unwrap();
let actual = std::str::from_utf8(body.as_bytes()).unwrap();
let expected = "host,time,val\na,1970-01-01T00:00:00.000000123,1\n";
assert_eq!(actual, expected);
// Test that we can query the output with a parquet
use arrow::buffer::Buffer;
use parquet::arrow::arrow_reader;
let res = query(&server, "foo", "select * from cpu", "parquet", None).await;
let body = body::to_bytes(res.into_body()).await.unwrap();
let batches = arrow_reader::ParquetRecordBatchReaderBuilder::try_new(body)
.unwrap()
.build()
.unwrap();
let batches = batches.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 1);
// Check that we only have the columns we expect
assert_eq!(batches[0].num_columns(), 3);
assert!(batches[0].schema().column_with_name("host").is_some());
assert!(batches[0].schema().column_with_name("time").is_some());
assert!(batches[0].schema().column_with_name("val").is_some());
assert!(batches[0]
.schema()
.column_with_name("random_name")
.is_none());
assert_eq!(
batches[0]["host"].to_data().child_data()[0].buffers()[1],
Buffer::from([b'a'].to_vec())
);
assert_eq!(
batches[0]["time"].to_data().buffers(),
&[Buffer::from(vec![123, 0, 0, 0, 0, 0, 0, 0])]
);
assert_eq!(
batches[0]["val"].to_data().buffers(),
&[Buffer::from(1_u64.to_le_bytes().to_vec())]
);
shutdown.cancel(); shutdown.cancel();
} }
@ -275,16 +327,18 @@ mod tests {
server: impl Into<String> + Send, server: impl Into<String> + Send,
database: impl Into<String> + Send, database: impl Into<String> + Send,
query: impl Into<String> + Send, query: impl Into<String> + Send,
format: impl Into<String> + Send,
authorization: Option<&str>, authorization: Option<&str>,
) -> Response<Body> { ) -> Response<Body> {
let client = Client::new(); let client = Client::new();
// query escaped for uri // query escaped for uri
let query = urlencoding::encode(&query.into()); let query = urlencoding::encode(&query.into());
let url = format!( let url = format!(
"{}/api/v3/query_sql?db={}&q={}", "{}/api/v3/query_sql?db={}&q={}&format={}",
server.into(), server.into(),
database.into(), database.into(),
query query,
format.into()
); );
println!("query url: {}", url); println!("query url: {}", url);