refactor: v1 recordbatch to json ()

* refactor: refactor serde json to use recordbatch

* fix: cargo audit with cargo update

* fix: add timestamp datatype

* fix: add timestamp datatype

* fix: apply feedbacks

* fix: cargo audit with cargo update

* fix: add timestamp datatype

* fix: apply feedbacks

* refactor: test data conversion
pull/25123/head^2
Jean Arhancet 2024-07-05 15:21:40 +02:00 committed by GitHub
parent 8f01e9c62a
commit 1fd355ed83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 286 additions and 94 deletions
influxdb3/tests/server
influxdb3_server

67
Cargo.lock generated
View File

@ -850,9 +850,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.101"
version = "1.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac367972e516d45567c7eafc73d24e1c193dcf200a8d94e9db7b3d38b349572d"
checksum = "74b6a57f98764a267ff415d50a25e6e166f3831a5071af4995296ea97d210490"
dependencies = [
"jobserver",
"libc",
@ -932,9 +932,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.7"
version = "4.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5db83dced34638ad474f39f250d7fea9598bdd239eaced1bdf45d597da0f433f"
checksum = "84b3edb18336f4df585bc9aa31dd99c036dfa5dc5e9a2939a722a188f3a8970d"
dependencies = [
"clap_builder",
"clap_derive",
@ -974,9 +974,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.7"
version = "4.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7e204572485eb3fbf28f871612191521df159bc3e15a9f5064c66dba3a8c05f"
checksum = "c1c09dd5ada6c6c78075d6fd0da3f90d8080651e2d6cc8eb2f1aaa4034ced708"
dependencies = [
"anstream",
"anstyle",
@ -986,9 +986,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.5.5"
version = "4.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c780290ccf4fb26629baa7a1081e68ced113f1d3ec302fa5948f1c381ebf06c6"
checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085"
dependencies = [
"heck 0.5.0",
"proc-macro2",
@ -1352,7 +1352,7 @@ dependencies = [
"murmur3",
"observability_deps",
"once_cell",
"ordered-float 4.2.0",
"ordered-float 4.2.1",
"percent-encoding",
"prost 0.12.6",
"schema",
@ -2357,9 +2357,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.3.1"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d"
checksum = "c4fe55fb7a772d59a5ff1dfbff4fe0258d19b89fec4b233e75d35d5d2316badc"
dependencies = [
"bytes",
"futures-channel",
@ -2396,7 +2396,7 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.3.1",
"hyper 1.4.0",
"hyper-util",
"rustls 0.23.10",
"rustls-native-certs 0.7.0",
@ -2420,16 +2420,16 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56"
checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.3.1",
"hyper 1.4.0",
"pin-project-lite",
"socket2",
"tokio",
@ -2631,6 +2631,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"arrow",
"arrow-array",
"arrow-csv",
"arrow-flight",
"arrow-json",
@ -3243,9 +3244,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.21"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
dependencies = [
"value-bag",
]
@ -3348,9 +3349,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.4"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
@ -3529,9 +3530,9 @@ dependencies = [
[[package]]
name = "num-bigint"
version = "0.4.5"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"num-integer",
"num-traits",
@ -3622,9 +3623,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.36.0"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434"
checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce"
dependencies = [
"memchr",
]
@ -3695,9 +3696,9 @@ dependencies = [
[[package]]
name = "ordered-float"
version = "4.2.0"
version = "4.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e"
checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be"
dependencies = [
"num-traits",
]
@ -4466,7 +4467,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.3.1",
"hyper 1.4.0",
"hyper-rustls 0.27.2",
"hyper-util",
"ipnet",
@ -4815,9 +4816,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.118"
version = "1.0.120"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d947f6b3163d8857ea16c4fa0dd4840d52f3041039a85decd46867eb1abef2e4"
checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5"
dependencies = [
"itoa",
"ryu",
@ -4838,9 +4839,9 @@ dependencies = [
[[package]]
name = "serde_with"
version = "3.8.1"
version = "3.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ad483d2ab0149d5a5ebcd9972a3852711e0153d863bf5a5d0391d28883c4a20"
checksum = "079f3a42cd87588d924ed95b533f8d30a483388c4e400ab736a7058e34f16169"
dependencies = [
"base64 0.22.1",
"chrono",
@ -4856,9 +4857,9 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "3.8.1"
version = "3.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65569b702f41443e8bc8bbb1c5779bd0450bbe723b56198980e80ec45780bce2"
checksum = "bc03aad67c1d26b7de277d51c86892e7d9a0110a2fe44bf6b26cc569fba302d6"
dependencies = [
"darling",
"proc-macro2",
@ -5472,7 +5473,7 @@ dependencies = [
"async-trait",
"dotenvy",
"observability_deps",
"ordered-float 4.2.0",
"ordered-float 4.2.1",
"parking_lot",
"prometheus-parse",
"reqwest 0.12.5",

View File

@ -717,9 +717,9 @@ async fn api_v1_query_json_format() {
],
"name": "cpu",
"values": [
["1970-01-01T00:00:01", "a", 0.9],
["1970-01-01T00:00:02", "a", 0.89],
["1970-01-01T00:00:03", "a", 0.85]
["1970-01-01T00:00:01Z", "a", 0.9],
["1970-01-01T00:00:02Z", "a", 0.89],
["1970-01-01T00:00:03Z", "a", 0.85]
]
}
],
@ -745,9 +745,9 @@ async fn api_v1_query_json_format() {
],
"name": "mem",
"values": [
["1970-01-01T00:00:04", "a", 0.5],
["1970-01-01T00:00:05", "a", 0.6],
["1970-01-01T00:00:06", "a", 0.7]
["1970-01-01T00:00:04Z", "a", 0.5],
["1970-01-01T00:00:05Z", "a", 0.6],
["1970-01-01T00:00:06Z", "a", 0.7]
]
},
{
@ -758,9 +758,9 @@ async fn api_v1_query_json_format() {
],
"name": "cpu",
"values": [
["1970-01-01T00:00:01", "a", 0.9],
["1970-01-01T00:00:02", "a", 0.89],
["1970-01-01T00:00:03", "a", 0.85]
["1970-01-01T00:00:01Z", "a", 0.9],
["1970-01-01T00:00:02Z", "a", 0.89],
["1970-01-01T00:00:03Z", "a", 0.85]
]
}
],
@ -786,9 +786,9 @@ async fn api_v1_query_json_format() {
],
"name": "cpu",
"values": [
["1970-01-01T00:00:01", "a", 0.9],
["1970-01-01T00:00:02", "a", 0.89],
["1970-01-01T00:00:03", "a", 0.85]
["1970-01-01T00:00:01Z", "a", 0.9],
["1970-01-01T00:00:02Z", "a", 0.89],
["1970-01-01T00:00:03Z", "a", 0.85]
]
}
],
@ -879,9 +879,9 @@ async fn api_v1_query_csv_format() {
epoch: None,
query: "SELECT time, host, usage FROM cpu",
expected: "name,tags,time,host,usage\n\
cpu,,1970-01-01T00:00:01,a,0.9\n\
cpu,,1970-01-01T00:00:02,a,0.89\n\
cpu,,1970-01-01T00:00:03,a,0.85\n\r\n",
cpu,,1970-01-01T00:00:01Z,a,0.9\n\
cpu,,1970-01-01T00:00:02Z,a,0.89\n\
cpu,,1970-01-01T00:00:03Z,a,0.85\n\r\n",
},
// Basic Query with multiple measurements:
TestCase {
@ -889,12 +889,12 @@ async fn api_v1_query_csv_format() {
epoch: None,
query: "SELECT time, host, usage FROM cpu, mem",
expected: "name,tags,time,host,usage\n\
mem,,1970-01-01T00:00:04,a,0.5\n\
mem,,1970-01-01T00:00:05,a,0.6\n\
mem,,1970-01-01T00:00:06,a,0.7\n\
cpu,,1970-01-01T00:00:01,a,0.9\n\
cpu,,1970-01-01T00:00:02,a,0.89\n\
cpu,,1970-01-01T00:00:03,a,0.85\n\r\n",
mem,,1970-01-01T00:00:04Z,a,0.5\n\
mem,,1970-01-01T00:00:05Z,a,0.6\n\
mem,,1970-01-01T00:00:06Z,a,0.7\n\
cpu,,1970-01-01T00:00:01Z,a,0.9\n\
cpu,,1970-01-01T00:00:02Z,a,0.89\n\
cpu,,1970-01-01T00:00:03Z,a,0.85\n\r\n",
},
// Basic Query with db in query string:
TestCase {
@ -902,9 +902,9 @@ async fn api_v1_query_csv_format() {
epoch: None,
query: "SELECT time, host, usage FROM foo.autogen.cpu",
expected: "name,tags,time,host,usage\n\
cpu,,1970-01-01T00:00:01,a,0.9\n\
cpu,,1970-01-01T00:00:02,a,0.89\n\
cpu,,1970-01-01T00:00:03,a,0.85\n\r\n",
cpu,,1970-01-01T00:00:01Z,a,0.9\n\
cpu,,1970-01-01T00:00:02Z,a,0.89\n\
cpu,,1970-01-01T00:00:03Z,a,0.85\n\r\n",
},
// Basic Query epoch parameter set:
TestCase {
@ -1168,3 +1168,79 @@ async fn api_v1_query_chunked() {
assert_eq!(t.expected, values, "query failed: {q}", q = t.query);
}
}
#[tokio::test]
async fn api_v1_query_data_conversion() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"weather,location=us-midwest temperature_integer=82i 1465839830100400200\n\
weather,location=us-midwest temperature_float=82 1465839830100400200\n\
weather,location=us-midwest temperature_str=\"too warm\" 1465839830100400200\n\
weather,location=us-midwest too_hot=true 1465839830100400200",
Precision::Nanosecond,
)
.await
.unwrap();
struct TestCase<'a> {
database: Option<&'a str>,
epoch: Option<&'a str>,
query: &'a str,
expected: Value,
}
let test_cases = [
// Basic Query:
TestCase {
database: Some("foo"),
epoch: None,
query: "SELECT time, location, temperature_integer, temperature_float, temperature_str, too_hot FROM weather",
expected: json!({
"results": [
{
"series": [
{
"columns": [
"time",
"location",
"temperature_integer",
"temperature_float",
"temperature_str",
"too_hot"
],
"name": "weather",
"values": [
["2016-06-13T17:43:50.100400200Z", "us-midwest", 82, 82.0, "too warm", true],
]
}
],
"statement_id": 0
}
]
}),
},
];
for t in test_cases {
let mut params = vec![("q", t.query)];
if let Some(db) = t.database {
params.push(("db", db));
}
if let Some(epoch) = t.epoch {
params.push(("epoch", epoch));
}
let resp = server
.api_v1_query(&params, None)
.await
.json::<Value>()
.await
.unwrap();
println!("\n{q}", q = t.query);
println!("{resp:#}");
assert_eq!(t.expected, resp, "query failed: {q}", q = t.query);
}
}

View File

@ -37,6 +37,7 @@ iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }
# crates.io Dependencies
anyhow.workspace = true
arrow.workspace = true
arrow-array.workspace = true
arrow-csv.workspace = true
arrow-flight.workspace = true
arrow-json.workspace = true

View File

@ -5,17 +5,20 @@ use std::{
task::{Context, Poll},
};
use anyhow::Context as AnyhowContext;
use anyhow::{bail, Context as AnyhowContext};
use arrow::{
array::{as_string_array, ArrayRef, AsArray},
compute::{cast_with_options, CastOptions},
datatypes::{
DataType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
},
record_batch::RecordBatch,
};
// Note: see https://github.com/influxdata/influxdb/issues/24981
#[allow(deprecated)]
use arrow_json::writer::record_batches_to_json_rows;
use arrow_schema::DataType;
use bytes::Bytes;
use chrono::{format::SecondsFormat, DateTime};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{ready, stream::Fuse, Stream, StreamExt};
use hyper::http::HeaderValue;
@ -472,41 +475,49 @@ impl QueryResponseStream {
}))
.context("failed to cast batch time column with `epoch` parameter specified")?;
}
// See https://github.com/influxdata/influxdb/issues/24981
#[allow(deprecated)]
let json_rows = record_batches_to_json_rows(&[&batch])
.context("failed to convert RecordBatch to JSON rows")?;
for json_row in json_rows {
let mut row = vec![Value::Null; self.column_map.len()];
for (k, v) in json_row {
if k == INFLUXQL_MEASUREMENT_COLUMN_NAME
&& (self.buffer.current_measurement_name().is_none()
|| self
.buffer
.current_measurement_name()
.is_some_and(|n| *n != v))
{
// we are on the "iox::measurement" column, which gives the name of the time series
// if we are on the first row, or if the measurement changes, we push into the
// buffer queue
self.buffer
.push_next_measurement(v.as_str().with_context(|| {
format!("{INFLUXQL_MEASUREMENT_COLUMN_NAME} value was not a string")
})?);
} else if k == INFLUXQL_MEASUREMENT_COLUMN_NAME {
// we are still working on the current measurement in the buffer, so ignore
let column_map = &self.column_map;
let columns = batch.columns();
let schema = batch.schema();
for row_index in 0..batch.num_rows() {
let mut row = vec![Value::Null; column_map.len()];
for (col_index, column) in columns.iter().enumerate() {
let field = schema.field(col_index);
let column_name = field.name();
let mut cell_value = if !column.is_valid(row_index) {
continue;
} else {
// this is a column value that is part of the time series, add it to the row
let j = self.column_map.get(&k).unwrap();
row[*j] = if let (Some(precision), TIME_COLUMN_NAME) = (self.epoch, k.as_str())
{
// specially handle the time column if `epoch` parameter provided
convert_ns_epoch(v, precision)?
} else {
v
};
cast_column_value(column, row_index)?
};
// Handle the special case for the measurement column
if column_name == INFLUXQL_MEASUREMENT_COLUMN_NAME {
if let Value::String(ref measurement_name) = cell_value {
if self.buffer.current_measurement_name().is_none()
|| self
.buffer
.current_measurement_name()
.is_some_and(|n| n != measurement_name)
{
// we are on the "iox::measurement" column, which gives the name of the time series
// if we are on the first row, or if the measurement changes, we push into the
// buffer queue
self.buffer.push_next_measurement(measurement_name);
}
}
continue;
}
if column_name == TIME_COLUMN_NAME {
if let Some(precision) = self.epoch {
cell_value = convert_ns_epoch(cell_value, precision)?
}
}
let col_position = column_map
.get(column_name)
.context("failed to retrieve column position")?;
row[*col_position] = cell_value;
}
self.buffer.push_row(Row(row))?;
}
@ -580,6 +591,109 @@ fn convert_ns_epoch(value: Value, precision: Precision) -> Result<Value, anyhow:
.into())
}
/// Converts a value from an Arrow `ArrayRef` at a given row index into a `serde_json::Value`.
///
/// This function handles various Arrow data types, converting them into their corresponding
/// JSON representations. For unsupported data types, it returns an error using the `anyhow` crate.
fn cast_column_value(column: &ArrayRef, row_index: usize) -> Result<Value, anyhow::Error> {
let value = match column.data_type() {
DataType::Boolean => Value::Bool(column.as_boolean().value(row_index)),
DataType::Null => Value::Null,
DataType::Int8 => Value::Number(column.as_primitive::<Int8Type>().value(row_index).into()),
DataType::Int16 => {
Value::Number(column.as_primitive::<Int16Type>().value(row_index).into())
}
DataType::Int32 => {
Value::Number(column.as_primitive::<Int32Type>().value(row_index).into())
}
DataType::Int64 => {
Value::Number(column.as_primitive::<Int64Type>().value(row_index).into())
}
DataType::UInt8 => {
Value::Number(column.as_primitive::<UInt8Type>().value(row_index).into())
}
DataType::UInt16 => {
Value::Number(column.as_primitive::<UInt16Type>().value(row_index).into())
}
DataType::UInt32 => {
Value::Number(column.as_primitive::<UInt32Type>().value(row_index).into())
}
DataType::UInt64 => {
Value::Number(column.as_primitive::<UInt64Type>().value(row_index).into())
}
DataType::Float16 => Value::Number(
serde_json::Number::from_f64(
column
.as_primitive::<Float16Type>()
.value(row_index)
.to_f64(),
)
.context("failed to downcast Float16 column")?,
),
DataType::Float32 => Value::Number(
serde_json::Number::from_f64(
column.as_primitive::<Float32Type>().value(row_index).into(),
)
.context("failed to downcast Float32 column")?,
),
DataType::Float64 => Value::Number(
serde_json::Number::from_f64(column.as_primitive::<Float64Type>().value(row_index))
.context("failed to downcast Float64 column")?,
),
DataType::Utf8 => Value::String(column.as_string::<i32>().value(row_index).to_string()),
DataType::LargeUtf8 => {
Value::String(column.as_string::<i64>().value(row_index).to_string())
}
DataType::Dictionary(key, value) => match (key.as_ref(), value.as_ref()) {
(DataType::Int32, DataType::Utf8) => {
let dict_array = column.as_dictionary::<Int32Type>();
let keys = dict_array.keys();
let values = as_string_array(dict_array.values());
Value::String(values.value(keys.value(row_index) as usize).to_string())
}
_ => Value::Null,
},
DataType::Timestamp(TimeUnit::Nanosecond, None) => Value::String(
DateTime::from_timestamp_nanos(
column
.as_primitive::<TimestampNanosecondType>()
.value(row_index),
)
.to_rfc3339_opts(SecondsFormat::AutoSi, true),
),
DataType::Timestamp(TimeUnit::Microsecond, None) => Value::String(
DateTime::from_timestamp_micros(
column
.as_primitive::<TimestampMicrosecondType>()
.value(row_index),
)
.context("failed to downcast TimestampMicrosecondType column")?
.to_rfc3339_opts(SecondsFormat::AutoSi, true),
),
DataType::Timestamp(TimeUnit::Millisecond, None) => Value::String(
DateTime::from_timestamp_millis(
column
.as_primitive::<TimestampMillisecondType>()
.value(row_index),
)
.context("failed to downcast TimestampNillisecondType column")?
.to_rfc3339_opts(SecondsFormat::AutoSi, true),
),
DataType::Timestamp(TimeUnit::Second, None) => Value::String(
DateTime::from_timestamp(
column
.as_primitive::<TimestampSecondType>()
.value(row_index),
0,
)
.context("failed to downcast TimestampSecondType column")?
.to_rfc3339_opts(SecondsFormat::AutoSi, true),
),
t => bail!("Unsupported data type: {:?}", t),
};
Ok(value)
}
impl Stream for QueryResponseStream {
type Item = Result<QueryResponse, anyhow::Error>;