From 527885f7f8eb410778a2aa47ace5bbe997a6266a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 23 Dec 2021 09:52:12 -0500 Subject: [PATCH] chore: Update datafusion (#3413) * chore: Update datafusion and update code to handle timezone aware timestamps * fix: cargo hakari Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 3 +- datafusion/Cargo.toml | 2 +- datafusion_util/src/lib.rs | 6 +-- query/src/frontend/reorg.rs | 2 +- query/src/func/selectors/internal.rs | 16 +++---- query/src/provider.rs | 10 +++- query/src/statistics.rs | 4 +- .../in/delete_multi_expr_one_chunk.expected | 46 +++++++++---------- .../cases/in/delete_multi_expr_one_chunk.sql | 22 ++++----- query_tests/cases/in/pushdown.expected | 26 +++++------ query_tests/src/cases.rs | 2 +- read_buffer/src/row_group.rs | 9 ++-- workspace-hack/Cargo.toml | 2 - 13 files changed, 77 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 616892cb4b..7a8cf8f1a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -781,7 +781,7 @@ dependencies = [ [[package]] name = "datafusion" version = "6.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=0052667afae33ba9e549256d0d5d47e2f45e6ffb#0052667afae33ba9e549256d0d5d47e2f45e6ffb" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=401271377cd84dc1546827f66bda1b242860a6a8#401271377cd84dc1546827f66bda1b242860a6a8" dependencies = [ "ahash", "arrow", @@ -4976,7 +4976,6 @@ dependencies = [ name = "workspace-hack" version = "0.1.0" dependencies = [ - "ahash", "bytes", "cc", "chrono", diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 9e67b750bc..43c27a813e 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,5 +9,5 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (e.g. don't get support for crypo functions or avro) -upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="0052667afae33ba9e549256d0d5d47e2f45e6ffb", default-features = false, package = "datafusion" } +upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="401271377cd84dc1546827f66bda1b242860a6a8", default-features = false, package = "datafusion" } workspace-hack = { path = "../workspace-hack"} diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index 07f755b4e5..f0c9dc3456 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -52,8 +52,8 @@ impl AsExpr for Expr { pub fn make_range_expr(start: i64, end: i64, time: impl AsRef) -> Expr { // We need to cast the start and end values to timestamps // the equivalent of: - let ts_start = ScalarValue::TimestampNanosecond(Some(start)); - let ts_end = ScalarValue::TimestampNanosecond(Some(end)); + let ts_start = ScalarValue::TimestampNanosecond(Some(start), None); + let ts_end = ScalarValue::TimestampNanosecond(Some(end), None); let ts_low = lit(ts_start).lt_eq(col(time.as_ref())); let ts_high = col(time.as_ref()).lt(lit(ts_end)); @@ -209,7 +209,7 @@ mod tests { let ts_predicate_expr = make_range_expr(101, 202, "time"); let expected_string = - "TimestampNanosecond(101) <= #time AND #time < TimestampNanosecond(202)"; + "TimestampNanosecond(101, None) <= #time AND #time < TimestampNanosecond(202, None)"; let actual_string = format!("{:?}", ts_predicate_expr); assert_eq!(actual_string, expected_string); diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index 7fc4550f8c..52c5deca1e 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -204,7 +204,7 @@ impl ReorgPlanner { trace!(output_schema=?schema, "Setting sort key on schema for split plan"); // time <= split_time - let ts_literal = Expr::Literal(ScalarValue::TimestampNanosecond(Some(split_time))); + let ts_literal = Expr::Literal(ScalarValue::TimestampNanosecond(Some(split_time), None)); let split_expr = col(TIME_COLUMN_NAME).lt_eq(ts_literal); let plan = plan_builder.build().context(BuildingPlan)?; diff --git a/query/src/func/selectors/internal.rs b/query/src/func/selectors/internal.rs index 9afeb2fa5c..bc98ef43c5 100644 --- a/query/src/func/selectors/internal.rs +++ b/query/src/func/selectors/internal.rs @@ -119,14 +119,14 @@ macro_rules! make_first_selector { fn datafusion_state(&self) -> DataFusionResult> { Ok(vec![ $TO_SCALARVALUE(self.value.clone()), - ScalarValue::TimestampNanosecond(self.time), + ScalarValue::TimestampNanosecond(self.time, None), ]) } fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult { match output { SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())), - SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time)), + SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)), } } @@ -226,14 +226,14 @@ macro_rules! make_last_selector { fn datafusion_state(&self) -> DataFusionResult> { Ok(vec![ $TO_SCALARVALUE(self.value.clone()), - ScalarValue::TimestampNanosecond(self.time), + ScalarValue::TimestampNanosecond(self.time, None), ]) } fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult { match output { SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())), - SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time)), + SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)), } } @@ -357,14 +357,14 @@ macro_rules! make_min_selector { fn datafusion_state(&self) -> DataFusionResult> { Ok(vec![ $TO_SCALARVALUE(self.value.clone()), - ScalarValue::TimestampNanosecond(self.time), + ScalarValue::TimestampNanosecond(self.time, None), ]) } fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult { match output { SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())), - SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time)), + SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)), } } @@ -469,14 +469,14 @@ macro_rules! make_max_selector { fn datafusion_state(&self) -> DataFusionResult> { Ok(vec![ $TO_SCALARVALUE(self.value.clone()), - ScalarValue::TimestampNanosecond(self.time), + ScalarValue::TimestampNanosecond(self.time, None), ]) } fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult { match output { SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())), - SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time)), + SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)), } } diff --git a/query/src/provider.rs b/query/src/provider.rs index da08e91326..b8905046c0 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -2451,6 +2451,7 @@ mod test { #[tokio::test] async fn test_sorted_metadata() { + test_helpers::maybe_start_logging(); let mut key = SortKey::default(); key.push("time", Default::default()); @@ -2483,7 +2484,14 @@ mod test { let schema: Schema = batch.schema().try_into().unwrap(); for field_idx in 0..schema.len() { - assert!(schema.field(field_idx).0.is_some()); + let (influx_column_type, field) = schema.field(field_idx); + assert!( + influx_column_type.is_some(), + "Schema for field {}: {:?}, {:?}", + field_idx, + influx_column_type, + field, + ); } } diff --git a/query/src/statistics.rs b/query/src/statistics.rs index aba206776f..3c1298aa17 100644 --- a/query/src/statistics.rs +++ b/query/src/statistics.rs @@ -17,7 +17,7 @@ pub(crate) fn min_to_scalar( match stats { IOxStatistics::I64(v) => { if let Some(InfluxDbType::Timestamp) = *influx_type { - Some(ScalarValue::TimestampNanosecond(v.min)) + Some(ScalarValue::TimestampNanosecond(v.min, None)) } else { v.min.map(ScalarValue::from) } @@ -37,7 +37,7 @@ pub(crate) fn max_to_scalar( match stats { IOxStatistics::I64(v) => { if let Some(InfluxDbType::Timestamp) = *influx_type { - Some(ScalarValue::TimestampNanosecond(v.max)) + Some(ScalarValue::TimestampNanosecond(v.max, None)) } else { v.max.map(ScalarValue::from) } diff --git a/query_tests/cases/in/delete_multi_expr_one_chunk.expected b/query_tests/cases/in/delete_multi_expr_one_chunk.expected index 250782ed8a..825266d78f 100644 --- a/query_tests/cases/in/delete_multi_expr_one_chunk.expected +++ b/query_tests/cases/in/delete_multi_expr_one_chunk.expected @@ -1,19 +1,19 @@ -- Test Setup: OneDeleteMultiExprsOneChunk --- SQL: SELECT * from cpu; +-- SQL: SELECT * from cpu order by bar, foo, time; +-----+-----+--------------------------------+ | bar | foo | time | +-----+-----+--------------------------------+ | 1 | me | 1970-01-01T00:00:00.000000040Z | | 2 | you | 1970-01-01T00:00:00.000000020Z | +-----+-----+--------------------------------+ --- SQL: SELECT time, bar from cpu; +-- SQL: SELECT time, bar from cpu order by time, bar; +--------------------------------+-----+ | time | bar | +--------------------------------+-----+ -| 1970-01-01T00:00:00.000000040Z | 1 | | 1970-01-01T00:00:00.000000020Z | 2 | +| 1970-01-01T00:00:00.000000040Z | 1 | +--------------------------------+-----+ --- SQL: SELECT bar from cpu; +-- SQL: SELECT bar from cpu order by bar; +-----+ | bar | +-----+ @@ -63,18 +63,18 @@ | me | | you | +-----+ --- SQL: SELECT min(foo) from cpu; -+--------------+ -| MIN(cpu.foo) | -+--------------+ -| me | -+--------------+ --- SQL: SELECT max(foo) from cpu; -+--------------+ -| MAX(cpu.foo) | -+--------------+ -| you | -+--------------+ +-- SQL: SELECT min(foo) as min_foo from cpu order by min_foo; ++---------+ +| min_foo | ++---------+ +| me | ++---------+ +-- SQL: SELECT max(foo) as max_foo from cpu order by max_foo; ++---------+ +| max_foo | ++---------+ +| you | ++---------+ -- SQL: SELECT min(foo) as min_foo from cpu group by time order by min_foo; +---------+ | min_foo | @@ -117,21 +117,21 @@ | me | | you | +---------+ --- SQL: SELECT min(time) from cpu; +-- SQL: SELECT min(time) as min_time from cpu order by min_time; +--------------------------------+ -| MIN(cpu.time) | +| min_time | +--------------------------------+ | 1970-01-01T00:00:00.000000020Z | +--------------------------------+ --- SQL: SELECT max(time) from cpu; +-- SQL: SELECT max(time) as max_time from cpu order by max_time; +--------------------------------+ -| MAX(cpu.time) | +| max_time | +--------------------------------+ | 1970-01-01T00:00:00.000000040Z | +--------------------------------+ --- SQL: SELECT min(time) from cpu group by bar; +-- SQL: SELECT min(time) as min_time from cpu group by bar order by min_time; +--------------------------------+ -| MIN(cpu.time) | +| min_time | +--------------------------------+ | 1970-01-01T00:00:00.000000020Z | | 1970-01-01T00:00:00.000000040Z | @@ -164,7 +164,7 @@ | 1970-01-01T00:00:00.000000040Z | | 1970-01-01T00:00:00.000000020Z | +--------------------------------+ --- SQL: SELECT max(bar) from cpu; +-- SQL: SELECT max(bar) from cpu order by 1; +--------------+ | MAX(cpu.bar) | +--------------+ diff --git a/query_tests/cases/in/delete_multi_expr_one_chunk.sql b/query_tests/cases/in/delete_multi_expr_one_chunk.sql index 4e73303117..0852980e01 100644 --- a/query_tests/cases/in/delete_multi_expr_one_chunk.sql +++ b/query_tests/cases/in/delete_multi_expr_one_chunk.sql @@ -2,11 +2,11 @@ -- IOX_SETUP: OneDeleteMultiExprsOneChunk -- select * -SELECT * from cpu; +SELECT * from cpu order by bar, foo, time; -SELECT time, bar from cpu; +SELECT time, bar from cpu order by time, bar; -SELECT bar from cpu; +SELECT bar from cpu order by bar; SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu; @@ -22,8 +22,8 @@ SELECT min(bar) from cpu; SELECT foo from cpu; -SELECT min(foo) from cpu; -SELECT max(foo) from cpu; +SELECT min(foo) as min_foo from cpu order by min_foo; +SELECT max(foo) as max_foo from cpu order by max_foo; SELECT min(foo) as min_foo from cpu group by time order by min_foo; SELECT max(foo) as max_foo from cpu group by time order by max_foo; @@ -31,12 +31,12 @@ SELECT time, max(foo) as max_foo from cpu group by time order by time, max_foo; SELECT min(foo) as min_foo from cpu group by bar order by min_foo; SELECT bar, max(foo) as max_foo from cpu group by bar order by bar, max_foo; -SELECT max(foo) as max_foo from cpu group by time order by max_foo; +SELECT max(foo) as max_foo from cpu group by time order by max_foo; -SELECT min(time) from cpu; -SELECT max(time) from cpu; +SELECT min(time) as min_time from cpu order by min_time; +SELECT max(time) as max_time from cpu order by max_time; -SELECT min(time) from cpu group by bar; +SELECT min(time) as min_time from cpu group by bar order by min_time; SELECT bar, min(time) as min_time from cpu group by bar order by bar, min_time; SELECT max(time) as max_time from cpu group by foo order by max_time; SELECT foo, max(time) as max_time from cpu group by foo order by foo, max_time; @@ -44,7 +44,7 @@ SELECT foo, max(time) as max_time from cpu group by foo order by foo, max_time; SELECT time from cpu; -SELECT max(bar) from cpu; +SELECT max(bar) from cpu order by 1; -------------------------------------------------------- -- With selection predicate @@ -58,5 +58,3 @@ SELECT time, bar from cpu where bar >= 1.0 order by bar, time; SELECT * from cpu where foo = 'you' order by bar, foo, time; SELECT min(bar) as mi, max(time) as ma from cpu where foo = 'you' order by mi, ma - - diff --git a/query_tests/cases/in/pushdown.expected b/query_tests/cases/in/pushdown.expected index b43667fc16..48507005fd 100644 --- a/query_tests/cases/in/pushdown.expected +++ b/query_tests/cases/in/pushdown.expected @@ -165,16 +165,16 @@ | | | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: EXPLAIN SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00'); -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town | -| | Filter: Float64(5) < #restaurant.system AND #restaurant.town != Utf8("tewsbury") AND #restaurant.system < Float64(7) AND #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading") AND #restaurant.time > TimestampNanosecond(130) | -| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[Float64(5) < #restaurant.system, #restaurant.town != Utf8("tewsbury"), #restaurant.system < Float64(7), #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading"), #restaurant.time > TimestampNanosecond(130)] | -| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=500 | -| | FilterExec: 5 < system@1 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading AND time@2 > 130 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | IOxReadFilterNode: table_name=restaurant, chunks=0 predicate=Predicate exprs: [Float64(5) < #system, #town != Utf8("tewsbury"), #system < Float64(7), #time > TimestampNanosecond(130)] | -| | | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town | +| | Filter: Float64(5) < #restaurant.system AND #restaurant.town != Utf8("tewsbury") AND #restaurant.system < Float64(7) AND #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading") AND #restaurant.time > TimestampNanosecond(130, None) | +| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[Float64(5) < #restaurant.system, #restaurant.town != Utf8("tewsbury"), #restaurant.system < Float64(7), #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading"), #restaurant.time > TimestampNanosecond(130, None)] | +| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | CoalesceBatchesExec: target_batch_size=500 | +| | FilterExec: 5 < system@1 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading AND time@2 > 130 | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | IOxReadFilterNode: table_name=restaurant, chunks=0 predicate=Predicate exprs: [Float64(5) < #system, #town != Utf8("tewsbury"), #system < Float64(7), #time > TimestampNanosecond(130, None)] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/src/cases.rs b/query_tests/src/cases.rs index 4797071906..f2ce55727b 100644 --- a/query_tests/src/cases.rs +++ b/query_tests/src/cases.rs @@ -184,4 +184,4 @@ async fn test_cases_timestamps_sql() { runner .flush() .expect("flush worked"); -} \ No newline at end of file +} diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index d8c52ee84e..df8ad9f576 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -1505,7 +1505,7 @@ impl<'a> TryFrom<&DFScalarValue> for Literal { Some(v) => Ok(Self::String(v.clone())), None => Err("NULL literal not supported".to_owned()), }, - DFScalarValue::TimestampNanosecond(v) => match v { + DFScalarValue::TimestampNanosecond(v, None) => match v { Some(v) => Ok(Self::Integer(*v)), None => Err("NULL literal not supported".to_owned()), }, @@ -4087,9 +4087,10 @@ west,host-c,pro,10,6 ), ( // a = timestamp(100000) - col("a").eq(Expr::Literal(ScalarValue::TimestampNanosecond(Some( - 1000000, - )))), + col("a").eq(Expr::Literal(ScalarValue::TimestampNanosecond( + Some(1000000), + None, + ))), BinaryExpr::from(("a", "=", 1000000_i64)), ), ]; diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index e99f447163..100e9b8ecf 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -13,7 +13,6 @@ publish = false ### BEGIN HAKARI SECTION [dependencies] -ahash = { version = "0.7", features = ["std"] } bytes = { version = "1", features = ["std"] } chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "std", "time", "winapi"] } either = { version = "1", features = ["use_std"] } @@ -49,7 +48,6 @@ tracing-core = { version = "0.1", features = ["lazy_static", "std"] } tracing-subscriber = { version = "0.3", features = ["alloc", "ansi", "ansi_term", "env-filter", "fmt", "lazy_static", "matchers", "regex", "registry", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log"] } [build-dependencies] -ahash = { version = "0.7", features = ["std"] } bytes = { version = "1", features = ["std"] } cc = { version = "1", default-features = false, features = ["jobserver", "parallel"] } either = { version = "1", features = ["use_std"] }