feat: Use datafusion serialization code rather than our own copy of it (#4421)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-04-28 09:03:34 -04:00 committed by GitHub
parent e41785ac3b
commit e13d3433ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 80 additions and 1989 deletions

19
Cargo.lock generated
View File

@ -1187,6 +1187,7 @@ name = "datafusion"
version = "0.1.0"
dependencies = [
"datafusion 7.0.0",
"datafusion-proto",
"workspace-hack",
]
@ -1280,6 +1281,16 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "datafusion-proto"
version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=0d125356ba482e9302cd52963980157965bce9e1#0d125356ba482e9302cd52963980157965bce9e1"
dependencies = [
"datafusion 7.0.0",
"prost",
"tonic-build",
]
[[package]]
name = "datafusion_util"
version = "0.1.0"
@ -3594,9 +3605,9 @@ dependencies = [
[[package]]
name = "nix"
version = "0.24.1"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f17df307904acd05aa8e32e97bb20f2a0df1728bbc2d771ae8f9a90463441e9"
checksum = "8f6d99b651ff9697d6710f9613a07c5b4e0d655040faf56b3b471af108d55597"
dependencies = [
"bitflags",
"cfg-if",
@ -6057,7 +6068,7 @@ dependencies = [
"http",
"hyper",
"influxdb_iox_client",
"nix 0.24.1",
"nix 0.24.0",
"observability_deps",
"once_cell",
"parking_lot 0.12.0",
@ -7075,6 +7086,8 @@ dependencies = [
"parquet",
"predicates",
"proptest",
"prost",
"prost-types",
"rand",
"regex",
"regex-automata",

View File

@ -924,7 +924,7 @@ pub struct ProcessedTombstone {
}
/// Request from the querier service to the ingester service
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct IngesterQueryRequest {
/// namespace to search
pub namespace: String,

View File

@ -10,4 +10,5 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. don't get support for crypo functions or avro)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="0d125356ba482e9302cd52963980157965bce9e1", default-features = false, package = "datafusion" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="0d125356ba482e9302cd52963980157965bce9e1" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -1 +1,4 @@
pub use upstream::*;
// Publically re-export datafusion-proto crate as well
pub use datafusion_proto;

View File

@ -90,7 +90,9 @@ message Predicate {
// Optional arbitrary predicates, represented as list of DataFusion expressions applied a logical
// conjunction (aka they are 'AND'ed together). Only rows that evaluate to TRUE for all these
// expressions should be returned. Other rows are excluded from the results.
repeated LogicalExprNode exprs = 4;
//
// Encoded using DataFusion's Expr serialization code
repeated bytes exprs = 4;
// Optional arbitrary predicates on the special `_value` column. These expressions are applied to
// `field_columns` projections in the form of `CASE` statement conditions.
@ -106,429 +108,9 @@ message TimestampRange {
int64 end = 2;
}
// DataFusion logical expressions
message LogicalExprNode {
oneof expr_type {
// column references
Column column = 1;
// alias
AliasNode alias = 2;
ScalarValue literal = 3;
// binary expressions
BinaryExprNode binary_expr = 4;
// aggregate expressions
AggregateExprNode aggregate_expr = 5;
// null checks
IsNull is_null_expr = 6;
IsNotNull is_not_null_expr = 7;
Not not_expr = 8;
BetweenNode between = 9;
CaseNode case_node = 10;
CastNode cast = 11;
SortExprNode sort = 12;
NegativeNode negative = 13;
InListNode in_list = 14;
bool wildcard = 15;
ScalarFunctionNode scalar_function = 16;
TryCastNode try_cast = 17;
// window expressions
WindowExprNode window_expr = 18;
}
}
// A wrapper around a DataFusion expression against `_value` columns
message ValueExpr {
LogicalExprNode expr = 1;
// Encoded using DataFusion's Expr serialization code
bytes expr = 1;
}
message BinaryExprNode {
LogicalExprNode l = 1;
LogicalExprNode r = 2;
string op = 3;
}
message ColumnRelation {
string relation = 1;
}
message Column {
string name = 1;
ColumnRelation relation = 2;
}
message AliasNode {
LogicalExprNode expr = 1;
string alias = 2;
}
message ScalarListValue{
ScalarType datatype = 1;
repeated ScalarValue values = 2;
}
message ScalarValue {
oneof value {
bool bool_value = 1;
string utf8_value = 2;
string large_utf8_value = 3;
int32 int8_value = 4;
int32 int16_value = 5;
int32 int32_value = 6;
int64 int64_value = 7;
uint32 uint8_value = 8;
uint32 uint16_value = 9;
uint32 uint32_value = 10;
uint64 uint64_value = 11;
float float32_value = 12;
double float64_value = 13;
int32 date_32_value = 14;
int64 time_microsecond_value = 15;
int64 time_nanosecond_value = 16;
ScalarListValue list_value = 17;
ScalarType null_list_value = 18;
PrimitiveScalarType null_value = 19;
Decimal128 decimal128_value = 20;
int64 date_64_value = 21;
int64 time_second_value = 22;
int64 time_millisecond_value = 23;
int32 interval_yearmonth_value = 24;
int64 interval_daytime_value = 25;
}
}
enum AggregateFunction {
AGGREGATE_FUNCTION_UNSPECIFIED = 0;
AGGREGATE_FUNCTION_MIN = 1;
AGGREGATE_FUNCTION_MAX = 2;
AGGREGATE_FUNCTION_SUM = 3;
AGGREGATE_FUNCTION_AVG = 4;
AGGREGATE_FUNCTION_COUNT = 5;
AGGREGATE_FUNCTION_APPROX_DISTINCT = 6;
AGGREGATE_FUNCTION_ARRAY_AGG = 7;
AGGREGATE_FUNCTION_VARIANCE = 8;
AGGREGATE_FUNCTION_VARIANCE_POP = 9;
AGGREGATE_FUNCTION_COVARIANCE = 10;
AGGREGATE_FUNCTION_COVARIANCE_POP = 11;
AGGREGATE_FUNCTION_STDDEV = 12;
AGGREGATE_FUNCTION_STDDEV_POP = 13;
AGGREGATE_FUNCTION_CORRELATION = 14;
AGGREGATE_FUNCTION_APPROX_PERCENTILE_CONT = 15;
AGGREGATE_FUNCTION_APPROX_MEDIAN = 16;
}
message AggregateExprNode {
AggregateFunction aggr_function = 1;
repeated LogicalExprNode expr = 2;
}
message IsNull {
LogicalExprNode expr = 1;
}
message IsNotNull {
LogicalExprNode expr = 1;
}
message Not {
LogicalExprNode expr = 1;
}
message WindowExprNode {
oneof window_function {
AggregateFunction aggr_function = 1;
BuiltInWindowFunction built_in_function = 2;
}
LogicalExprNode expr = 3;
repeated LogicalExprNode partition_by = 4;
repeated LogicalExprNode order_by = 5;
optional WindowFrame window_frame = 6;
}
message BetweenNode {
LogicalExprNode expr = 1;
bool negated = 2;
LogicalExprNode low = 3;
LogicalExprNode high = 4;
}
message CaseNode {
LogicalExprNode expr = 1;
repeated WhenThen when_then_expr = 2;
LogicalExprNode else_expr = 3;
}
message WhenThen {
LogicalExprNode when_expr = 1;
LogicalExprNode then_expr = 2;
}
message CastNode {
LogicalExprNode expr = 1;
ArrowType arrow_type = 2;
}
message TryCastNode {
LogicalExprNode expr = 1;
ArrowType arrow_type = 2;
}
message SortExprNode {
LogicalExprNode expr = 1;
bool asc = 2;
bool nulls_first = 3;
}
message NegativeNode {
LogicalExprNode expr = 1;
}
message InListNode {
LogicalExprNode expr = 1;
repeated LogicalExprNode list = 2;
bool negated = 3;
}
enum ScalarFunction {
SCALAR_FUNCTION_UNSPECIFIED = 0;
SCALAR_FUNCTION_SQRT = 1;
SCALAR_FUNCTION_SIN = 2;
SCALAR_FUNCTION_COS = 3;
SCALAR_FUNCTION_TAN = 4;
SCALAR_FUNCTION_ASIN = 5;
SCALAR_FUNCTION_ACOS = 6;
SCALAR_FUNCTION_ATAN = 7;
SCALAR_FUNCTION_EXP = 8;
SCALAR_FUNCTION_LOG = 9;
SCALAR_FUNCTION_LOG2 = 10;
SCALAR_FUNCTION_LOG10 = 11;
SCALAR_FUNCTION_FLOOR = 12;
SCALAR_FUNCTION_CEIL = 13;
SCALAR_FUNCTION_ROUND = 14;
SCALAR_FUNCTION_TRUNC = 15;
SCALAR_FUNCTION_ABS = 16;
SCALAR_FUNCTION_SIGNUM = 17;
SCALAR_FUNCTION_OCTETLENGTH = 18;
SCALAR_FUNCTION_CONCAT = 19;
SCALAR_FUNCTION_LOWER = 20;
SCALAR_FUNCTION_UPPER = 21;
SCALAR_FUNCTION_TRIM = 22;
SCALAR_FUNCTION_LTRIM = 23;
SCALAR_FUNCTION_RTRIM = 24;
SCALAR_FUNCTION_TOTIMESTAMP = 25;
SCALAR_FUNCTION_ARRAY = 26;
SCALAR_FUNCTION_NULLIF = 27;
SCALAR_FUNCTION_DATEPART = 28;
SCALAR_FUNCTION_DATETRUNC = 29;
SCALAR_FUNCTION_MD5 = 30;
SCALAR_FUNCTION_SHA224 = 31;
SCALAR_FUNCTION_SHA256 = 32;
SCALAR_FUNCTION_SHA384 = 33;
SCALAR_FUNCTION_SHA512 = 34;
SCALAR_FUNCTION_LN = 35;
SCALAR_FUNCTION_TOTIMESTAMPMILLIS = 36;
SCALAR_FUNCTION_DIGEST = 37;
}
message ScalarFunctionNode {
ScalarFunction fun = 1;
repeated LogicalExprNode args = 2;
}
message ScalarType {
oneof datatype {
PrimitiveScalarType scalar = 1;
ScalarListType list = 2;
}
}
// Contains all valid datafusion scalar types except for List
enum PrimitiveScalarType {
PRIMITIVE_SCALAR_TYPE_UNSPECIFIED = 0;
PRIMITIVE_SCALAR_TYPE_BOOL = 1;
PRIMITIVE_SCALAR_TYPE_UINT8 = 2;
PRIMITIVE_SCALAR_TYPE_INT8 = 3;
PRIMITIVE_SCALAR_TYPE_UINT16 = 4;
PRIMITIVE_SCALAR_TYPE_INT16 = 5;
PRIMITIVE_SCALAR_TYPE_UINT32 = 6;
PRIMITIVE_SCALAR_TYPE_INT32 = 7;
PRIMITIVE_SCALAR_TYPE_UINT64 = 8;
PRIMITIVE_SCALAR_TYPE_INT64 = 9;
PRIMITIVE_SCALAR_TYPE_FLOAT32 = 10;
PRIMITIVE_SCALAR_TYPE_FLOAT64 = 11;
PRIMITIVE_SCALAR_TYPE_UTF8 = 12;
PRIMITIVE_SCALAR_TYPE_LARGE_UTF8 = 13;
PRIMITIVE_SCALAR_TYPE_DATE32 = 14;
PRIMITIVE_SCALAR_TYPE_TIME_MICROSECOND = 15;
PRIMITIVE_SCALAR_TYPE_TIME_NANOSECOND = 16;
PRIMITIVE_SCALAR_TYPE_NULL = 17;
PRIMITIVE_SCALAR_TYPE_DECIMAL128 = 18;
PRIMITIVE_SCALAR_TYPE_DATE64 = 19;
PRIMITIVE_SCALAR_TYPE_TIME_SECOND = 20;
PRIMITIVE_SCALAR_TYPE_TIME_MILLISECOND = 21;
PRIMITIVE_SCALAR_TYPE_INTERVAL_YEARMONTH = 22;
PRIMITIVE_SCALAR_TYPE_INTERVAL_DAYTIME = 23;
}
message ScalarListType {
repeated string field_names = 1;
PrimitiveScalarType deepest_type = 2;
}
message Decimal128 {
bytes value = 1;
int64 p = 2;
int64 s = 3;
}
enum BuiltInWindowFunction {
BUILT_IN_WINDOW_FUNCTION_UNSPECIFIED = 0;
BUILT_IN_WINDOW_FUNCTION_ROW_NUMBER = 1;
BUILT_IN_WINDOW_FUNCTION_RANK = 2;
BUILT_IN_WINDOW_FUNCTION_DENSE_RANK = 3;
BUILT_IN_WINDOW_FUNCTION_PERCENT_RANK = 4;
BUILT_IN_WINDOW_FUNCTION_CUME_DIST = 5;
BUILT_IN_WINDOW_FUNCTION_NTILE = 6;
BUILT_IN_WINDOW_FUNCTION_LAG = 7;
BUILT_IN_WINDOW_FUNCTION_LEAD = 8;
BUILT_IN_WINDOW_FUNCTION_FIRST_VALUE = 9;
BUILT_IN_WINDOW_FUNCTION_LAST_VALUE = 10;
BUILT_IN_WINDOW_FUNCTION_NTH_VALUE = 11;
}
message WindowFrame {
WindowFrameUnits window_frame_units = 1;
WindowFrameBound start_bound = 2;
optional WindowFrameBound end_bound = 3;
}
enum WindowFrameUnits {
WINDOW_FRAME_UNITS_UNSPECIFIED = 0;
WINDOW_FRAME_UNITS_ROWS = 1;
WINDOW_FRAME_UNITS_RANGE = 2;
WINDOW_FRAME_UNITS_GROUPS = 3;
}
message WindowFrameBound {
WindowFrameBoundType window_frame_bound_type = 1;
optional uint64 bound_value = 2;
}
enum WindowFrameBoundType {
WINDOW_FRAME_BOUND_TYPE_UNSPECIFIED = 0;
WINDOW_FRAME_BOUND_TYPE_CURRENT_ROW = 1;
WINDOW_FRAME_BOUND_TYPE_PRECEDING = 2;
WINDOW_FRAME_BOUND_TYPE_FOLLOWING = 3;
}
// Broken out into multiple message types so that type metadata did not need to be in separate
// messages. All types that are of the empty message types contain no additional metadata about the
// type.
message ArrowType {
oneof arrow_type_enum {
EmptyMessage none = 1; // arrow::Type::NA
EmptyMessage bool = 2; // arrow::Type::BOOL
EmptyMessage uint8 = 3; // arrow::Type::UINT8
EmptyMessage int8 = 4; // arrow::Type::INT8
EmptyMessage uint16 = 5; // represents arrow::Type fields in src/arrow/type.h
EmptyMessage int16 = 6;
EmptyMessage uint32 = 7;
EmptyMessage int32 = 8;
EmptyMessage uint64 = 9;
EmptyMessage int64 = 10;
EmptyMessage float16 = 11;
EmptyMessage float32 = 12;
EmptyMessage float64 = 13;
EmptyMessage utf8 = 14;
EmptyMessage binary = 15;
int32 fixed_size_binary = 16;
EmptyMessage date32 = 17;
EmptyMessage date64 = 18;
TimeUnit duration = 19;
Timestamp timestamp = 20;
TimeUnit time32 = 21;
TimeUnit time64 = 22;
IntervalUnit interval = 23;
Decimal decimal = 24;
List list = 25;
List large_list = 26;
FixedSizeList fixed_size_list = 27;
Struct struct = 28;
Union union = 29;
Dictionary dictionary = 30;
EmptyMessage large_binary = 31;
EmptyMessage large_utf8 = 32;
}
}
enum TimeUnit {
TIME_UNIT_UNSPECIFIED = 0;
TIME_UNIT_SECOND = 1;
TIME_UNIT_TIME_MILLISECOND = 2;
TIME_UNIT_MICROSECOND = 3;
TIME_UNIT_NANOSECOND = 4;
}
enum IntervalUnit {
INTERVAL_UNIT_UNSPECIFIED = 0;
INTERVAL_UNIT_YEAR_MONTH = 1;
INTERVAL_UNIT_DAY_TIME = 2;
INTERVAL_UNIT_MONTH_DAY_NANO = 3;
}
message Decimal {
uint64 whole = 1;
uint64 fractional = 2;
}
message Timestamp {
TimeUnit time_unit = 1;
string timezone = 2;
}
message List {
Field field_type = 1;
}
message FixedSizeList {
Field field_type = 1;
int32 list_size = 2;
}
message Struct {
repeated Field sub_field_types = 1;
}
enum UnionMode {
UNION_MODE_UNSPECIFIED = 0;
UNION_MODE_SPARSE = 1;
UNION_MODE_DENSE = 2;
}
message Union {
repeated Field union_types = 1;
UnionMode union_mode = 2;
}
message Dictionary {
ArrowType key = 1;
ArrowType value = 2;
}
message Field {
// name of the field
string name = 1;
ArrowType arrow_type = 2;
bool nullable = 3;
// for complex data types like structs, unions
repeated Field children = 4;
}
// Useful for representing an empty Rust enum variant
message EmptyMessage {}

File diff suppressed because it is too large Load Diff

View File

@ -326,6 +326,12 @@ impl PredicateBuilder {
self
}
/// Adds a ValueExpr to the list of value expressons
pub fn add_value_expr(mut self, value_expr: ValueExpr) -> Self {
self.inner.value_expr.push(value_expr);
self
}
/// Builds a regex matching expression from the provided column name and
/// pattern. Values not matching the regex will be filtered out.
pub fn build_regex_match_expr(mut self, column: &str, pattern: impl Into<String>) -> Self {

View File

@ -47,6 +47,8 @@ once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"]
parquet = { version = "12", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] }
predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] }
proptest = { version = "1", features = ["bit-set", "break-dead-code", "fork", "lazy_static", "quick-error", "regex-syntax", "rusty-fork", "std", "tempfile", "timeout"] }
prost = { version = "0.10", features = ["prost-derive", "std"] }
prost-types = { version = "0.10", features = ["std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-automata = { version = "0.1", features = ["regex-syntax", "std"] }
@ -101,6 +103,8 @@ memchr = { version = "2", features = ["std"] }
nom = { version = "7", features = ["alloc", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] }
prost = { version = "0.10", features = ["prost-derive", "std"] }
prost-types = { version = "0.10", features = ["std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }