chore: Merge branch 'main' into ntran/refactor_delete_tests

pull/24376/head
Nga Tran 2021-09-27 14:52:38 -04:00
commit cbfa3e85af
23 changed files with 613 additions and 389 deletions

View File

@ -142,9 +142,6 @@ jobs:
# "1" means line tables only, which is useful for panic tracebacks.
RUSTFLAGS: "-C debuginfo=1"
RUST_BACKTRACE: "1"
# set min stack size as a workaround to avoid stack overflow bug in DF
# https://github.com/apache/arrow-datafusion/issues/419
RUST_MIN_STACK: "10485760"
steps:
- checkout
- rust_components
@ -337,10 +334,10 @@ jobs:
- cache_restore
- run:
name: Print rustc target CPU options
command: cargo run --release --no-default-features --features="aws,gcp,azure,heappy" --bin print_cpu
command: cargo run --release --no-default-features --features="aws,gcp,azure,jemalloc_replacing_malloc" --bin print_cpu
- run:
name: Cargo release build with target arch set for CRoaring
command: cargo build --release --no-default-features --features="aws,gcp,azure,heappy"
command: cargo build --release --no-default-features --features="aws,gcp,azure,jemalloc_replacing_malloc"
- run: |
echo sha256sum after build is
sha256sum target/release/influxdb_iox

39
Cargo.lock generated
View File

@ -828,7 +828,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "5.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=1c858ce7baab1929cfdba97051ef4e5e4d0a866b#1c858ce7baab1929cfdba97051ef4e5e4d0a866b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6da0b91041a462772219de6bed37f2054db114f6#6da0b91041a462772219de6bed37f2054db114f6"
dependencies = [
"ahash",
"arrow",
@ -1787,9 +1787,9 @@ dependencies = [
[[package]]
name = "instant"
version = "0.1.10"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d"
checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd"
dependencies = [
"cfg-if",
]
@ -2570,9 +2570,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a"
[[package]]
name = "openssl-sys"
version = "0.9.66"
version = "0.9.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1996d2d305e561b70d1ee0c53f1542833f4e1ac6ce9a6708b6ff2738ca67dc82"
checksum = "69df2d8dfc6ce3aaf44b40dec6f487d5a886516cf6879c49e98e0710f310a058"
dependencies = [
"autocfg",
"cc",
@ -2601,9 +2601,9 @@ dependencies = [
[[package]]
name = "ouroboros"
version = "0.10.1"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84236d64f1718c387232287cf036eb6632a5ecff226f4ff9dccb8c2b79ba0bde"
checksum = "3518a68fc597f6a42f83a31e41c039c3cbaa10fa8bb239c936c235e81cce873f"
dependencies = [
"aliasable",
"ouroboros_macro",
@ -2612,9 +2612,9 @@ dependencies = [
[[package]]
name = "ouroboros_macro"
version = "0.10.1"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f463857a6eb96c0136b1d56e56c718350cef30412ec065b48294799a088bca68"
checksum = "4e23813b1bcb2d41a838849a2bbae40ae5c03c85ecabf04ba97086f438484714"
dependencies = [
"Inflector",
"proc-macro-error",
@ -2904,9 +2904,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.19"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
[[package]]
name = "plotters"
@ -3168,7 +3168,6 @@ dependencies = [
"predicate",
"regex",
"snafu",
"sqlparser",
"test_helpers",
"tokio",
"tokio-stream",
@ -4040,9 +4039,9 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.10.0"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dbcb26aebf44a0993c3c95e41d13860131b6cbf52edb2c53230056baa4d733f"
checksum = "10e1ce16b71375ad72d28d111131069ce0d5f8603f4f86d8acd3456b41b57a51"
dependencies = [
"log",
]
@ -4132,9 +4131,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.76"
version = "1.0.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6f107db402c2c2055242dbf4d2af0e69197202e9faacbef9571bbe47f5a1b84"
checksum = "5239bc68e0fef57495900cfea4e8dc75596d9a319d7e16b1e0a440d24e6fe0a0"
dependencies = [
"proc-macro2",
"quote",
@ -4302,9 +4301,9 @@ dependencies = [
[[package]]
name = "tinyvec"
version = "1.4.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5241dd6f21443a3606b432718b166d3cedc962fd4b8bea54a8bc7f514ebda986"
checksum = "f83b2a3d4d9091d0abd7eba4dc2710b1718583bd4d8992e2190720ea38f391f7"
dependencies = [
"tinyvec_macros",
]
@ -5014,9 +5013,9 @@ checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
[[package]]
name = "zeroize"
version = "1.4.1"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "377db0846015f7ae377174787dd452e1c5f5a9050bc6f954911d01f116daa0cd"
checksum = "bf68b08513768deaa790264a7fac27a58cbf2705cfcdc9448362229217d7e970"
[[package]]
name = "zstd"

View File

@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. various crypo functions)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="1c858ce7baab1929cfdba97051ef4e5e4d0a866b", default-features = false, package = "datafusion" }
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="6da0b91041a462772219de6bed37f2054db114f6", default-features = false, package = "datafusion" }

View File

@ -209,7 +209,7 @@ mod tests {
let ts_predicate_expr = make_range_expr(101, 202, "time");
let expected_string =
"TimestampNanosecond(101) LtEq #time And #time Lt TimestampNanosecond(202)";
"TimestampNanosecond(101) <= #time AND #time < TimestampNanosecond(202)";
let actual_string = format!("{:?}", ts_predicate_expr);
assert_eq!(actual_string, expected_string);

View File

@ -13,6 +13,6 @@ data_types = { path = "../data_types" }
flatbuffers = "2"
snafu = "0.6"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ouroboros = "0.10.1"
ouroboros = "0.11.1"
internal_types = { path = "../internal_types" }
generated_types = { path = "../generated_types" }

View File

@ -14,7 +14,7 @@ internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" }
regex = "1"
snafu = "0.6.9"
sqlparser = "0.10.0"
sqlparser = "0.11.0"
[dev-dependencies]
test_helpers = { path = "../test_helpers" }

View File

@ -296,7 +296,7 @@ impl fmt::Display for Predicate {
///
/// assert_eq!(
/// p.to_string(),
/// "Predicate range: [1 - 100] exprs: [#foo Eq Int32(42)]"
/// "Predicate range: [1 - 100] exprs: [#foo = Int32(42)]"
/// );
/// ```
pub struct PredicateBuilder {
@ -837,7 +837,7 @@ mod tests {
assert_eq!(
p.to_string(),
"Predicate range: [1 - 100] exprs: [#foo Eq Int32(42) And #bar Lt Int32(11)]"
"Predicate range: [1 - 100] exprs: [#foo = Int32(42) AND #bar < Int32(11)]"
);
}
@ -851,7 +851,7 @@ mod tests {
.partition_key("the_key")
.build();
assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo Eq Int32(42)]");
assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]");
}
// The delete predicate

View File

@ -29,7 +29,6 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.2"
regex = "1"
snafu = "0.6.9"
sqlparser = "0.10.0"
tokio = { version = "1.11", features = ["macros"] }
tokio-stream = "0.1.2"
trace = { path = "../trace" }

View File

@ -10,158 +10,158 @@
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
+---------------+---------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200;
+---------------+--------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count Gt Int64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count Gt Int64(200)] |
+---------------+--------------------------------------------------------------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count > Int64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count > Int64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count > Int64(200)] |
+---------------+-------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200.0;
+---------------+----------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Float64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count Gt Float64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Float64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count Gt Float64(200)] |
+---------------+----------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 4.0;
+---------------+---------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system Gt Float64(4)] |
| | Filter: #restaurant.count > Float64(200) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count > Float64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 4 |
| | FilterExec: CAST(count@0 AS Float64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system Gt Float64(4)] |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count > Float64(200)] |
+---------------+---------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 4.0;
+---------------+--------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system > Float64(4) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system > Float64(4)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 4 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system > Float64(4)] |
+---------------+--------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury';
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count Gt Int64(200), #restaurant.town NotEq Utf8("tewsbury")] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count Gt Int64(200), #town NotEq Utf8("tewsbury")] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count > Int64(200) AND #restaurant.town != Utf8("tewsbury") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count > Int64(200), #restaurant.town != Utf8("tewsbury")] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count > Int64(200), #town != Utf8("tewsbury")] |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence');
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count Gt Int64(200), #restaurant.town NotEq Utf8("tewsbury"), #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence")] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 = CAST(5 AS Float64) OR CAST(town@3 AS Utf8) = lawrence |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count Gt Int64(200), #town NotEq Utf8("tewsbury")] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count > Int64(200) AND #restaurant.town != Utf8("tewsbury") AND #restaurant.system = Int64(5) OR #restaurant.town = Utf8("lawrence") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count > Int64(200), #restaurant.town != Utf8("tewsbury"), #restaurant.system = Int64(5) OR #restaurant.town = Utf8("lawrence")] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 = CAST(5 AS Float64) OR CAST(town@3 AS Utf8) = lawrence |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count > Int64(200), #town != Utf8("tewsbury")] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence") And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count Gt Int64(200), #restaurant.town NotEq Utf8("tewsbury"), #restaurant.system Eq Int64(5) Or #restaurant.town Eq Utf8("lawrence"), #restaurant.count Lt Int64(40000)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 = CAST(5 AS Float64) OR CAST(town@3 AS Utf8) = lawrence AND CAST(count@0 AS Int64) < 40000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count Gt Int64(200), #town NotEq Utf8("tewsbury"), #count Lt Int64(40000)] |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count > Int64(200) AND #restaurant.town != Utf8("tewsbury") AND #restaurant.system = Int64(5) OR #restaurant.town = Utf8("lawrence") AND #restaurant.count < Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count > Int64(200), #restaurant.town != Utf8("tewsbury"), #restaurant.system = Int64(5) OR #restaurant.town = Utf8("lawrence"), #restaurant.count < Int64(40000)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 = CAST(5 AS Float64) OR CAST(town@3 AS Utf8) = lawrence AND CAST(count@0 AS Int64) < 40000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count > Int64(200), #town != Utf8("tewsbury"), #count < Int64(40000)] |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and count < 40000;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count Gt Int64(200) And #restaurant.count Lt Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count Gt Int64(200), #restaurant.count Lt Int64(40000)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(count@0 AS Int64) < 40000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count Gt Int64(200), #count Lt Int64(40000)] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.count > Int64(200) AND #restaurant.count < Int64(40000) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.count > Int64(200), #restaurant.count < Int64(40000)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(count@0 AS Int64) < 40000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#count > Int64(200), #count < Int64(40000)] |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 4.0 and system < 7.0;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(4) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system Gt Float64(4), #restaurant.system Lt Float64(7)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 4 AND system@1 < 7 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system Gt Float64(4), #system Lt Float64(7)] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system > Float64(4) AND #restaurant.system < Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system > Float64(4), #restaurant.system < Float64(7)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 4 AND system@1 < 7 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system > Float64(4), #system < Float64(7)] |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 5.0 and system < 7.0;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.system Lt Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system Gt Float64(5), #restaurant.system Lt Float64(7)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND system@1 < 7 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system Gt Float64(5), #system Lt Float64(7)] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system > Float64(5) AND #restaurant.system < Float64(7) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system > Float64(5), #restaurant.system < Float64(7)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND system@1 < 7 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system > Float64(5), #system < Float64(7)] |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And #restaurant.town NotEq Utf8("tewsbury") And Float64(7) Gt #restaurant.system |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system Gt Float64(5), #restaurant.town NotEq Utf8("tewsbury"), Float64(7) Gt #restaurant.system] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND CAST(town@3 AS Utf8) != tewsbury AND 7 > system@1 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system Gt Float64(5), #town NotEq Utf8("tewsbury"), Float64(7) Gt #system] |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system > Float64(5) AND #restaurant.town != Utf8("tewsbury") AND Float64(7) > #restaurant.system |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system > Float64(5), #restaurant.town != Utf8("tewsbury"), Float64(7) > #restaurant.system] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND CAST(town@3 AS Utf8) != tewsbury AND 7 > system@1 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system > Float64(5), #town != Utf8("tewsbury"), Float64(7) > #system] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading');
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system Gt Float64(5) And Utf8("tewsbury") NotEq #restaurant.town And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system Gt Float64(5), Utf8("tewsbury") NotEq #restaurant.town, #restaurant.system Lt Float64(7), #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading")] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND tewsbury != CAST(town@3 AS Utf8) AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system Gt Float64(5), Utf8("tewsbury") NotEq #town, #system Lt Float64(7)] |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: #restaurant.system > Float64(5) AND Utf8("tewsbury") != #restaurant.town AND #restaurant.system < Float64(7) AND #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading") |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[#restaurant.system > Float64(5), Utf8("tewsbury") != #restaurant.town, #restaurant.system < Float64(7), #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading")] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: system@1 > 5 AND tewsbury != CAST(town@3 AS Utf8) AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [#system > Float64(5), Utf8("tewsbury") != #town, #system < Float64(7)] |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: Float64(5) Lt #restaurant.system And #restaurant.town NotEq Utf8("tewsbury") And #restaurant.system Lt Float64(7) And #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading") And #restaurant.time Gt TimestampNanosecond(130) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[Float64(5) Lt #restaurant.system, #restaurant.town NotEq Utf8("tewsbury"), #restaurant.system Lt Float64(7), #restaurant.count Eq Int64(632) Or #restaurant.town Eq Utf8("reading"), #restaurant.time Gt TimestampNanosecond(130)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: 5 < system@1 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading AND time@2 > 130 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=0 predicate=Predicate exprs: [Float64(5) Lt #system, #town NotEq Utf8("tewsbury"), #system Lt Float64(7), #time Gt TimestampNanosecond(130)] |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #restaurant.count, #restaurant.system, #restaurant.time, #restaurant.town |
| | Filter: Float64(5) < #restaurant.system AND #restaurant.town != Utf8("tewsbury") AND #restaurant.system < Float64(7) AND #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading") AND #restaurant.time > TimestampNanosecond(130) |
| | TableScan: restaurant projection=Some([0, 1, 2, 3]), filters=[Float64(5) < #restaurant.system, #restaurant.town != Utf8("tewsbury"), #restaurant.system < Float64(7), #restaurant.count = Int64(632) OR #restaurant.town = Utf8("reading"), #restaurant.time > TimestampNanosecond(130)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: 5 < system@1 AND CAST(town@3 AS Utf8) != tewsbury AND system@1 < 7 AND CAST(count@0 AS Int64) = 632 OR CAST(town@3 AS Utf8) = reading AND time@2 > 130 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=0 predicate=Predicate exprs: [Float64(5) < #system, #town != Utf8("tewsbury"), #system < Float64(7), #time > TimestampNanosecond(130)] |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -12,22 +12,22 @@
| | EmptyExec: produce_one_row=true |
+---------------+-------------------------------------------------------------+
-- SQL: EXPLAIN SELECT count(*) from h2o where temp > 70.0 and temp < 72.0;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | Filter: #h2o.temp Gt Float64(70) And #h2o.temp Lt Float64(72) |
| | TableScan: h2o projection=Some([3]), filters=[#h2o.temp Gt Float64(70), #h2o.temp Lt Float64(72)] |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalescePartitionsExec |
| | HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: temp@0 > 70 AND temp@0 < 72 |
| | ProjectionExec: expr=[temp@2 as temp] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [city@0 ASC,state@1 ASC,time@3 ASC] |
| | SortExec: [city@0 ASC,state@1 ASC,time@3 ASC] |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate exprs: [#temp Gt Float64(70), #temp Lt Float64(72)] |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | Filter: #h2o.temp > Float64(70) AND #h2o.temp < Float64(72) |
| | TableScan: h2o projection=Some([3]), filters=[#h2o.temp > Float64(70), #h2o.temp < Float64(72)] |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalescePartitionsExec |
| | HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: temp@0 > 70 AND temp@0 < 72 |
| | ProjectionExec: expr=[temp@2 as temp] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [city@0 ASC,state@1 ASC,time@3 ASC] |
| | SortExec: [city@0 ASC,state@1 ASC,time@3 ASC] |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate exprs: [#temp > Float64(70), #temp < Float64(72)] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -741,22 +741,21 @@ async fn sql_predicate_pushdown_correctness_11() {
#[tokio::test]
async fn sql_predicate_pushdown_correctness_12() {
// TODO: Hit stackoverflow in DF. Ticket https://github.com/apache/arrow-datafusion/issues/419
// // Test 12: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and town = 'reading'
// //
// // Check correctness
// let expected = vec![
// "+-------+--------+-------------------------------+---------+",
// "| count | system | time | town |",
// "+-------+--------+-------------------------------+---------+",
// "| 632 | 6 | 1970-01-01 00:00:00.000000130 | reading |",
// "+-------+--------+-------------------------------+---------+",
// ];
// run_sql_test_case!(
// TwoMeasurementsPredicatePushDown {},
// "SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and town = 'reading'",
// &expected
// );
// Test 12: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and town = 'reading'
//
// Check correctness
let expected = vec![
"+-------+--------+--------------------------------+---------+",
"| count | system | time | town |",
"+-------+--------+--------------------------------+---------+",
"| 632 | 6 | 1970-01-01T00:00:00.000000130Z | reading |",
"+-------+--------+--------------------------------+---------+",
];
run_sql_test_case(
TwoMeasurementsPredicatePushDown {},
"SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and town = 'reading'",
&expected
).await;
}
#[tokio::test]

View File

@ -47,7 +47,9 @@ fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &RBChunk) {
&exp_card,
|b, _| {
b.iter(|| {
let result = chunk.read_filter(Predicate::default(), projection, vec![]);
let result = chunk
.read_filter(Predicate::default(), projection, vec![])
.unwrap();
let rbs = result.collect::<Vec<_>>();
assert_eq!(rbs.len(), 1);
assert_eq!(rbs[0].num_rows(), 200_000);
@ -82,7 +84,9 @@ fn read_filter_with_pred_vary_proj(c: &mut Criterion, chunk: &RBChunk) {
&exp_rows,
|b, _| {
b.iter(|| {
let result = chunk.read_filter(predicate.clone(), Selection::All, vec![]);
let result = chunk
.read_filter(predicate.clone(), Selection::All, vec![])
.unwrap();
let rbs = result.collect::<Vec<_>>();
assert_eq!(rbs.len(), 1);
assert!(rbs[0].num_rows() > 0); // data randomly generated so row numbers not exact

View File

@ -183,9 +183,10 @@ impl Chunk {
predicate: Predicate,
select_columns: Selection<'_>,
negated_predicates: Vec<Predicate>,
) -> table::ReadFilterResults {
) -> Result<table::ReadFilterResults> {
self.table
.read_filter(&select_columns, &predicate, negated_predicates.as_slice())
.context(TableError)
}
/// Returns an iterable collection of data in group columns and aggregate
@ -213,7 +214,8 @@ impl Chunk {
/// match the provided predicate.
///
/// If the provided table does not exist then `could_pass_predicate` returns
/// `false`.
/// `false`. If the predicate is incompatible with chunk's schema
/// `could_pass_predicate` returns false.
pub fn could_pass_predicate(&self, predicate: Predicate) -> bool {
self.table.could_pass_predicate(&predicate)
}
@ -279,7 +281,9 @@ impl Chunk {
only_columns: Selection<'_>,
dst: BTreeSet<String>,
) -> Result<BTreeSet<String>> {
Ok(self.table.column_names(&predicate, only_columns, dst))
self.table
.column_names(&predicate, only_columns, dst)
.context(TableError)
}
/// Returns the distinct set of column values for each provided column,
@ -1024,7 +1028,9 @@ mod test {
let predicate =
Predicate::with_time_range(&[BinaryExpr::from(("env", "=", "us-west"))], 100, 205); // filter on time
let mut itr = chunk.read_filter(predicate, Selection::All, vec![]);
let mut itr = chunk
.read_filter(predicate, Selection::All, vec![])
.unwrap();
let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]);
let exp_region_values = Values::Dictionary(vec![0], vec![Some("west")]);
@ -1058,6 +1064,13 @@ mod test {
assert_rb_column_equals(&first_row_group, "active", &exp_active_values);
assert_rb_column_equals(&second_row_group, "time", &Values::I64(vec![200])); // first row from second record batch
// Error when predicate is invalid
let predicate =
Predicate::with_time_range(&[BinaryExpr::from(("env", "=", 22.3))], 100, 205);
assert!(chunk
.read_filter(predicate, Selection::All, vec![])
.is_err());
// No more data
assert!(itr.next().is_none());
}
@ -1079,7 +1092,9 @@ mod test {
let delete_predicates = vec![Predicate::new(vec![BinaryExpr::from((
"region", "=", "west",
))])];
let mut itr = chunk.read_filter(predicate, Selection::All, delete_predicates);
let mut itr = chunk
.read_filter(predicate, Selection::All, delete_predicates)
.unwrap();
let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]);
let exp_region_values = Values::Dictionary(vec![0], vec![Some("east")]);
@ -1127,6 +1142,16 @@ mod test {
// No more data
assert!(itr.next().is_none());
// Error when one of the negated predicates is invalid
let predicate = Predicate::new(vec![BinaryExpr::from(("env", "=", "us-west"))]);
let delete_predicates = vec![
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
Predicate::new(vec![BinaryExpr::from(("time", "=", "not a number"))]),
];
assert!(chunk
.read_filter(predicate, Selection::All, delete_predicates)
.is_err());
}
#[test]
@ -1172,6 +1197,13 @@ mod test {
"region", ">", "west"
))]),)
);
// invalid predicate so no rows can match
assert!(
!chunk.satisfies_predicate(&Predicate::new(vec![BinaryExpr::from((
"region", "=", 33.2
))]),)
);
}
fn to_set(v: &[&str]) -> BTreeSet<String> {
@ -1231,6 +1263,16 @@ mod test {
// sketchy_sensor won't be returned because it has a NULL value for the
// only matching row.
assert_eq!(result, to_set(&["counter", "region", "time"]));
// Error when invalid predicate provided.
assert!(matches!(
chunk.column_names(
Predicate::new(vec![BinaryExpr::from(("time", "=", "not a number"))]),
Selection::Some(&["region", "env"]),
BTreeSet::new()
),
Err(Error::TableError { .. })
));
}
fn to_map(arr: Vec<(&str, &[&str])>) -> BTreeMap<String, BTreeSet<String>> {
@ -1322,5 +1364,15 @@ mod test {
chunk.column_values(Predicate::default(), Selection::All, BTreeMap::new()),
Err(Error::UnsupportedOperation { .. })
));
// Error when invalid predicate provided.
assert!(matches!(
chunk.column_values(
Predicate::new(vec![BinaryExpr::from(("time", "=", "not a number"))]),
Selection::Some(&["region", "env"]),
BTreeMap::new()
),
Err(Error::TableError { .. })
));
}
}

View File

@ -1,8 +1,9 @@
use crate::{
column,
row_group::{self, ColumnName, Predicate, RowGroup},
row_group::{self, ColumnName, Literal, Predicate, RowGroup},
schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema},
value::{OwnedValue, Scalar, Value},
BinaryExpr,
};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
@ -238,6 +239,10 @@ impl Table {
pub fn could_pass_predicate(&self, predicate: &Predicate) -> bool {
let table_data = self.table_data.read();
if table_data.meta.validate_exprs(predicate.iter()).is_err() {
return false;
}
table_data.data.iter().any(|row_group| {
row_group.could_satisfy_conjunctive_binary_expressions(predicate.iter())
})
@ -246,12 +251,14 @@ impl Table {
// Identify set of row groups that might satisfy the predicate.
//
// Produce a set of these row groups along with a snapshot of the table meta
// data associated with them.
// data associated with them. Returns an error if the provided predicate
// cannot be applied to the row groups with respect to the schema.
//
// N.B the table read lock is only held as long as it takes to determine
// with meta data whether each row group may satisfy the predicate.
fn filter_row_groups(&self, predicate: &Predicate) -> (Arc<MetaData>, Vec<Arc<RowGroup>>) {
let table_data = self.table_data.read();
let mut row_groups = Vec::with_capacity(table_data.data.len());
'rowgroup: for rg in table_data.data.iter() {
@ -281,10 +288,15 @@ impl Table {
columns: &Selection<'_>,
predicate: &Predicate,
negated_predicates: &[Predicate],
) -> ReadFilterResults {
) -> Result<ReadFilterResults> {
// identify row groups where time range and predicates match could match
// the predicate. Get a snapshot of those and the meta-data.
// the predicate. Get a snapshot of those and the meta-data. Finally,
// validate that the predicate can be applied to the row groups.
let (meta, row_groups) = self.filter_row_groups(predicate);
meta.validate_exprs(predicate.iter())?;
for pred in negated_predicates {
meta.validate_exprs(pred.iter())?;
}
let schema = ResultSchema {
select_columns: match columns {
@ -295,12 +307,12 @@ impl Table {
};
// TODO(edd): I think I can remove `predicates` from the results
ReadFilterResults {
Ok(ReadFilterResults {
predicate: predicate.clone(),
negated_predicates: negated_predicates.to_vec(),
schema,
row_groups,
}
})
}
/// Returns an iterable collection of data in group columns and aggregate
@ -319,6 +331,7 @@ impl Table {
aggregates: &'input [(ColumnName<'input>, AggregateType)],
) -> Result<ReadAggregateResults> {
let (meta, row_groups) = self.filter_row_groups(&predicate);
meta.validate_exprs(predicate.iter())?;
// Filter out any column names that we do not have data for.
let schema = ResultSchema {
@ -443,7 +456,7 @@ impl Table {
predicate: &Predicate,
columns: Selection<'_>,
mut dst: BTreeSet<String>,
) -> BTreeSet<String> {
) -> Result<BTreeSet<String>> {
let table_data = self.table_data.read();
// Short circuit execution if we have already got all of this table's
@ -454,7 +467,7 @@ impl Table {
.keys()
.all(|name| dst.contains(name))
{
return dst;
return Ok(dst);
}
// Identify row groups where time range and predicates match could match
@ -464,12 +477,14 @@ impl Table {
// ok, but if it turns out it's not then we can move the
// `filter_row_groups` logic into here and not take the second read
// lock.
let (_, row_groups) = self.filter_row_groups(predicate);
let (meta, row_groups) = self.filter_row_groups(predicate);
meta.validate_exprs(predicate.iter())?;
for row_group in row_groups {
row_group.column_names(predicate, columns, &mut dst);
}
dst
Ok(dst)
}
/// Returns the distinct set of column values for each provided column,
@ -484,6 +499,7 @@ impl Table {
mut dst: BTreeMap<String, BTreeSet<String>>,
) -> Result<BTreeMap<String, BTreeSet<String>>> {
let (meta, row_groups) = self.filter_row_groups(predicate);
meta.validate_exprs(predicate.iter())?;
// Validate that only supported columns present in `columns`.
for (name, (ct, _)) in columns.iter().zip(meta.schema_for_column_names(columns)) {
@ -512,6 +528,10 @@ impl Table {
(Arc::clone(&table_data.meta), table_data.data.to_vec())
};
if meta.validate_exprs(predicate.iter()).is_err() {
return false;
}
// if the table doesn't have a column for one of the predicate's
// expressions then the table cannot satisfy the predicate.
if !predicate
@ -684,6 +704,46 @@ impl MetaData {
self.column_names.iter().map(|name| name.as_str()).collect()
}
/// Determine, based on the table meta data, whether all of the provided
/// expressions can be applied, returning an error if any can't.
pub fn validate_exprs<'a>(&self, iter: impl IntoIterator<Item = &'a BinaryExpr>) -> Result<()> {
iter.into_iter()
.try_for_each(|expr| match self.columns.get(expr.column()) {
Some(col_meta) => match (col_meta.logical_data_type, expr.literal()) {
(LogicalDataType::Integer, Literal::Integer(_))
| (LogicalDataType::Integer, Literal::Unsigned(_))
| (LogicalDataType::Integer, Literal::Float(_))
| (LogicalDataType::Unsigned, Literal::Integer(_))
| (LogicalDataType::Unsigned, Literal::Unsigned(_))
| (LogicalDataType::Unsigned, Literal::Float(_))
| (LogicalDataType::Float, Literal::Integer(_))
| (LogicalDataType::Float, Literal::Unsigned(_))
| (LogicalDataType::Float, Literal::Float(_))
| (LogicalDataType::String, Literal::String(_))
| (LogicalDataType::Binary, Literal::String(_))
| (LogicalDataType::Boolean, Literal::Boolean(_)) => Ok(()),
_ => {
return UnsupportedColumnOperation {
column_name: expr.column().to_owned(),
msg: format!(
"cannot compare column type {} to expression literal {:?}",
col_meta.logical_data_type,
expr.literal()
),
}
.fail()
}
},
None => {
return UnsupportedColumnOperation {
column_name: expr.column().to_owned(),
msg: "column does not exist",
}
.fail()
}
})
}
pub fn to_summary(&self, table_name: impl Into<String>) -> TableSummary {
use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics};
let columns = self
@ -1056,6 +1116,7 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> {
#[cfg(test)]
mod test {
use arrow::array::BooleanArray;
use data_types::partition_metadata::{StatValues, Statistics};
use super::*;
@ -1113,6 +1174,80 @@ mod test {
);
}
#[test]
fn meta_validate_expressions() {
let time = ColumnType::Time(Column::from(&[1_i64][..]));
let col_a = ColumnType::Field(Column::from(&[1_i64][..]));
let col_b = ColumnType::Field(Column::from(&[1_u64][..]));
let col_c = ColumnType::Field(Column::from(&[1_f64][..]));
let col_d = ColumnType::Field(Column::from(&["south"][..]));
let col_e = ColumnType::Field(Column::from(BooleanArray::from(vec![true])));
let columns = vec![
("time".to_string(), time),
("i64_col".to_string(), col_a),
("u64_col".to_string(), col_b),
("f64_col".to_string(), col_c),
("str_col".to_string(), col_d),
("bool_col".to_string(), col_e),
];
let row_group = RowGroup::new(1, columns);
let table = Table::with_row_group("cpu", row_group);
let predicate = Predicate::default();
assert!(table.meta().validate_exprs(predicate.iter()).is_ok());
// valid predicates
let predicates = vec![
// exact logical types
BinaryExpr::from(("time", "=", 100_i64)),
BinaryExpr::from(("i64_col", "=", 100_i64)),
BinaryExpr::from(("u64_col", "=", 100_u64)),
BinaryExpr::from(("f64_col", "=", 100.0)),
BinaryExpr::from(("str_col", "=", "hello")),
BinaryExpr::from(("bool_col", "=", true)),
// compatible logical types
BinaryExpr::from(("time", "=", 100_u64)),
BinaryExpr::from(("time", "=", 100.0)),
BinaryExpr::from(("i64_col", "=", 100_u64)),
BinaryExpr::from(("i64_col", "=", 100.0)),
BinaryExpr::from(("u64_col", "=", 100_i64)),
BinaryExpr::from(("u64_col", "=", 100.0)),
BinaryExpr::from(("f64_col", "=", 100_i64)),
BinaryExpr::from(("f64_col", "=", 100_u64)),
];
for exprs in predicates {
let predicate = Predicate::new(vec![exprs]);
assert!(table.meta().validate_exprs(predicate.iter()).is_ok());
}
// invalid predicates
let predicates = vec![
vec![BinaryExpr::from(("time", "=", "hello"))],
vec![BinaryExpr::from(("time", "=", true))],
vec![BinaryExpr::from(("i64_col", "=", "hello"))],
vec![BinaryExpr::from(("i64_col", "=", false))],
vec![BinaryExpr::from(("u64_col", "=", "hello"))],
vec![BinaryExpr::from(("u64_col", "=", false))],
vec![BinaryExpr::from(("f64_col", "=", "hello"))],
vec![BinaryExpr::from(("f64_col", "=", false))],
vec![BinaryExpr::from(("str_col", "=", 10_i64))],
vec![BinaryExpr::from(("bool_col", "=", "true"))],
// mixture valid/invalid
vec![
BinaryExpr::from(("time", "=", 100_u64)),
BinaryExpr::from(("i64_col", "=", "not good")),
],
];
for exprs in predicates {
let predicate = Predicate::new(exprs);
assert!(table.meta().validate_exprs(predicate.iter()).is_err());
}
}
#[test]
fn meta_data_update_with_null() {
let columns = vec![
@ -1295,6 +1430,10 @@ mod test {
BinaryExpr::from(("count", "<=", 0_u64)),
]);
assert!(!table.could_pass_predicate(&predicate));
// the predicate is invalid
let predicate = Predicate::new(vec![BinaryExpr::from(("region", ">", 32.3))]);
assert!(!table.could_pass_predicate(&predicate));
}
#[test]
@ -1345,11 +1484,13 @@ mod test {
// Get all the results
let predicate = Predicate::with_time_range(&[], 1, 31);
let results = table.read_filter(
&Selection::Some(&["time", "count", "region"]),
&predicate,
&[],
);
let results = table
.read_filter(
&Selection::Some(&["time", "count", "region"]),
&predicate,
&[],
)
.unwrap();
// check the column types
let exp_schema = ResultSchema {
@ -1395,7 +1536,9 @@ mod test {
Predicate::with_time_range(&[BinaryExpr::from(("region", "!=", "south"))], 1, 25);
// Apply a predicate `WHERE "region" != "south"`
let results = table.read_filter(&Selection::Some(&["time", "region"]), &predicate, &[]);
let results = table
.read_filter(&Selection::Some(&["time", "region"]), &predicate, &[])
.unwrap();
let exp_schema = ResultSchema {
select_columns: vec![
@ -1608,7 +1751,9 @@ west,host-b,100
// NULL, 400
let mut dst: BTreeSet<String> = BTreeSet::new();
dst = table.column_names(&Predicate::default(), Selection::All, dst);
dst = table
.column_names(&Predicate::default(), Selection::All, dst)
.unwrap();
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
@ -1616,7 +1761,9 @@ west,host-b,100
);
// re-run and get the same answer
dst = table.column_names(&Predicate::default(), Selection::All, dst);
dst = table
.column_names(&Predicate::default(), Selection::All, dst)
.unwrap();
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["region".to_owned(), "time".to_owned()],
@ -1624,26 +1771,39 @@ west,host-b,100
// include a predicate that doesn't match any region rows and still get
// region from previous results.
dst = table.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]),
Selection::All,
dst,
);
dst = table
.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]),
Selection::All,
dst,
)
.unwrap();
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["region".to_owned(), "time".to_owned()],
);
// wipe the destination buffer and region won't show up
dst = table.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]),
Selection::All,
BTreeSet::new(),
);
dst = table
.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]),
Selection::All,
BTreeSet::new(),
)
.unwrap();
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["time".to_owned()],
);
// invalid predicate
assert!(table
.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", "not a number"))]),
Selection::All,
dst,
)
.is_err());
}
#[test]

View File

@ -223,17 +223,14 @@ impl Database {
let db_name = &self.shared.config.name;
info!(%db_name, "marking database deleted");
let handle = {
let state = self.shared.state.read();
let handle = self.shared.state.read().freeze();
let handle = handle.await;
{
let state = self.shared.state.read();
// Can't delete an already deleted database.
ensure!(state.is_active(), NoActiveDatabaseToDelete { db_name });
state.try_freeze().context(TransitionInProgress {
db_name,
state: state.state_code(),
})?
};
}
// If there is an object store for this database, write out a tombstone file.
// If there isn't an object store, something is wrong and we shouldn't switch the
@ -250,16 +247,13 @@ impl Database {
.await
.context(CannotMarkDatabaseDeleted { db_name })?;
let shared = Arc::clone(&self.shared);
{
let mut state = shared.state.write();
*state.unfreeze(handle) = DatabaseState::NoActiveDatabase(
DatabaseStateKnown {},
Arc::new(InitError::NoActiveDatabase),
);
shared.state_notify.notify_waiters();
}
let mut state = self.shared.state.write();
let mut state = state.unfreeze(handle);
*state = DatabaseState::NoActiveDatabase(
DatabaseStateKnown {},
Arc::new(InitError::NoActiveDatabase),
);
self.shared.state_notify.notify_waiters();
Ok(())
}
@ -269,17 +263,14 @@ impl Database {
let db_name = &self.shared.config.name;
info!(%db_name, %generation_id, "restoring database");
let handle = {
let state = self.shared.state.read();
let handle = self.shared.state.read().freeze();
let handle = handle.await;
{
let state = self.shared.state.read();
// Can't restore an already active database.
ensure!(!state.is_active(), CannotRestoreActiveDatabase { db_name });
state.try_freeze().context(TransitionInProgress {
db_name,
state: state.state_code(),
})?
};
}
IoxObjectStore::restore_database(
Arc::clone(self.shared.application.object_store()),
@ -292,15 +283,12 @@ impl Database {
.await
.context(CannotRestoreDatabaseInObjectStorage)?;
let shared = Arc::clone(&self.shared);
{
// Reset the state
let mut state = shared.state.write();
*state.unfreeze(handle) = DatabaseState::Known(DatabaseStateKnown {});
shared.state_notify.notify_waiters();
info!(%db_name, "set database state to object store found");
}
// Reset the state
let mut state = self.shared.state.write();
let mut state = state.unfreeze(handle);
*state = DatabaseState::Known(DatabaseStateKnown {});
self.shared.state_notify.notify_waiters();
info!(%db_name, "set database state to object store found");
Ok(())
}
@ -504,11 +492,15 @@ impl Database {
}
/// Recover from a ReplayError by skipping replay
pub fn skip_replay(&self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
pub async fn skip_replay(&self) -> Result<(), Error> {
let db_name = &self.shared.config.name;
let (mut current_state, handle) = {
let handle = self.shared.state.read().freeze();
let handle = handle.await;
let mut current_state = {
let state = self.shared.state.read();
let current_state = match &**state {
match &**state {
DatabaseState::ReplayError(rules_loaded, _) => rules_loaded.clone(),
_ => {
return InvalidState {
@ -518,34 +510,20 @@ impl Database {
}
.fail()
}
};
let handle = state.try_freeze().context(TransitionInProgress {
db_name,
state: state.state_code(),
})?;
(current_state, handle)
}
};
let shared = Arc::clone(&self.shared);
current_state.replay_plan = Arc::new(None);
let current_state = current_state
.advance(self.shared.as_ref())
.await
.map_err(Box::new)
.context(SkipReplay { db_name })?;
Ok(async move {
let db_name = &shared.config.name;
current_state.replay_plan = Arc::new(None);
let current_state = current_state
.advance(shared.as_ref())
.await
.map_err(Box::new)
.context(SkipReplay { db_name })?;
let mut state = self.shared.state.write();
*state.unfreeze(handle) = DatabaseState::Initialized(current_state);
{
let mut state = shared.state.write();
*state.unfreeze(handle) = DatabaseState::Initialized(current_state);
}
Ok(())
})
Ok(())
}
/// Writes an entry to this `Database` this will either:
@ -1486,7 +1464,7 @@ mod tests {
assert!(matches!(err.as_ref(), InitError::Replay { .. }));
// skip replay
database.skip_replay().unwrap().await.unwrap();
database.skip_replay().await.unwrap();
database.wait_for_init().await.unwrap();
// wait for ingest
@ -1515,7 +1493,7 @@ mod tests {
assert!(db.partition_summary("table_1", "partition_by_c").is_none());
// cannot skip when database is initialized
let res = database.skip_replay();
let res = database.skip_replay().await;
assert!(matches!(res, Err(Error::InvalidState { .. })));
// clean up

View File

@ -7,6 +7,7 @@ use data_types::{
chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkSummary},
partition_metadata::{PartitionAddr, PartitionSummary},
};
use hashbrown::HashMap;
use internal_types::schema::Schema;
use observability_deps::tracing::info;
use persistence_windows::{
@ -14,11 +15,7 @@ use persistence_windows::{
};
use predicate::predicate::Predicate;
use snafu::{OptionExt, Snafu};
use std::{
collections::{btree_map::Entry, BTreeMap},
fmt::Display,
sync::Arc,
};
use std::{collections::BTreeMap, fmt::Display, sync::Arc};
use tracker::RwLock;
#[derive(Debug, Snafu)]
@ -42,6 +39,76 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Provides ordered iteration of a collection of chunks
#[derive(Debug, Default)]
struct ChunkCollection {
/// The chunks that make up this partition, indexed by order and id.
///
/// This is the order that chunks should be iterated and locks acquired
chunks: BTreeMap<(ChunkOrder, ChunkId), Arc<RwLock<CatalogChunk>>>,
/// Provides a lookup from `ChunkId` to the corresponding `ChunkOrder`
chunk_orders: HashMap<ChunkId, ChunkOrder>,
}
impl ChunkCollection {
/// Returns an iterator over the chunks in this collection
/// ordered by `ChunkOrder` and then `ChunkId`
fn iter(&self) -> impl Iterator<Item = (ChunkId, ChunkOrder, &Arc<RwLock<CatalogChunk>>)> + '_ {
self.chunks
.iter()
.map(|((order, id), chunk)| (*id, *order, chunk))
}
/// Returns an iterator over the chunks in this collection
/// ordered by `ChunkOrder` and then `ChunkId`
fn values(&self) -> impl Iterator<Item = &Arc<RwLock<CatalogChunk>>> + '_ {
self.chunks.values()
}
/// Gets a chunk by `ChunkId`
fn get(&self, id: ChunkId) -> Option<(&Arc<RwLock<CatalogChunk>>, ChunkOrder)> {
let order = *self.chunk_orders.get(&id)?;
let chunk = self.chunks.get(&(order, id)).unwrap();
Some((chunk, order))
}
/// Inserts a new chunk
///
/// # Panics
///
/// Panics if a chunk already exists with the given id
fn insert(
&mut self,
id: ChunkId,
order: ChunkOrder,
chunk: Arc<RwLock<CatalogChunk>>,
) -> &Arc<RwLock<CatalogChunk>> {
match self.chunk_orders.entry(id) {
hashbrown::hash_map::Entry::Occupied(_) => {
panic!("chunk already found with id: {}", id)
}
hashbrown::hash_map::Entry::Vacant(v) => v.insert(order),
};
match self.chunks.entry((order, id)) {
std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
std::collections::btree_map::Entry::Vacant(v) => v.insert(chunk),
}
}
/// Remove a chunk with the given ID, returns None if the chunk doesn't exist
fn remove(&mut self, id: ChunkId) -> Option<Arc<RwLock<CatalogChunk>>> {
let order = self.chunk_orders.remove(&id)?;
Some(self.chunks.remove(&(order, id)).unwrap())
}
/// Returns `true` if the collection contains no chunks
fn is_empty(&self) -> bool {
self.chunk_orders.is_empty()
}
}
/// IOx Catalog Partition
///
/// A partition contains multiple Chunks for a given table
@ -49,10 +116,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct Partition {
addr: PartitionAddr,
/// The chunks that make up this partition, indexed by id.
//
// Alongside the chunk we also store its order.
chunks: BTreeMap<ChunkId, (ChunkOrder, Arc<RwLock<CatalogChunk>>)>,
/// The chunks that make up this partition
chunks: ChunkCollection,
/// When this partition was created
created_at: DateTime<Utc>,
@ -138,7 +203,7 @@ impl Partition {
&mut self,
chunk: mutable_buffer::chunk::MBChunk,
time_of_write: DateTime<Utc>,
) -> Arc<RwLock<CatalogChunk>> {
) -> &Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name().as_ref(), self.table_name());
let chunk_id = self.next_chunk_id();
@ -154,17 +219,7 @@ impl Partition {
chunk_order,
);
let chunk = Arc::new(self.metrics.new_chunk_lock(chunk));
if self
.chunks
.insert(chunk_id, (chunk_order, Arc::clone(&chunk)))
.is_some()
{
// A fundamental invariant has been violated - abort
panic!("chunk already existed with id {}", chunk_id)
}
chunk
self.chunks.insert(chunk_id, chunk_order, chunk)
}
/// Create a new read buffer chunk.
@ -178,7 +233,7 @@ impl Partition {
schema: Arc<Schema>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> (ChunkId, Arc<RwLock<CatalogChunk>>) {
) -> (ChunkId, &Arc<RwLock<CatalogChunk>>) {
let chunk_id = self.next_chunk_id();
assert!(
chunk_order < self.next_chunk_order,
@ -201,14 +256,7 @@ impl Partition {
chunk_order,
)));
if self
.chunks
.insert(chunk_id, (chunk_order, Arc::clone(&chunk)))
.is_some()
{
// A fundamental invariant has been violated - abort
panic!("chunk already existed with id {}", chunk_id)
}
let chunk = self.chunks.insert(chunk_id, chunk_order, chunk);
(chunk_id, chunk)
}
@ -227,7 +275,7 @@ impl Partition {
time_of_last_write: DateTime<Utc>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> Arc<RwLock<CatalogChunk>> {
) -> &Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name());
let addr = ChunkAddr::new(&self.addr, chunk_id);
@ -245,28 +293,23 @@ impl Partition {
)),
);
match self.chunks.entry(chunk_id) {
Entry::Vacant(vacant) => {
// only update internal state when we know that insertion is OK
self.next_chunk_id = self.next_chunk_id.max(chunk_id.next());
self.next_chunk_order = self.next_chunk_order.max(chunk_order.next());
let chunk = self.chunks.insert(chunk_id, chunk_order, chunk);
vacant.insert((chunk_order, Arc::clone(&chunk)));
chunk
}
Entry::Occupied(_) => panic!("chunk with id {} already exists", chunk_id),
}
// only update internal state when we know that insertion is OK
self.next_chunk_id = self.next_chunk_id.max(chunk_id.next());
self.next_chunk_order = self.next_chunk_order.max(chunk_order.next());
chunk
}
/// Drop the specified chunk
pub fn drop_chunk(&mut self, chunk_id: ChunkId) -> Result<Arc<RwLock<CatalogChunk>>> {
match self.chunks.entry(chunk_id) {
Entry::Vacant(_) => Err(Error::ChunkNotFound {
match self.chunks.get(chunk_id) {
None => Err(Error::ChunkNotFound {
chunk: ChunkAddr::new(&self.addr, chunk_id),
}),
Entry::Occupied(occupied) => {
Some((chunk, _)) => {
{
let (_order, chunk) = occupied.get();
let chunk = chunk.read();
if let Some(action) = chunk.lifecycle_action() {
if action.metadata() != &ChunkLifecycleAction::Dropping {
@ -277,8 +320,7 @@ impl Partition {
}
}
}
let (_order, chunk) = occupied.remove();
Ok(chunk)
Ok(self.chunks.remove(chunk_id).unwrap())
}
}
}
@ -286,54 +328,41 @@ impl Partition {
/// Drop the specified chunk even if it has an in-progress lifecycle action
/// returning the dropped chunk
pub fn force_drop_chunk(&mut self, chunk_id: ChunkId) -> Result<Arc<RwLock<CatalogChunk>>> {
self.chunks
.remove(&chunk_id)
.map(|(_order, chunk)| chunk)
.context(ChunkNotFound {
chunk: ChunkAddr::new(&self.addr, chunk_id),
})
self.chunks.remove(chunk_id).context(ChunkNotFound {
chunk: ChunkAddr::new(&self.addr, chunk_id),
})
}
/// Return the first currently open chunk, if any
pub fn open_chunk(&self) -> Option<Arc<RwLock<CatalogChunk>>> {
self.chunks
.values()
.find(|(_order, chunk)| {
.find(|chunk| {
let chunk = chunk.read();
matches!(chunk.stage(), ChunkStage::Open { .. })
})
.cloned()
.map(|(_order, chunk)| chunk)
}
/// Return an immutable chunk and its order reference by chunk id.
pub fn chunk(&self, chunk_id: ChunkId) -> Option<(&Arc<RwLock<CatalogChunk>>, ChunkOrder)> {
self.chunks
.get(&chunk_id)
.map(|(order, chunk)| (chunk, *order))
self.chunks.get(chunk_id)
}
/// Return chunks in this partition.
///
/// Note that chunks are guaranteed ordered by chunk order and ID.
pub fn chunks(&self) -> Vec<&Arc<RwLock<CatalogChunk>>> {
self.keyed_chunks()
.into_iter()
.map(|(_id, _order, chunk)| chunk)
.collect()
/// Note that chunks are guaranteed ordered by chunk order and then ID.
pub fn chunks(&self) -> impl Iterator<Item = &Arc<RwLock<CatalogChunk>>> {
self.chunks.values()
}
/// Return chunks in this partition with their order and ids.
///
/// Note that chunks are guaranteed ordered by chunk order and ID.
pub fn keyed_chunks(&self) -> Vec<(ChunkId, ChunkOrder, &Arc<RwLock<CatalogChunk>>)> {
let mut chunks: Vec<_> = self
.chunks
.iter()
.map(|(id, (order, chunk))| (*id, *order, chunk))
.collect();
chunks.sort_by_key(|(id, order, _chunk)| (*order, *id));
chunks
pub fn keyed_chunks(
&self,
) -> impl Iterator<Item = (ChunkId, ChunkOrder, &Arc<RwLock<CatalogChunk>>)> + '_ {
self.chunks.iter()
}
/// Return a PartitionSummary for this partition. If the partition
@ -346,14 +375,14 @@ impl Partition {
self.addr.partition_key.to_string(),
self.chunks
.values()
.map(|(_order, chunk)| chunk.read().table_summary().as_ref().clone()),
.map(|chunk| chunk.read().table_summary().as_ref().clone()),
))
}
}
/// Return chunk summaries for all chunks in this partition
pub fn chunk_summaries(&self) -> impl Iterator<Item = ChunkSummary> + '_ {
self.chunks().into_iter().map(|x| x.read().summary())
self.chunks.values().map(|x| x.read().summary())
}
/// Return reference to partition-specific metrics.

View File

@ -374,7 +374,11 @@ impl QueryChunk for DbChunk {
"Negated Predicate pushed down to RUB"
);
let read_results = chunk.read_filter(rb_predicate, selection, negated_delete_exprs);
let read_results = chunk
.read_filter(rb_predicate, selection, negated_delete_exprs)
.context(ReadBufferChunkError {
chunk_id: self.id(),
})?;
let schema =
chunk
.read_filter_table_schema(selection)

View File

@ -108,21 +108,22 @@ pub(crate) fn compact_chunks(
.expect("chunk has zero rows");
let rb_row_groups = rb_chunk.row_groups();
let (_id, new_chunk) = {
let new_chunk = {
let mut partition = partition.write();
for id in chunk_ids {
partition.force_drop_chunk(id).expect(
"There was a lifecycle action attached to this chunk, who deleted it?!",
);
}
partition.create_rub_chunk(
let (_, chunk) = partition.create_rub_chunk(
rb_chunk,
time_of_first_write,
time_of_last_write,
schema,
delete_predicates,
min_order,
)
);
Arc::clone(chunk)
};
let guard = new_chunk.read();

View File

@ -164,7 +164,7 @@ where
);
let to_persist = LockableCatalogChunk {
db,
chunk: new_chunk,
chunk: Arc::clone(new_chunk),
id: new_chunk_id,
order: min_order,
};

View File

@ -38,7 +38,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Manage long-running IOx operations
/// Interrogate internal database data
#[derive(Debug, StructOpt)]
pub struct Config {
#[structopt(subcommand)]

View File

@ -550,7 +550,6 @@ where
database
.skip_replay()
.map_err(default_database_error_handler)?
.await
.map_err(default_database_error_handler)?;

View File

@ -13,7 +13,10 @@ async fn setup() -> (UdpCapture, ServerFixture) {
.with_env("TRACES_EXPORTER", "jaeger")
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
.with_env("JAEGER_TRACE_CONTEXT_HEADER_NAME", "custom-trace-header")
.with_env(
"TRACES_EXPORTER_JAEGER_TRACE_CONTEXT_HEADER_NAME",
"custom-trace-header",
)
.with_client_header("custom-trace-header", "4:3:2:1");
let server_fixture = ServerFixture::create_single_use_with_config(test_config).await;
@ -114,7 +117,7 @@ pub async fn test_tracing_create_trace() {
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
// setup a custom debug name (to ensure it gets plumbed through)
.with_env("JAEGER_DEBUG_NAME", "force-trace")
.with_env("TRACES_EXPORTER_JAEGER_DEBUG_NAME", "force-trace")
.with_client_header("force-trace", "some-debug-id");
let server_fixture = ServerFixture::create_single_use_with_config(test_config).await;

View File

@ -88,7 +88,7 @@ pub struct TracingConfig {
/// Only used if `--traces-exporter` is "jaeger".
#[structopt(
long = "--traces-exporter-jaeger-trace-context-header-name",
env = "JAEGER_TRACE_CONTEXT_HEADER_NAME",
env = "TRACES_EXPORTER_JAEGER_TRACE_CONTEXT_HEADER_NAME",
default_value = "uber-trace-id"
)]
pub traces_jaeger_trace_context_header_name: String,
@ -98,7 +98,7 @@ pub struct TracingConfig {
/// Only used if `--traces-exporter` is "jaeger".
#[structopt(
long = "--traces-jaeger-debug-name",
env = "JAEGER_DEBUG_NAME",
env = "TRACES_EXPORTER_JAEGER_DEBUG_NAME",
default_value = "jaeger-debug-id"
)]
pub traces_jaeger_debug_name: String,