diff --git a/query/execute/table.go b/query/execute/table.go index 3f77d445f7..4e69b472fd 100644 --- a/query/execute/table.go +++ b/query/execute/table.go @@ -179,7 +179,7 @@ func AppendCol(bj, cj int, cr query.ColReader, builder TableBuilder) { } } -// AppendMappedRecord appends the record from cr onto builder assuming matching columns. +// AppendRecord appends the record from cr onto builder assuming matching columns. func AppendRecord(i int, cr query.ColReader, builder TableBuilder) { for j, c := range builder.Cols() { switch c.Type { @@ -223,6 +223,23 @@ func AppendMappedRecord(i int, cr query.ColReader, builder TableBuilder, colMap } } +// ColMap writes a mapping of builder index to column reader index into colMap. +// When colMap does not have enough capacity a new colMap is allocated. +// The colMap is always returned +func ColMap(colMap []int, builder TableBuilder, cr query.ColReader) []int { + l := len(builder.Cols()) + if cap(colMap) < l { + colMap = make([]int, len(builder.Cols())) + } else { + colMap = colMap[:l] + } + cols := cr.Cols() + for j, c := range builder.Cols() { + colMap[j] = ColIdx(c.Label, cols) + } + return colMap +} + // AppendRecordForCols appends the only the columns provided from cr onto builder. func AppendRecordForCols(i int, cr query.ColReader, builder TableBuilder, cols []query.ColMeta) { for j, c := range cols { diff --git a/query/functions/group.go b/query/functions/group.go index ee61aba5cc..07518745d4 100644 --- a/query/functions/group.go +++ b/query/functions/group.go @@ -303,6 +303,7 @@ func (t *groupTransformation) Process(id execute.DatasetID, tbl query.Table) err on[c.Label] = true } } + colMap := make([]int, 0, len(tbl.Cols())) return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { @@ -311,7 +312,8 @@ func (t *groupTransformation) Process(id execute.DatasetID, tbl query.Table) err if created { execute.AddTableCols(tbl, builder) } - execute.AppendRecord(i, cr, builder) + colMap = execute.ColMap(colMap, builder, cr) + execute.AppendMappedRecord(i, cr, builder, colMap) } return nil }) diff --git a/query/functions/group_test.go b/query/functions/group_test.go index ed2ab6d42d..6d0418af73 100644 --- a/query/functions/group_test.go +++ b/query/functions/group_test.go @@ -206,6 +206,96 @@ func TestGroup_Process(t *testing.T) { }, }, }, + { + // TODO(nathanielc): When we have support for null, the missing column + // needs to be added with null values for any missing values. + // Right now the order of input tables determines whether the column is included. + name: "fan in missing columns", + spec: &functions.GroupProcedureSpec{ + GroupMode: functions.GroupModeBy, + GroupKeys: []string{"t1"}, + }, + data: []query.Table{ + &executetest.Table{ + KeyCols: []string{"t1", "t2"}, + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "t1", Type: query.TString}, + {Label: "t2", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(2), 1.0, "a", "y"}, + }, + }, + &executetest.Table{ + KeyCols: []string{"t1", "t3", "t2"}, + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "t1", Type: query.TString}, + {Label: "t3", Type: query.TInt}, + {Label: "t2", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(1), 2.0, "a", int64(5), "x"}, + }, + }, + &executetest.Table{ + KeyCols: []string{"t1", "t2"}, + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "t1", Type: query.TString}, + {Label: "t2", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(2), 7.0, "b", "y"}, + }, + }, + &executetest.Table{ + KeyCols: []string{"t1", "t3", "t2"}, + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "t1", Type: query.TString}, + {Label: "t3", Type: query.TInt}, + {Label: "t2", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(1), 4.0, "b", int64(7), "x"}, + }, + }, + }, + want: []*executetest.Table{ + { + KeyCols: []string{"t1"}, + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "t1", Type: query.TString}, + {Label: "t2", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(2), 1.0, "a", "y"}, + {execute.Time(1), 2.0, "a", "x"}, + }, + }, + { + KeyCols: []string{"t1"}, + ColMeta: []query.ColMeta{ + {Label: "_time", Type: query.TTime}, + {Label: "_value", Type: query.TFloat}, + {Label: "t1", Type: query.TString}, + {Label: "t2", Type: query.TString}, + }, + Data: [][]interface{}{ + {execute.Time(2), 7.0, "b", "y"}, + {execute.Time(1), 4.0, "b", "x"}, + }, + }, + }, + }, { name: "fan out", spec: &functions.GroupProcedureSpec{