feat(query/functions/map): Add mergeKey argument to map

In order to facilitate this change the query.PartitionKey
interface was changed to use the values.Value interface.

Additionally map was previously broken when it needed to repartition.
Tests have been added.
pull/10616/head
Nathaniel Cook 2018-05-24 09:08:32 -06:00
parent 2a6885ba46
commit 44ea17cab3
15 changed files with 587 additions and 143 deletions

View File

@ -232,7 +232,7 @@ type tableMetadata struct {
TableID string
Cols []colMeta
Partitions []bool
Defaults []interface{}
Defaults []values.Value
NumFields int
}
@ -319,7 +319,7 @@ func readMetadata(r *csv.Reader, c ResultDecoderConfig, extraLine []string) (tab
}
cols := make([]colMeta, len(labels))
defaultValues := make([]interface{}, len(labels))
defaultValues := make([]values.Value, len(labels))
partitionValues := make([]bool, len(labels))
for j, label := range labels {
@ -502,10 +502,10 @@ func (b *blockDecoder) init(line []string) error {
record = line[recordStartIdx:]
}
keyCols := make([]query.ColMeta, 0, len(b.meta.Cols))
keyValues := make([]interface{}, 0, len(b.meta.Cols))
keyValues := make([]values.Value, 0, len(b.meta.Cols))
for j, c := range b.meta.Cols {
if b.meta.Partitions[j] {
var value interface{}
var value values.Value
if b.meta.Defaults[j] != nil {
value = b.meta.Defaults[j]
} else if record != nil {
@ -537,22 +537,22 @@ func (b *blockDecoder) appendRecord(record []string) error {
if record[j] == "" && b.meta.Defaults[j] != nil {
switch c.Type {
case query.TBool:
v := b.meta.Defaults[j].(bool)
v := b.meta.Defaults[j].Bool()
b.builder.AppendBool(j, v)
case query.TInt:
v := b.meta.Defaults[j].(int64)
v := b.meta.Defaults[j].Int()
b.builder.AppendInt(j, v)
case query.TUInt:
v := b.meta.Defaults[j].(uint64)
v := b.meta.Defaults[j].UInt()
b.builder.AppendUInt(j, v)
case query.TFloat:
v := b.meta.Defaults[j].(float64)
v := b.meta.Defaults[j].Float()
b.builder.AppendFloat(j, v)
case query.TString:
v := b.meta.Defaults[j].(string)
v := b.meta.Defaults[j].Str()
b.builder.AppendString(j, v)
case query.TTime:
v := b.meta.Defaults[j].(execute.Time)
v := b.meta.Defaults[j].Time()
b.builder.AppendTime(j, v)
default:
return fmt.Errorf("unsupported column type %v", c.Type)
@ -843,24 +843,45 @@ func writeDefaults(writer *csv.Writer, row, defaults []string) error {
return writer.Write(row)
}
func decodeValue(value string, c colMeta) (v interface{}, err error) {
func decodeValue(value string, c colMeta) (values.Value, error) {
var val values.Value
switch c.Type {
case query.TBool:
v, err = strconv.ParseBool(value)
v, err := strconv.ParseBool(value)
if err != nil {
return nil, err
}
val = values.NewBoolValue(v)
case query.TInt:
v, err = strconv.ParseInt(value, 10, 64)
v, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return nil, err
}
val = values.NewIntValue(v)
case query.TUInt:
v, err = strconv.ParseUint(value, 10, 64)
v, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, err
}
val = values.NewUIntValue(v)
case query.TFloat:
v, err = strconv.ParseFloat(value, 64)
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, err
}
val = values.NewFloatValue(v)
case query.TString:
v = value
val = values.NewStringValue(value)
case query.TTime:
v, err = decodeTime(value, c.fmt)
v, err := decodeTime(value, c.fmt)
if err != nil {
return nil, err
}
val = values.NewTimeValue(v)
default:
return nil, fmt.Errorf("unsupported type %v", c.Type)
}
return
return val, nil
}
func decodeValueInto(j int, c colMeta, value string, builder execute.BlockBuilder) error {
@ -903,20 +924,20 @@ func decodeValueInto(j int, c colMeta, value string, builder execute.BlockBuilde
return nil
}
func encodeValue(value interface{}, c colMeta) (string, error) {
func encodeValue(value values.Value, c colMeta) (string, error) {
switch c.Type {
case query.TBool:
return strconv.FormatBool(value.(bool)), nil
return strconv.FormatBool(value.Bool()), nil
case query.TInt:
return strconv.FormatInt(value.(int64), 10), nil
return strconv.FormatInt(value.Int(), 10), nil
case query.TUInt:
return strconv.FormatUint(value.(uint64), 10), nil
return strconv.FormatUint(value.UInt(), 10), nil
case query.TFloat:
return strconv.FormatFloat(value.(float64), 'f', -1, 64), nil
return strconv.FormatFloat(value.Float(), 'f', -1, 64), nil
case query.TString:
return value.(string), nil
return value.Str(), nil
case query.TTime:
return encodeTime(value.(execute.Time), c.fmt), nil
return encodeTime(value.Time(), c.fmt), nil
default:
return "", fmt.Errorf("unknown type %v", c.Type)
}

View File

@ -999,12 +999,19 @@ Map applies a function to each record of the input tables.
The modified records are assigned to new tables based on the partition key of the input table.
The output tables are the result of applying the map function to each record on the input tables.
When the output record contains a different value for the partition key the record is repartitioned into the appropriate table.
When the output record drops a column that was part of the partition key that column is removed from the partition key.
Map has the following properties:
* `fn` function
Function to apply to each record.
The return value must be an object.
Only properties defined on the return object will be present on the output records.
* `mergeKey` bool
MergeKey indicates if the record returned from fn should be merged with the partition key.
When merging, all columns on the partition key will be added to the record giving precedence to any columns already present on the record.
When not merging, only columns defined on the returned record will be present on the output records.
Defaults to true.
#### Range

View File

@ -7,6 +7,7 @@ import (
"sync/atomic"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/values"
"github.com/pkg/errors"
)
@ -19,12 +20,12 @@ const (
type partitionKey struct {
cols []query.ColMeta
values []interface{}
values []values.Value
hasHash bool
hash uint64
}
func NewPartitionKey(cols []query.ColMeta, values []interface{}) query.PartitionKey {
func NewPartitionKey(cols []query.ColMeta, values []values.Value) query.PartitionKey {
return &partitionKey{
cols: cols,
values: values,
@ -37,60 +38,29 @@ func (k *partitionKey) Cols() []query.ColMeta {
func (k *partitionKey) HasCol(label string) bool {
return ColIdx(label, k.cols) >= 0
}
func (k *partitionKey) Value(j int) interface{} {
func (k *partitionKey) Value(j int) values.Value {
return k.values[j]
}
func (k *partitionKey) ValueBool(j int) bool {
return k.values[j].(bool)
return k.values[j].Bool()
}
func (k *partitionKey) ValueUInt(j int) uint64 {
return k.values[j].(uint64)
return k.values[j].UInt()
}
func (k *partitionKey) ValueInt(j int) int64 {
return k.values[j].(int64)
return k.values[j].Int()
}
func (k *partitionKey) ValueFloat(j int) float64 {
return k.values[j].(float64)
return k.values[j].Float()
}
func (k *partitionKey) ValueString(j int) string {
return k.values[j].(string)
return k.values[j].Str()
}
func (k *partitionKey) ValueDuration(j int) Duration {
return k.values[j].(Duration)
return k.values[j].Duration()
}
func (k *partitionKey) ValueTime(j int) Time {
return k.values[j].(Time)
}
func (k *partitionKey) Intersect(keys []string) query.PartitionKey {
nk := &partitionKey{
cols: make([]query.ColMeta, 0, len(k.cols)),
values: make([]interface{}, 0, len(k.values)),
}
for i, c := range k.cols {
found := false
for _, label := range keys {
if c.Label == label {
found = true
break
}
}
if found {
nk.cols = append(nk.cols, c)
nk.values = append(nk.values, k.values[i])
}
}
return nk
}
func (k *partitionKey) Diff(labels []string) []string {
diff := make([]string, 0, len(labels))
for _, label := range labels {
if ColIdx(label, k.cols) < 0 {
diff = append(diff, label)
}
}
return diff
return k.values[j].Time()
}
func (k *partitionKey) Hash() uint64 {
@ -209,7 +179,7 @@ func PartitionKeyForRow(i int, cr query.ColReader) query.PartitionKey {
key := cr.Key()
cols := cr.Cols()
colsCpy := make([]query.ColMeta, 0, len(cols))
values := make([]interface{}, 0, len(cols))
vs := make([]values.Value, 0, len(cols))
for j, c := range cols {
if !key.HasCol(c.Label) {
continue
@ -217,28 +187,28 @@ func PartitionKeyForRow(i int, cr query.ColReader) query.PartitionKey {
colsCpy = append(colsCpy, c)
switch c.Type {
case query.TBool:
values = append(values, cr.Bools(j)[i])
vs = append(vs, values.NewBoolValue(cr.Bools(j)[i]))
case query.TInt:
values = append(values, cr.Ints(j)[i])
vs = append(vs, values.NewIntValue(cr.Ints(j)[i]))
case query.TUInt:
values = append(values, cr.UInts(j)[i])
vs = append(vs, values.NewUIntValue(cr.UInts(j)[i]))
case query.TFloat:
values = append(values, cr.Floats(j)[i])
vs = append(vs, values.NewFloatValue(cr.Floats(j)[i]))
case query.TString:
values = append(values, cr.Strings(j)[i])
vs = append(vs, values.NewStringValue(cr.Strings(j)[i]))
case query.TTime:
values = append(values, cr.Times(j)[i])
vs = append(vs, values.NewTimeValue(cr.Times(j)[i]))
}
}
return &partitionKey{
cols: colsCpy,
values: values,
values: vs,
}
}
func PartitionKeyForRowOn(i int, cr query.ColReader, on map[string]bool) query.PartitionKey {
cols := make([]query.ColMeta, 0, len(on))
values := make([]interface{}, 0, len(on))
vs := make([]values.Value, 0, len(on))
for j, c := range cr.Cols() {
if !on[c.Label] {
continue
@ -246,20 +216,20 @@ func PartitionKeyForRowOn(i int, cr query.ColReader, on map[string]bool) query.P
cols = append(cols, c)
switch c.Type {
case query.TBool:
values = append(values, cr.Bools(j)[i])
vs = append(vs, values.NewBoolValue(cr.Bools(j)[i]))
case query.TInt:
values = append(values, cr.Ints(j)[i])
vs = append(vs, values.NewIntValue(cr.Ints(j)[i]))
case query.TUInt:
values = append(values, cr.UInts(j)[i])
vs = append(vs, values.NewUIntValue(cr.UInts(j)[i]))
case query.TFloat:
values = append(values, cr.Floats(j)[i])
vs = append(vs, values.NewFloatValue(cr.Floats(j)[i]))
case query.TString:
values = append(values, cr.Strings(j)[i])
vs = append(vs, values.NewStringValue(cr.Strings(j)[i]))
case query.TTime:
values = append(values, cr.Times(j)[i])
vs = append(vs, values.NewTimeValue(cr.Times(j)[i]))
}
}
return NewPartitionKey(cols, values)
return NewPartitionKey(cols, vs)
}
// OneTimeBlock is a Block that permits reading data only once.

View File

@ -5,6 +5,7 @@ import (
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/values"
)
// Block is an implementation of execute.Block
@ -31,6 +32,7 @@ type Block struct {
func (b *Block) Normalize() {
if b.PartitionKey == nil {
cols := make([]query.ColMeta, len(b.KeyCols))
vs := make([]values.Value, len(b.KeyCols))
if len(b.KeyValues) != len(b.KeyCols) {
b.KeyValues = make([]interface{}, len(b.KeyCols))
}
@ -43,8 +45,13 @@ func (b *Block) Normalize() {
if len(b.Data) > 0 {
b.KeyValues[j] = b.Data[0][idx]
}
v, err := values.NewValue(b.KeyValues[j], execute.ConvertToKind(cols[j].Type))
if err != nil {
panic(err)
}
vs[j] = v
}
b.PartitionKey = execute.NewPartitionKey(cols, b.KeyValues)
b.PartitionKey = execute.NewPartitionKey(cols, vs)
}
}
@ -150,7 +157,24 @@ func ConvertBlock(b query.Block) (*Block, error) {
blk.KeyValues = make([]interface{}, len(keyCols))
for j, c := range keyCols {
blk.KeyCols[j] = c.Label
blk.KeyValues[j] = key.Value(j)
var v interface{}
switch c.Type {
case query.TBool:
v = key.ValueBool(j)
case query.TUInt:
v = key.ValueUInt(j)
case query.TInt:
v = key.ValueInt(j)
case query.TFloat:
v = key.ValueFloat(j)
case query.TString:
v = key.ValueString(j)
case query.TTime:
v = key.ValueTime(j)
default:
return nil, fmt.Errorf("unsupported column type %v", c.Type)
}
blk.KeyValues[j] = v
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/values"
)
const (
@ -47,10 +48,10 @@ func init() {
{Label: execute.DefaultStopColLabel, Type: query.TTime},
{Label: "t1", Type: query.TString},
},
[]interface{}{
start,
stop,
t1Value,
[]values.Value{
values.NewTimeValue(start),
values.NewTimeValue(stop),
values.NewStringValue(t1Value),
},
)
normalBlockBuilder := execute.NewColListBlockBuilder(key, executetest.UnlimitedAllocator)

View File

@ -10,12 +10,14 @@ import (
"github.com/influxdata/platform/query/interpreter"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
)
const MapKind = "map"
type MapOpSpec struct {
Fn *semantic.FunctionExpression `json:"fn"`
Fn *semantic.FunctionExpression `json:"fn"`
MergeKey bool `json:"mergeKey"`
}
var mapSignature = query.DefaultFunctionSignature()
@ -34,17 +36,26 @@ func createMapOpSpec(args query.Arguments, a *query.Administration) (query.Opera
return nil, err
}
f, err := args.GetRequiredFunction("fn")
if err != nil {
spec := new(MapOpSpec)
if f, err := args.GetRequiredFunction("fn"); err != nil {
return nil, err
} else {
fn, err := interpreter.ResolveFunction(f)
if err != nil {
return nil, err
}
spec.Fn = fn
}
fn, err := interpreter.ResolveFunction(f)
if err != nil {
if m, ok, err := args.GetBool("mergeKey"); err != nil {
return nil, err
} else if ok {
spec.MergeKey = m
} else {
spec.MergeKey = true
}
return &MapOpSpec{
Fn: fn,
}, nil
return spec, nil
}
func newMapOp() query.OperationSpec {
@ -56,7 +67,8 @@ func (s *MapOpSpec) Kind() query.OperationKind {
}
type MapProcedureSpec struct {
Fn *semantic.FunctionExpression
Fn *semantic.FunctionExpression
MergeKey bool
}
func newMapProcedure(qs query.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
@ -66,7 +78,8 @@ func newMapProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Proce
}
return &MapProcedureSpec{
Fn: spec.Fn,
Fn: spec.Fn,
MergeKey: spec.MergeKey,
}, nil
}
@ -75,6 +88,7 @@ func (s *MapProcedureSpec) Kind() plan.ProcedureKind {
}
func (s *MapProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(MapProcedureSpec)
*ns = *s
ns.Fn = s.Fn.Copy().(*semantic.FunctionExpression)
return ns
}
@ -97,7 +111,8 @@ type mapTransformation struct {
d execute.Dataset
cache execute.BlockBuilderCache
fn *execute.RowMapFn
fn *execute.RowMapFn
mergeKey bool
}
func NewMapTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *MapProcedureSpec) (*mapTransformation, error) {
@ -106,9 +121,10 @@ func NewMapTransformation(d execute.Dataset, cache execute.BlockBuilderCache, sp
return nil, err
}
return &mapTransformation{
d: d,
cache: cache,
fn: fn,
d: d,
cache: cache,
fn: fn,
mergeKey: spec.MergeKey,
}, nil
}
@ -124,6 +140,19 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error {
// TODO(nathanielc): Should we not fail the query for failed compilation?
return err
}
// Determine keys return from function
properties := t.fn.Type().Properties()
keys := make([]string, 0, len(properties))
for k := range properties {
keys = append(keys, k)
}
sort.Strings(keys)
// Determine on which cols to partition
on := make(map[string]bool, len(b.Key().Cols()))
for _, c := range b.Key().Cols() {
on[c.Label] = t.mergeKey || execute.ContainsStr(keys, c.Label)
}
return b.Do(func(cr query.ColReader) error {
l := cr.Len()
@ -133,17 +162,17 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error {
log.Printf("failed to evaluate map expression: %v", err)
continue
}
key := execute.PartitionKeyForRow(i, cr)
key := partitionKeyForObject(i, cr, m, on)
builder, created := t.cache.BlockBuilder(key)
if created {
// Add columns from function in sorted order
properties := t.fn.Type().Properties()
keys := make([]string, 0, len(properties))
for k := range properties {
keys = append(keys, k)
if t.mergeKey {
execute.AddBlockKeyCols(b.Key(), builder)
}
sort.Strings(keys)
// Add columns from function in sorted order
for _, k := range keys {
if t.mergeKey && b.Key().HasCol(k) {
continue
}
builder.AddCol(query.ColMeta{
Label: k,
Type: execute.ConvertFromKind(properties[k].Kind()),
@ -151,13 +180,51 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error {
}
}
for j, c := range builder.Cols() {
v, _ := m.Get(c.Label)
v, ok := m.Get(c.Label)
if !ok {
if idx := execute.ColIdx(c.Label, b.Key().Cols()); t.mergeKey && idx >= 0 {
v = b.Key().Value(idx)
} else {
// This should be unreachable
return fmt.Errorf("could not find value for column %q", c.Label)
}
}
execute.AppendValue(builder, j, v)
}
}
return nil
})
}
func partitionKeyForObject(i int, cr query.ColReader, obj values.Object, on map[string]bool) query.PartitionKey {
cols := make([]query.ColMeta, 0, len(on))
vs := make([]values.Value, 0, len(on))
for j, c := range cr.Cols() {
if !on[c.Label] {
continue
}
cols = append(cols, c)
v, ok := obj.Get(c.Label)
if ok {
vs = append(vs, v)
} else {
switch c.Type {
case query.TBool:
vs = append(vs, values.NewBoolValue(cr.Bools(j)[i]))
case query.TInt:
vs = append(vs, values.NewIntValue(cr.Ints(j)[i]))
case query.TUInt:
vs = append(vs, values.NewUIntValue(cr.UInts(j)[i]))
case query.TFloat:
vs = append(vs, values.NewFloatValue(cr.Floats(j)[i]))
case query.TString:
vs = append(vs, values.NewStringValue(cr.Strings(j)[i]))
case query.TTime:
vs = append(vs, values.NewTimeValue(cr.Times(j)[i]))
}
}
}
return execute.NewPartitionKey(cols, vs)
}
func (t *mapTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {

View File

@ -28,6 +28,7 @@ func TestMap_NewQuery(t *testing.T) {
{
ID: "map1",
Spec: &functions.MapOpSpec{
MergeKey: true,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.BinaryExpression{
@ -63,6 +64,7 @@ func TestMap_NewQuery(t *testing.T) {
{
ID: "map1",
Spec: &functions.MapOpSpec{
MergeKey: true,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.BinaryExpression{
@ -151,6 +153,7 @@ func TestMap_Process(t *testing.T) {
{
name: `_value+5`,
spec: &functions.MapProcedureSpec{
MergeKey: false,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.ObjectExpression{
@ -204,9 +207,308 @@ func TestMap_Process(t *testing.T) {
},
}},
},
{
name: `_value+5 mergeKey=true`,
spec: &functions.MapProcedureSpec{
MergeKey: true,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.ObjectExpression{
Properties: []*semantic.Property{
{
Key: &semantic.Identifier{Name: "_time"},
Value: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_time",
},
},
{
Key: &semantic.Identifier{Name: "_value"},
Value: &semantic.BinaryExpression{
Operator: ast.AdditionOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_value",
},
Right: &semantic.FloatLiteral{
Value: 5,
},
},
},
},
},
},
},
data: []query.Block{&executetest.Block{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{"m", "A", execute.Time(1), 1.0},
{"m", "A", execute.Time(2), 6.0},
},
}},
want: []*executetest.Block{{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{"m", "A", execute.Time(1), 6.0},
{"m", "A", execute.Time(2), 11.0},
},
}},
},
{
name: `_value+5 mergeKey=false drop cols`,
spec: &functions.MapProcedureSpec{
MergeKey: false,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.ObjectExpression{
Properties: []*semantic.Property{
{
Key: &semantic.Identifier{Name: "_time"},
Value: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_time",
},
},
{
Key: &semantic.Identifier{Name: "_value"},
Value: &semantic.BinaryExpression{
Operator: ast.AdditionOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_value",
},
Right: &semantic.FloatLiteral{
Value: 5,
},
},
},
},
},
},
},
data: []query.Block{&executetest.Block{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{"m", "A", execute.Time(1), 1.0},
{"m", "A", execute.Time(2), 6.0},
},
}},
want: []*executetest.Block{{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{execute.Time(1), 6.0},
{execute.Time(2), 11.0},
},
}},
},
{
name: `_value+5 mergeKey=true repartition`,
spec: &functions.MapProcedureSpec{
MergeKey: true,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.ObjectExpression{
Properties: []*semantic.Property{
{
Key: &semantic.Identifier{Name: "_time"},
Value: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_time",
},
},
{
Key: &semantic.Identifier{Name: "host"},
Value: &semantic.BinaryExpression{
Operator: ast.AdditionOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "host",
},
Right: &semantic.StringLiteral{Value: ".local"},
},
},
{
Key: &semantic.Identifier{Name: "_value"},
Value: &semantic.BinaryExpression{
Operator: ast.AdditionOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_value",
},
Right: &semantic.FloatLiteral{
Value: 5,
},
},
},
},
},
},
},
data: []query.Block{&executetest.Block{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{"m", "A", execute.Time(1), 1.0},
{"m", "A", execute.Time(2), 6.0},
},
}},
want: []*executetest.Block{{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{"m", "A.local", execute.Time(1), 6.0},
{"m", "A.local", execute.Time(2), 11.0},
},
}},
},
{
name: `_value+5 mergeKey=true repartition fan out`,
spec: &functions.MapProcedureSpec{
MergeKey: true,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.ObjectExpression{
Properties: []*semantic.Property{
{
Key: &semantic.Identifier{Name: "_time"},
Value: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_time",
},
},
{
Key: &semantic.Identifier{Name: "host"},
Value: &semantic.BinaryExpression{
Operator: ast.AdditionOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "host",
},
Right: &semantic.BinaryExpression{
Operator: ast.AdditionOperator,
Left: &semantic.StringLiteral{Value: "."},
Right: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "domain",
},
},
},
},
{
Key: &semantic.Identifier{Name: "_value"},
Value: &semantic.BinaryExpression{
Operator: ast.AdditionOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{
Name: "r",
},
Property: "_value",
},
Right: &semantic.FloatLiteral{
Value: 5,
},
},
},
},
},
},
},
data: []query.Block{&executetest.Block{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "domain", Type: query.TString},
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{"m", "A", "example.com", execute.Time(1), 1.0},
{"m", "A", "www.example.com", execute.Time(2), 6.0},
},
}},
want: []*executetest.Block{
{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{"m", "A.example.com", execute.Time(1), 6.0},
},
},
{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{"m", "A.www.example.com", execute.Time(2), 11.0},
},
},
},
},
{
name: `_value*_value`,
spec: &functions.MapProcedureSpec{
MergeKey: false,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.ObjectExpression{
@ -266,6 +568,7 @@ func TestMap_Process(t *testing.T) {
{
name: "float(r._value) int",
spec: &functions.MapProcedureSpec{
MergeKey: false,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.ObjectExpression{
@ -324,6 +627,7 @@ func TestMap_Process(t *testing.T) {
{
name: "float(r._value) uint",
spec: &functions.MapProcedureSpec{
MergeKey: false,
Fn: &semantic.FunctionExpression{
Params: []*semantic.FunctionParam{{Key: &semantic.Identifier{Name: "r"}}},
Body: &semantic.ObjectExpression{

View File

@ -7,6 +7,7 @@ import (
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
)
const SetKind = "set"
@ -124,16 +125,16 @@ func (t *setTransformation) Process(id execute.DatasetID, b query.Block) error {
if idx := execute.ColIdx(t.key, key.Cols()); idx >= 0 {
// Update key
cols := make([]query.ColMeta, len(key.Cols()))
values := make([]interface{}, len(key.Cols()))
vs := make([]values.Value, len(key.Cols()))
for j, c := range key.Cols() {
cols[j] = c
if j == idx {
values[j] = t.value
vs[j] = values.NewStringValue(t.value)
} else {
values[j] = key.Value(j)
vs[j] = key.Value(j)
}
}
key = execute.NewPartitionKey(cols, values)
key = execute.NewPartitionKey(cols, vs)
}
builder, created := t.cache.BlockBuilder(key)
if created {

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/platform/query/interpreter"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
)
const ShiftKind = "shift"
@ -134,20 +135,20 @@ func (t *shiftTransformation) Process(id execute.DatasetID, b query.Block) error
key := b.Key()
// Update key
cols := make([]query.ColMeta, len(key.Cols()))
values := make([]interface{}, len(key.Cols()))
vs := make([]values.Value, len(key.Cols()))
for j, c := range key.Cols() {
if execute.ContainsStr(t.columns, c.Label) {
if c.Type != query.TTime {
return fmt.Errorf("column %q is not of type time", c.Label)
}
cols[j] = c
values[j] = key.ValueTime(j).Add(t.shift)
vs[j] = values.NewTimeValue(key.ValueTime(j).Add(t.shift))
} else {
cols[j] = c
values[j] = key.Value(j)
vs[j] = key.Value(j)
}
}
key = execute.NewPartitionKey(cols, values)
key = execute.NewPartitionKey(cols, vs)
builder, created := t.cache.BlockBuilder(key)
if !created {

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/functions/storage"
"github.com/influxdata/platform/query/values"
"github.com/influxdata/yarpc"
"github.com/pkg/errors"
)
@ -279,17 +280,17 @@ func determineBlockColsForSeries(s *ReadResponse_SeriesFrame, typ query.DataType
func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey {
cols := make([]query.ColMeta, 2, len(s.Tags))
values := make([]interface{}, 2, len(s.Tags))
vs := make([]values.Value, 2, len(s.Tags))
cols[0] = query.ColMeta{
Label: execute.DefaultStartColLabel,
Type: query.TTime,
}
values[0] = bnds.Start
vs[0] = values.NewTimeValue(bnds.Start)
cols[1] = query.ColMeta{
Label: execute.DefaultStopColLabel,
Type: query.TTime,
}
values[1] = bnds.Stop
vs[1] = values.NewTimeValue(bnds.Stop)
switch readSpec.GroupMode {
case storage.GroupModeBy:
// partition key in GroupKeys order, including tags in the GroupKeys slice
@ -299,7 +300,7 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp
Label: string(s.Tags[i].Key),
Type: query.TString,
})
values = append(values, string(s.Tags[i].Value))
vs = append(vs, values.NewStringValue(string(s.Tags[i].Value)))
}
}
case storage.GroupModeExcept:
@ -310,7 +311,7 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp
Label: string(s.Tags[i].Key),
Type: query.TString,
})
values = append(values, string(s.Tags[i].Value))
vs = append(vs, values.NewStringValue(string(s.Tags[i].Value)))
}
}
case storage.GroupModeAll:
@ -319,10 +320,10 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp
Label: string(s.Tags[i].Key),
Type: query.TString,
})
values = append(values, string(s.Tags[i].Value))
vs = append(vs, values.NewStringValue(string(s.Tags[i].Value)))
}
}
return execute.NewPartitionKey(cols, values)
return execute.NewPartitionKey(cols, vs)
}
func determineBlockColsForGroup(f *ReadResponse_GroupFrame, typ query.DataType) ([]query.ColMeta, [][]byte) {
@ -357,25 +358,25 @@ func determineBlockColsForGroup(f *ReadResponse_GroupFrame, typ query.DataType)
func partitionKeyForGroup(g *ReadResponse_GroupFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey {
cols := make([]query.ColMeta, 2, len(readSpec.GroupKeys)+2)
values := make([]interface{}, 2, len(readSpec.GroupKeys)+2)
vs := make([]values.Value, 2, len(readSpec.GroupKeys)+2)
cols[0] = query.ColMeta{
Label: execute.DefaultStartColLabel,
Type: query.TTime,
}
values[0] = bnds.Start
vs[0] = values.NewTimeValue(bnds.Start)
cols[1] = query.ColMeta{
Label: execute.DefaultStopColLabel,
Type: query.TTime,
}
values[1] = bnds.Stop
vs[1] = values.NewTimeValue(bnds.Stop)
for i := range readSpec.GroupKeys {
cols = append(cols, query.ColMeta{
Label: readSpec.GroupKeys[i],
Type: query.TString,
})
values = append(values, string(g.PartitionKeyVals[i]))
vs = append(vs, values.NewStringValue(string(g.PartitionKeyVals[i])))
}
return execute.NewPartitionKey(cols, values)
return execute.NewPartitionKey(cols, vs)
}
// block implement OneTimeBlock as it can only be read once.

View File

@ -294,19 +294,19 @@ func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Block)
for _, bnds := range bounds {
// Update key
cols := make([]query.ColMeta, len(keyCols))
values := make([]interface{}, len(keyCols))
vs := make([]values.Value, len(keyCols))
for j, c := range keyCols {
cols[j] = c
switch c.Label {
case t.startColLabel:
values[j] = bnds.Start
vs[j] = values.NewTimeValue(bnds.Start)
case t.stopColLabel:
values[j] = bnds.Stop
vs[j] = values.NewTimeValue(bnds.Stop)
default:
values[j] = b.Key().Value(keyColMap[j])
vs[j] = b.Key().Value(keyColMap[j])
}
}
key := execute.NewPartitionKey(cols, values)
key := execute.NewPartitionKey(cols, vs)
builder, created := t.cache.BlockBuilder(key)
if created {
for _, c := range newCols {

View File

@ -49,7 +49,7 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e
if c.Type != query.TString {
return fmt.Errorf("partition column %q is not a string type", c.Label)
}
v := b.Key().Value(j).(string)
v := b.Key().Value(j).Str()
if c.Label == "_measurement" {
r.Name = v
} else {

View File

@ -2,5 +2,5 @@ from(db:"test")
|> range(start:-5m)
|> group(by:["_measurement"])
|> max()
|> map(fn: (r) => {_time:r._time, _measurement:r._measurement, _field: r._field, max:r._value})
|> map(fn: (r) => {_time:r._time, _measurement:r._measurement, _field: r._field, max:r._value}, mergeKey:false)
|> yield(name:"0")

View File

@ -101,12 +101,8 @@ type PartitionKey interface {
ValueString(j int) string
ValueDuration(j int) values.Duration
ValueTime(j int) values.Time
Value(j int) interface{}
Value(j int) values.Value
// Intersect returns a new PartitionKey with only columns in the list of labels.
Intersect(labels []string) PartitionKey
// Diff returns the labels that exist in list of labels but not in the key's columns.
Diff(labels []string) []string
Hash() uint64
Equal(o PartitionKey) bool
Less(o PartitionKey) bool

View File

@ -88,6 +88,57 @@ func (v value) Function() Function {
// InvalidValue is a non nil value who's type is semantic.Invalid
var InvalidValue = value{t: semantic.Invalid}
func NewValue(v interface{}, k semantic.Kind) (Value, error) {
switch k {
case semantic.String:
_, ok := v.(string)
if !ok {
return nil, fmt.Errorf("string value must have type string, got %T", v)
}
case semantic.Int:
_, ok := v.(int64)
if !ok {
return nil, fmt.Errorf("int value must have type int64, got %T", v)
}
case semantic.UInt:
_, ok := v.(uint64)
if !ok {
return nil, fmt.Errorf("uint value must have type uint64, got %T", v)
}
case semantic.Float:
_, ok := v.(float64)
if !ok {
return nil, fmt.Errorf("float value must have type float64, got %T", v)
}
case semantic.Bool:
_, ok := v.(bool)
if !ok {
return nil, fmt.Errorf("bool value must have type bool, got %T", v)
}
case semantic.Time:
_, ok := v.(Time)
if !ok {
return nil, fmt.Errorf("time value must have type Time, got %T", v)
}
case semantic.Duration:
_, ok := v.(Duration)
if !ok {
return nil, fmt.Errorf("duration value must have type Duration, got %T", v)
}
case semantic.Regexp:
_, ok := v.(*regexp.Regexp)
if !ok {
return nil, fmt.Errorf("regexp value must have type *regexp.Regexp, got %T", v)
}
default:
return nil, fmt.Errorf("unsupported value kind %v", k)
}
return value{
t: k,
v: v,
}, nil
}
func NewStringValue(v string) Value {
return value{
t: semantic.String,