diff --git a/query/csv/result.go b/query/csv/result.go index 5dbc83c149..df234c869c 100644 --- a/query/csv/result.go +++ b/query/csv/result.go @@ -664,11 +664,11 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) (int64, error) var lastCols []colMeta var lastEmpty bool - err := result.Tables().Do(func(b query.Table) error { + err := result.Tables().Do(func(tbl query.Table) error { e.written = true // Update cols with table cols cols := metaCols - for _, c := range b.Cols() { + for _, c := range tbl.Cols() { cm := colMeta{ColMeta: c} if c.Type == query.TTime { cm.fmt = time.RFC3339Nano @@ -680,13 +680,13 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) (int64, error) schemaChanged := !equalCols(cols, lastCols) - if lastEmpty || schemaChanged || b.Empty() { + if lastEmpty || schemaChanged || tbl.Empty() { if len(lastCols) > 0 { // Write out empty line if not first table writer.Write(nil) } - if err := writeSchema(writer, &e.c, row, cols, b.Empty(), b.Key(), result.Name(), tableIDStr); err != nil { + if err := writeSchema(writer, &e.c, row, cols, tbl.Empty(), tbl.Key(), result.Name(), tableIDStr); err != nil { return err } } @@ -706,7 +706,7 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) (int64, error) } } - err := b.Do(func(cr query.ColReader) error { + err := tbl.Do(func(cr query.ColReader) error { record := row[recordStartIdx:] l := cr.Len() for i := 0; i < l; i++ { @@ -729,7 +729,7 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) (int64, error) tableID++ tableIDStr = strconv.Itoa(tableID) lastCols = cols - lastEmpty = b.Empty() + lastEmpty = tbl.Empty() writer.Flush() return writer.Error() }) diff --git a/query/csv/result_test.go b/query/csv/result_test.go index d99dbf0bac..798091c483 100644 --- a/query/csv/result_test.go +++ b/query/csv/result_test.go @@ -483,8 +483,8 @@ func TestResultDecoder(t *testing.T) { got := &executetest.Result{ Nm: result.Name(), } - if err := result.Tables().Do(func(b query.Table) error { - cb, err := executetest.ConvertTable(b) + if err := result.Tables().Do(func(tbl query.Table) error { + cb, err := executetest.ConvertTable(tbl) if err != nil { return err } diff --git a/query/execute/executetest/block.go b/query/execute/executetest/table.go similarity index 95% rename from query/execute/executetest/block.go rename to query/execute/executetest/table.go index fd31e229bf..e79d32c029 100644 --- a/query/execute/executetest/block.go +++ b/query/execute/executetest/table.go @@ -129,13 +129,13 @@ func TablesFromCache(c execute.DataCache) (tables []*Table, err error) { if err != nil { return } - var b query.Table - b, err = c.Table(key) + var tbl query.Table + tbl, err = c.Table(key) if err != nil { return } var cb *Table - cb, err = ConvertTable(b) + cb, err = ConvertTable(tbl) if err != nil { return } @@ -144,11 +144,11 @@ func TablesFromCache(c execute.DataCache) (tables []*Table, err error) { return tables, nil } -func ConvertTable(b query.Table) (*Table, error) { - key := b.Key() +func ConvertTable(tbl query.Table) (*Table, error) { + key := tbl.Key() blk := &Table{ GroupKey: key, - ColMeta: b.Cols(), + ColMeta: tbl.Cols(), } keyCols := key.Cols() @@ -178,7 +178,7 @@ func ConvertTable(b query.Table) (*Table, error) { } } - err := b.Do(func(cr query.ColReader) error { + err := tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { row := make([]interface{}, len(blk.ColMeta)) diff --git a/query/execute/executor_test.go b/query/execute/executor_test.go index e30cd32a99..89e27346eb 100644 --- a/query/execute/executor_test.go +++ b/query/execute/executor_test.go @@ -360,8 +360,8 @@ func TestExecutor_Execute(t *testing.T) { } got := make(map[string][]*executetest.Table, len(results)) for name, r := range results { - if err := r.Tables().Do(func(b query.Table) error { - cb, err := executetest.ConvertTable(b) + if err := r.Tables().Do(func(tbl query.Table) error { + cb, err := executetest.ConvertTable(tbl) if err != nil { return err } diff --git a/query/execute/format.go b/query/execute/format.go index b3be94df40..a52a126b06 100644 --- a/query/execute/format.go +++ b/query/execute/format.go @@ -13,7 +13,7 @@ const fixedWidthTimeFmt = "2006-01-02T15:04:05.000000000Z" // Formatter writes a table to a Writer. type Formatter struct { - b query.Table + tbl query.Table widths []int maxWidth int newWidths []int @@ -40,12 +40,12 @@ var eol = []byte{'\n'} // NewFormatter creates a Formatter for a given table. // If opts is nil, the DefaultFormatOptions are used. -func NewFormatter(b query.Table, opts *FormatOptions) *Formatter { +func NewFormatter(tbl query.Table, opts *FormatOptions) *Formatter { if opts == nil { opts = DefaultFormatOptions() } return &Formatter{ - b: b, + tbl: tbl, opts: *opts, } } @@ -80,8 +80,8 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) { w := &writeToHelper{w: out} // Sort cols - cols := f.b.Cols() - f.cols = newOrderedCols(cols, f.b.Key()) + cols := f.tbl.Cols() + f.cols = newOrderedCols(cols, f.tbl.Key()) sort.Sort(f.cols) // Compute header widths @@ -102,8 +102,8 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) { // Write table header w.write([]byte("Table: keys: [")) - labels := make([]string, len(f.b.Key().Cols())) - for i, c := range f.b.Key().Cols() { + labels := make([]string, len(f.tbl.Key().Cols())) + for i, c := range f.tbl.Key().Cols() { labels[i] = c.Label } w.write([]byte(strings.Join(labels, ", "))) @@ -117,7 +117,7 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) { // Write rows r := 0 - w.err = f.b.Do(func(cr query.ColReader) error { + w.err = f.tbl.Do(func(cr query.ColReader) error { if r == 0 { l := cr.Len() for i := 0; i < l; i++ { diff --git a/query/execute/result.go b/query/execute/result.go index 5d91b6c683..00cd9a89af 100644 --- a/query/execute/result.go +++ b/query/execute/result.go @@ -42,10 +42,10 @@ func (s *result) RetractTable(DatasetID, query.GroupKey) error { return nil } -func (s *result) Process(id DatasetID, b query.Table) error { +func (s *result) Process(id DatasetID, tbl query.Table) error { select { case s.tables <- resultMessage{ - table: b, + table: tbl, }: case <-s.aborted: } diff --git a/query/execute/selector.go b/query/execute/selector.go index 255b9131ef..d6c3e4a17e 100644 --- a/query/execute/selector.go +++ b/query/execute/selector.go @@ -84,12 +84,12 @@ func (t *selectorTransformation) Finish(id DatasetID, err error) { t.d.Finish(err) } -func (t *selectorTransformation) setupBuilder(b query.Table) (TableBuilder, int, error) { - builder, new := t.cache.TableBuilder(b.Key()) +func (t *selectorTransformation) setupBuilder(tbl query.Table) (TableBuilder, int, error) { + builder, new := t.cache.TableBuilder(tbl.Key()) if !new { - return nil, 0, fmt.Errorf("found duplicate table with key: %v", b.Key()) + return nil, 0, fmt.Errorf("found duplicate table with key: %v", tbl.Key()) } - AddTableCols(b, builder) + AddTableCols(tbl, builder) cols := builder.Cols() valueIdx := ColIdx(t.config.Column, cols) @@ -99,8 +99,8 @@ func (t *selectorTransformation) setupBuilder(b query.Table) (TableBuilder, int, return builder, valueIdx, nil } -func (t *indexSelectorTransformation) Process(id DatasetID, b query.Table) error { - builder, valueIdx, err := t.setupBuilder(b) +func (t *indexSelectorTransformation) Process(id DatasetID, tbl query.Table) error { + builder, valueIdx, err := t.setupBuilder(tbl) if err != nil { return err } @@ -122,7 +122,7 @@ func (t *indexSelectorTransformation) Process(id DatasetID, b query.Table) error return fmt.Errorf("unsupported selector type %v", valueCol.Type) } - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { switch valueCol.Type { case query.TBool: selected := s.(DoBoolIndexSelector).DoBool(cr.Bools(valueIdx)) @@ -146,8 +146,8 @@ func (t *indexSelectorTransformation) Process(id DatasetID, b query.Table) error }) } -func (t *rowSelectorTransformation) Process(id DatasetID, b query.Table) error { - builder, valueIdx, err := t.setupBuilder(b) +func (t *rowSelectorTransformation) Process(id DatasetID, tbl query.Table) error { + builder, valueIdx, err := t.setupBuilder(tbl) if err != nil { return err } @@ -177,7 +177,7 @@ func (t *rowSelectorTransformation) Process(id DatasetID, b query.Table) error { return fmt.Errorf("invalid use of function: %T has no implementation for type %v", t.selector, valueCol.Type) } - b.Do(func(cr query.ColReader) error { + tbl.Do(func(cr query.ColReader) error { switch valueCol.Type { case query.TBool: rower.(DoBoolRowSelector).DoBool(cr.Bools(valueIdx), cr) diff --git a/query/execute/block.go b/query/execute/table.go similarity index 100% rename from query/execute/block.go rename to query/execute/table.go diff --git a/query/execute/transformation.go b/query/execute/transformation.go index 260f61a7cb..e4e78f74b5 100644 --- a/query/execute/transformation.go +++ b/query/execute/transformation.go @@ -10,7 +10,7 @@ import ( type Transformation interface { RetractTable(id DatasetID, key query.GroupKey) error - Process(id DatasetID, b query.Table) error + Process(id DatasetID, tbl query.Table) error UpdateWatermark(id DatasetID, t Time) error UpdateProcessingTime(id DatasetID, t Time) error Finish(id DatasetID, err error) diff --git a/query/execute/transport.go b/query/execute/transport.go index 82e2a9c45e..7af07be110 100644 --- a/query/execute/transport.go +++ b/query/execute/transport.go @@ -67,7 +67,7 @@ func (t *consecutiveTransport) RetractTable(id DatasetID, key query.GroupKey) er return nil } -func (t *consecutiveTransport) Process(id DatasetID, b query.Table) error { +func (t *consecutiveTransport) Process(id DatasetID, tbl query.Table) error { select { case <-t.finished: return t.err() @@ -75,7 +75,7 @@ func (t *consecutiveTransport) Process(id DatasetID, b query.Table) error { } t.pushMsg(&processMsg{ srcMessage: srcMessage(id), - table: b, + table: tbl, }) return nil } diff --git a/query/functions/covariance.go b/query/functions/covariance.go index 064636fa68..f1e9509532 100644 --- a/query/functions/covariance.go +++ b/query/functions/covariance.go @@ -155,13 +155,13 @@ func (t *CovarianceTransformation) RetractTable(id execute.DatasetID, key query. return t.d.RetractTable(key) } -func (t *CovarianceTransformation) Process(id execute.DatasetID, b query.Table) error { - cols := b.Cols() - builder, created := t.cache.TableBuilder(b.Key()) +func (t *CovarianceTransformation) Process(id execute.DatasetID, tbl query.Table) error { + cols := tbl.Cols() + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("covariance found duplicate table with key: %v", b.Key()) + return fmt.Errorf("covariance found duplicate table with key: %v", tbl.Key()) } - execute.AddTableKeyCols(b.Key(), builder) + execute.AddTableKeyCols(tbl.Key(), builder) builder.AddCol(query.ColMeta{ Label: t.spec.TimeDst, Type: query.TTime, @@ -176,12 +176,12 @@ func (t *CovarianceTransformation) Process(id execute.DatasetID, b query.Table) if cols[xIdx].Type != cols[yIdx].Type { return errors.New("cannot compute the covariance between different types") } - if err := execute.AppendAggregateTime(t.spec.TimeSrc, t.spec.TimeDst, b.Key(), builder); err != nil { + if err := execute.AppendAggregateTime(t.spec.TimeSrc, t.spec.TimeDst, tbl.Key(), builder); err != nil { return err } t.reset() - b.Do(func(cr query.ColReader) error { + tbl.Do(func(cr query.ColReader) error { switch typ := cols[xIdx].Type; typ { case query.TFloat: t.DoFloat(cr.Floats(xIdx), cr.Floats(yIdx)) @@ -191,7 +191,7 @@ func (t *CovarianceTransformation) Process(id execute.DatasetID, b query.Table) return nil }) - execute.AppendKeyValues(b.Key(), builder) + execute.AppendKeyValues(tbl.Key(), builder) builder.AppendFloat(valueIdx, t.value()) return nil } diff --git a/query/functions/cumulative_sum.go b/query/functions/cumulative_sum.go index 9aa2228f56..2641d49e37 100644 --- a/query/functions/cumulative_sum.go +++ b/query/functions/cumulative_sum.go @@ -110,14 +110,14 @@ func (t *cumulativeSumTransformation) RetractTable(id execute.DatasetID, key que return t.d.RetractTable(key) } -func (t *cumulativeSumTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *cumulativeSumTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("cumulative sum found duplicate table with key: %v", b.Key()) + return fmt.Errorf("cumulative sum found duplicate table with key: %v", tbl.Key()) } - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) - cols := b.Cols() + cols := tbl.Cols() sumers := make([]*cumulativeSum, len(cols)) for j, c := range cols { for _, label := range t.spec.Columns { @@ -127,7 +127,7 @@ func (t *cumulativeSumTransformation) Process(id execute.DatasetID, b query.Tabl } } } - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for j, c := range cols { switch c.Type { diff --git a/query/functions/derivative.go b/query/functions/derivative.go index 86bbc5091f..203e2c7df3 100644 --- a/query/functions/derivative.go +++ b/query/functions/derivative.go @@ -156,12 +156,12 @@ func (t *derivativeTransformation) RetractTable(id execute.DatasetID, key query. return t.d.RetractTable(key) } -func (t *derivativeTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *derivativeTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("derivative found duplicate table with key: %v", b.Key()) + return fmt.Errorf("derivative found duplicate table with key: %v", tbl.Key()) } - cols := b.Cols() + cols := tbl.Cols() derivatives := make([]*derivative, len(cols)) timeIdx := -1 for j, c := range cols { @@ -192,7 +192,7 @@ func (t *derivativeTransformation) Process(id execute.DatasetID, b query.Table) // We need to drop the first row since its derivative is undefined firstIdx := 1 - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for j, c := range cols { d := derivatives[j] diff --git a/query/functions/difference.go b/query/functions/difference.go index 37ac272c08..ba65d83bb9 100644 --- a/query/functions/difference.go +++ b/query/functions/difference.go @@ -133,12 +133,12 @@ func (t *differenceTransformation) RetractTable(id execute.DatasetID, key query. return t.d.RetractTable(key) } -func (t *differenceTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *differenceTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("difference found duplicate table with key: %v", b.Key()) + return fmt.Errorf("difference found duplicate table with key: %v", tbl.Key()) } - cols := b.Cols() + cols := tbl.Cols() differences := make([]*difference, len(cols)) for j, c := range cols { found := false @@ -169,7 +169,7 @@ func (t *differenceTransformation) Process(id execute.DatasetID, b query.Table) // We need to drop the first row since its derivative is undefined firstIdx := 1 - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for j, c := range cols { d := differences[j] diff --git a/query/functions/distinct.go b/query/functions/distinct.go index 8b1caca793..4bdb99a133 100644 --- a/query/functions/distinct.go +++ b/query/functions/distinct.go @@ -144,56 +144,56 @@ func (t *distinctTransformation) RetractTable(id execute.DatasetID, key query.Gr return t.d.RetractTable(key) } -func (t *distinctTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *distinctTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("distinct found duplicate table with key: %v", b.Key()) + return fmt.Errorf("distinct found duplicate table with key: %v", tbl.Key()) } - colIdx := execute.ColIdx(t.column, b.Cols()) + colIdx := execute.ColIdx(t.column, tbl.Cols()) if colIdx < 0 { // doesn't exist in this table, so add an empty value - execute.AddTableKeyCols(b.Key(), builder) + execute.AddTableKeyCols(tbl.Key(), builder) colIdx = builder.AddCol(query.ColMeta{ Label: execute.DefaultValueColLabel, Type: query.TString, }) builder.AppendString(colIdx, "") - execute.AppendKeyValues(b.Key(), builder) + execute.AppendKeyValues(tbl.Key(), builder) // TODO: hack required to ensure data flows downstream - return b.Do(func(query.ColReader) error { + return tbl.Do(func(query.ColReader) error { return nil }) } - col := b.Cols()[colIdx] + col := tbl.Cols()[colIdx] - execute.AddTableKeyCols(b.Key(), builder) + execute.AddTableKeyCols(tbl.Key(), builder) colIdx = builder.AddCol(query.ColMeta{ Label: execute.DefaultValueColLabel, Type: col.Type, }) - if b.Key().HasCol(t.column) { - j := execute.ColIdx(t.column, b.Key().Cols()) + if tbl.Key().HasCol(t.column) { + j := execute.ColIdx(t.column, tbl.Key().Cols()) switch col.Type { case query.TBool: - builder.AppendBool(colIdx, b.Key().ValueBool(j)) + builder.AppendBool(colIdx, tbl.Key().ValueBool(j)) case query.TInt: - builder.AppendInt(colIdx, b.Key().ValueInt(j)) + builder.AppendInt(colIdx, tbl.Key().ValueInt(j)) case query.TUInt: - builder.AppendUInt(colIdx, b.Key().ValueUInt(j)) + builder.AppendUInt(colIdx, tbl.Key().ValueUInt(j)) case query.TFloat: - builder.AppendFloat(colIdx, b.Key().ValueFloat(j)) + builder.AppendFloat(colIdx, tbl.Key().ValueFloat(j)) case query.TString: - builder.AppendString(colIdx, b.Key().ValueString(j)) + builder.AppendString(colIdx, tbl.Key().ValueString(j)) case query.TTime: - builder.AppendTime(colIdx, b.Key().ValueTime(j)) + builder.AppendTime(colIdx, tbl.Key().ValueTime(j)) } - execute.AppendKeyValues(b.Key(), builder) + execute.AppendKeyValues(tbl.Key(), builder) // TODO: hack required to ensure data flows downstream - return b.Do(func(query.ColReader) error { + return tbl.Do(func(query.ColReader) error { return nil }) } @@ -221,7 +221,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Table) er timeDistinct = make(map[execute.Time]bool) } - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { // Check distinct @@ -270,7 +270,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Table) er builder.AppendTime(colIdx, v) } - execute.AppendKeyValues(b.Key(), builder) + execute.AppendKeyValues(tbl.Key(), builder) } return nil }) diff --git a/query/functions/filter.go b/query/functions/filter.go index 9928936b35..a9ed0712ea 100644 --- a/query/functions/filter.go +++ b/query/functions/filter.go @@ -221,22 +221,22 @@ func (t *filterTransformation) RetractTable(id execute.DatasetID, key query.Grou return t.d.RetractTable(key) } -func (t *filterTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *filterTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("filter found duplicate table with key: %v", b.Key()) + return fmt.Errorf("filter found duplicate table with key: %v", tbl.Key()) } - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) // Prepare the function for the column types. - cols := b.Cols() + cols := tbl.Cols() if err := t.fn.Prepare(cols); err != nil { // TODO(nathanielc): Should we not fail the query for failed compilation? return err } // Append only matching rows to table - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { if pass, err := t.fn.Eval(i, cr); err != nil { diff --git a/query/functions/from_csv.go b/query/functions/from_csv.go index 1c12e0b455..c9084f83de 100644 --- a/query/functions/from_csv.go +++ b/query/functions/from_csv.go @@ -147,14 +147,14 @@ func (c *CSVSource) Run(ctx context.Context) { var err error var max execute.Time maxSet := false - err = c.data.Tables().Do(func(b query.Table) error { + err = c.data.Tables().Do(func(tbl query.Table) error { for _, t := range c.ts { - err := t.Process(c.id, b) + err := t.Process(c.id, tbl) if err != nil { return err } - if idx := execute.ColIdx(execute.DefaultStopColLabel, b.Key().Cols()); idx >= 0 { - if stop := b.Key().ValueTime(idx); !maxSet || stop > max { + if idx := execute.ColIdx(execute.DefaultStopColLabel, tbl.Key().Cols()); idx >= 0 { + if stop := tbl.Key().ValueTime(idx); !maxSet || stop > max { max = stop maxSet = true } diff --git a/query/functions/group.go b/query/functions/group.go index fd6fcd3fe6..ee61aba5cc 100644 --- a/query/functions/group.go +++ b/query/functions/group.go @@ -285,8 +285,8 @@ func (t *groupTransformation) RetractTable(id execute.DatasetID, key query.Group panic("not implemented") } -func (t *groupTransformation) Process(id execute.DatasetID, b query.Table) error { - cols := b.Cols() +func (t *groupTransformation) Process(id execute.DatasetID, tbl query.Table) error { + cols := tbl.Cols() on := make(map[string]bool, len(cols)) if t.mode == GroupModeBy && len(t.keys) > 0 { for _, k := range t.keys { @@ -303,13 +303,13 @@ func (t *groupTransformation) Process(id execute.DatasetID, b query.Table) error on[c.Label] = true } } - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { key := execute.GroupKeyForRowOn(i, cr, on) builder, created := t.cache.TableBuilder(key) if created { - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) } execute.AppendRecord(i, cr, builder) } diff --git a/query/functions/integral.go b/query/functions/integral.go index f71c0227f7..d5ce0b69e0 100644 --- a/query/functions/integral.go +++ b/query/functions/integral.go @@ -117,18 +117,18 @@ func (t *integralTransformation) RetractTable(id execute.DatasetID, key query.Gr return t.d.RetractTable(key) } -func (t *integralTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *integralTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("integral found duplicate table with key: %v", b.Key()) + return fmt.Errorf("integral found duplicate table with key: %v", tbl.Key()) } - execute.AddTableKeyCols(b.Key(), builder) + execute.AddTableKeyCols(tbl.Key(), builder) builder.AddCol(query.ColMeta{ Label: t.spec.TimeDst, Type: query.TTime, }) - cols := b.Cols() + cols := tbl.Cols() integrals := make([]*integral, len(cols)) colMap := make([]int, len(cols)) for j, c := range cols { @@ -141,7 +141,7 @@ func (t *integralTransformation) Process(id execute.DatasetID, b query.Table) er } } - if err := execute.AppendAggregateTime(t.spec.TimeSrc, t.spec.TimeDst, b.Key(), builder); err != nil { + if err := execute.AppendAggregateTime(t.spec.TimeSrc, t.spec.TimeDst, tbl.Key(), builder); err != nil { return err } @@ -149,7 +149,7 @@ func (t *integralTransformation) Process(id execute.DatasetID, b query.Table) er if timeIdx < 0 { return fmt.Errorf("no column %q exists", t.spec.TimeSrc) } - err := b.Do(func(cr query.ColReader) error { + err := tbl.Do(func(cr query.ColReader) error { for j, in := range integrals { if in == nil { continue @@ -166,7 +166,7 @@ func (t *integralTransformation) Process(id execute.DatasetID, b query.Table) er return err } - execute.AppendKeyValues(b.Key(), builder) + execute.AppendKeyValues(tbl.Key(), builder) for j, in := range integrals { if in == nil { continue diff --git a/query/functions/join.go b/query/functions/join.go index 55092806f6..ec93c637ff 100644 --- a/query/functions/join.go +++ b/query/functions/join.go @@ -278,11 +278,11 @@ func (t *mergeJoinTransformation) RetractTable(id execute.DatasetID, key query.G panic("not implemented") } -func (t *mergeJoinTransformation) Process(id execute.DatasetID, b query.Table) error { +func (t *mergeJoinTransformation) Process(id execute.DatasetID, tbl query.Table) error { t.mu.Lock() defer t.mu.Unlock() - tables := t.cache.Tables(b.Key()) + tables := t.cache.Tables(tbl.Key()) var references []string var table execute.TableBuilder @@ -299,20 +299,20 @@ func (t *mergeJoinTransformation) Process(id execute.DatasetID, b query.Table) e labels := unionStrs(t.keys, references) colMap := make([]int, len(labels)) for _, label := range labels { - tableIdx := execute.ColIdx(label, b.Cols()) + tableIdx := execute.ColIdx(label, tbl.Cols()) if tableIdx < 0 { return fmt.Errorf("no column %q exists", label) } // Only add the column if it does not already exist builderIdx := execute.ColIdx(label, table.Cols()) if builderIdx < 0 { - c := b.Cols()[tableIdx] + c := tbl.Cols()[tableIdx] builderIdx = table.AddCol(c) } colMap[builderIdx] = tableIdx } - execute.AppendTable(b, table, colMap) + execute.AppendTable(tbl, table, colMap) return nil } diff --git a/query/functions/keys.go b/query/functions/keys.go index 5ac1bf66a0..b686e56458 100644 --- a/query/functions/keys.go +++ b/query/functions/keys.go @@ -155,16 +155,16 @@ func (t *keysTransformation) RetractTable(id execute.DatasetID, key query.GroupK return t.d.RetractTable(key) } -func (t *keysTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *keysTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("keys found duplicate table with key: %v", b.Key()) + return fmt.Errorf("keys found duplicate table with key: %v", tbl.Key()) } - execute.AddTableKeyCols(b.Key(), builder) + execute.AddTableKeyCols(tbl.Key(), builder) colIdx := builder.AddCol(query.ColMeta{Label: execute.DefaultValueColLabel, Type: query.TString}) - cols := b.Cols() + cols := tbl.Cols() sort.Slice(cols, func(i, j int) bool { return cols[i].Label < cols[j].Label }) @@ -175,7 +175,7 @@ func (t *keysTransformation) Process(id execute.DatasetID, b query.Table) error for i < len(cols) && j < len(t.except) { c := strings.Compare(cols[i].Label, t.except[j]) if c < 0 { - execute.AppendKeyValues(b.Key(), builder) + execute.AppendKeyValues(tbl.Key(), builder) builder.AppendString(colIdx, cols[i].Label) i++ } else if c > 0 { @@ -189,12 +189,12 @@ func (t *keysTransformation) Process(id execute.DatasetID, b query.Table) error // add remaining for ; i < len(cols); i++ { - execute.AppendKeyValues(b.Key(), builder) + execute.AppendKeyValues(tbl.Key(), builder) builder.AppendString(colIdx, cols[i].Label) } // TODO: this is a hack - return b.Do(func(query.ColReader) error { + return tbl.Do(func(query.ColReader) error { return nil }) } diff --git a/query/functions/limit.go b/query/functions/limit.go index 982c7b6bb9..59c8d23602 100644 --- a/query/functions/limit.go +++ b/query/functions/limit.go @@ -144,12 +144,12 @@ func (t *limitTransformation) RetractTable(id execute.DatasetID, key query.Group return t.d.RetractTable(key) } -func (t *limitTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *limitTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("limit found duplicate table with key: %v", b.Key()) + return fmt.Errorf("limit found duplicate table with key: %v", tbl.Key()) } - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) ncols := builder.NCols() if cap(t.colMap) < ncols { @@ -164,7 +164,7 @@ func (t *limitTransformation) Process(id execute.DatasetID, b query.Table) error // AppendTable with limit n := t.n offset := t.offset - b.Do(func(cr query.ColReader) error { + tbl.Do(func(cr query.ColReader) error { if n <= 0 { // Returning an error terminates iteration return errors.New("finished") diff --git a/query/functions/map.go b/query/functions/map.go index e33792d699..98364348e8 100644 --- a/query/functions/map.go +++ b/query/functions/map.go @@ -132,9 +132,9 @@ func (t *mapTransformation) RetractTable(id execute.DatasetID, key query.GroupKe return t.d.RetractTable(key) } -func (t *mapTransformation) Process(id execute.DatasetID, b query.Table) error { +func (t *mapTransformation) Process(id execute.DatasetID, tbl query.Table) error { // Prepare the functions for the column types. - cols := b.Cols() + cols := tbl.Cols() err := t.fn.Prepare(cols) if err != nil { // TODO(nathanielc): Should we not fail the query for failed compilation? @@ -149,12 +149,12 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Table) error { sort.Strings(keys) // Determine on which cols to group - on := make(map[string]bool, len(b.Key().Cols())) - for _, c := range b.Key().Cols() { + on := make(map[string]bool, len(tbl.Key().Cols())) + for _, c := range tbl.Key().Cols() { on[c.Label] = t.mergeKey || execute.ContainsStr(keys, c.Label) } - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { m, err := t.fn.Eval(i, cr) @@ -166,11 +166,11 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Table) error { builder, created := t.cache.TableBuilder(key) if created { if t.mergeKey { - execute.AddTableKeyCols(b.Key(), builder) + execute.AddTableKeyCols(tbl.Key(), builder) } // Add columns from function in sorted order for _, k := range keys { - if t.mergeKey && b.Key().HasCol(k) { + if t.mergeKey && tbl.Key().HasCol(k) { continue } builder.AddCol(query.ColMeta{ @@ -182,8 +182,8 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Table) error { for j, c := range builder.Cols() { v, ok := m.Get(c.Label) if !ok { - if idx := execute.ColIdx(c.Label, b.Key().Cols()); t.mergeKey && idx >= 0 { - v = b.Key().Value(idx) + if idx := execute.ColIdx(c.Label, tbl.Key().Cols()); t.mergeKey && idx >= 0 { + v = tbl.Key().Value(idx) } else { // This should be unreachable return fmt.Errorf("could not find value for column %q", c.Label) diff --git a/query/functions/range.go b/query/functions/range.go index 812f2d28f8..8e81bc53aa 100644 --- a/query/functions/range.go +++ b/query/functions/range.go @@ -149,17 +149,17 @@ func (t *rangeTransformation) RetractTable(id execute.DatasetID, key query.Group return t.d.RetractTable(key) } -func (t *rangeTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *rangeTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("range found duplicate table with key: %v", b.Key()) + return fmt.Errorf("range found duplicate table with key: %v", tbl.Key()) } - execute.AddTableCols(b, builder) - cols := make([]int, len(b.Cols())) + execute.AddTableCols(tbl, builder) + cols := make([]int, len(tbl.Cols())) for i := range cols { cols[i] = i } - execute.AppendTable(b, builder, cols) + execute.AppendTable(tbl, builder, cols) return nil } diff --git a/query/functions/set.go b/query/functions/set.go index ecf7ff58b9..ab6e3ca035 100644 --- a/query/functions/set.go +++ b/query/functions/set.go @@ -120,8 +120,8 @@ func (t *setTransformation) RetractTable(id execute.DatasetID, key query.GroupKe return nil } -func (t *setTransformation) Process(id execute.DatasetID, b query.Table) error { - key := b.Key() +func (t *setTransformation) Process(id execute.DatasetID, tbl query.Table) error { + key := tbl.Key() if idx := execute.ColIdx(t.key, key.Cols()); idx >= 0 { // Update key cols := make([]query.ColMeta, len(key.Cols())) @@ -138,7 +138,7 @@ func (t *setTransformation) Process(id execute.DatasetID, b query.Table) error { } builder, created := t.cache.TableBuilder(key) if created { - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) if !execute.HasCol(t.key, builder.Cols()) { builder.AddCol(query.ColMeta{ Label: t.key, @@ -147,7 +147,7 @@ func (t *setTransformation) Process(id execute.DatasetID, b query.Table) error { } } idx := execute.ColIdx(t.key, builder.Cols()) - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { for j := range cr.Cols() { if j == idx { continue diff --git a/query/functions/shift.go b/query/functions/shift.go index 6e09f2cb99..7cdb548e47 100644 --- a/query/functions/shift.go +++ b/query/functions/shift.go @@ -131,8 +131,8 @@ func (t *shiftTransformation) RetractTable(id execute.DatasetID, key query.Group return t.d.RetractTable(key) } -func (t *shiftTransformation) Process(id execute.DatasetID, b query.Table) error { - key := b.Key() +func (t *shiftTransformation) Process(id execute.DatasetID, tbl query.Table) error { + key := tbl.Key() // Update key cols := make([]query.ColMeta, len(key.Cols())) vs := make([]values.Value, len(key.Cols())) @@ -152,11 +152,11 @@ func (t *shiftTransformation) Process(id execute.DatasetID, b query.Table) error builder, created := t.cache.TableBuilder(key) if !created { - return fmt.Errorf("shift found duplicate table with key: %v", b.Key()) + return fmt.Errorf("shift found duplicate table with key: %v", tbl.Key()) } - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { for j, c := range cr.Cols() { if execute.ContainsStr(t.columns, c.Label) { l := cr.Len() diff --git a/query/functions/sort.go b/query/functions/sort.go index d906531e6c..4587983fbc 100644 --- a/query/functions/sort.go +++ b/query/functions/sort.go @@ -129,8 +129,8 @@ func (t *sortTransformation) RetractTable(id execute.DatasetID, key query.GroupK return t.d.RetractTable(key) } -func (t *sortTransformation) Process(id execute.DatasetID, b query.Table) error { - key := b.Key() +func (t *sortTransformation) Process(id execute.DatasetID, tbl query.Table) error { + key := tbl.Key() for _, label := range t.cols { if key.HasCol(label) { key = t.sortedKey(key) @@ -140,9 +140,9 @@ func (t *sortTransformation) Process(id execute.DatasetID, b query.Table) error builder, created := t.cache.TableBuilder(key) if !created { - return fmt.Errorf("sort found duplicate table with key: %v", b.Key()) + return fmt.Errorf("sort found duplicate table with key: %v", tbl.Key()) } - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) ncols := builder.NCols() if cap(t.colMap) < ncols { @@ -154,7 +154,7 @@ func (t *sortTransformation) Process(id execute.DatasetID, b query.Table) error t.colMap = t.colMap[:ncols] } - execute.AppendTable(b, builder, t.colMap) + execute.AppendTable(tbl, builder, t.colMap) builder.Sort(t.cols, t.desc) return nil diff --git a/query/functions/state_tracking.go b/query/functions/state_tracking.go index 652c4c39a8..0587284634 100644 --- a/query/functions/state_tracking.go +++ b/query/functions/state_tracking.go @@ -209,15 +209,15 @@ func (t *stateTrackingTransformation) RetractTable(id execute.DatasetID, key que return t.d.RetractTable(key) } -func (t *stateTrackingTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *stateTrackingTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("found duplicate table with key: %v", b.Key()) + return fmt.Errorf("found duplicate table with key: %v", tbl.Key()) } - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) // Prepare the functions for the column types. - cols := b.Cols() + cols := tbl.Cols() err := t.fn.Prepare(cols) if err != nil { // TODO(nathanielc): Should we not fail the query for failed compilation? @@ -247,12 +247,12 @@ func (t *stateTrackingTransformation) Process(id execute.DatasetID, b query.Tabl inState bool ) - timeIdx := execute.ColIdx(t.timeCol, b.Cols()) + timeIdx := execute.ColIdx(t.timeCol, tbl.Cols()) if timeIdx < 0 { return fmt.Errorf("no column %q exists", t.timeCol) } // Append modified rows - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { tm := cr.Times(timeIdx)[i] diff --git a/query/functions/storage/storage.go b/query/functions/storage/storage.go index 359ada89c6..3f3e1be9e4 100644 --- a/query/functions/storage/storage.go +++ b/query/functions/storage/storage.go @@ -107,9 +107,9 @@ func (s *source) run(ctx context.Context) error { //TODO(nathanielc): Pass through context to actual network I/O. for tables, mark, ok := s.next(ctx, trace); ok; tables, mark, ok = s.next(ctx, trace) { - err := tables.Do(func(b query.Table) error { + err := tables.Do(func(tbl query.Table) error { for _, t := range s.ts { - if err := t.Process(s.id, b); err != nil { + if err := t.Process(s.id, tbl); err != nil { return err } //TODO(nathanielc): Also add mechanism to send UpdateProcessingTime calls, when no data is arriving. diff --git a/query/functions/to_http.go b/query/functions/to_http.go index 98a14b5c5b..2eaa0919ba 100644 --- a/query/functions/to_http.go +++ b/query/functions/to_http.go @@ -306,13 +306,13 @@ type idxType struct { Type query.DataType } -func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Table) error { +func (t *ToHTTPTransformation) Process(id execute.DatasetID, tbl query.Table) error { pr, pw := io.Pipe() // TODO: replce the pipe with something faster m := &toHttpMetric{} e := protocol.NewEncoder(pw) e.FailOnFieldErr(true) e.SetFieldSortOrder(protocol.SortFields) - cols := b.Cols() + cols := tbl.Cols() labels := make(map[string]idxType, len(cols)) for i, col := range cols { labels[col.Label] = idxType{Idx: i, Type: col.Type} @@ -334,7 +334,7 @@ func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Table) erro } // check if each col is a tag or value and cache this value for the loop - colMetadatas := b.Cols() + colMetadatas := tbl.Cols() isTag := make([]bool, len(colMetadatas)) isValue := make([]bool, len(colMetadatas)) @@ -347,7 +347,7 @@ func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Table) erro wg.Add(1) go func() { m.name = t.spec.Spec.Name - b.Do(func(er query.ColReader) error { + tbl.Do(func(er query.ColReader) error { l := er.Len() for i := 0; i < l; i++ { m.truncateTagsAndFields() diff --git a/query/functions/to_kafka.go b/query/functions/to_kafka.go index 843fc71af3..de903e2853 100644 --- a/query/functions/to_kafka.go +++ b/query/functions/to_kafka.go @@ -256,7 +256,7 @@ func (m *toKafkaMetric) Time() time.Time { return m.t } -func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Table) (err error) { +func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl query.Table) (err error) { w := DefaultKafkaWriterFactory(kafka.WriterConfig{ Brokers: t.spec.Spec.Brokers, Topic: t.spec.Spec.Topic, @@ -283,7 +283,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Table) (er e := protocol.NewEncoder(pw) e.FailOnFieldErr(true) e.SetFieldSortOrder(protocol.SortFields) - cols := b.Cols() + cols := tbl.Cols() labels := make(map[string]idxType, len(cols)) for i, col := range cols { labels[col.Label] = idxType{Idx: i, Type: col.Type} @@ -302,7 +302,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Table) (er measurementNameCol = t.spec.Spec.NameColumn } // check if each col is a tag or value and cache this value for the loop - colMetadatas := b.Cols() + colMetadatas := tbl.Cols() isTag := make([]bool, len(colMetadatas)) isValue := make([]bool, len(colMetadatas)) for i, col := range colMetadatas { @@ -313,7 +313,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Table) (er wg := sync.WaitGroup{} wg.Add(1) go func() { - err = b.Do(func(er query.ColReader) error { + err = tbl.Do(func(er query.ColReader) error { l := er.Len() for i := 0; i < l; i++ { m.truncateTagsAndFields() diff --git a/query/functions/unique.go b/query/functions/unique.go index 486bb4b194..73aca2adc3 100644 --- a/query/functions/unique.go +++ b/query/functions/unique.go @@ -108,12 +108,12 @@ func (t *uniqueTransformation) RetractTable(id execute.DatasetID, key query.Grou return t.d.RetractTable(key) } -func (t *uniqueTransformation) Process(id execute.DatasetID, b query.Table) error { - builder, created := t.cache.TableBuilder(b.Key()) +func (t *uniqueTransformation) Process(id execute.DatasetID, tbl query.Table) error { + builder, created := t.cache.TableBuilder(tbl.Key()) if !created { - return fmt.Errorf("unique found duplicate table with key: %v", b.Key()) + return fmt.Errorf("unique found duplicate table with key: %v", tbl.Key()) } - execute.AddTableCols(b, builder) + execute.AddTableCols(tbl, builder) colIdx := execute.ColIdx(t.column, builder.Cols()) if colIdx < 0 { @@ -144,7 +144,7 @@ func (t *uniqueTransformation) Process(id execute.DatasetID, b query.Table) erro timeUnique = make(map[execute.Time]bool) } - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { // Check unique diff --git a/query/functions/window.go b/query/functions/window.go index 9d45d6830a..16f926d9f1 100644 --- a/query/functions/window.go +++ b/query/functions/window.go @@ -245,16 +245,16 @@ func (t *fixedWindowTransformation) RetractTable(id execute.DatasetID, key query panic("not implemented") } -func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Table) error { - timeIdx := execute.ColIdx(t.timeCol, b.Cols()) +func (t *fixedWindowTransformation) Process(id execute.DatasetID, tbl query.Table) error { + timeIdx := execute.ColIdx(t.timeCol, tbl.Cols()) - newCols := make([]query.ColMeta, 0, len(b.Cols())+2) - keyCols := make([]query.ColMeta, 0, len(b.Cols())+2) - keyColMap := make([]int, 0, len(b.Cols())+2) + newCols := make([]query.ColMeta, 0, len(tbl.Cols())+2) + keyCols := make([]query.ColMeta, 0, len(tbl.Cols())+2) + keyColMap := make([]int, 0, len(tbl.Cols())+2) startColIdx := -1 stopColIdx := -1 - for j, c := range b.Cols() { - keyIdx := execute.ColIdx(c.Label, b.Key().Cols()) + for j, c := range tbl.Cols() { + keyIdx := execute.ColIdx(c.Label, tbl.Key().Cols()) keyed := keyIdx >= 0 if c.Label == t.startColLabel { startColIdx = j @@ -291,7 +291,7 @@ func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Table) keyColMap = append(keyColMap, len(keyColMap)) } - return b.Do(func(cr query.ColReader) error { + return tbl.Do(func(cr query.ColReader) error { l := cr.Len() for i := 0; i < l; i++ { tm := cr.Times(timeIdx)[i] @@ -308,7 +308,7 @@ func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Table) case t.stopColLabel: vs[j] = values.NewTimeValue(bnds.Stop) default: - vs[j] = b.Key().Value(keyColMap[j]) + vs[j] = tbl.Key().Value(keyColMap[j]) } } key := execute.NewGroupKey(cols, vs) diff --git a/query/influxql/result.go b/query/influxql/result.go index 8c8265ee0e..cf7397c4aa 100644 --- a/query/influxql/result.go +++ b/query/influxql/result.go @@ -44,17 +44,17 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) ( tables := res.Tables() result := Result{StatementID: id} - if err := tables.Do(func(b query.Table) error { + if err := tables.Do(func(tbl query.Table) error { var row Row - for j, c := range b.Key().Cols() { + for j, c := range tbl.Key().Cols() { if c.Type != query.TString { // Skip any columns that aren't strings. They are extra ones that // flux includes by default like the start and end times that we do not // care about. continue } - v := b.Key().Value(j).Str() + v := tbl.Key().Value(j).Str() if c.Label == "_measurement" { row.Name = v } else if c.Label == "_field" { @@ -74,10 +74,10 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) ( // from the ordering given in the original query. resultColMap := map[string]int{} j := 1 - for _, c := range b.Cols() { + for _, c := range tbl.Cols() { if c.Label == execute.DefaultTimeColLabel { resultColMap[c.Label] = 0 - } else if !b.Key().HasCol(c.Label) { + } else if !tbl.Key().HasCol(c.Label) { resultColMap[c.Label] = j j++ } @@ -91,7 +91,7 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) ( row.Columns[v] = k } - if err := b.Do(func(cr query.ColReader) error { + if err := tbl.Do(func(cr query.ColReader) error { // Preallocate the number of rows for the response to make this section // of code easier to read. Find a time column which should exist // in the output. @@ -101,7 +101,7 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) ( } j := 0 - for idx, c := range b.Cols() { + for idx, c := range tbl.Cols() { if cr.Key().HasCol(c.Label) { continue } diff --git a/query/repl/repl.go b/query/repl/repl.go index 23f16314df..77826edb76 100644 --- a/query/repl/repl.go +++ b/query/repl/repl.go @@ -222,10 +222,10 @@ func (r *REPL) doQuery(spec *query.Spec) error { for _, name := range names { r := results[name] - blocks := r.Tables() + tables := r.Tables() fmt.Println("Result:", name) - err := blocks.Do(func(b query.Table) error { - _, err := execute.NewFormatter(b, nil).WriteTo(os.Stdout) + err := tables.Do(func(tbl query.Table) error { + _, err := execute.NewFormatter(tbl, nil).WriteTo(os.Stdout) return err }) if err != nil {