chore: Update datafusion (#3413)

* chore: Update datafusion and update code to handle timezone aware timestamps

* fix: cargo hakari

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-12-23 09:52:12 -05:00 committed by GitHub
parent 2c3ca0c77c
commit 527885f7f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 77 additions and 73 deletions

3
Cargo.lock generated
View File

@ -781,7 +781,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "6.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=0052667afae33ba9e549256d0d5d47e2f45e6ffb#0052667afae33ba9e549256d0d5d47e2f45e6ffb"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=401271377cd84dc1546827f66bda1b242860a6a8#401271377cd84dc1546827f66bda1b242860a6a8"
dependencies = [
"ahash",
"arrow",
@ -4976,7 +4976,6 @@ dependencies = [
name = "workspace-hack"
version = "0.1.0"
dependencies = [
"ahash",
"bytes",
"cc",
"chrono",

View File

@ -9,5 +9,5 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. don't get support for crypo functions or avro)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="0052667afae33ba9e549256d0d5d47e2f45e6ffb", default-features = false, package = "datafusion" }
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="401271377cd84dc1546827f66bda1b242860a6a8", default-features = false, package = "datafusion" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -52,8 +52,8 @@ impl AsExpr for Expr {
pub fn make_range_expr(start: i64, end: i64, time: impl AsRef<str>) -> Expr {
// We need to cast the start and end values to timestamps
// the equivalent of:
let ts_start = ScalarValue::TimestampNanosecond(Some(start));
let ts_end = ScalarValue::TimestampNanosecond(Some(end));
let ts_start = ScalarValue::TimestampNanosecond(Some(start), None);
let ts_end = ScalarValue::TimestampNanosecond(Some(end), None);
let ts_low = lit(ts_start).lt_eq(col(time.as_ref()));
let ts_high = col(time.as_ref()).lt(lit(ts_end));
@ -209,7 +209,7 @@ mod tests {
let ts_predicate_expr = make_range_expr(101, 202, "time");
let expected_string =
"TimestampNanosecond(101) <= #time AND #time < TimestampNanosecond(202)";
"TimestampNanosecond(101, None) <= #time AND #time < TimestampNanosecond(202, None)";
let actual_string = format!("{:?}", ts_predicate_expr);
assert_eq!(actual_string, expected_string);

View File

@ -204,7 +204,7 @@ impl ReorgPlanner {
trace!(output_schema=?schema, "Setting sort key on schema for split plan");
// time <= split_time
let ts_literal = Expr::Literal(ScalarValue::TimestampNanosecond(Some(split_time)));
let ts_literal = Expr::Literal(ScalarValue::TimestampNanosecond(Some(split_time), None));
let split_expr = col(TIME_COLUMN_NAME).lt_eq(ts_literal);
let plan = plan_builder.build().context(BuildingPlan)?;

View File

@ -119,14 +119,14 @@ macro_rules! make_first_selector {
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
Ok(vec![
$TO_SCALARVALUE(self.value.clone()),
ScalarValue::TimestampNanosecond(self.time),
ScalarValue::TimestampNanosecond(self.time, None),
])
}
fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult<ScalarValue> {
match output {
SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())),
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time)),
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)),
}
}
@ -226,14 +226,14 @@ macro_rules! make_last_selector {
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
Ok(vec![
$TO_SCALARVALUE(self.value.clone()),
ScalarValue::TimestampNanosecond(self.time),
ScalarValue::TimestampNanosecond(self.time, None),
])
}
fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult<ScalarValue> {
match output {
SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())),
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time)),
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)),
}
}
@ -357,14 +357,14 @@ macro_rules! make_min_selector {
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
Ok(vec![
$TO_SCALARVALUE(self.value.clone()),
ScalarValue::TimestampNanosecond(self.time),
ScalarValue::TimestampNanosecond(self.time, None),
])
}
fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult<ScalarValue> {
match output {
SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())),
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time)),
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)),
}
}
@ -469,14 +469,14 @@ macro_rules! make_max_selector {
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
Ok(vec![
$TO_SCALARVALUE(self.value.clone()),
ScalarValue::TimestampNanosecond(self.time),
ScalarValue::TimestampNanosecond(self.time, None),
])
}
fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult<ScalarValue> {
match output {
SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())),
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time)),
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)),
}
}

View File

@ -2451,6 +2451,7 @@ mod test {
#[tokio::test]
async fn test_sorted_metadata() {
test_helpers::maybe_start_logging();
let mut key = SortKey::default();
key.push("time", Default::default());
@ -2483,7 +2484,14 @@ mod test {
let schema: Schema = batch.schema().try_into().unwrap();
for field_idx in 0..schema.len() {
assert!(schema.field(field_idx).0.is_some());
let (influx_column_type, field) = schema.field(field_idx);
assert!(
influx_column_type.is_some(),
"Schema for field {}: {:?}, {:?}",
field_idx,
influx_column_type,
field,
);
}
}

View File

@ -17,7 +17,7 @@ pub(crate) fn min_to_scalar(
match stats {
IOxStatistics::I64(v) => {
if let Some(InfluxDbType::Timestamp) = *influx_type {
Some(ScalarValue::TimestampNanosecond(v.min))
Some(ScalarValue::TimestampNanosecond(v.min, None))
} else {
v.min.map(ScalarValue::from)
}
@ -37,7 +37,7 @@ pub(crate) fn max_to_scalar(
match stats {
IOxStatistics::I64(v) => {
if let Some(InfluxDbType::Timestamp) = *influx_type {
Some(ScalarValue::TimestampNanosecond(v.max))
Some(ScalarValue::TimestampNanosecond(v.max, None))
} else {
v.max.map(ScalarValue::from)
}

View File

@ -1,19 +1,19 @@
-- Test Setup: OneDeleteMultiExprsOneChunk
-- SQL: SELECT * from cpu;
-- SQL: SELECT * from cpu order by bar, foo, time;
+-----+-----+--------------------------------+
| bar | foo | time |
+-----+-----+--------------------------------+
| 1 | me | 1970-01-01T00:00:00.000000040Z |
| 2 | you | 1970-01-01T00:00:00.000000020Z |
+-----+-----+--------------------------------+
-- SQL: SELECT time, bar from cpu;
-- SQL: SELECT time, bar from cpu order by time, bar;
+--------------------------------+-----+
| time | bar |
+--------------------------------+-----+
| 1970-01-01T00:00:00.000000040Z | 1 |
| 1970-01-01T00:00:00.000000020Z | 2 |
| 1970-01-01T00:00:00.000000040Z | 1 |
+--------------------------------+-----+
-- SQL: SELECT bar from cpu;
-- SQL: SELECT bar from cpu order by bar;
+-----+
| bar |
+-----+
@ -63,18 +63,18 @@
| me |
| you |
+-----+
-- SQL: SELECT min(foo) from cpu;
+--------------+
| MIN(cpu.foo) |
+--------------+
| me |
+--------------+
-- SQL: SELECT max(foo) from cpu;
+--------------+
| MAX(cpu.foo) |
+--------------+
| you |
+--------------+
-- SQL: SELECT min(foo) as min_foo from cpu order by min_foo;
+---------+
| min_foo |
+---------+
| me |
+---------+
-- SQL: SELECT max(foo) as max_foo from cpu order by max_foo;
+---------+
| max_foo |
+---------+
| you |
+---------+
-- SQL: SELECT min(foo) as min_foo from cpu group by time order by min_foo;
+---------+
| min_foo |
@ -117,21 +117,21 @@
| me |
| you |
+---------+
-- SQL: SELECT min(time) from cpu;
-- SQL: SELECT min(time) as min_time from cpu order by min_time;
+--------------------------------+
| MIN(cpu.time) |
| min_time |
+--------------------------------+
| 1970-01-01T00:00:00.000000020Z |
+--------------------------------+
-- SQL: SELECT max(time) from cpu;
-- SQL: SELECT max(time) as max_time from cpu order by max_time;
+--------------------------------+
| MAX(cpu.time) |
| max_time |
+--------------------------------+
| 1970-01-01T00:00:00.000000040Z |
+--------------------------------+
-- SQL: SELECT min(time) from cpu group by bar;
-- SQL: SELECT min(time) as min_time from cpu group by bar order by min_time;
+--------------------------------+
| MIN(cpu.time) |
| min_time |
+--------------------------------+
| 1970-01-01T00:00:00.000000020Z |
| 1970-01-01T00:00:00.000000040Z |
@ -164,7 +164,7 @@
| 1970-01-01T00:00:00.000000040Z |
| 1970-01-01T00:00:00.000000020Z |
+--------------------------------+
-- SQL: SELECT max(bar) from cpu;
-- SQL: SELECT max(bar) from cpu order by 1;
+--------------+
| MAX(cpu.bar) |
+--------------+

View File

@ -2,11 +2,11 @@
-- IOX_SETUP: OneDeleteMultiExprsOneChunk
-- select *
SELECT * from cpu;
SELECT * from cpu order by bar, foo, time;
SELECT time, bar from cpu;
SELECT time, bar from cpu order by time, bar;
SELECT bar from cpu;
SELECT bar from cpu order by bar;
SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu;
@ -22,8 +22,8 @@ SELECT min(bar) from cpu;
SELECT foo from cpu;
SELECT min(foo) from cpu;
SELECT max(foo) from cpu;
SELECT min(foo) as min_foo from cpu order by min_foo;
SELECT max(foo) as max_foo from cpu order by max_foo;
SELECT min(foo) as min_foo from cpu group by time order by min_foo;
SELECT max(foo) as max_foo from cpu group by time order by max_foo;
@ -31,12 +31,12 @@ SELECT time, max(foo) as max_foo from cpu group by time order by time, max_foo;
SELECT min(foo) as min_foo from cpu group by bar order by min_foo;
SELECT bar, max(foo) as max_foo from cpu group by bar order by bar, max_foo;
SELECT max(foo) as max_foo from cpu group by time order by max_foo;
SELECT max(foo) as max_foo from cpu group by time order by max_foo;
SELECT min(time) from cpu;
SELECT max(time) from cpu;
SELECT min(time) as min_time from cpu order by min_time;
SELECT max(time) as max_time from cpu order by max_time;
SELECT min(time) from cpu group by bar;
SELECT min(time) as min_time from cpu group by bar order by min_time;
SELECT bar, min(time) as min_time from cpu group by bar order by bar, min_time;
SELECT max(time) as max_time from cpu group by foo order by max_time;
SELECT foo, max(time) as max_time from cpu group by foo order by foo, max_time;
@ -44,7 +44,7 @@ SELECT foo, max(time) as max_time from cpu group by foo order by foo, max_time;
SELECT time from cpu;
SELECT max(bar) from cpu;
SELECT max(bar) from cpu order by 1;
--------------------------------------------------------
-- With selection predicate
@ -58,5 +58,3 @@ SELECT time, bar from cpu where bar >= 1.0 order by bar, time;
SELECT * from cpu where foo = 'you' order by bar, foo, time;
SELECT min(bar) as mi, max(time) as ma from cpu where foo = 'you' order by mi, ma

View File

@ -165,16 +165,16 @@
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: Float64(5) < #restaurant.system AND #restaurant.town != Utf8("tewsbury") AND #restaurant.system < Float64(7) AND #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading") AND #restaurant.time > TimestampNanosecond(130) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[Float64(5) < #restaurant.system, #restaurant.town != Utf8("tewsbury"), #restaurant.system < Float64(7), #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading"), #restaurant.time > TimestampNanosecond(130)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: 5 < system@1 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading AND time@2 > 130 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=0 predicate=Predicate exprs: [Float64(5) < #system, #town != Utf8("tewsbury"), #system < Float64(7), #time > TimestampNanosecond(130)] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: Float64(5) < #restaurant.system AND #restaurant.town != Utf8("tewsbury") AND #restaurant.system < Float64(7) AND #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading") AND #restaurant.time > TimestampNanosecond(130, None) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[Float64(5) < #restaurant.system, #restaurant.town != Utf8("tewsbury"), #restaurant.system < Float64(7), #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading"), #restaurant.time > TimestampNanosecond(130, None)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: 5 < system@1 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading AND time@2 > 130 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=0 predicate=Predicate exprs: [Float64(5) < #system, #town != Utf8("tewsbury"), #system < Float64(7), #time > TimestampNanosecond(130, None)] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -184,4 +184,4 @@ async fn test_cases_timestamps_sql() {
runner
.flush()
.expect("flush worked");
}
}

View File

@ -1505,7 +1505,7 @@ impl<'a> TryFrom<&DFScalarValue> for Literal {
Some(v) => Ok(Self::String(v.clone())),
None => Err("NULL literal not supported".to_owned()),
},
DFScalarValue::TimestampNanosecond(v) => match v {
DFScalarValue::TimestampNanosecond(v, None) => match v {
Some(v) => Ok(Self::Integer(*v)),
None => Err("NULL literal not supported".to_owned()),
},
@ -4087,9 +4087,10 @@ west,host-c,pro,10,6
),
(
// a = timestamp(100000)
col("a").eq(Expr::Literal(ScalarValue::TimestampNanosecond(Some(
1000000,
)))),
col("a").eq(Expr::Literal(ScalarValue::TimestampNanosecond(
Some(1000000),
None,
))),
BinaryExpr::from(("a", "=", 1000000_i64)),
),
];

View File

@ -13,7 +13,6 @@ publish = false
### BEGIN HAKARI SECTION
[dependencies]
ahash = { version = "0.7", features = ["std"] }
bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "std", "time", "winapi"] }
either = { version = "1", features = ["use_std"] }
@ -49,7 +48,6 @@ tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
tracing-subscriber = { version = "0.3", features = ["alloc", "ansi", "ansi_term", "env-filter", "fmt", "lazy_static", "matchers", "regex", "registry", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log"] }
[build-dependencies]
ahash = { version = "0.7", features = ["std"] }
bytes = { version = "1", features = ["std"] }
cc = { version = "1", default-features = false, features = ["jobserver", "parallel"] }
either = { version = "1", features = ["use_std"] }