parent
ce6195150a
commit
96d11c3d9f
|
@ -1145,6 +1145,26 @@ Additionally exactly two columns must be provided to the `columns` property.
|
|||
Count is an aggregate operation.
|
||||
For each aggregated column, it outputs the number of non null records as an integer.
|
||||
|
||||
#### Duplicate
|
||||
Duplicate will duplicate a specified column in a table
|
||||
|
||||
Duplicate has the following properties:
|
||||
|
||||
* `column` string
|
||||
The column to duplicate
|
||||
* `as` string
|
||||
The name that should be assigned to the duplicate column
|
||||
|
||||
Example usage:
|
||||
|
||||
Duplicate column `server` under the name `host`:
|
||||
```
|
||||
from(db: "telegraf")
|
||||
|> range(start:-5m)
|
||||
|> filter(fn: (r) => r._measurement == "cpu")
|
||||
|> duplicate(column: "host", as: "server")
|
||||
```
|
||||
|
||||
##### Integral
|
||||
|
||||
Integral is an aggregate operation.
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
const RenameKind = "rename"
|
||||
const DropKind = "drop"
|
||||
const KeepKind = "keep"
|
||||
const DuplicateKind = "duplicate"
|
||||
|
||||
type RenameOpSpec struct {
|
||||
Cols map[string]string `json:"columns"`
|
||||
|
@ -31,6 +32,11 @@ type KeepOpSpec struct {
|
|||
Predicate *semantic.FunctionExpression `json:"fn"`
|
||||
}
|
||||
|
||||
type DuplicateOpSpec struct {
|
||||
Col string `json:"columns"`
|
||||
As string `json:"as"`
|
||||
}
|
||||
|
||||
// The base kind for SchemaMutations
|
||||
const SchemaMutationKind = "SchemaMutation"
|
||||
|
||||
|
@ -95,6 +101,15 @@ var Registrars = []MutationRegistrar{
|
|||
Create: createKeepOpSpec,
|
||||
New: newKeepOp,
|
||||
},
|
||||
{
|
||||
Kind: DuplicateKind,
|
||||
Args: map[string]semantic.Type{
|
||||
"column": semantic.String,
|
||||
"as": semantic.String,
|
||||
},
|
||||
Create: createDuplicateOpSpec,
|
||||
New: newDuplicateOp,
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -257,6 +272,27 @@ func createKeepOpSpec(args query.Arguments, a *query.Administration) (query.Oper
|
|||
}, nil
|
||||
}
|
||||
|
||||
func createDuplicateOpSpec(args query.Arguments, a *query.Administration) (query.OperationSpec, error) {
|
||||
if err := a.AddParentFromArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
col, err := args.GetRequiredString("column")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newName, err := args.GetRequiredString("as")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &DuplicateOpSpec{
|
||||
Col: col,
|
||||
As: newName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newRenameOp() query.OperationSpec {
|
||||
return new(RenameOpSpec)
|
||||
}
|
||||
|
@ -281,6 +317,14 @@ func (s *KeepOpSpec) Kind() query.OperationKind {
|
|||
return KeepKind
|
||||
}
|
||||
|
||||
func newDuplicateOp() query.OperationSpec {
|
||||
return new(DuplicateOpSpec)
|
||||
}
|
||||
|
||||
func (s *DuplicateOpSpec) Kind() query.OperationKind {
|
||||
return DuplicateKind
|
||||
}
|
||||
|
||||
func (s *RenameOpSpec) Copy() SchemaMutation {
|
||||
newCols := make(map[string]string, len(s.Cols))
|
||||
for k, v := range s.Cols {
|
||||
|
@ -317,6 +361,13 @@ func (s *KeepOpSpec) Copy() SchemaMutation {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *DuplicateOpSpec) Copy() SchemaMutation {
|
||||
return &DuplicateOpSpec{
|
||||
Col: s.Col,
|
||||
As: s.As,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *RenameOpSpec) Mutator() (SchemaMutator, error) {
|
||||
m, err := NewRenameMutator(s)
|
||||
if err != nil {
|
||||
|
@ -341,6 +392,14 @@ func (s *KeepOpSpec) Mutator() (SchemaMutator, error) {
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func (s *DuplicateOpSpec) Mutator() (SchemaMutator, error) {
|
||||
m, err := NewDuplicateMutator(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
type SchemaMutationProcedureSpec struct {
|
||||
Mutations []SchemaMutation
|
||||
}
|
|
@ -17,7 +17,7 @@ import (
|
|||
"github.com/influxdata/platform/query/querytest"
|
||||
)
|
||||
|
||||
func TestDropRenameKeep_NewQueries(t *testing.T) {
|
||||
func TestSchemaMutions_NewQueries(t *testing.T) {
|
||||
tests := []querytest.NewQueryTestCase{
|
||||
{
|
||||
Name: "test rename query",
|
||||
|
@ -111,6 +111,37 @@ func TestDropRenameKeep_NewQueries(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "test duplicate query",
|
||||
Raw: `from(db:"mydb") |> duplicate(column: "col1", as: "col1_new") |> sum()`,
|
||||
Want: &query.Spec{
|
||||
Operations: []*query.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
Database: "mydb",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "duplicate1",
|
||||
Spec: &functions.DuplicateOpSpec{
|
||||
Col: "col1",
|
||||
As: "col1_new",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "sum2",
|
||||
Spec: &functions.SumOpSpec{
|
||||
AggregateConfig: execute.DefaultAggregateConfig,
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []query.Edge{
|
||||
{Parent: "from0", Child: "duplicate1"},
|
||||
{Parent: "duplicate1", Child: "sum2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "test drop query fn param",
|
||||
Raw: `from(db:"mydb") |> drop(fn: (col) => col =~ /reg*/) |> sum()`,
|
||||
|
@ -246,7 +277,14 @@ func TestDropRenameKeep_NewQueries(t *testing.T) {
|
|||
Want: nil,
|
||||
WantErr: true,
|
||||
},
|
||||
{
|
||||
Name: "test duplicate query invalid",
|
||||
Raw: `from(db:"mydb") |> duplicate(columns: ["a", "b"], n: -1) |> sum()`,
|
||||
Want: nil,
|
||||
WantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
|
@ -367,6 +405,42 @@ func TestDropRenameKeep_Process(t *testing.T) {
|
|||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "duplicate single col",
|
||||
spec: &functions.SchemaMutationProcedureSpec{
|
||||
Mutations: []functions.SchemaMutation{
|
||||
&functions.DuplicateOpSpec{
|
||||
Col: "a",
|
||||
As: "a_1",
|
||||
},
|
||||
},
|
||||
},
|
||||
data: []query.Table{&executetest.Table{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "a", Type: query.TFloat},
|
||||
{Label: "b", Type: query.TFloat},
|
||||
{Label: "c", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{1.0, 2.0, 3.0},
|
||||
{11.0, 12.0, 13.0},
|
||||
{21.0, 22.0, 23.0},
|
||||
},
|
||||
}},
|
||||
want: []*executetest.Table{{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "a", Type: query.TFloat},
|
||||
{Label: "a_1", Type: query.TFloat},
|
||||
{Label: "b", Type: query.TFloat},
|
||||
{Label: "c", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{1.0, 1.0, 2.0, 3.0},
|
||||
{11.0, 11.0, 12.0, 13.0},
|
||||
{21.0, 21.0, 22.0, 23.0},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "rename map fn (col) => name",
|
||||
spec: &functions.SchemaMutationProcedureSpec{
|
||||
|
@ -604,6 +678,31 @@ func TestDropRenameKeep_Process(t *testing.T) {
|
|||
want: []*executetest.Table(nil),
|
||||
wantErr: errors.New(`keep error: column "no_exist" doesn't exist`),
|
||||
},
|
||||
{
|
||||
name: "duplicate no exist",
|
||||
spec: &functions.SchemaMutationProcedureSpec{
|
||||
Mutations: []functions.SchemaMutation{
|
||||
&functions.DuplicateOpSpec{
|
||||
Col: "no_exist",
|
||||
As: "no_exist_2",
|
||||
},
|
||||
},
|
||||
},
|
||||
data: []query.Table{&executetest.Table{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "server1", Type: query.TFloat},
|
||||
{Label: "local", Type: query.TFloat},
|
||||
{Label: "server2", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{1.0, 2.0, 3.0},
|
||||
{11.0, 12.0, 13.0},
|
||||
{21.0, 22.0, 23.0},
|
||||
},
|
||||
}},
|
||||
want: []*executetest.Table(nil),
|
||||
wantErr: errors.New(`duplicate error: column "no_exist" doesn't exist`),
|
||||
},
|
||||
{
|
||||
name: "rename group key",
|
||||
spec: &functions.SchemaMutationProcedureSpec{
|
|
@ -88,6 +88,13 @@ func toStringSet(arr []string) map[string]bool {
|
|||
return set
|
||||
}
|
||||
|
||||
func checkCol(label string, cols []query.ColMeta) error {
|
||||
if execute.ColIdx(label, cols) < 0 {
|
||||
return fmt.Errorf(`column "%s" doesn't exist`, label)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type RenameMutator struct {
|
||||
Cols map[string]string
|
||||
Fn compiler.Func
|
||||
|
@ -120,17 +127,6 @@ func NewRenameMutator(qs query.OperationSpec) (*RenameMutator, error) {
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func (m *RenameMutator) checkColumnReferences(cols []query.ColMeta) error {
|
||||
if m.Cols != nil {
|
||||
for c := range m.Cols {
|
||||
if execute.ColIdx(c, cols) < 0 {
|
||||
return fmt.Errorf(`rename error: column "%s" doesn't exist`, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *RenameMutator) renameCol(col *query.ColMeta) error {
|
||||
if col == nil {
|
||||
return errors.New("rename error: cannot rename nil column")
|
||||
|
@ -150,8 +146,17 @@ func (m *RenameMutator) renameCol(col *query.ColMeta) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *RenameMutator) checkColumns(tableCols []query.ColMeta) error {
|
||||
for c := range m.Cols {
|
||||
if err := checkCol(c, tableCols); err != nil {
|
||||
return errors.Wrap(err, "rename error")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *RenameMutator) Mutate(ctx *BuilderContext) error {
|
||||
if err := m.checkColumnReferences(ctx.Cols()); err != nil {
|
||||
if err := m.checkColumns(ctx.Cols()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -225,21 +230,23 @@ func NewDropKeepMutator(qs query.OperationSpec) (*DropKeepMutator, error) {
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func (m *DropKeepMutator) checkColumnReferences(cols []query.ColMeta) error {
|
||||
func (m *DropKeepMutator) checkColumns(tableCols []query.ColMeta) error {
|
||||
if m.DropCols != nil {
|
||||
for c := range m.DropCols {
|
||||
if execute.ColIdx(c, cols) < 0 {
|
||||
return fmt.Errorf(`drop error: column "%s" doesn't exist`, c)
|
||||
if err := checkCol(c, tableCols); err != nil {
|
||||
return errors.Wrap(err, "drop error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if m.KeepCols != nil {
|
||||
for c := range m.KeepCols {
|
||||
if execute.ColIdx(c, cols) < 0 {
|
||||
return fmt.Errorf(`keep error: column "%s" doesn't exist`, c)
|
||||
if err := checkCol(c, tableCols); err != nil {
|
||||
return errors.Wrap(err, "keep error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -282,7 +289,7 @@ func (m *DropKeepMutator) keepToDropCols(cols []query.ColMeta) {
|
|||
}
|
||||
|
||||
func (m *DropKeepMutator) Mutate(ctx *BuilderContext) error {
|
||||
if err := m.checkColumnReferences(ctx.Cols()); err != nil {
|
||||
if err := m.checkColumns(ctx.Cols()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -318,6 +325,55 @@ func (m *DropKeepMutator) Mutate(ctx *BuilderContext) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type DuplicateMutator struct {
|
||||
Col string
|
||||
As string
|
||||
}
|
||||
|
||||
func NewDuplicateMutator(qs query.OperationSpec) (*DuplicateMutator, error) {
|
||||
s, ok := qs.(*DuplicateOpSpec)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid spec type %T", qs)
|
||||
}
|
||||
|
||||
return &DuplicateMutator{
|
||||
Col: s.Col,
|
||||
As: s.As,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *DuplicateMutator) Mutate(ctx *BuilderContext) error {
|
||||
if err := checkCol(m.Col, ctx.Cols()); err != nil {
|
||||
return errors.Wrap(err, "duplicate error")
|
||||
}
|
||||
|
||||
newCols := make([]query.ColMeta, 0, len(ctx.Cols())+1)
|
||||
newColMap := make([]int, 0, len(ctx.Cols())+1)
|
||||
oldColMap := ctx.ColMap()
|
||||
|
||||
for i, c := range ctx.Cols() {
|
||||
newCols = append(newCols, c)
|
||||
newColMap = append(newColMap, oldColMap[i])
|
||||
|
||||
if c.Label == m.Col {
|
||||
newCols = append(newCols, duplicate(c, m.As))
|
||||
newColMap = append(newColMap, oldColMap[i])
|
||||
}
|
||||
}
|
||||
|
||||
ctx.TableColumns = newCols
|
||||
ctx.ColIdxMap = newColMap
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func duplicate(col query.ColMeta, dupName string) query.ColMeta {
|
||||
return query.ColMeta{
|
||||
Type: col.Type,
|
||||
Label: dupName,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: determine pushdown rules
|
||||
/*
|
||||
func (s *SchemaMutationProcedureSpec) PushDownRules() []plan.PushDownRule {
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
from(db: "test")
|
||||
|> range(start:2018-05-22T19:53:26Z)
|
||||
|> duplicate(column: "host", as: "host_new")
|
|
@ -0,0 +1,16 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest,cpu,cpu-total,host.local
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0,usage_guest,cpu,cpu-total,host.local
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,0,usage_guest,cpu,cpu-total,host.local
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,0,usage_guest,cpu,cpu-total,host.local
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,0,usage_guest,cpu,cpu-total,host.local
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,usage_guest,cpu,cpu-total,host.local
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest_nice,cpu,cpu-total,host.local
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0,usage_guest_nice,cpu,cpu-total,host.local
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,0,usage_guest_nice,cpu,cpu-total,host.local
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,0,usage_guest_nice,cpu,cpu-total,host.local
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,0,usage_guest_nice,cpu,cpu-total,host.local
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,usage_guest_nice,cpu,cpu-total,host.local
|
|
|
@ -0,0 +1,17 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string
|
||||
#group,false,false,false,false,false,false,true,true,true,true,false
|
||||
#default,_result,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host,host_new
|
||||
,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest,cpu,cpu-total,host.local,host.local
|
||||
,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0,usage_guest,cpu,cpu-total,host.local,host.local
|
||||
,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,0,usage_guest,cpu,cpu-total,host.local,host.local
|
||||
,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,0,usage_guest,cpu,cpu-total,host.local,host.local
|
||||
,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,0,usage_guest,cpu,cpu-total,host.local,host.local
|
||||
,,0,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,usage_guest,cpu,cpu-total,host.local,host.local
|
||||
,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local
|
||||
,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local
|
||||
,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local
|
||||
,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local
|
||||
,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local
|
||||
,,1,2018-05-22T19:53:26Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,usage_guest_nice,cpu,cpu-total,host.local,host.local
|
||||
|
|
Loading…
Reference in New Issue