Merged pull request #737 from influxdata/nc-group-missing
fix: Fix group by panic when missing a columnpull/10616/head
commit
745376a5fe
|
@ -179,7 +179,7 @@ func AppendCol(bj, cj int, cr query.ColReader, builder TableBuilder) {
|
|||
}
|
||||
}
|
||||
|
||||
// AppendMappedRecord appends the record from cr onto builder assuming matching columns.
|
||||
// AppendRecord appends the record from cr onto builder assuming matching columns.
|
||||
func AppendRecord(i int, cr query.ColReader, builder TableBuilder) {
|
||||
for j, c := range builder.Cols() {
|
||||
switch c.Type {
|
||||
|
@ -223,6 +223,23 @@ func AppendMappedRecord(i int, cr query.ColReader, builder TableBuilder, colMap
|
|||
}
|
||||
}
|
||||
|
||||
// ColMap writes a mapping of builder index to column reader index into colMap.
|
||||
// When colMap does not have enough capacity a new colMap is allocated.
|
||||
// The colMap is always returned
|
||||
func ColMap(colMap []int, builder TableBuilder, cr query.ColReader) []int {
|
||||
l := len(builder.Cols())
|
||||
if cap(colMap) < l {
|
||||
colMap = make([]int, len(builder.Cols()))
|
||||
} else {
|
||||
colMap = colMap[:l]
|
||||
}
|
||||
cols := cr.Cols()
|
||||
for j, c := range builder.Cols() {
|
||||
colMap[j] = ColIdx(c.Label, cols)
|
||||
}
|
||||
return colMap
|
||||
}
|
||||
|
||||
// AppendRecordForCols appends the only the columns provided from cr onto builder.
|
||||
func AppendRecordForCols(i int, cr query.ColReader, builder TableBuilder, cols []query.ColMeta) {
|
||||
for j, c := range cols {
|
||||
|
|
|
@ -303,6 +303,7 @@ func (t *groupTransformation) Process(id execute.DatasetID, tbl query.Table) err
|
|||
on[c.Label] = true
|
||||
}
|
||||
}
|
||||
colMap := make([]int, 0, len(tbl.Cols()))
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
|
@ -311,7 +312,8 @@ func (t *groupTransformation) Process(id execute.DatasetID, tbl query.Table) err
|
|||
if created {
|
||||
execute.AddTableCols(tbl, builder)
|
||||
}
|
||||
execute.AppendRecord(i, cr, builder)
|
||||
colMap = execute.ColMap(colMap, builder, cr)
|
||||
execute.AppendMappedRecord(i, cr, builder, colMap)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -206,6 +206,96 @@ func TestGroup_Process(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// TODO(nathanielc): When we have support for null, the missing column
|
||||
// needs to be added with null values for any missing values.
|
||||
// Right now the order of input tables determines whether the column is included.
|
||||
name: "fan in missing columns",
|
||||
spec: &functions.GroupProcedureSpec{
|
||||
GroupMode: functions.GroupModeBy,
|
||||
GroupKeys: []string{"t1"},
|
||||
},
|
||||
data: []query.Table{
|
||||
&executetest.Table{
|
||||
KeyCols: []string{"t1", "t2"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "t1", Type: query.TString},
|
||||
{Label: "t2", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(2), 1.0, "a", "y"},
|
||||
},
|
||||
},
|
||||
&executetest.Table{
|
||||
KeyCols: []string{"t1", "t3", "t2"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "t1", Type: query.TString},
|
||||
{Label: "t3", Type: query.TInt},
|
||||
{Label: "t2", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), 2.0, "a", int64(5), "x"},
|
||||
},
|
||||
},
|
||||
&executetest.Table{
|
||||
KeyCols: []string{"t1", "t2"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "t1", Type: query.TString},
|
||||
{Label: "t2", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(2), 7.0, "b", "y"},
|
||||
},
|
||||
},
|
||||
&executetest.Table{
|
||||
KeyCols: []string{"t1", "t3", "t2"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "t1", Type: query.TString},
|
||||
{Label: "t3", Type: query.TInt},
|
||||
{Label: "t2", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), 4.0, "b", int64(7), "x"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: []*executetest.Table{
|
||||
{
|
||||
KeyCols: []string{"t1"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "t1", Type: query.TString},
|
||||
{Label: "t2", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(2), 1.0, "a", "y"},
|
||||
{execute.Time(1), 2.0, "a", "x"},
|
||||
},
|
||||
},
|
||||
{
|
||||
KeyCols: []string{"t1"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "t1", Type: query.TString},
|
||||
{Label: "t2", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(2), 7.0, "b", "y"},
|
||||
{execute.Time(1), 4.0, "b", "x"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "fan out",
|
||||
spec: &functions.GroupProcedureSpec{
|
||||
|
|
Loading…
Reference in New Issue