fix: broken format for JSON queries and add tests (#25980)

In #25927 we missed that JSON queries were broken despite having some
tests use the format. This fixes JSON queries such that they now
properly contain a comma between RecordBatches. This commit also
includes tests for the formats that now stream data back (CSV, JSON, and
JSON Lines) so that we won't run into this issue again.
pull/25884/merge
Michael Gattozzi 2025-02-06 20:25:18 -05:00 committed by GitHub
parent 1ac67a3ca1
commit cf1dd5c831
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 207 additions and 6 deletions

View File

@ -1579,11 +1579,9 @@ async fn record_batch_stream_to_body(
fn start_row<W: std::io::Write>(
&self,
writer: &mut W,
is_first_row: bool,
_is_first_row: bool,
) -> std::result::Result<(), arrow_schema::ArrowError> {
if !is_first_row {
writer.write_all(b",")?;
}
writer.write_all(b",")?;
Ok(())
}
@ -1840,10 +1838,16 @@ fn legacy_write_error_to_response(e: WriteParseError) -> Response<Body> {
mod tests {
use http::{header::ACCEPT, HeaderMap, HeaderValue};
use super::QueryFormat;
use super::record_batch_stream_to_body;
use super::validate_db_name;
use super::QueryFormat;
use super::ValidateDbNameError;
use arrow_array::record_batch;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use hyper::body::to_bytes;
use pretty_assertions::assert_eq;
use std::str;
macro_rules! assert_validate_db_name {
($name:literal, $accept_rp:literal, $expected:pat) => {
@ -1884,4 +1888,201 @@ mod tests {
assert_validate_db_name!("_foo", false, Err(ValidateDbNameError::InvalidStartChar));
assert_validate_db_name!("", false, Err(ValidateDbNameError::Empty));
}
#[tokio::test]
async fn test_json_output_empty() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(None), QueryFormat::Json)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(str::from_utf8(bytes.as_ref()).unwrap(), "[]");
}
#[tokio::test]
async fn test_json_output_one_record() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(1)), QueryFormat::Json)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(str::from_utf8(bytes.as_ref()).unwrap(), "[{\"a\":1}]");
}
#[tokio::test]
async fn test_json_output_three_records() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(3)), QueryFormat::Json)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
str::from_utf8(bytes.as_ref()).unwrap(),
"[{\"a\":1},{\"a\":1},{\"a\":1}]"
);
}
#[tokio::test]
async fn test_json_output_five_records() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(5)), QueryFormat::Json)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
str::from_utf8(bytes.as_ref()).unwrap(),
"[{\"a\":1},{\"a\":1},{\"a\":1},{\"a\":1},{\"a\":1}]"
);
}
#[tokio::test]
async fn test_jsonl_output_empty() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(None), QueryFormat::JsonLines)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(str::from_utf8(bytes.as_ref()).unwrap(), "");
}
#[tokio::test]
async fn test_jsonl_output_one_record() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(1)), QueryFormat::JsonLines)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(str::from_utf8(bytes.as_ref()).unwrap(), "{\"a\":1}\n");
}
#[tokio::test]
async fn test_jsonl_output_three_records() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(3)), QueryFormat::JsonLines)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
str::from_utf8(bytes.as_ref()).unwrap(),
"{\"a\":1}\n{\"a\":1}\n{\"a\":1}\n"
);
}
#[tokio::test]
async fn test_jsonl_output_five_records() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(5)), QueryFormat::JsonLines)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
str::from_utf8(bytes.as_ref()).unwrap(),
"{\"a\":1}\n{\"a\":1}\n{\"a\":1}\n{\"a\":1}\n{\"a\":1}\n"
);
}
#[tokio::test]
async fn test_csv_output_empty() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(None), QueryFormat::Csv)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(str::from_utf8(bytes.as_ref()).unwrap(), "");
}
#[tokio::test]
async fn test_csv_output_one_record() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(1)), QueryFormat::Csv)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(str::from_utf8(bytes.as_ref()).unwrap(), "a\n1\n");
}
#[tokio::test]
async fn test_csv_output_three_records() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(3)), QueryFormat::Csv)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(str::from_utf8(bytes.as_ref()).unwrap(), "a\n1\n1\n1\n");
}
#[tokio::test]
async fn test_csv_output_five_records() {
// Turn RecordBatches into a Body and then collect into Bytes to assert
// their validity
let bytes = to_bytes(
record_batch_stream_to_body(make_record_stream(Some(5)), QueryFormat::Csv)
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
str::from_utf8(bytes.as_ref()).unwrap(),
"a\n1\n1\n1\n1\n1\n"
);
}
fn make_record_stream(records: Option<usize>) -> SendableRecordBatchStream {
let mut batches = Vec::new();
let batch = record_batch!(("a", Int32, [1])).unwrap();
let schema = batch.schema();
let num = match records {
None => {
let stream = futures::stream::iter(Vec::new());
let adapter = RecordBatchStreamAdapter::new(schema, stream);
return Box::pin(adapter);
}
Some(num) => num,
};
batches.push(Ok(batch));
for _ in 1..num {
batches.push(Ok(record_batch!(("a", Int32, [1])).unwrap()));
}
let stream = futures::stream::iter(batches);
// Convert the stream to a SendableRecordBatchStream
let adapter = RecordBatchStreamAdapter::new(schema, stream);
Box::pin(adapter)
}
}