feat(query): Created mapped/unmapped versions of append table and updated functions that use it (#751)
parent
d4753e2ed6
commit
559ef60a78
|
@ -72,7 +72,7 @@ func CopyTable(t query.Table, a *Allocator) query.Table {
|
|||
builder.AddCol(c)
|
||||
}
|
||||
|
||||
AppendTable(t, builder, colMap)
|
||||
AppendMappedTable(t, builder, colMap)
|
||||
// ColListTableBuilders do not error
|
||||
nb, _ := builder.Table()
|
||||
return nb
|
||||
|
@ -115,27 +115,48 @@ func AddNewCols(t query.Table, builder TableBuilder) []int {
|
|||
return colMap
|
||||
}
|
||||
|
||||
// AppendTable append data from table t onto builder.
|
||||
// AppendMappedTable appends data from table t onto builder.
|
||||
// The colMap is a map of builder column index to table column index.
|
||||
func AppendTable(t query.Table, builder TableBuilder, colMap []int) {
|
||||
func AppendMappedTable(t query.Table, builder TableBuilder, colMap []int) {
|
||||
if len(t.Cols()) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
t.Do(func(cr query.ColReader) error {
|
||||
AppendCols(cr, builder, colMap)
|
||||
AppendMappedCols(cr, builder, colMap)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// AppendCols appends all columns from cr onto builder.
|
||||
// AppendTable appends data from table t onto builder.
|
||||
// This function assumes builder and t have the same column schema.
|
||||
func AppendTable(t query.Table, builder TableBuilder) {
|
||||
if len(t.Cols()) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
t.Do(func(cr query.ColReader) error {
|
||||
AppendCols(cr, builder)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// AppendMappedCols appends all columns from cr onto builder.
|
||||
// The colMap is a map of builder column index to cr column index.
|
||||
func AppendCols(cr query.ColReader, builder TableBuilder, colMap []int) {
|
||||
func AppendMappedCols(cr query.ColReader, builder TableBuilder, colMap []int) {
|
||||
for j := range builder.Cols() {
|
||||
AppendCol(j, colMap[j], cr, builder)
|
||||
}
|
||||
}
|
||||
|
||||
// AppendCols appends all columns from cr onto builder.
|
||||
// This function assumes that builder and cr have the same column schema.
|
||||
func AppendCols(cr query.ColReader, builder TableBuilder) {
|
||||
for j := range builder.Cols() {
|
||||
AppendCol(j, j, cr, builder)
|
||||
}
|
||||
}
|
||||
|
||||
// AppendCol append a column from cr onto builder
|
||||
// The indexes bj and cj are builder and col reader indexes respectively.
|
||||
func AppendCol(bj, cj int, cr query.ColReader, builder TableBuilder) {
|
||||
|
|
|
@ -418,13 +418,8 @@ func (buf *streamBuffer) insert(table query.Table) {
|
|||
builder := execute.NewColListTableBuilder(table.Key(), buf.alloc)
|
||||
execute.AddTableCols(table, builder)
|
||||
|
||||
builderColumnsToTableColumns := make([]int, len(builder.Cols()))
|
||||
for i := range builder.Cols() {
|
||||
builderColumnsToTableColumns[i] = i
|
||||
}
|
||||
|
||||
// Append the input table to this builder
|
||||
execute.AppendTable(table, builder, builderColumnsToTableColumns)
|
||||
execute.AppendTable(table, builder)
|
||||
|
||||
// Insert this table into the buffer
|
||||
buf.data[table.Key()] = builder
|
||||
|
|
|
@ -128,8 +128,6 @@ type limitTransformation struct {
|
|||
cache execute.TableBuilderCache
|
||||
|
||||
n, offset int
|
||||
|
||||
colMap []int
|
||||
}
|
||||
|
||||
func NewLimitTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *LimitProcedureSpec) *limitTransformation {
|
||||
|
@ -152,16 +150,6 @@ func (t *limitTransformation) Process(id execute.DatasetID, tbl query.Table) err
|
|||
}
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
ncols := builder.NCols()
|
||||
if cap(t.colMap) < ncols {
|
||||
t.colMap = make([]int, ncols)
|
||||
for j := range t.colMap {
|
||||
t.colMap[j] = j
|
||||
}
|
||||
} else {
|
||||
t.colMap = t.colMap[:ncols]
|
||||
}
|
||||
|
||||
// AppendTable with limit
|
||||
n := t.n
|
||||
offset := t.offset
|
||||
|
@ -189,7 +177,7 @@ func (t *limitTransformation) Process(id execute.DatasetID, tbl query.Table) err
|
|||
start: start,
|
||||
stop: stop,
|
||||
}
|
||||
execute.AppendCols(lcr, builder, t.colMap)
|
||||
execute.AppendCols(lcr, builder)
|
||||
return nil
|
||||
})
|
||||
return nil
|
||||
|
|
|
@ -346,18 +346,14 @@ func NewExactPercentileSelectorTransformation(d execute.Dataset, cache execute.T
|
|||
}
|
||||
|
||||
func (t *ExactPercentileSelectorTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
copyTable := execute.NewColListTableBuilder(tbl.Key(), t.a)
|
||||
cols := tbl.Cols()
|
||||
colMap := make([]int, len(cols))
|
||||
for j, c := range cols {
|
||||
colMap[j] = j
|
||||
copyTable.AddCol(c)
|
||||
}
|
||||
valueIdx := execute.ColIdx(t.spec.Column, cols)
|
||||
valueIdx := execute.ColIdx(t.spec.Column, tbl.Cols())
|
||||
if valueIdx < 0 {
|
||||
return fmt.Errorf("no column %q exists", t.spec.Column)
|
||||
}
|
||||
execute.AppendTable(tbl, copyTable, colMap)
|
||||
|
||||
copyTable := execute.NewColListTableBuilder(tbl.Key(), t.a)
|
||||
execute.AddTableCols(tbl, copyTable)
|
||||
execute.AppendTable(tbl, copyTable)
|
||||
copyTable.Sort([]string{t.spec.Column}, false)
|
||||
|
||||
n := copyTable.RawTable().NRows()
|
||||
|
|
|
@ -254,11 +254,7 @@ func (t *rangeTransformation) Process(id execute.DatasetID, tbl query.Table) err
|
|||
}
|
||||
|
||||
if forwardTable {
|
||||
cols := make([]int, len(tbl.Cols()))
|
||||
for i := range cols {
|
||||
cols[i] = i
|
||||
}
|
||||
execute.AppendTable(tbl, builder, cols)
|
||||
execute.AppendTable(tbl, builder)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -113,8 +113,6 @@ type sortTransformation struct {
|
|||
|
||||
cols []string
|
||||
desc bool
|
||||
|
||||
colMap []int
|
||||
}
|
||||
|
||||
func NewSortTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *SortProcedureSpec) *sortTransformation {
|
||||
|
@ -144,18 +142,7 @@ func (t *sortTransformation) Process(id execute.DatasetID, tbl query.Table) erro
|
|||
return fmt.Errorf("sort found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
ncols := builder.NCols()
|
||||
if cap(t.colMap) < ncols {
|
||||
t.colMap = make([]int, ncols)
|
||||
for j := range t.colMap {
|
||||
t.colMap[j] = j
|
||||
}
|
||||
} else {
|
||||
t.colMap = t.colMap[:ncols]
|
||||
}
|
||||
|
||||
execute.AppendTable(tbl, builder, t.colMap)
|
||||
execute.AppendTable(tbl, builder)
|
||||
|
||||
builder.Sort(t.cols, t.desc)
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue