diff --git a/query/csv/result.go b/query/csv/result.go index ad391f42b7..f2304fd322 100644 --- a/query/csv/result.go +++ b/query/csv/result.go @@ -232,7 +232,7 @@ type tableMetadata struct { TableID string Cols []colMeta Partitions []bool - Defaults []interface{} + Defaults []values.Value NumFields int } @@ -319,7 +319,7 @@ func readMetadata(r *csv.Reader, c ResultDecoderConfig, extraLine []string) (tab } cols := make([]colMeta, len(labels)) - defaultValues := make([]interface{}, len(labels)) + defaultValues := make([]values.Value, len(labels)) partitionValues := make([]bool, len(labels)) for j, label := range labels { @@ -502,10 +502,10 @@ func (b *blockDecoder) init(line []string) error { record = line[recordStartIdx:] } keyCols := make([]query.ColMeta, 0, len(b.meta.Cols)) - keyValues := make([]interface{}, 0, len(b.meta.Cols)) + keyValues := make([]values.Value, 0, len(b.meta.Cols)) for j, c := range b.meta.Cols { if b.meta.Partitions[j] { - var value interface{} + var value values.Value if b.meta.Defaults[j] != nil { value = b.meta.Defaults[j] } else if record != nil { @@ -537,22 +537,22 @@ func (b *blockDecoder) appendRecord(record []string) error { if record[j] == "" && b.meta.Defaults[j] != nil { switch c.Type { case query.TBool: - v := b.meta.Defaults[j].(bool) + v := b.meta.Defaults[j].Bool() b.builder.AppendBool(j, v) case query.TInt: - v := b.meta.Defaults[j].(int64) + v := b.meta.Defaults[j].Int() b.builder.AppendInt(j, v) case query.TUInt: - v := b.meta.Defaults[j].(uint64) + v := b.meta.Defaults[j].UInt() b.builder.AppendUInt(j, v) case query.TFloat: - v := b.meta.Defaults[j].(float64) + v := b.meta.Defaults[j].Float() b.builder.AppendFloat(j, v) case query.TString: - v := b.meta.Defaults[j].(string) + v := b.meta.Defaults[j].Str() b.builder.AppendString(j, v) case query.TTime: - v := b.meta.Defaults[j].(execute.Time) + v := b.meta.Defaults[j].Time() b.builder.AppendTime(j, v) default: return fmt.Errorf("unsupported column type %v", c.Type) @@ -843,24 +843,45 @@ func writeDefaults(writer *csv.Writer, row, defaults []string) error { return writer.Write(row) } -func decodeValue(value string, c colMeta) (v interface{}, err error) { +func decodeValue(value string, c colMeta) (values.Value, error) { + var val values.Value switch c.Type { case query.TBool: - v, err = strconv.ParseBool(value) + v, err := strconv.ParseBool(value) + if err != nil { + return nil, err + } + val = values.NewBoolValue(v) case query.TInt: - v, err = strconv.ParseInt(value, 10, 64) + v, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, err + } + val = values.NewIntValue(v) case query.TUInt: - v, err = strconv.ParseUint(value, 10, 64) + v, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return nil, err + } + val = values.NewUIntValue(v) case query.TFloat: - v, err = strconv.ParseFloat(value, 64) + v, err := strconv.ParseFloat(value, 64) + if err != nil { + return nil, err + } + val = values.NewFloatValue(v) case query.TString: - v = value + val = values.NewStringValue(value) case query.TTime: - v, err = decodeTime(value, c.fmt) + v, err := decodeTime(value, c.fmt) + if err != nil { + return nil, err + } + val = values.NewTimeValue(v) default: return nil, fmt.Errorf("unsupported type %v", c.Type) } - return + return val, nil } func decodeValueInto(j int, c colMeta, value string, builder execute.BlockBuilder) error { @@ -903,20 +924,20 @@ func decodeValueInto(j int, c colMeta, value string, builder execute.BlockBuilde return nil } -func encodeValue(value interface{}, c colMeta) (string, error) { +func encodeValue(value values.Value, c colMeta) (string, error) { switch c.Type { case query.TBool: - return strconv.FormatBool(value.(bool)), nil + return strconv.FormatBool(value.Bool()), nil case query.TInt: - return strconv.FormatInt(value.(int64), 10), nil + return strconv.FormatInt(value.Int(), 10), nil case query.TUInt: - return strconv.FormatUint(value.(uint64), 10), nil + return strconv.FormatUint(value.UInt(), 10), nil case query.TFloat: - return strconv.FormatFloat(value.(float64), 'f', -1, 64), nil + return strconv.FormatFloat(value.Float(), 'f', -1, 64), nil case query.TString: - return value.(string), nil + return value.Str(), nil case query.TTime: - return encodeTime(value.(execute.Time), c.fmt), nil + return encodeTime(value.Time(), c.fmt), nil default: return "", fmt.Errorf("unknown type %v", c.Type) } diff --git a/query/docs/SPEC.md b/query/docs/SPEC.md index cd994d8875..049518741a 100644 --- a/query/docs/SPEC.md +++ b/query/docs/SPEC.md @@ -999,12 +999,19 @@ Map applies a function to each record of the input tables. The modified records are assigned to new tables based on the partition key of the input table. The output tables are the result of applying the map function to each record on the input tables. +When the output record contains a different value for the partition key the record is repartitioned into the appropriate table. +When the output record drops a column that was part of the partition key that column is removed from the partition key. + Map has the following properties: * `fn` function Function to apply to each record. The return value must be an object. - Only properties defined on the return object will be present on the output records. +* `mergeKey` bool + MergeKey indicates if the record returned from fn should be merged with the partition key. + When merging, all columns on the partition key will be added to the record giving precedence to any columns already present on the record. + When not merging, only columns defined on the returned record will be present on the output records. + Defaults to true. #### Range diff --git a/query/execute/block.go b/query/execute/block.go index 94e7da572c..da2a39fdb3 100644 --- a/query/execute/block.go +++ b/query/execute/block.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/values" "github.com/pkg/errors" ) @@ -19,12 +20,12 @@ const ( type partitionKey struct { cols []query.ColMeta - values []interface{} + values []values.Value hasHash bool hash uint64 } -func NewPartitionKey(cols []query.ColMeta, values []interface{}) query.PartitionKey { +func NewPartitionKey(cols []query.ColMeta, values []values.Value) query.PartitionKey { return &partitionKey{ cols: cols, values: values, @@ -37,60 +38,29 @@ func (k *partitionKey) Cols() []query.ColMeta { func (k *partitionKey) HasCol(label string) bool { return ColIdx(label, k.cols) >= 0 } -func (k *partitionKey) Value(j int) interface{} { +func (k *partitionKey) Value(j int) values.Value { return k.values[j] } func (k *partitionKey) ValueBool(j int) bool { - return k.values[j].(bool) + return k.values[j].Bool() } func (k *partitionKey) ValueUInt(j int) uint64 { - return k.values[j].(uint64) + return k.values[j].UInt() } func (k *partitionKey) ValueInt(j int) int64 { - return k.values[j].(int64) + return k.values[j].Int() } func (k *partitionKey) ValueFloat(j int) float64 { - return k.values[j].(float64) + return k.values[j].Float() } func (k *partitionKey) ValueString(j int) string { - return k.values[j].(string) + return k.values[j].Str() } func (k *partitionKey) ValueDuration(j int) Duration { - return k.values[j].(Duration) + return k.values[j].Duration() } func (k *partitionKey) ValueTime(j int) Time { - return k.values[j].(Time) -} - -func (k *partitionKey) Intersect(keys []string) query.PartitionKey { - nk := &partitionKey{ - cols: make([]query.ColMeta, 0, len(k.cols)), - values: make([]interface{}, 0, len(k.values)), - } - for i, c := range k.cols { - found := false - for _, label := range keys { - if c.Label == label { - found = true - break - } - } - - if found { - nk.cols = append(nk.cols, c) - nk.values = append(nk.values, k.values[i]) - } - } - return nk -} -func (k *partitionKey) Diff(labels []string) []string { - diff := make([]string, 0, len(labels)) - for _, label := range labels { - if ColIdx(label, k.cols) < 0 { - diff = append(diff, label) - } - } - return diff + return k.values[j].Time() } func (k *partitionKey) Hash() uint64 { @@ -209,7 +179,7 @@ func PartitionKeyForRow(i int, cr query.ColReader) query.PartitionKey { key := cr.Key() cols := cr.Cols() colsCpy := make([]query.ColMeta, 0, len(cols)) - values := make([]interface{}, 0, len(cols)) + vs := make([]values.Value, 0, len(cols)) for j, c := range cols { if !key.HasCol(c.Label) { continue @@ -217,28 +187,28 @@ func PartitionKeyForRow(i int, cr query.ColReader) query.PartitionKey { colsCpy = append(colsCpy, c) switch c.Type { case query.TBool: - values = append(values, cr.Bools(j)[i]) + vs = append(vs, values.NewBoolValue(cr.Bools(j)[i])) case query.TInt: - values = append(values, cr.Ints(j)[i]) + vs = append(vs, values.NewIntValue(cr.Ints(j)[i])) case query.TUInt: - values = append(values, cr.UInts(j)[i]) + vs = append(vs, values.NewUIntValue(cr.UInts(j)[i])) case query.TFloat: - values = append(values, cr.Floats(j)[i]) + vs = append(vs, values.NewFloatValue(cr.Floats(j)[i])) case query.TString: - values = append(values, cr.Strings(j)[i]) + vs = append(vs, values.NewStringValue(cr.Strings(j)[i])) case query.TTime: - values = append(values, cr.Times(j)[i]) + vs = append(vs, values.NewTimeValue(cr.Times(j)[i])) } } return &partitionKey{ cols: colsCpy, - values: values, + values: vs, } } func PartitionKeyForRowOn(i int, cr query.ColReader, on map[string]bool) query.PartitionKey { cols := make([]query.ColMeta, 0, len(on)) - values := make([]interface{}, 0, len(on)) + vs := make([]values.Value, 0, len(on)) for j, c := range cr.Cols() { if !on[c.Label] { continue @@ -246,20 +216,20 @@ func PartitionKeyForRowOn(i int, cr query.ColReader, on map[string]bool) query.P cols = append(cols, c) switch c.Type { case query.TBool: - values = append(values, cr.Bools(j)[i]) + vs = append(vs, values.NewBoolValue(cr.Bools(j)[i])) case query.TInt: - values = append(values, cr.Ints(j)[i]) + vs = append(vs, values.NewIntValue(cr.Ints(j)[i])) case query.TUInt: - values = append(values, cr.UInts(j)[i]) + vs = append(vs, values.NewUIntValue(cr.UInts(j)[i])) case query.TFloat: - values = append(values, cr.Floats(j)[i]) + vs = append(vs, values.NewFloatValue(cr.Floats(j)[i])) case query.TString: - values = append(values, cr.Strings(j)[i]) + vs = append(vs, values.NewStringValue(cr.Strings(j)[i])) case query.TTime: - values = append(values, cr.Times(j)[i]) + vs = append(vs, values.NewTimeValue(cr.Times(j)[i])) } } - return NewPartitionKey(cols, values) + return NewPartitionKey(cols, vs) } // OneTimeBlock is a Block that permits reading data only once. diff --git a/query/execute/executetest/block.go b/query/execute/executetest/block.go index d96c8e230e..8799c80eb9 100644 --- a/query/execute/executetest/block.go +++ b/query/execute/executetest/block.go @@ -5,6 +5,7 @@ import ( "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute" + "github.com/influxdata/platform/query/values" ) // Block is an implementation of execute.Block @@ -31,6 +32,7 @@ type Block struct { func (b *Block) Normalize() { if b.PartitionKey == nil { cols := make([]query.ColMeta, len(b.KeyCols)) + vs := make([]values.Value, len(b.KeyCols)) if len(b.KeyValues) != len(b.KeyCols) { b.KeyValues = make([]interface{}, len(b.KeyCols)) } @@ -43,8 +45,13 @@ func (b *Block) Normalize() { if len(b.Data) > 0 { b.KeyValues[j] = b.Data[0][idx] } + v, err := values.NewValue(b.KeyValues[j], execute.ConvertToKind(cols[j].Type)) + if err != nil { + panic(err) + } + vs[j] = v } - b.PartitionKey = execute.NewPartitionKey(cols, b.KeyValues) + b.PartitionKey = execute.NewPartitionKey(cols, vs) } } @@ -150,7 +157,24 @@ func ConvertBlock(b query.Block) (*Block, error) { blk.KeyValues = make([]interface{}, len(keyCols)) for j, c := range keyCols { blk.KeyCols[j] = c.Label - blk.KeyValues[j] = key.Value(j) + var v interface{} + switch c.Type { + case query.TBool: + v = key.ValueBool(j) + case query.TUInt: + v = key.ValueUInt(j) + case query.TInt: + v = key.ValueInt(j) + case query.TFloat: + v = key.ValueFloat(j) + case query.TString: + v = key.ValueString(j) + case query.TTime: + v = key.ValueTime(j) + default: + return nil, fmt.Errorf("unsupported column type %v", c.Type) + } + blk.KeyValues[j] = v } } diff --git a/query/functions/data_test.go b/query/functions/data_test.go index 0cb9352eb9..88c3c9c63d 100644 --- a/query/functions/data_test.go +++ b/query/functions/data_test.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute" "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/values" ) const ( @@ -47,10 +48,10 @@ func init() { {Label: execute.DefaultStopColLabel, Type: query.TTime}, {Label: "t1", Type: query.TString}, }, - []interface{}{ - start, - stop, - t1Value, + []values.Value{ + values.NewTimeValue(start), + values.NewTimeValue(stop), + values.NewStringValue(t1Value), }, ) normalBlockBuilder := execute.NewColListBlockBuilder(key, executetest.UnlimitedAllocator) diff --git a/query/functions/map.go b/query/functions/map.go index 1c1898b52c..741e4c2b62 100644 --- a/query/functions/map.go +++ b/query/functions/map.go @@ -10,12 +10,14 @@ import ( "github.com/influxdata/platform/query/interpreter" "github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/semantic" + "github.com/influxdata/platform/query/values" ) const MapKind = "map" type MapOpSpec struct { - Fn *semantic.FunctionExpression `json:"fn"` + Fn *semantic.FunctionExpression `json:"fn"` + MergeKey bool `json:"mergeKey"` } var mapSignature = query.DefaultFunctionSignature() @@ -34,17 +36,26 @@ func createMapOpSpec(args query.Arguments, a *query.Administration) (query.Opera return nil, err } - f, err := args.GetRequiredFunction("fn") - if err != nil { + spec := new(MapOpSpec) + + if f, err := args.GetRequiredFunction("fn"); err != nil { return nil, err + } else { + fn, err := interpreter.ResolveFunction(f) + if err != nil { + return nil, err + } + spec.Fn = fn } - fn, err := interpreter.ResolveFunction(f) - if err != nil { + + if m, ok, err := args.GetBool("mergeKey"); err != nil { return nil, err + } else if ok { + spec.MergeKey = m + } else { + spec.MergeKey = true } - return &MapOpSpec{ - Fn: fn, - }, nil + return spec, nil } func newMapOp() query.OperationSpec { @@ -56,7 +67,8 @@ func (s *MapOpSpec) Kind() query.OperationKind { } type MapProcedureSpec struct { - Fn *semantic.FunctionExpression + Fn *semantic.FunctionExpression + MergeKey bool } func newMapProcedure(qs query.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) { @@ -66,7 +78,8 @@ func newMapProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Proce } return &MapProcedureSpec{ - Fn: spec.Fn, + Fn: spec.Fn, + MergeKey: spec.MergeKey, }, nil } @@ -75,6 +88,7 @@ func (s *MapProcedureSpec) Kind() plan.ProcedureKind { } func (s *MapProcedureSpec) Copy() plan.ProcedureSpec { ns := new(MapProcedureSpec) + *ns = *s ns.Fn = s.Fn.Copy().(*semantic.FunctionExpression) return ns } @@ -97,7 +111,8 @@ type mapTransformation struct { d execute.Dataset cache execute.BlockBuilderCache - fn *execute.RowMapFn + fn *execute.RowMapFn + mergeKey bool } func NewMapTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *MapProcedureSpec) (*mapTransformation, error) { @@ -106,9 +121,10 @@ func NewMapTransformation(d execute.Dataset, cache execute.BlockBuilderCache, sp return nil, err } return &mapTransformation{ - d: d, - cache: cache, - fn: fn, + d: d, + cache: cache, + fn: fn, + mergeKey: spec.MergeKey, }, nil } @@ -124,6 +140,19 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error { // TODO(nathanielc): Should we not fail the query for failed compilation? return err } + // Determine keys return from function + properties := t.fn.Type().Properties() + keys := make([]string, 0, len(properties)) + for k := range properties { + keys = append(keys, k) + } + sort.Strings(keys) + + // Determine on which cols to partition + on := make(map[string]bool, len(b.Key().Cols())) + for _, c := range b.Key().Cols() { + on[c.Label] = t.mergeKey || execute.ContainsStr(keys, c.Label) + } return b.Do(func(cr query.ColReader) error { l := cr.Len() @@ -133,17 +162,17 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error { log.Printf("failed to evaluate map expression: %v", err) continue } - key := execute.PartitionKeyForRow(i, cr) + key := partitionKeyForObject(i, cr, m, on) builder, created := t.cache.BlockBuilder(key) if created { - // Add columns from function in sorted order - properties := t.fn.Type().Properties() - keys := make([]string, 0, len(properties)) - for k := range properties { - keys = append(keys, k) + if t.mergeKey { + execute.AddBlockKeyCols(b.Key(), builder) } - sort.Strings(keys) + // Add columns from function in sorted order for _, k := range keys { + if t.mergeKey && b.Key().HasCol(k) { + continue + } builder.AddCol(query.ColMeta{ Label: k, Type: execute.ConvertFromKind(properties[k].Kind()), @@ -151,13 +180,51 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error { } } for j, c := range builder.Cols() { - v, _ := m.Get(c.Label) + v, ok := m.Get(c.Label) + if !ok { + if idx := execute.ColIdx(c.Label, b.Key().Cols()); t.mergeKey && idx >= 0 { + v = b.Key().Value(idx) + } else { + // This should be unreachable + return fmt.Errorf("could not find value for column %q", c.Label) + } + } execute.AppendValue(builder, j, v) } } return nil }) +} +func partitionKeyForObject(i int, cr query.ColReader, obj values.Object, on map[string]bool) query.PartitionKey { + cols := make([]query.ColMeta, 0, len(on)) + vs := make([]values.Value, 0, len(on)) + for j, c := range cr.Cols() { + if !on[c.Label] { + continue + } + cols = append(cols, c) + v, ok := obj.Get(c.Label) + if ok { + vs = append(vs, v) + } else { + switch c.Type { + case query.TBool: + vs = append(vs, values.NewBoolValue(cr.Bools(j)[i])) + case query.TInt: + vs = append(vs, values.NewIntValue(cr.Ints(j)[i])) + case query.TUInt: + vs = append(vs, values.NewUIntValue(cr.UInts(j)[i])) + case query.TFloat: + vs = append(vs, values.NewFloatValue(cr.Floats(j)[i])) + case query.TString: + vs = append(vs, values.NewStringValue(cr.Strings(j)[i])) + case query.TTime: + vs = append(vs, values.NewTimeValue(cr.Times(j)[i])) + } + } + } + return execute.NewPartitionKey(cols, vs) } func (t *mapTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error { diff --git a/query/functions/map_test.go b/query/functions/map_test.go index efe3d9b9ae..f9ac6a87d2 100644 --- a/query/functions/map_test.go +++ b/query/functions/map_test.go @@ -28,6 +28,7 @@ func TestMap_NewQuery(t *testing.T) { { ID: "map1", Spec: &functions.MapOpSpec{ + MergeKey: true, Fn: &semantic.FunctionExpression{ Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, Body: &semantic.BinaryExpression{ @@ -63,6 +64,7 @@ func TestMap_NewQuery(t *testing.T) { { ID: "map1", Spec: &functions.MapOpSpec{ + MergeKey: true, Fn: &semantic.FunctionExpression{ Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, Body: &semantic.BinaryExpression{ @@ -151,6 +153,7 @@ func TestMap_Process(t *testing.T) { { name: `_value+5`, spec: &functions.MapProcedureSpec{ + MergeKey: false, Fn: &semantic.FunctionExpression{ Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, Body: &semantic.ObjectExpression{ @@ -204,9 +207,308 @@ func TestMap_Process(t *testing.T) { }, }}, }, + { + name: `_value+5 mergeKey=true`, + spec: &functions.MapProcedureSpec{ + MergeKey: true, + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, + Body: &semantic.ObjectExpression{ + Properties: []*semantic.Property{ + { + Key: &semantic.Identifier{Name: "_time"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_time", + }, + }, + { + Key: &semantic.Identifier{Name: "_value"}, + Value: &semantic.BinaryExpression{ + Operator: ast.AdditionOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_value", + }, + Right: &semantic.FloatLiteral{ + Value: 5, + }, + }, + }, + }, + }, + }, + }, + data: []query.Block{&executetest.Block{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {"m", "A", execute.Time(1), 1.0}, + {"m", "A", execute.Time(2), 6.0}, + }, + }}, + want: []*executetest.Block{{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {"m", "A", execute.Time(1), 6.0}, + {"m", "A", execute.Time(2), 11.0}, + }, + }}, + }, + { + name: `_value+5 mergeKey=false drop cols`, + spec: &functions.MapProcedureSpec{ + MergeKey: false, + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, + Body: &semantic.ObjectExpression{ + Properties: []*semantic.Property{ + { + Key: &semantic.Identifier{Name: "_time"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_time", + }, + }, + { + Key: &semantic.Identifier{Name: "_value"}, + Value: &semantic.BinaryExpression{ + Operator: ast.AdditionOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_value", + }, + Right: &semantic.FloatLiteral{ + Value: 5, + }, + }, + }, + }, + }, + }, + }, + data: []query.Block{&executetest.Block{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {"m", "A", execute.Time(1), 1.0}, + {"m", "A", execute.Time(2), 6.0}, + }, + }}, + want: []*executetest.Block{{ + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {execute.Time(1), 6.0}, + {execute.Time(2), 11.0}, + }, + }}, + }, + { + name: `_value+5 mergeKey=true repartition`, + spec: &functions.MapProcedureSpec{ + MergeKey: true, + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, + Body: &semantic.ObjectExpression{ + Properties: []*semantic.Property{ + { + Key: &semantic.Identifier{Name: "_time"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_time", + }, + }, + { + Key: &semantic.Identifier{Name: "host"}, + Value: &semantic.BinaryExpression{ + Operator: ast.AdditionOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "host", + }, + Right: &semantic.StringLiteral{Value: ".local"}, + }, + }, + { + Key: &semantic.Identifier{Name: "_value"}, + Value: &semantic.BinaryExpression{ + Operator: ast.AdditionOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_value", + }, + Right: &semantic.FloatLiteral{ + Value: 5, + }, + }, + }, + }, + }, + }, + }, + data: []query.Block{&executetest.Block{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {"m", "A", execute.Time(1), 1.0}, + {"m", "A", execute.Time(2), 6.0}, + }, + }}, + want: []*executetest.Block{{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {"m", "A.local", execute.Time(1), 6.0}, + {"m", "A.local", execute.Time(2), 11.0}, + }, + }}, + }, + { + name: `_value+5 mergeKey=true repartition fan out`, + spec: &functions.MapProcedureSpec{ + MergeKey: true, + Fn: &semantic.FunctionExpression{ + Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, + Body: &semantic.ObjectExpression{ + Properties: []*semantic.Property{ + { + Key: &semantic.Identifier{Name: "_time"}, + Value: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_time", + }, + }, + { + Key: &semantic.Identifier{Name: "host"}, + Value: &semantic.BinaryExpression{ + Operator: ast.AdditionOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "host", + }, + Right: &semantic.BinaryExpression{ + Operator: ast.AdditionOperator, + Left: &semantic.StringLiteral{Value: "."}, + Right: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "domain", + }, + }, + }, + }, + { + Key: &semantic.Identifier{Name: "_value"}, + Value: &semantic.BinaryExpression{ + Operator: ast.AdditionOperator, + Left: &semantic.MemberExpression{ + Object: &semantic.IdentifierExpression{ + Name: "r", + }, + Property: "_value", + }, + Right: &semantic.FloatLiteral{ + Value: 5, + }, + }, + }, + }, + }, + }, + }, + data: []query.Block{&executetest.Block{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "domain", Type: query.TString}, + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {"m", "A", "example.com", execute.Time(1), 1.0}, + {"m", "A", "www.example.com", execute.Time(2), 6.0}, + }, + }}, + want: []*executetest.Block{ + { + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {"m", "A.example.com", execute.Time(1), 6.0}, + }, + }, + { + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {"m", "A.www.example.com", execute.Time(2), 11.0}, + }, + }, + }, + }, { name: `_value*_value`, spec: &functions.MapProcedureSpec{ + MergeKey: false, Fn: &semantic.FunctionExpression{ Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, Body: &semantic.ObjectExpression{ @@ -266,6 +568,7 @@ func TestMap_Process(t *testing.T) { { name: "float(r._value) int", spec: &functions.MapProcedureSpec{ + MergeKey: false, Fn: &semantic.FunctionExpression{ Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, Body: &semantic.ObjectExpression{ @@ -324,6 +627,7 @@ func TestMap_Process(t *testing.T) { { name: "float(r._value) uint", spec: &functions.MapProcedureSpec{ + MergeKey: false, Fn: &semantic.FunctionExpression{ Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}}, Body: &semantic.ObjectExpression{ diff --git a/query/functions/set.go b/query/functions/set.go index 525ca24825..b7c7b7a80e 100644 --- a/query/functions/set.go +++ b/query/functions/set.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/platform/query/execute" "github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/semantic" + "github.com/influxdata/platform/query/values" ) const SetKind = "set" @@ -124,16 +125,16 @@ func (t *setTransformation) Process(id execute.DatasetID, b query.Block) error { if idx := execute.ColIdx(t.key, key.Cols()); idx >= 0 { // Update key cols := make([]query.ColMeta, len(key.Cols())) - values := make([]interface{}, len(key.Cols())) + vs := make([]values.Value, len(key.Cols())) for j, c := range key.Cols() { cols[j] = c if j == idx { - values[j] = t.value + vs[j] = values.NewStringValue(t.value) } else { - values[j] = key.Value(j) + vs[j] = key.Value(j) } } - key = execute.NewPartitionKey(cols, values) + key = execute.NewPartitionKey(cols, vs) } builder, created := t.cache.BlockBuilder(key) if created { diff --git a/query/functions/shift.go b/query/functions/shift.go index 0d25493aeb..bd0d7dd611 100644 --- a/query/functions/shift.go +++ b/query/functions/shift.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/platform/query/interpreter" "github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/semantic" + "github.com/influxdata/platform/query/values" ) const ShiftKind = "shift" @@ -134,20 +135,20 @@ func (t *shiftTransformation) Process(id execute.DatasetID, b query.Block) error key := b.Key() // Update key cols := make([]query.ColMeta, len(key.Cols())) - values := make([]interface{}, len(key.Cols())) + vs := make([]values.Value, len(key.Cols())) for j, c := range key.Cols() { if execute.ContainsStr(t.columns, c.Label) { if c.Type != query.TTime { return fmt.Errorf("column %q is not of type time", c.Label) } cols[j] = c - values[j] = key.ValueTime(j).Add(t.shift) + vs[j] = values.NewTimeValue(key.ValueTime(j).Add(t.shift)) } else { cols[j] = c - values[j] = key.Value(j) + vs[j] = key.Value(j) } } - key = execute.NewPartitionKey(cols, values) + key = execute.NewPartitionKey(cols, vs) builder, created := t.cache.BlockBuilder(key) if !created { diff --git a/query/functions/storage/pb/reader.go b/query/functions/storage/pb/reader.go index 70bed1a3af..c2349f6329 100644 --- a/query/functions/storage/pb/reader.go +++ b/query/functions/storage/pb/reader.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute" "github.com/influxdata/platform/query/functions/storage" + "github.com/influxdata/platform/query/values" "github.com/influxdata/yarpc" "github.com/pkg/errors" ) @@ -279,17 +280,17 @@ func determineBlockColsForSeries(s *ReadResponse_SeriesFrame, typ query.DataType func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey { cols := make([]query.ColMeta, 2, len(s.Tags)) - values := make([]interface{}, 2, len(s.Tags)) + vs := make([]values.Value, 2, len(s.Tags)) cols[0] = query.ColMeta{ Label: execute.DefaultStartColLabel, Type: query.TTime, } - values[0] = bnds.Start + vs[0] = values.NewTimeValue(bnds.Start) cols[1] = query.ColMeta{ Label: execute.DefaultStopColLabel, Type: query.TTime, } - values[1] = bnds.Stop + vs[1] = values.NewTimeValue(bnds.Stop) switch readSpec.GroupMode { case storage.GroupModeBy: // partition key in GroupKeys order, including tags in the GroupKeys slice @@ -299,7 +300,7 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp Label: string(s.Tags[i].Key), Type: query.TString, }) - values = append(values, string(s.Tags[i].Value)) + vs = append(vs, values.NewStringValue(string(s.Tags[i].Value))) } } case storage.GroupModeExcept: @@ -310,7 +311,7 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp Label: string(s.Tags[i].Key), Type: query.TString, }) - values = append(values, string(s.Tags[i].Value)) + vs = append(vs, values.NewStringValue(string(s.Tags[i].Value))) } } case storage.GroupModeAll: @@ -319,10 +320,10 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp Label: string(s.Tags[i].Key), Type: query.TString, }) - values = append(values, string(s.Tags[i].Value)) + vs = append(vs, values.NewStringValue(string(s.Tags[i].Value))) } } - return execute.NewPartitionKey(cols, values) + return execute.NewPartitionKey(cols, vs) } func determineBlockColsForGroup(f *ReadResponse_GroupFrame, typ query.DataType) ([]query.ColMeta, [][]byte) { @@ -357,25 +358,25 @@ func determineBlockColsForGroup(f *ReadResponse_GroupFrame, typ query.DataType) func partitionKeyForGroup(g *ReadResponse_GroupFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey { cols := make([]query.ColMeta, 2, len(readSpec.GroupKeys)+2) - values := make([]interface{}, 2, len(readSpec.GroupKeys)+2) + vs := make([]values.Value, 2, len(readSpec.GroupKeys)+2) cols[0] = query.ColMeta{ Label: execute.DefaultStartColLabel, Type: query.TTime, } - values[0] = bnds.Start + vs[0] = values.NewTimeValue(bnds.Start) cols[1] = query.ColMeta{ Label: execute.DefaultStopColLabel, Type: query.TTime, } - values[1] = bnds.Stop + vs[1] = values.NewTimeValue(bnds.Stop) for i := range readSpec.GroupKeys { cols = append(cols, query.ColMeta{ Label: readSpec.GroupKeys[i], Type: query.TString, }) - values = append(values, string(g.PartitionKeyVals[i])) + vs = append(vs, values.NewStringValue(string(g.PartitionKeyVals[i]))) } - return execute.NewPartitionKey(cols, values) + return execute.NewPartitionKey(cols, vs) } // block implement OneTimeBlock as it can only be read once. diff --git a/query/functions/window.go b/query/functions/window.go index b9bd24c859..8d7c43a130 100644 --- a/query/functions/window.go +++ b/query/functions/window.go @@ -294,19 +294,19 @@ func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Block) for _, bnds := range bounds { // Update key cols := make([]query.ColMeta, len(keyCols)) - values := make([]interface{}, len(keyCols)) + vs := make([]values.Value, len(keyCols)) for j, c := range keyCols { cols[j] = c switch c.Label { case t.startColLabel: - values[j] = bnds.Start + vs[j] = values.NewTimeValue(bnds.Start) case t.stopColLabel: - values[j] = bnds.Stop + vs[j] = values.NewTimeValue(bnds.Stop) default: - values[j] = b.Key().Value(keyColMap[j]) + vs[j] = b.Key().Value(keyColMap[j]) } } - key := execute.NewPartitionKey(cols, values) + key := execute.NewPartitionKey(cols, vs) builder, created := t.cache.BlockBuilder(key) if created { for _, c := range newCols { diff --git a/query/influxql/result.go b/query/influxql/result.go index 8e23a2c3bc..8681efa677 100644 --- a/query/influxql/result.go +++ b/query/influxql/result.go @@ -49,7 +49,7 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e if c.Type != query.TString { return fmt.Errorf("partition column %q is not a string type", c.Label) } - v := b.Key().Value(j).(string) + v := b.Key().Value(j).Str() if c.Label == "_measurement" { r.Name = v } else { diff --git a/query/query_test/test_cases/simple_max.ifql b/query/query_test/test_cases/simple_max.ifql index c518cf4b1c..539e0ccc70 100644 --- a/query/query_test/test_cases/simple_max.ifql +++ b/query/query_test/test_cases/simple_max.ifql @@ -2,5 +2,5 @@ from(db:"test") |> range(start:-5m) |> group(by:["_measurement"]) |> max() - |> map(fn: (r) => {_time:r._time, _measurement:r._measurement, _field: r._field, max:r._value}) + |> map(fn: (r) => {_time:r._time, _measurement:r._measurement, _field: r._field, max:r._value}, mergeKey:false) |> yield(name:"0") diff --git a/query/result.go b/query/result.go index 9ccdd0422f..69d45d1d9e 100644 --- a/query/result.go +++ b/query/result.go @@ -101,12 +101,8 @@ type PartitionKey interface { ValueString(j int) string ValueDuration(j int) values.Duration ValueTime(j int) values.Time - Value(j int) interface{} + Value(j int) values.Value - // Intersect returns a new PartitionKey with only columns in the list of labels. - Intersect(labels []string) PartitionKey - // Diff returns the labels that exist in list of labels but not in the key's columns. - Diff(labels []string) []string Hash() uint64 Equal(o PartitionKey) bool Less(o PartitionKey) bool diff --git a/query/values/values.go b/query/values/values.go index 37a995b3b6..362978cafb 100644 --- a/query/values/values.go +++ b/query/values/values.go @@ -88,6 +88,57 @@ func (v value) Function() Function { // InvalidValue is a non nil value who's type is semantic.Invalid var InvalidValue = value{t: semantic.Invalid} +func NewValue(v interface{}, k semantic.Kind) (Value, error) { + switch k { + case semantic.String: + _, ok := v.(string) + if !ok { + return nil, fmt.Errorf("string value must have type string, got %T", v) + } + case semantic.Int: + _, ok := v.(int64) + if !ok { + return nil, fmt.Errorf("int value must have type int64, got %T", v) + } + case semantic.UInt: + _, ok := v.(uint64) + if !ok { + return nil, fmt.Errorf("uint value must have type uint64, got %T", v) + } + case semantic.Float: + _, ok := v.(float64) + if !ok { + return nil, fmt.Errorf("float value must have type float64, got %T", v) + } + case semantic.Bool: + _, ok := v.(bool) + if !ok { + return nil, fmt.Errorf("bool value must have type bool, got %T", v) + } + case semantic.Time: + _, ok := v.(Time) + if !ok { + return nil, fmt.Errorf("time value must have type Time, got %T", v) + } + case semantic.Duration: + _, ok := v.(Duration) + if !ok { + return nil, fmt.Errorf("duration value must have type Duration, got %T", v) + } + case semantic.Regexp: + _, ok := v.(*regexp.Regexp) + if !ok { + return nil, fmt.Errorf("regexp value must have type *regexp.Regexp, got %T", v) + } + default: + return nil, fmt.Errorf("unsupported value kind %v", k) + } + return value{ + t: k, + v: v, + }, nil +} + func NewStringValue(v string) Value { return value{ t: semantic.String,