chore: Rename local vars for b to tbl
parent
80acc8d8c4
commit
7902652290
|
@ -664,11 +664,11 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) (int64, error)
|
|||
var lastCols []colMeta
|
||||
var lastEmpty bool
|
||||
|
||||
err := result.Tables().Do(func(b query.Table) error {
|
||||
err := result.Tables().Do(func(tbl query.Table) error {
|
||||
e.written = true
|
||||
// Update cols with table cols
|
||||
cols := metaCols
|
||||
for _, c := range b.Cols() {
|
||||
for _, c := range tbl.Cols() {
|
||||
cm := colMeta{ColMeta: c}
|
||||
if c.Type == query.TTime {
|
||||
cm.fmt = time.RFC3339Nano
|
||||
|
@ -680,13 +680,13 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) (int64, error)
|
|||
|
||||
schemaChanged := !equalCols(cols, lastCols)
|
||||
|
||||
if lastEmpty || schemaChanged || b.Empty() {
|
||||
if lastEmpty || schemaChanged || tbl.Empty() {
|
||||
if len(lastCols) > 0 {
|
||||
// Write out empty line if not first table
|
||||
writer.Write(nil)
|
||||
}
|
||||
|
||||
if err := writeSchema(writer, &e.c, row, cols, b.Empty(), b.Key(), result.Name(), tableIDStr); err != nil {
|
||||
if err := writeSchema(writer, &e.c, row, cols, tbl.Empty(), tbl.Key(), result.Name(), tableIDStr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -706,7 +706,7 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) (int64, error)
|
|||
}
|
||||
}
|
||||
|
||||
err := b.Do(func(cr query.ColReader) error {
|
||||
err := tbl.Do(func(cr query.ColReader) error {
|
||||
record := row[recordStartIdx:]
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
|
@ -729,7 +729,7 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) (int64, error)
|
|||
tableID++
|
||||
tableIDStr = strconv.Itoa(tableID)
|
||||
lastCols = cols
|
||||
lastEmpty = b.Empty()
|
||||
lastEmpty = tbl.Empty()
|
||||
writer.Flush()
|
||||
return writer.Error()
|
||||
})
|
||||
|
|
|
@ -483,8 +483,8 @@ func TestResultDecoder(t *testing.T) {
|
|||
got := &executetest.Result{
|
||||
Nm: result.Name(),
|
||||
}
|
||||
if err := result.Tables().Do(func(b query.Table) error {
|
||||
cb, err := executetest.ConvertTable(b)
|
||||
if err := result.Tables().Do(func(tbl query.Table) error {
|
||||
cb, err := executetest.ConvertTable(tbl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -129,13 +129,13 @@ func TablesFromCache(c execute.DataCache) (tables []*Table, err error) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
var b query.Table
|
||||
b, err = c.Table(key)
|
||||
var tbl query.Table
|
||||
tbl, err = c.Table(key)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var cb *Table
|
||||
cb, err = ConvertTable(b)
|
||||
cb, err = ConvertTable(tbl)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -144,11 +144,11 @@ func TablesFromCache(c execute.DataCache) (tables []*Table, err error) {
|
|||
return tables, nil
|
||||
}
|
||||
|
||||
func ConvertTable(b query.Table) (*Table, error) {
|
||||
key := b.Key()
|
||||
func ConvertTable(tbl query.Table) (*Table, error) {
|
||||
key := tbl.Key()
|
||||
blk := &Table{
|
||||
GroupKey: key,
|
||||
ColMeta: b.Cols(),
|
||||
ColMeta: tbl.Cols(),
|
||||
}
|
||||
|
||||
keyCols := key.Cols()
|
||||
|
@ -178,7 +178,7 @@ func ConvertTable(b query.Table) (*Table, error) {
|
|||
}
|
||||
}
|
||||
|
||||
err := b.Do(func(cr query.ColReader) error {
|
||||
err := tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
row := make([]interface{}, len(blk.ColMeta))
|
|
@ -360,8 +360,8 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
}
|
||||
got := make(map[string][]*executetest.Table, len(results))
|
||||
for name, r := range results {
|
||||
if err := r.Tables().Do(func(b query.Table) error {
|
||||
cb, err := executetest.ConvertTable(b)
|
||||
if err := r.Tables().Do(func(tbl query.Table) error {
|
||||
cb, err := executetest.ConvertTable(tbl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ const fixedWidthTimeFmt = "2006-01-02T15:04:05.000000000Z"
|
|||
|
||||
// Formatter writes a table to a Writer.
|
||||
type Formatter struct {
|
||||
b query.Table
|
||||
tbl query.Table
|
||||
widths []int
|
||||
maxWidth int
|
||||
newWidths []int
|
||||
|
@ -40,12 +40,12 @@ var eol = []byte{'\n'}
|
|||
|
||||
// NewFormatter creates a Formatter for a given table.
|
||||
// If opts is nil, the DefaultFormatOptions are used.
|
||||
func NewFormatter(b query.Table, opts *FormatOptions) *Formatter {
|
||||
func NewFormatter(tbl query.Table, opts *FormatOptions) *Formatter {
|
||||
if opts == nil {
|
||||
opts = DefaultFormatOptions()
|
||||
}
|
||||
return &Formatter{
|
||||
b: b,
|
||||
tbl: tbl,
|
||||
opts: *opts,
|
||||
}
|
||||
}
|
||||
|
@ -80,8 +80,8 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) {
|
|||
w := &writeToHelper{w: out}
|
||||
|
||||
// Sort cols
|
||||
cols := f.b.Cols()
|
||||
f.cols = newOrderedCols(cols, f.b.Key())
|
||||
cols := f.tbl.Cols()
|
||||
f.cols = newOrderedCols(cols, f.tbl.Key())
|
||||
sort.Sort(f.cols)
|
||||
|
||||
// Compute header widths
|
||||
|
@ -102,8 +102,8 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) {
|
|||
|
||||
// Write table header
|
||||
w.write([]byte("Table: keys: ["))
|
||||
labels := make([]string, len(f.b.Key().Cols()))
|
||||
for i, c := range f.b.Key().Cols() {
|
||||
labels := make([]string, len(f.tbl.Key().Cols()))
|
||||
for i, c := range f.tbl.Key().Cols() {
|
||||
labels[i] = c.Label
|
||||
}
|
||||
w.write([]byte(strings.Join(labels, ", ")))
|
||||
|
@ -117,7 +117,7 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) {
|
|||
|
||||
// Write rows
|
||||
r := 0
|
||||
w.err = f.b.Do(func(cr query.ColReader) error {
|
||||
w.err = f.tbl.Do(func(cr query.ColReader) error {
|
||||
if r == 0 {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
|
|
|
@ -42,10 +42,10 @@ func (s *result) RetractTable(DatasetID, query.GroupKey) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *result) Process(id DatasetID, b query.Table) error {
|
||||
func (s *result) Process(id DatasetID, tbl query.Table) error {
|
||||
select {
|
||||
case s.tables <- resultMessage{
|
||||
table: b,
|
||||
table: tbl,
|
||||
}:
|
||||
case <-s.aborted:
|
||||
}
|
||||
|
|
|
@ -84,12 +84,12 @@ func (t *selectorTransformation) Finish(id DatasetID, err error) {
|
|||
t.d.Finish(err)
|
||||
}
|
||||
|
||||
func (t *selectorTransformation) setupBuilder(b query.Table) (TableBuilder, int, error) {
|
||||
builder, new := t.cache.TableBuilder(b.Key())
|
||||
func (t *selectorTransformation) setupBuilder(tbl query.Table) (TableBuilder, int, error) {
|
||||
builder, new := t.cache.TableBuilder(tbl.Key())
|
||||
if !new {
|
||||
return nil, 0, fmt.Errorf("found duplicate table with key: %v", b.Key())
|
||||
return nil, 0, fmt.Errorf("found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
AddTableCols(b, builder)
|
||||
AddTableCols(tbl, builder)
|
||||
|
||||
cols := builder.Cols()
|
||||
valueIdx := ColIdx(t.config.Column, cols)
|
||||
|
@ -99,8 +99,8 @@ func (t *selectorTransformation) setupBuilder(b query.Table) (TableBuilder, int,
|
|||
return builder, valueIdx, nil
|
||||
}
|
||||
|
||||
func (t *indexSelectorTransformation) Process(id DatasetID, b query.Table) error {
|
||||
builder, valueIdx, err := t.setupBuilder(b)
|
||||
func (t *indexSelectorTransformation) Process(id DatasetID, tbl query.Table) error {
|
||||
builder, valueIdx, err := t.setupBuilder(tbl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ func (t *indexSelectorTransformation) Process(id DatasetID, b query.Table) error
|
|||
return fmt.Errorf("unsupported selector type %v", valueCol.Type)
|
||||
}
|
||||
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
switch valueCol.Type {
|
||||
case query.TBool:
|
||||
selected := s.(DoBoolIndexSelector).DoBool(cr.Bools(valueIdx))
|
||||
|
@ -146,8 +146,8 @@ func (t *indexSelectorTransformation) Process(id DatasetID, b query.Table) error
|
|||
})
|
||||
}
|
||||
|
||||
func (t *rowSelectorTransformation) Process(id DatasetID, b query.Table) error {
|
||||
builder, valueIdx, err := t.setupBuilder(b)
|
||||
func (t *rowSelectorTransformation) Process(id DatasetID, tbl query.Table) error {
|
||||
builder, valueIdx, err := t.setupBuilder(tbl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ func (t *rowSelectorTransformation) Process(id DatasetID, b query.Table) error {
|
|||
return fmt.Errorf("invalid use of function: %T has no implementation for type %v", t.selector, valueCol.Type)
|
||||
}
|
||||
|
||||
b.Do(func(cr query.ColReader) error {
|
||||
tbl.Do(func(cr query.ColReader) error {
|
||||
switch valueCol.Type {
|
||||
case query.TBool:
|
||||
rower.(DoBoolRowSelector).DoBool(cr.Bools(valueIdx), cr)
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
type Transformation interface {
|
||||
RetractTable(id DatasetID, key query.GroupKey) error
|
||||
Process(id DatasetID, b query.Table) error
|
||||
Process(id DatasetID, tbl query.Table) error
|
||||
UpdateWatermark(id DatasetID, t Time) error
|
||||
UpdateProcessingTime(id DatasetID, t Time) error
|
||||
Finish(id DatasetID, err error)
|
||||
|
|
|
@ -67,7 +67,7 @@ func (t *consecutiveTransport) RetractTable(id DatasetID, key query.GroupKey) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *consecutiveTransport) Process(id DatasetID, b query.Table) error {
|
||||
func (t *consecutiveTransport) Process(id DatasetID, tbl query.Table) error {
|
||||
select {
|
||||
case <-t.finished:
|
||||
return t.err()
|
||||
|
@ -75,7 +75,7 @@ func (t *consecutiveTransport) Process(id DatasetID, b query.Table) error {
|
|||
}
|
||||
t.pushMsg(&processMsg{
|
||||
srcMessage: srcMessage(id),
|
||||
table: b,
|
||||
table: tbl,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -155,13 +155,13 @@ func (t *CovarianceTransformation) RetractTable(id execute.DatasetID, key query.
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *CovarianceTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
cols := b.Cols()
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *CovarianceTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
cols := tbl.Cols()
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("covariance found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("covariance found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableKeyCols(b.Key(), builder)
|
||||
execute.AddTableKeyCols(tbl.Key(), builder)
|
||||
builder.AddCol(query.ColMeta{
|
||||
Label: t.spec.TimeDst,
|
||||
Type: query.TTime,
|
||||
|
@ -176,12 +176,12 @@ func (t *CovarianceTransformation) Process(id execute.DatasetID, b query.Table)
|
|||
if cols[xIdx].Type != cols[yIdx].Type {
|
||||
return errors.New("cannot compute the covariance between different types")
|
||||
}
|
||||
if err := execute.AppendAggregateTime(t.spec.TimeSrc, t.spec.TimeDst, b.Key(), builder); err != nil {
|
||||
if err := execute.AppendAggregateTime(t.spec.TimeSrc, t.spec.TimeDst, tbl.Key(), builder); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.reset()
|
||||
b.Do(func(cr query.ColReader) error {
|
||||
tbl.Do(func(cr query.ColReader) error {
|
||||
switch typ := cols[xIdx].Type; typ {
|
||||
case query.TFloat:
|
||||
t.DoFloat(cr.Floats(xIdx), cr.Floats(yIdx))
|
||||
|
@ -191,7 +191,7 @@ func (t *CovarianceTransformation) Process(id execute.DatasetID, b query.Table)
|
|||
return nil
|
||||
})
|
||||
|
||||
execute.AppendKeyValues(b.Key(), builder)
|
||||
execute.AppendKeyValues(tbl.Key(), builder)
|
||||
builder.AppendFloat(valueIdx, t.value())
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -110,14 +110,14 @@ func (t *cumulativeSumTransformation) RetractTable(id execute.DatasetID, key que
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *cumulativeSumTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *cumulativeSumTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("cumulative sum found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("cumulative sum found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
sumers := make([]*cumulativeSum, len(cols))
|
||||
for j, c := range cols {
|
||||
for _, label := range t.spec.Columns {
|
||||
|
@ -127,7 +127,7 @@ func (t *cumulativeSumTransformation) Process(id execute.DatasetID, b query.Tabl
|
|||
}
|
||||
}
|
||||
}
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for j, c := range cols {
|
||||
switch c.Type {
|
||||
|
|
|
@ -156,12 +156,12 @@ func (t *derivativeTransformation) RetractTable(id execute.DatasetID, key query.
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *derivativeTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *derivativeTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("derivative found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("derivative found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
derivatives := make([]*derivative, len(cols))
|
||||
timeIdx := -1
|
||||
for j, c := range cols {
|
||||
|
@ -192,7 +192,7 @@ func (t *derivativeTransformation) Process(id execute.DatasetID, b query.Table)
|
|||
|
||||
// We need to drop the first row since its derivative is undefined
|
||||
firstIdx := 1
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for j, c := range cols {
|
||||
d := derivatives[j]
|
||||
|
|
|
@ -133,12 +133,12 @@ func (t *differenceTransformation) RetractTable(id execute.DatasetID, key query.
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *differenceTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *differenceTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("difference found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("difference found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
differences := make([]*difference, len(cols))
|
||||
for j, c := range cols {
|
||||
found := false
|
||||
|
@ -169,7 +169,7 @@ func (t *differenceTransformation) Process(id execute.DatasetID, b query.Table)
|
|||
|
||||
// We need to drop the first row since its derivative is undefined
|
||||
firstIdx := 1
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for j, c := range cols {
|
||||
d := differences[j]
|
||||
|
|
|
@ -144,56 +144,56 @@ func (t *distinctTransformation) RetractTable(id execute.DatasetID, key query.Gr
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *distinctTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *distinctTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("distinct found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("distinct found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
|
||||
colIdx := execute.ColIdx(t.column, b.Cols())
|
||||
colIdx := execute.ColIdx(t.column, tbl.Cols())
|
||||
if colIdx < 0 {
|
||||
// doesn't exist in this table, so add an empty value
|
||||
execute.AddTableKeyCols(b.Key(), builder)
|
||||
execute.AddTableKeyCols(tbl.Key(), builder)
|
||||
colIdx = builder.AddCol(query.ColMeta{
|
||||
Label: execute.DefaultValueColLabel,
|
||||
Type: query.TString,
|
||||
})
|
||||
builder.AppendString(colIdx, "")
|
||||
execute.AppendKeyValues(b.Key(), builder)
|
||||
execute.AppendKeyValues(tbl.Key(), builder)
|
||||
// TODO: hack required to ensure data flows downstream
|
||||
return b.Do(func(query.ColReader) error {
|
||||
return tbl.Do(func(query.ColReader) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
col := b.Cols()[colIdx]
|
||||
col := tbl.Cols()[colIdx]
|
||||
|
||||
execute.AddTableKeyCols(b.Key(), builder)
|
||||
execute.AddTableKeyCols(tbl.Key(), builder)
|
||||
colIdx = builder.AddCol(query.ColMeta{
|
||||
Label: execute.DefaultValueColLabel,
|
||||
Type: col.Type,
|
||||
})
|
||||
|
||||
if b.Key().HasCol(t.column) {
|
||||
j := execute.ColIdx(t.column, b.Key().Cols())
|
||||
if tbl.Key().HasCol(t.column) {
|
||||
j := execute.ColIdx(t.column, tbl.Key().Cols())
|
||||
switch col.Type {
|
||||
case query.TBool:
|
||||
builder.AppendBool(colIdx, b.Key().ValueBool(j))
|
||||
builder.AppendBool(colIdx, tbl.Key().ValueBool(j))
|
||||
case query.TInt:
|
||||
builder.AppendInt(colIdx, b.Key().ValueInt(j))
|
||||
builder.AppendInt(colIdx, tbl.Key().ValueInt(j))
|
||||
case query.TUInt:
|
||||
builder.AppendUInt(colIdx, b.Key().ValueUInt(j))
|
||||
builder.AppendUInt(colIdx, tbl.Key().ValueUInt(j))
|
||||
case query.TFloat:
|
||||
builder.AppendFloat(colIdx, b.Key().ValueFloat(j))
|
||||
builder.AppendFloat(colIdx, tbl.Key().ValueFloat(j))
|
||||
case query.TString:
|
||||
builder.AppendString(colIdx, b.Key().ValueString(j))
|
||||
builder.AppendString(colIdx, tbl.Key().ValueString(j))
|
||||
case query.TTime:
|
||||
builder.AppendTime(colIdx, b.Key().ValueTime(j))
|
||||
builder.AppendTime(colIdx, tbl.Key().ValueTime(j))
|
||||
}
|
||||
|
||||
execute.AppendKeyValues(b.Key(), builder)
|
||||
execute.AppendKeyValues(tbl.Key(), builder)
|
||||
// TODO: hack required to ensure data flows downstream
|
||||
return b.Do(func(query.ColReader) error {
|
||||
return tbl.Do(func(query.ColReader) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -221,7 +221,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Table) er
|
|||
timeDistinct = make(map[execute.Time]bool)
|
||||
}
|
||||
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
// Check distinct
|
||||
|
@ -270,7 +270,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Table) er
|
|||
builder.AppendTime(colIdx, v)
|
||||
}
|
||||
|
||||
execute.AppendKeyValues(b.Key(), builder)
|
||||
execute.AppendKeyValues(tbl.Key(), builder)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -221,22 +221,22 @@ func (t *filterTransformation) RetractTable(id execute.DatasetID, key query.Grou
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *filterTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *filterTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("filter found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("filter found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
// Prepare the function for the column types.
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
if err := t.fn.Prepare(cols); err != nil {
|
||||
// TODO(nathanielc): Should we not fail the query for failed compilation?
|
||||
return err
|
||||
}
|
||||
|
||||
// Append only matching rows to table
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
if pass, err := t.fn.Eval(i, cr); err != nil {
|
||||
|
|
|
@ -147,14 +147,14 @@ func (c *CSVSource) Run(ctx context.Context) {
|
|||
var err error
|
||||
var max execute.Time
|
||||
maxSet := false
|
||||
err = c.data.Tables().Do(func(b query.Table) error {
|
||||
err = c.data.Tables().Do(func(tbl query.Table) error {
|
||||
for _, t := range c.ts {
|
||||
err := t.Process(c.id, b)
|
||||
err := t.Process(c.id, tbl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if idx := execute.ColIdx(execute.DefaultStopColLabel, b.Key().Cols()); idx >= 0 {
|
||||
if stop := b.Key().ValueTime(idx); !maxSet || stop > max {
|
||||
if idx := execute.ColIdx(execute.DefaultStopColLabel, tbl.Key().Cols()); idx >= 0 {
|
||||
if stop := tbl.Key().ValueTime(idx); !maxSet || stop > max {
|
||||
max = stop
|
||||
maxSet = true
|
||||
}
|
||||
|
|
|
@ -285,8 +285,8 @@ func (t *groupTransformation) RetractTable(id execute.DatasetID, key query.Group
|
|||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (t *groupTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
cols := b.Cols()
|
||||
func (t *groupTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
cols := tbl.Cols()
|
||||
on := make(map[string]bool, len(cols))
|
||||
if t.mode == GroupModeBy && len(t.keys) > 0 {
|
||||
for _, k := range t.keys {
|
||||
|
@ -303,13 +303,13 @@ func (t *groupTransformation) Process(id execute.DatasetID, b query.Table) error
|
|||
on[c.Label] = true
|
||||
}
|
||||
}
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
key := execute.GroupKeyForRowOn(i, cr, on)
|
||||
builder, created := t.cache.TableBuilder(key)
|
||||
if created {
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
}
|
||||
execute.AppendRecord(i, cr, builder)
|
||||
}
|
||||
|
|
|
@ -117,18 +117,18 @@ func (t *integralTransformation) RetractTable(id execute.DatasetID, key query.Gr
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *integralTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *integralTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("integral found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("integral found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
|
||||
execute.AddTableKeyCols(b.Key(), builder)
|
||||
execute.AddTableKeyCols(tbl.Key(), builder)
|
||||
builder.AddCol(query.ColMeta{
|
||||
Label: t.spec.TimeDst,
|
||||
Type: query.TTime,
|
||||
})
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
integrals := make([]*integral, len(cols))
|
||||
colMap := make([]int, len(cols))
|
||||
for j, c := range cols {
|
||||
|
@ -141,7 +141,7 @@ func (t *integralTransformation) Process(id execute.DatasetID, b query.Table) er
|
|||
}
|
||||
}
|
||||
|
||||
if err := execute.AppendAggregateTime(t.spec.TimeSrc, t.spec.TimeDst, b.Key(), builder); err != nil {
|
||||
if err := execute.AppendAggregateTime(t.spec.TimeSrc, t.spec.TimeDst, tbl.Key(), builder); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -149,7 +149,7 @@ func (t *integralTransformation) Process(id execute.DatasetID, b query.Table) er
|
|||
if timeIdx < 0 {
|
||||
return fmt.Errorf("no column %q exists", t.spec.TimeSrc)
|
||||
}
|
||||
err := b.Do(func(cr query.ColReader) error {
|
||||
err := tbl.Do(func(cr query.ColReader) error {
|
||||
for j, in := range integrals {
|
||||
if in == nil {
|
||||
continue
|
||||
|
@ -166,7 +166,7 @@ func (t *integralTransformation) Process(id execute.DatasetID, b query.Table) er
|
|||
return err
|
||||
}
|
||||
|
||||
execute.AppendKeyValues(b.Key(), builder)
|
||||
execute.AppendKeyValues(tbl.Key(), builder)
|
||||
for j, in := range integrals {
|
||||
if in == nil {
|
||||
continue
|
||||
|
|
|
@ -278,11 +278,11 @@ func (t *mergeJoinTransformation) RetractTable(id execute.DatasetID, key query.G
|
|||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (t *mergeJoinTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
func (t *mergeJoinTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
tables := t.cache.Tables(b.Key())
|
||||
tables := t.cache.Tables(tbl.Key())
|
||||
|
||||
var references []string
|
||||
var table execute.TableBuilder
|
||||
|
@ -299,20 +299,20 @@ func (t *mergeJoinTransformation) Process(id execute.DatasetID, b query.Table) e
|
|||
labels := unionStrs(t.keys, references)
|
||||
colMap := make([]int, len(labels))
|
||||
for _, label := range labels {
|
||||
tableIdx := execute.ColIdx(label, b.Cols())
|
||||
tableIdx := execute.ColIdx(label, tbl.Cols())
|
||||
if tableIdx < 0 {
|
||||
return fmt.Errorf("no column %q exists", label)
|
||||
}
|
||||
// Only add the column if it does not already exist
|
||||
builderIdx := execute.ColIdx(label, table.Cols())
|
||||
if builderIdx < 0 {
|
||||
c := b.Cols()[tableIdx]
|
||||
c := tbl.Cols()[tableIdx]
|
||||
builderIdx = table.AddCol(c)
|
||||
}
|
||||
colMap[builderIdx] = tableIdx
|
||||
}
|
||||
|
||||
execute.AppendTable(b, table, colMap)
|
||||
execute.AppendTable(tbl, table, colMap)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -155,16 +155,16 @@ func (t *keysTransformation) RetractTable(id execute.DatasetID, key query.GroupK
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *keysTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *keysTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("keys found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("keys found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
|
||||
execute.AddTableKeyCols(b.Key(), builder)
|
||||
execute.AddTableKeyCols(tbl.Key(), builder)
|
||||
colIdx := builder.AddCol(query.ColMeta{Label: execute.DefaultValueColLabel, Type: query.TString})
|
||||
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
sort.Slice(cols, func(i, j int) bool {
|
||||
return cols[i].Label < cols[j].Label
|
||||
})
|
||||
|
@ -175,7 +175,7 @@ func (t *keysTransformation) Process(id execute.DatasetID, b query.Table) error
|
|||
for i < len(cols) && j < len(t.except) {
|
||||
c := strings.Compare(cols[i].Label, t.except[j])
|
||||
if c < 0 {
|
||||
execute.AppendKeyValues(b.Key(), builder)
|
||||
execute.AppendKeyValues(tbl.Key(), builder)
|
||||
builder.AppendString(colIdx, cols[i].Label)
|
||||
i++
|
||||
} else if c > 0 {
|
||||
|
@ -189,12 +189,12 @@ func (t *keysTransformation) Process(id execute.DatasetID, b query.Table) error
|
|||
|
||||
// add remaining
|
||||
for ; i < len(cols); i++ {
|
||||
execute.AppendKeyValues(b.Key(), builder)
|
||||
execute.AppendKeyValues(tbl.Key(), builder)
|
||||
builder.AppendString(colIdx, cols[i].Label)
|
||||
}
|
||||
|
||||
// TODO: this is a hack
|
||||
return b.Do(func(query.ColReader) error {
|
||||
return tbl.Do(func(query.ColReader) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -144,12 +144,12 @@ func (t *limitTransformation) RetractTable(id execute.DatasetID, key query.Group
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *limitTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *limitTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("limit found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("limit found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
ncols := builder.NCols()
|
||||
if cap(t.colMap) < ncols {
|
||||
|
@ -164,7 +164,7 @@ func (t *limitTransformation) Process(id execute.DatasetID, b query.Table) error
|
|||
// AppendTable with limit
|
||||
n := t.n
|
||||
offset := t.offset
|
||||
b.Do(func(cr query.ColReader) error {
|
||||
tbl.Do(func(cr query.ColReader) error {
|
||||
if n <= 0 {
|
||||
// Returning an error terminates iteration
|
||||
return errors.New("finished")
|
||||
|
|
|
@ -132,9 +132,9 @@ func (t *mapTransformation) RetractTable(id execute.DatasetID, key query.GroupKe
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *mapTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
func (t *mapTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
// Prepare the functions for the column types.
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
err := t.fn.Prepare(cols)
|
||||
if err != nil {
|
||||
// TODO(nathanielc): Should we not fail the query for failed compilation?
|
||||
|
@ -149,12 +149,12 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Table) error {
|
|||
sort.Strings(keys)
|
||||
|
||||
// Determine on which cols to group
|
||||
on := make(map[string]bool, len(b.Key().Cols()))
|
||||
for _, c := range b.Key().Cols() {
|
||||
on := make(map[string]bool, len(tbl.Key().Cols()))
|
||||
for _, c := range tbl.Key().Cols() {
|
||||
on[c.Label] = t.mergeKey || execute.ContainsStr(keys, c.Label)
|
||||
}
|
||||
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
m, err := t.fn.Eval(i, cr)
|
||||
|
@ -166,11 +166,11 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Table) error {
|
|||
builder, created := t.cache.TableBuilder(key)
|
||||
if created {
|
||||
if t.mergeKey {
|
||||
execute.AddTableKeyCols(b.Key(), builder)
|
||||
execute.AddTableKeyCols(tbl.Key(), builder)
|
||||
}
|
||||
// Add columns from function in sorted order
|
||||
for _, k := range keys {
|
||||
if t.mergeKey && b.Key().HasCol(k) {
|
||||
if t.mergeKey && tbl.Key().HasCol(k) {
|
||||
continue
|
||||
}
|
||||
builder.AddCol(query.ColMeta{
|
||||
|
@ -182,8 +182,8 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Table) error {
|
|||
for j, c := range builder.Cols() {
|
||||
v, ok := m.Get(c.Label)
|
||||
if !ok {
|
||||
if idx := execute.ColIdx(c.Label, b.Key().Cols()); t.mergeKey && idx >= 0 {
|
||||
v = b.Key().Value(idx)
|
||||
if idx := execute.ColIdx(c.Label, tbl.Key().Cols()); t.mergeKey && idx >= 0 {
|
||||
v = tbl.Key().Value(idx)
|
||||
} else {
|
||||
// This should be unreachable
|
||||
return fmt.Errorf("could not find value for column %q", c.Label)
|
||||
|
|
|
@ -149,17 +149,17 @@ func (t *rangeTransformation) RetractTable(id execute.DatasetID, key query.Group
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *rangeTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *rangeTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("range found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("range found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(b, builder)
|
||||
cols := make([]int, len(b.Cols()))
|
||||
execute.AddTableCols(tbl, builder)
|
||||
cols := make([]int, len(tbl.Cols()))
|
||||
for i := range cols {
|
||||
cols[i] = i
|
||||
}
|
||||
execute.AppendTable(b, builder, cols)
|
||||
execute.AppendTable(tbl, builder, cols)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -120,8 +120,8 @@ func (t *setTransformation) RetractTable(id execute.DatasetID, key query.GroupKe
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *setTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
key := b.Key()
|
||||
func (t *setTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
key := tbl.Key()
|
||||
if idx := execute.ColIdx(t.key, key.Cols()); idx >= 0 {
|
||||
// Update key
|
||||
cols := make([]query.ColMeta, len(key.Cols()))
|
||||
|
@ -138,7 +138,7 @@ func (t *setTransformation) Process(id execute.DatasetID, b query.Table) error {
|
|||
}
|
||||
builder, created := t.cache.TableBuilder(key)
|
||||
if created {
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
if !execute.HasCol(t.key, builder.Cols()) {
|
||||
builder.AddCol(query.ColMeta{
|
||||
Label: t.key,
|
||||
|
@ -147,7 +147,7 @@ func (t *setTransformation) Process(id execute.DatasetID, b query.Table) error {
|
|||
}
|
||||
}
|
||||
idx := execute.ColIdx(t.key, builder.Cols())
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
for j := range cr.Cols() {
|
||||
if j == idx {
|
||||
continue
|
||||
|
|
|
@ -131,8 +131,8 @@ func (t *shiftTransformation) RetractTable(id execute.DatasetID, key query.Group
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *shiftTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
key := b.Key()
|
||||
func (t *shiftTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
key := tbl.Key()
|
||||
// Update key
|
||||
cols := make([]query.ColMeta, len(key.Cols()))
|
||||
vs := make([]values.Value, len(key.Cols()))
|
||||
|
@ -152,11 +152,11 @@ func (t *shiftTransformation) Process(id execute.DatasetID, b query.Table) error
|
|||
|
||||
builder, created := t.cache.TableBuilder(key)
|
||||
if !created {
|
||||
return fmt.Errorf("shift found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("shift found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
for j, c := range cr.Cols() {
|
||||
if execute.ContainsStr(t.columns, c.Label) {
|
||||
l := cr.Len()
|
||||
|
|
|
@ -129,8 +129,8 @@ func (t *sortTransformation) RetractTable(id execute.DatasetID, key query.GroupK
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *sortTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
key := b.Key()
|
||||
func (t *sortTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
key := tbl.Key()
|
||||
for _, label := range t.cols {
|
||||
if key.HasCol(label) {
|
||||
key = t.sortedKey(key)
|
||||
|
@ -140,9 +140,9 @@ func (t *sortTransformation) Process(id execute.DatasetID, b query.Table) error
|
|||
|
||||
builder, created := t.cache.TableBuilder(key)
|
||||
if !created {
|
||||
return fmt.Errorf("sort found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("sort found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
ncols := builder.NCols()
|
||||
if cap(t.colMap) < ncols {
|
||||
|
@ -154,7 +154,7 @@ func (t *sortTransformation) Process(id execute.DatasetID, b query.Table) error
|
|||
t.colMap = t.colMap[:ncols]
|
||||
}
|
||||
|
||||
execute.AppendTable(b, builder, t.colMap)
|
||||
execute.AppendTable(tbl, builder, t.colMap)
|
||||
|
||||
builder.Sort(t.cols, t.desc)
|
||||
return nil
|
||||
|
|
|
@ -209,15 +209,15 @@ func (t *stateTrackingTransformation) RetractTable(id execute.DatasetID, key que
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *stateTrackingTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *stateTrackingTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
// Prepare the functions for the column types.
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
err := t.fn.Prepare(cols)
|
||||
if err != nil {
|
||||
// TODO(nathanielc): Should we not fail the query for failed compilation?
|
||||
|
@ -247,12 +247,12 @@ func (t *stateTrackingTransformation) Process(id execute.DatasetID, b query.Tabl
|
|||
inState bool
|
||||
)
|
||||
|
||||
timeIdx := execute.ColIdx(t.timeCol, b.Cols())
|
||||
timeIdx := execute.ColIdx(t.timeCol, tbl.Cols())
|
||||
if timeIdx < 0 {
|
||||
return fmt.Errorf("no column %q exists", t.timeCol)
|
||||
}
|
||||
// Append modified rows
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
tm := cr.Times(timeIdx)[i]
|
||||
|
|
|
@ -107,9 +107,9 @@ func (s *source) run(ctx context.Context) error {
|
|||
|
||||
//TODO(nathanielc): Pass through context to actual network I/O.
|
||||
for tables, mark, ok := s.next(ctx, trace); ok; tables, mark, ok = s.next(ctx, trace) {
|
||||
err := tables.Do(func(b query.Table) error {
|
||||
err := tables.Do(func(tbl query.Table) error {
|
||||
for _, t := range s.ts {
|
||||
if err := t.Process(s.id, b); err != nil {
|
||||
if err := t.Process(s.id, tbl); err != nil {
|
||||
return err
|
||||
}
|
||||
//TODO(nathanielc): Also add mechanism to send UpdateProcessingTime calls, when no data is arriving.
|
||||
|
|
|
@ -306,13 +306,13 @@ type idxType struct {
|
|||
Type query.DataType
|
||||
}
|
||||
|
||||
func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
func (t *ToHTTPTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
pr, pw := io.Pipe() // TODO: replce the pipe with something faster
|
||||
m := &toHttpMetric{}
|
||||
e := protocol.NewEncoder(pw)
|
||||
e.FailOnFieldErr(true)
|
||||
e.SetFieldSortOrder(protocol.SortFields)
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
labels := make(map[string]idxType, len(cols))
|
||||
for i, col := range cols {
|
||||
labels[col.Label] = idxType{Idx: i, Type: col.Type}
|
||||
|
@ -334,7 +334,7 @@ func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Table) erro
|
|||
}
|
||||
|
||||
// check if each col is a tag or value and cache this value for the loop
|
||||
colMetadatas := b.Cols()
|
||||
colMetadatas := tbl.Cols()
|
||||
isTag := make([]bool, len(colMetadatas))
|
||||
isValue := make([]bool, len(colMetadatas))
|
||||
|
||||
|
@ -347,7 +347,7 @@ func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Table) erro
|
|||
wg.Add(1)
|
||||
go func() {
|
||||
m.name = t.spec.Spec.Name
|
||||
b.Do(func(er query.ColReader) error {
|
||||
tbl.Do(func(er query.ColReader) error {
|
||||
l := er.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
m.truncateTagsAndFields()
|
||||
|
|
|
@ -256,7 +256,7 @@ func (m *toKafkaMetric) Time() time.Time {
|
|||
return m.t
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Table) (err error) {
|
||||
func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl query.Table) (err error) {
|
||||
w := DefaultKafkaWriterFactory(kafka.WriterConfig{
|
||||
Brokers: t.spec.Spec.Brokers,
|
||||
Topic: t.spec.Spec.Topic,
|
||||
|
@ -283,7 +283,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Table) (er
|
|||
e := protocol.NewEncoder(pw)
|
||||
e.FailOnFieldErr(true)
|
||||
e.SetFieldSortOrder(protocol.SortFields)
|
||||
cols := b.Cols()
|
||||
cols := tbl.Cols()
|
||||
labels := make(map[string]idxType, len(cols))
|
||||
for i, col := range cols {
|
||||
labels[col.Label] = idxType{Idx: i, Type: col.Type}
|
||||
|
@ -302,7 +302,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Table) (er
|
|||
measurementNameCol = t.spec.Spec.NameColumn
|
||||
}
|
||||
// check if each col is a tag or value and cache this value for the loop
|
||||
colMetadatas := b.Cols()
|
||||
colMetadatas := tbl.Cols()
|
||||
isTag := make([]bool, len(colMetadatas))
|
||||
isValue := make([]bool, len(colMetadatas))
|
||||
for i, col := range colMetadatas {
|
||||
|
@ -313,7 +313,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Table) (er
|
|||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err = b.Do(func(er query.ColReader) error {
|
||||
err = tbl.Do(func(er query.ColReader) error {
|
||||
l := er.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
m.truncateTagsAndFields()
|
||||
|
|
|
@ -108,12 +108,12 @@ func (t *uniqueTransformation) RetractTable(id execute.DatasetID, key query.Grou
|
|||
return t.d.RetractTable(key)
|
||||
}
|
||||
|
||||
func (t *uniqueTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(b.Key())
|
||||
func (t *uniqueTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("unique found duplicate table with key: %v", b.Key())
|
||||
return fmt.Errorf("unique found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
execute.AddTableCols(b, builder)
|
||||
execute.AddTableCols(tbl, builder)
|
||||
|
||||
colIdx := execute.ColIdx(t.column, builder.Cols())
|
||||
if colIdx < 0 {
|
||||
|
@ -144,7 +144,7 @@ func (t *uniqueTransformation) Process(id execute.DatasetID, b query.Table) erro
|
|||
timeUnique = make(map[execute.Time]bool)
|
||||
}
|
||||
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
// Check unique
|
||||
|
|
|
@ -245,16 +245,16 @@ func (t *fixedWindowTransformation) RetractTable(id execute.DatasetID, key query
|
|||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Table) error {
|
||||
timeIdx := execute.ColIdx(t.timeCol, b.Cols())
|
||||
func (t *fixedWindowTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
timeIdx := execute.ColIdx(t.timeCol, tbl.Cols())
|
||||
|
||||
newCols := make([]query.ColMeta, 0, len(b.Cols())+2)
|
||||
keyCols := make([]query.ColMeta, 0, len(b.Cols())+2)
|
||||
keyColMap := make([]int, 0, len(b.Cols())+2)
|
||||
newCols := make([]query.ColMeta, 0, len(tbl.Cols())+2)
|
||||
keyCols := make([]query.ColMeta, 0, len(tbl.Cols())+2)
|
||||
keyColMap := make([]int, 0, len(tbl.Cols())+2)
|
||||
startColIdx := -1
|
||||
stopColIdx := -1
|
||||
for j, c := range b.Cols() {
|
||||
keyIdx := execute.ColIdx(c.Label, b.Key().Cols())
|
||||
for j, c := range tbl.Cols() {
|
||||
keyIdx := execute.ColIdx(c.Label, tbl.Key().Cols())
|
||||
keyed := keyIdx >= 0
|
||||
if c.Label == t.startColLabel {
|
||||
startColIdx = j
|
||||
|
@ -291,7 +291,7 @@ func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Table)
|
|||
keyColMap = append(keyColMap, len(keyColMap))
|
||||
}
|
||||
|
||||
return b.Do(func(cr query.ColReader) error {
|
||||
return tbl.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
tm := cr.Times(timeIdx)[i]
|
||||
|
@ -308,7 +308,7 @@ func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Table)
|
|||
case t.stopColLabel:
|
||||
vs[j] = values.NewTimeValue(bnds.Stop)
|
||||
default:
|
||||
vs[j] = b.Key().Value(keyColMap[j])
|
||||
vs[j] = tbl.Key().Value(keyColMap[j])
|
||||
}
|
||||
}
|
||||
key := execute.NewGroupKey(cols, vs)
|
||||
|
|
|
@ -44,17 +44,17 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) (
|
|||
tables := res.Tables()
|
||||
|
||||
result := Result{StatementID: id}
|
||||
if err := tables.Do(func(b query.Table) error {
|
||||
if err := tables.Do(func(tbl query.Table) error {
|
||||
var row Row
|
||||
|
||||
for j, c := range b.Key().Cols() {
|
||||
for j, c := range tbl.Key().Cols() {
|
||||
if c.Type != query.TString {
|
||||
// Skip any columns that aren't strings. They are extra ones that
|
||||
// flux includes by default like the start and end times that we do not
|
||||
// care about.
|
||||
continue
|
||||
}
|
||||
v := b.Key().Value(j).Str()
|
||||
v := tbl.Key().Value(j).Str()
|
||||
if c.Label == "_measurement" {
|
||||
row.Name = v
|
||||
} else if c.Label == "_field" {
|
||||
|
@ -74,10 +74,10 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) (
|
|||
// from the ordering given in the original query.
|
||||
resultColMap := map[string]int{}
|
||||
j := 1
|
||||
for _, c := range b.Cols() {
|
||||
for _, c := range tbl.Cols() {
|
||||
if c.Label == execute.DefaultTimeColLabel {
|
||||
resultColMap[c.Label] = 0
|
||||
} else if !b.Key().HasCol(c.Label) {
|
||||
} else if !tbl.Key().HasCol(c.Label) {
|
||||
resultColMap[c.Label] = j
|
||||
j++
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) (
|
|||
row.Columns[v] = k
|
||||
}
|
||||
|
||||
if err := b.Do(func(cr query.ColReader) error {
|
||||
if err := tbl.Do(func(cr query.ColReader) error {
|
||||
// Preallocate the number of rows for the response to make this section
|
||||
// of code easier to read. Find a time column which should exist
|
||||
// in the output.
|
||||
|
@ -101,7 +101,7 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) (
|
|||
}
|
||||
|
||||
j := 0
|
||||
for idx, c := range b.Cols() {
|
||||
for idx, c := range tbl.Cols() {
|
||||
if cr.Key().HasCol(c.Label) {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -222,10 +222,10 @@ func (r *REPL) doQuery(spec *query.Spec) error {
|
|||
|
||||
for _, name := range names {
|
||||
r := results[name]
|
||||
blocks := r.Tables()
|
||||
tables := r.Tables()
|
||||
fmt.Println("Result:", name)
|
||||
err := blocks.Do(func(b query.Table) error {
|
||||
_, err := execute.NewFormatter(b, nil).WriteTo(os.Stdout)
|
||||
err := tables.Do(func(tbl query.Table) error {
|
||||
_, err := execute.NewFormatter(tbl, nil).WriteTo(os.Stdout)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue