diff --git a/query/docs/SPEC.md b/query/docs/SPEC.md index 6ba5204d52..1c2332698f 100644 --- a/query/docs/SPEC.md +++ b/query/docs/SPEC.md @@ -1145,6 +1145,26 @@ Additionally exactly two columns must be provided to the `columns` property. Count is an aggregate operation. For each aggregated column, it outputs the number of non null records as an integer. +#### Duplicate +Duplicate will duplicate a specified column in a table + +Duplicate has the following properties: + +* `column` string + The column to duplicate +* `as` string + The name that should be assigned to the duplicate column + +Example usage: + +Duplicate column `server` under the name `host`: +``` +from(db: "telegraf") + |> range(start:-5m) + |> filter(fn: (r) => r._measurement == "cpu") + |> duplicate(column: "host", as: "server") +``` + ##### Integral Integral is an aggregate operation. diff --git a/query/functions/drop_rename_keep.go b/query/functions/schema_functions.go similarity index 90% rename from query/functions/drop_rename_keep.go rename to query/functions/schema_functions.go index 676f33df4e..d6de8017c4 100644 --- a/query/functions/drop_rename_keep.go +++ b/query/functions/schema_functions.go @@ -15,6 +15,7 @@ import ( const RenameKind = "rename" const DropKind = "drop" const KeepKind = "keep" +const DuplicateKind = "duplicate" type RenameOpSpec struct { Cols map[string]string `json:"columns"` @@ -31,6 +32,11 @@ type KeepOpSpec struct { Predicate *semantic.FunctionExpression `json:"fn"` } +type DuplicateOpSpec struct { + Col string `json:"columns"` + As string `json:"as"` +} + // The base kind for SchemaMutations const SchemaMutationKind = "SchemaMutation" @@ -95,6 +101,15 @@ var Registrars = []MutationRegistrar{ Create: createKeepOpSpec, New: newKeepOp, }, + { + Kind: DuplicateKind, + Args: map[string]semantic.Type{ + "column": semantic.String, + "as": semantic.String, + }, + Create: createDuplicateOpSpec, + New: newDuplicateOp, + }, } func init() { @@ -257,6 +272,27 @@ func createKeepOpSpec(args query.Arguments, a *query.Administration) (query.Oper }, nil } +func createDuplicateOpSpec(args query.Arguments, a *query.Administration) (query.OperationSpec, error) { + if err := a.AddParentFromArgs(args); err != nil { + return nil, err + } + + col, err := args.GetRequiredString("column") + if err != nil { + return nil, err + } + + newName, err := args.GetRequiredString("as") + if err != nil { + return nil, err + } + + return &DuplicateOpSpec{ + Col: col, + As: newName, + }, nil +} + func newRenameOp() query.OperationSpec { return new(RenameOpSpec) } @@ -281,6 +317,14 @@ func (s *KeepOpSpec) Kind() query.OperationKind { return KeepKind } +func newDuplicateOp() query.OperationSpec { + return new(DuplicateOpSpec) +} + +func (s *DuplicateOpSpec) Kind() query.OperationKind { + return DuplicateKind +} + func (s *RenameOpSpec) Copy() SchemaMutation { newCols := make(map[string]string, len(s.Cols)) for k, v := range s.Cols { @@ -317,6 +361,13 @@ func (s *KeepOpSpec) Copy() SchemaMutation { } } +func (s *DuplicateOpSpec) Copy() SchemaMutation { + return &DuplicateOpSpec{ + Col: s.Col, + As: s.As, + } +} + func (s *RenameOpSpec) Mutator() (SchemaMutator, error) { m, err := NewRenameMutator(s) if err != nil { @@ -341,6 +392,14 @@ func (s *KeepOpSpec) Mutator() (SchemaMutator, error) { return m, nil } +func (s *DuplicateOpSpec) Mutator() (SchemaMutator, error) { + m, err := NewDuplicateMutator(s) + if err != nil { + return nil, err + } + return m, nil +} + type SchemaMutationProcedureSpec struct { Mutations []SchemaMutation } diff --git a/query/functions/drop_rename_keep_test.go b/query/functions/schema_functions_test.go similarity index 88% rename from query/functions/drop_rename_keep_test.go rename to query/functions/schema_functions_test.go index 580c297fca..805afc4a3f 100644 --- a/query/functions/drop_rename_keep_test.go +++ b/query/functions/schema_functions_test.go @@ -17,7 +17,7 @@ import ( "github.com/influxdata/platform/query/querytest" ) -func TestDropRenameKeep_NewQueries(t *testing.T) { +func TestSchemaMutions_NewQueries(t *testing.T) { tests := []querytest.NewQueryTestCase{ { Name: "test rename query", @@ -111,6 +111,37 @@ func TestDropRenameKeep_NewQueries(t *testing.T) { }, }, }, + { + Name: "test duplicate query", + Raw: `from(db:"mydb") |> duplicate(column: "col1", as: "col1_new") |> sum()`, + Want: &query.Spec{ + Operations: []*query.Operation{ + { + ID: "from0", + Spec: &functions.FromOpSpec{ + Database: "mydb", + }, + }, + { + ID: "duplicate1", + Spec: &functions.DuplicateOpSpec{ + Col: "col1", + As: "col1_new", + }, + }, + { + ID: "sum2", + Spec: &functions.SumOpSpec{ + AggregateConfig: execute.DefaultAggregateConfig, + }, + }, + }, + Edges: []query.Edge{ + {Parent: "from0", Child: "duplicate1"}, + {Parent: "duplicate1", Child: "sum2"}, + }, + }, + }, { Name: "test drop query fn param", Raw: `from(db:"mydb") |> drop(fn: (col) => col =~ /reg*/) |> sum()`, @@ -246,7 +277,14 @@ func TestDropRenameKeep_NewQueries(t *testing.T) { Want: nil, WantErr: true, }, + { + Name: "test duplicate query invalid", + Raw: `from(db:"mydb") |> duplicate(columns: ["a", "b"], n: -1) |> sum()`, + Want: nil, + WantErr: true, + }, } + for _, tc := range tests { tc := tc t.Run(tc.Name, func(t *testing.T) { @@ -367,6 +405,42 @@ func TestDropRenameKeep_Process(t *testing.T) { }, }}, }, + { + name: "duplicate single col", + spec: &functions.SchemaMutationProcedureSpec{ + Mutations: []functions.SchemaMutation{ + &functions.DuplicateOpSpec{ + Col: "a", + As: "a_1", + }, + }, + }, + data: []query.Table{&executetest.Table{ + ColMeta: []query.ColMeta{ + {Label: "a", Type: query.TFloat}, + {Label: "b", Type: query.TFloat}, + {Label: "c", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {1.0, 2.0, 3.0}, + {11.0, 12.0, 13.0}, + {21.0, 22.0, 23.0}, + }, + }}, + want: []*executetest.Table{{ + ColMeta: []query.ColMeta{ + {Label: "a", Type: query.TFloat}, + {Label: "a_1", Type: query.TFloat}, + {Label: "b", Type: query.TFloat}, + {Label: "c", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {1.0, 1.0, 2.0, 3.0}, + {11.0, 11.0, 12.0, 13.0}, + {21.0, 21.0, 22.0, 23.0}, + }, + }}, + }, { name: "rename map fn (col) => name", spec: &functions.SchemaMutationProcedureSpec{ @@ -604,6 +678,31 @@ func TestDropRenameKeep_Process(t *testing.T) { want: []*executetest.Table(nil), wantErr: errors.New(`keep error: column "no_exist" doesn't exist`), }, + { + name: "duplicate no exist", + spec: &functions.SchemaMutationProcedureSpec{ + Mutations: []functions.SchemaMutation{ + &functions.DuplicateOpSpec{ + Col: "no_exist", + As: "no_exist_2", + }, + }, + }, + data: []query.Table{&executetest.Table{ + ColMeta: []query.ColMeta{ + {Label: "server1", Type: query.TFloat}, + {Label: "local", Type: query.TFloat}, + {Label: "server2", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {1.0, 2.0, 3.0}, + {11.0, 12.0, 13.0}, + {21.0, 22.0, 23.0}, + }, + }}, + want: []*executetest.Table(nil), + wantErr: errors.New(`duplicate error: column "no_exist" doesn't exist`), + }, { name: "rename group key", spec: &functions.SchemaMutationProcedureSpec{ diff --git a/query/functions/schema_mutators.go b/query/functions/schema_mutators.go index 0c1241209a..3503b9a43f 100644 --- a/query/functions/schema_mutators.go +++ b/query/functions/schema_mutators.go @@ -88,6 +88,13 @@ func toStringSet(arr []string) map[string]bool { return set } +func checkCol(label string, cols []query.ColMeta) error { + if execute.ColIdx(label, cols) < 0 { + return fmt.Errorf(`column "%s" doesn't exist`, label) + } + return nil +} + type RenameMutator struct { Cols map[string]string Fn compiler.Func @@ -120,17 +127,6 @@ func NewRenameMutator(qs query.OperationSpec) (*RenameMutator, error) { return m, nil } -func (m *RenameMutator) checkColumnReferences(cols []query.ColMeta) error { - if m.Cols != nil { - for c := range m.Cols { - if execute.ColIdx(c, cols) < 0 { - return fmt.Errorf(`rename error: column "%s" doesn't exist`, c) - } - } - } - return nil -} - func (m *RenameMutator) renameCol(col *query.ColMeta) error { if col == nil { return errors.New("rename error: cannot rename nil column") @@ -150,8 +146,17 @@ func (m *RenameMutator) renameCol(col *query.ColMeta) error { return nil } +func (m *RenameMutator) checkColumns(tableCols []query.ColMeta) error { + for c := range m.Cols { + if err := checkCol(c, tableCols); err != nil { + return errors.Wrap(err, "rename error") + } + } + return nil +} + func (m *RenameMutator) Mutate(ctx *BuilderContext) error { - if err := m.checkColumnReferences(ctx.Cols()); err != nil { + if err := m.checkColumns(ctx.Cols()); err != nil { return err } @@ -225,21 +230,23 @@ func NewDropKeepMutator(qs query.OperationSpec) (*DropKeepMutator, error) { return m, nil } -func (m *DropKeepMutator) checkColumnReferences(cols []query.ColMeta) error { +func (m *DropKeepMutator) checkColumns(tableCols []query.ColMeta) error { if m.DropCols != nil { for c := range m.DropCols { - if execute.ColIdx(c, cols) < 0 { - return fmt.Errorf(`drop error: column "%s" doesn't exist`, c) + if err := checkCol(c, tableCols); err != nil { + return errors.Wrap(err, "drop error") } } } + if m.KeepCols != nil { for c := range m.KeepCols { - if execute.ColIdx(c, cols) < 0 { - return fmt.Errorf(`keep error: column "%s" doesn't exist`, c) + if err := checkCol(c, tableCols); err != nil { + return errors.Wrap(err, "keep error") } } } + return nil } @@ -282,7 +289,7 @@ func (m *DropKeepMutator) keepToDropCols(cols []query.ColMeta) { } func (m *DropKeepMutator) Mutate(ctx *BuilderContext) error { - if err := m.checkColumnReferences(ctx.Cols()); err != nil { + if err := m.checkColumns(ctx.Cols()); err != nil { return err } @@ -318,6 +325,55 @@ func (m *DropKeepMutator) Mutate(ctx *BuilderContext) error { return nil } +type DuplicateMutator struct { + Col string + As string +} + +func NewDuplicateMutator(qs query.OperationSpec) (*DuplicateMutator, error) { + s, ok := qs.(*DuplicateOpSpec) + if !ok { + return nil, fmt.Errorf("invalid spec type %T", qs) + } + + return &DuplicateMutator{ + Col: s.Col, + As: s.As, + }, nil +} + +func (m *DuplicateMutator) Mutate(ctx *BuilderContext) error { + if err := checkCol(m.Col, ctx.Cols()); err != nil { + return errors.Wrap(err, "duplicate error") + } + + newCols := make([]query.ColMeta, 0, len(ctx.Cols())+1) + newColMap := make([]int, 0, len(ctx.Cols())+1) + oldColMap := ctx.ColMap() + + for i, c := range ctx.Cols() { + newCols = append(newCols, c) + newColMap = append(newColMap, oldColMap[i]) + + if c.Label == m.Col { + newCols = append(newCols, duplicate(c, m.As)) + newColMap = append(newColMap, oldColMap[i]) + } + } + + ctx.TableColumns = newCols + ctx.ColIdxMap = newColMap + + return nil +} + +func duplicate(col query.ColMeta, dupName string) query.ColMeta { + return query.ColMeta{ + Type: col.Type, + Label: dupName, + } +} + // TODO: determine pushdown rules /* func (s *SchemaMutationProcedureSpec) PushDownRules() []plan.PushDownRule { diff --git a/query/functions/testdata/duplicate.flux b/query/functions/testdata/duplicate.flux new file mode 100644 index 0000000000..9fb703ae2a --- /dev/null +++ b/query/functions/testdata/duplicate.flux @@ -0,0 +1,3 @@ +from(db: "test") + |> range(start:2018-05-22T19:53:26Z) + |> duplicate(column: "host", as: "host_new") diff --git a/query/functions/testdata/duplicate.in.csv b/query/functions/testdata/duplicate.in.csv new file mode 100644 index 0000000000..4aea70179f --- /dev/null +++ b/query/functions/testdata/duplicate.in.csv @@ -0,0 +1,16 @@ +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#group,false,false,false,false,false,false,true,true,true,true +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest,cpu,cpu-total,host.local +,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0,usage_guest,cpu,cpu-total,host.local +,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,0,usage_guest,cpu,cpu-total,host.local +,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,0,usage_guest,cpu,cpu-total,host.local +,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,0,usage_guest,cpu,cpu-total,host.local +,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,usage_guest,cpu,cpu-total,host.local +,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest_nice,cpu,cpu-total,host.local +,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0,usage_guest_nice,cpu,cpu-total,host.local +,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,0,usage_guest_nice,cpu,cpu-total,host.local +,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,0,usage_guest_nice,cpu,cpu-total,host.local +,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,0,usage_guest_nice,cpu,cpu-total,host.local +,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,usage_guest_nice,cpu,cpu-total,host.local \ No newline at end of file diff --git a/query/functions/testdata/duplicate.out.csv b/query/functions/testdata/duplicate.out.csv new file mode 100644 index 0000000000..a7870848ba --- /dev/null +++ b/query/functions/testdata/duplicate.out.csv @@ -0,0 +1,17 @@ +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string +#group,false,false,false,false,false,false,true,true,true,true,false +#default,_result,,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host,host_new +,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest,cpu,cpu-total,host.local,host.local +,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0,usage_guest,cpu,cpu-total,host.local,host.local +,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,0,usage_guest,cpu,cpu-total,host.local,host.local +,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,0,usage_guest,cpu,cpu-total,host.local,host.local +,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,0,usage_guest,cpu,cpu-total,host.local,host.local +,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,usage_guest,cpu,cpu-total,host.local,host.local +,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local +,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local +,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local +,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local +,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local +,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local +