fix(storage): convert the storage table interface to use arrow buffers

The table interface was modified to expose the arrow buffers. The
storage table has now been converted to use this interface with the same
fixes so that it exposes arrow buffers.

The influxql package has also been updated to use the `DoArrow` method
from the `flux.Table` interface.
pull/10616/head
Jonathan A. Sternberg 2018-12-17 17:25:49 -06:00 committed by Nathaniel Cook
parent c27d954016
commit cc93531400
6 changed files with 324 additions and 213 deletions

3
go.mod
View File

@ -12,6 +12,7 @@ require (
github.com/SAP/go-hdb v0.13.1 // indirect
github.com/SermoDigital/jose v0.9.1 // indirect
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/apache/arrow/go/arrow v0.0.0-20181217213538-e9ed591db9cb
github.com/apex/log v1.1.0 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/armon/go-radix v1.0.0 // indirect
@ -75,7 +76,7 @@ require (
github.com/hashicorp/vault-plugin-secrets-kv v0.0.0-20181106190520-2236f141171e // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/influxdata/flux v0.10.0
github.com/influxdata/flux v0.10.1-0.20181217231646-291730263db1
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect

7
go.sum
View File

@ -36,6 +36,8 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/arrow/go/arrow v0.0.0-20181031164735-a56c009257a7 h1:+leX3wRmpUQHMlRdNw982F5GJ7ty683B4x9EdT7iPVs=
github.com/apache/arrow/go/arrow v0.0.0-20181031164735-a56c009257a7/go.mod h1:GjvccvtI06FGFvRU1In/maF7tKp3h7GBV9Sexo5rNPM=
github.com/apache/arrow/go/arrow v0.0.0-20181217213538-e9ed591db9cb h1:p6xQwsjxRtuIrUDjGAFuro04BO0GNJ9V2troYRY8kmQ=
github.com/apache/arrow/go/arrow v0.0.0-20181217213538-e9ed591db9cb/go.mod h1:GjvccvtI06FGFvRU1In/maF7tKp3h7GBV9Sexo5rNPM=
github.com/apex/log v1.1.0 h1:J5rld6WVFi6NxA6m8GJ1LJqu3+GiTFIt3mYv27gdQWI=
github.com/apex/log v1.1.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
@ -220,11 +222,12 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.10.0 h1:5zz3ZFaSNsJmEf7ldOED7ujWd7a16gSLYptD/Nz2vjQ=
github.com/influxdata/flux v0.10.0/go.mod h1:crguqnTMQHaGEKp93vZH+pIyTVlJYqkv8bNqSMfc22A=
github.com/influxdata/flux v0.10.1-0.20181217231646-291730263db1 h1:dYBDeAXzKklFxfHvOHOnwaeZiGgzBz7T+DQK5E3n2pw=
github.com/influxdata/flux v0.10.1-0.20181217231646-291730263db1/go.mod h1:crguqnTMQHaGEKp93vZH+pIyTVlJYqkv8bNqSMfc22A=
github.com/influxdata/goreleaser v0.86.2-0.20181010170531-0fd209ba67f5/go.mod h1:aVuBpDAT5VtjtUxzvBt8HOd0buzvvk7OX3H2iaviixg=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
github.com/influxdata/kin-openapi v0.1.1-0.20181212221347-ca3615a71f83 h1:z2dPj4rGtqzrwesRBb4yS0+d0a5wcIXiobrhBx1Jm5U=
github.com/influxdata/kin-openapi v0.1.1-0.20181212221347-ca3615a71f83/go.mod h1:pvoDDuHnoB0wCKQlkSgyhYpfBsn8vTy1ZZMXq8K38e0=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=

View File

@ -6,8 +6,11 @@ import (
"strconv"
"time"
"github.com/apache/arrow/go/arrow/array"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
)
@ -104,7 +107,7 @@ type queryTable struct {
row *Row
groupKey flux.GroupKey
colMeta []flux.ColMeta
cols []interface{}
cols []array.Interface
}
func newQueryTable(r *Row) (*queryTable, error) {
@ -124,88 +127,113 @@ func (t *queryTable) Statistics() flux.Statistics {
// Data in a column is laid out in the following way:
// [ r.row.Columns... , r.tagKeys()... , r.row.Name ]
func (t *queryTable) translateRowsToColumns() error {
cols := t.Cols()
t.cols = make([]interface{}, len(cols))
for i, col := range cols {
t.cols = make([]array.Interface, len(t.Cols()))
for i := range t.row.Columns {
col := t.Cols()[i]
switch col.Type {
case flux.TFloat:
t.cols[i] = make([]float64, 0, t.Len())
b := arrow.NewFloatBuilder(&memory.Allocator{})
b.Reserve(t.Len())
for _, row := range t.row.Values {
val, ok := row[i].(float64)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
b.Append(val)
}
t.cols[i] = b.NewArray()
b.Release()
case flux.TInt:
t.cols[i] = make([]int64, 0, t.Len())
b := arrow.NewIntBuilder(&memory.Allocator{})
b.Reserve(t.Len())
for _, row := range t.row.Values {
val, ok := row[i].(int64)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
b.Append(val)
}
t.cols[i] = b.NewArray()
b.Release()
case flux.TUInt:
t.cols[i] = make([]uint64, 0, t.Len())
b := arrow.NewUintBuilder(&memory.Allocator{})
b.Reserve(t.Len())
for _, row := range t.row.Values {
val, ok := row[i].(uint64)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
b.Append(val)
}
t.cols[i] = b.NewArray()
b.Release()
case flux.TString:
t.cols[i] = make([]string, 0, t.Len())
b := arrow.NewStringBuilder(&memory.Allocator{})
b.Reserve(t.Len())
for _, row := range t.row.Values {
val, ok := row[i].(string)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
b.AppendString(val)
}
t.cols[i] = b.NewArray()
b.Release()
case flux.TBool:
t.cols[i] = make([]bool, 0, t.Len())
b := arrow.NewBoolBuilder(&memory.Allocator{})
b.Reserve(t.Len())
for _, row := range t.row.Values {
val, ok := row[i].(bool)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
b.Append(val)
}
t.cols[i] = b.NewArray()
b.Release()
case flux.TTime:
t.cols[i] = make([]values.Time, 0, t.Len())
}
}
for _, els := range t.row.Values {
for i, el := range els {
col := cols[i]
switch col.Type {
case flux.TFloat:
val, ok := el.(float64)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
t.cols[i] = append(t.cols[i].([]float64), val)
case flux.TInt:
val, ok := el.(int64)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
t.cols[i] = append(t.cols[i].([]int64), val)
case flux.TUInt:
val, ok := el.(uint64)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
t.cols[i] = append(t.cols[i].([]uint64), val)
case flux.TString:
val, ok := el.(string)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
t.cols[i] = append(t.cols[i].([]string), val)
case flux.TBool:
val, ok := el.(bool)
if !ok {
return fmt.Errorf("unsupported type %T found in column %s of type %s", val, col.Label, col.Type)
}
t.cols[i] = append(t.cols[i].([]bool), val)
case flux.TTime:
switch val := el.(type) {
b := arrow.NewIntBuilder(&memory.Allocator{})
b.Reserve(t.Len())
for _, row := range t.row.Values {
switch val := row[i].(type) {
case int64:
t.cols[i] = append(t.cols[i].([]values.Time), values.Time(val))
b.Append(val)
case float64:
t.cols[i] = append(t.cols[i].([]values.Time), values.Time(val))
b.Append(int64(val))
case string:
tm, err := time.Parse(time.RFC3339, val)
if err != nil {
return fmt.Errorf("could not parse string %q as time: %v", val, err)
}
t.cols[i] = append(t.cols[i].([]values.Time), values.ConvertTime(tm))
b.Append(tm.UnixNano())
default:
return fmt.Errorf("unsupported type %T found in column %s", val, col.Label)
}
default:
return fmt.Errorf("invalid type %T found in column %s", el, col.Label)
}
t.cols[i] = b.NewArray()
b.Release()
default:
return fmt.Errorf("invalid type %T found in column %s", col.Type, col.Label)
}
j := len(t.row.Columns)
for j < len(t.row.Columns)+len(t.row.Tags) {
col := cols[j]
t.cols[j] = append(t.cols[j].([]string), t.row.Tags[col.Label])
j++
}
t.cols[j] = append(t.cols[j].([]string), t.row.Name)
}
for j := len(t.row.Columns); j < len(t.Cols()); j++ {
b := arrow.NewStringBuilder(&memory.Allocator{})
b.Reserve(t.Len())
var value string
if key := t.Cols()[j].Label; key == "_measurement" {
value = t.row.Name
} else {
value = t.row.Tags[key]
}
for i := 0; i < t.Len(); i++ {
b.AppendString(value)
}
t.cols[j] = b.NewArray()
b.Release()
}
return nil
}
@ -313,6 +341,12 @@ func (r *queryTable) Cols() []flux.ColMeta {
// Do applies f to itself. This is because Row is a flux.ColReader.
// It is used to implement flux.Table.
func (r *queryTable) Do(f func(flux.ColReader) error) error {
return r.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (r *queryTable) DoArrow(f func(flux.ArrowColReader) error) error {
return f(r)
}
@ -333,41 +367,41 @@ func (r *queryTable) Len() int {
// Bools returns the values in column index j as bools.
// It will panic if the column is not a []bool.
// It is used to implement flux.ColReader.
func (r *queryTable) Bools(j int) []bool {
return r.cols[j].([]bool)
func (r *queryTable) Bools(j int) *array.Boolean {
return r.cols[j].(*array.Boolean)
}
// Ints returns the values in column index j as ints.
// It will panic if the column is not a []int64.
// It is used to implement flux.ColReader.
func (r *queryTable) Ints(j int) []int64 {
return r.cols[j].([]int64)
func (r *queryTable) Ints(j int) *array.Int64 {
return r.cols[j].(*array.Int64)
}
// UInts returns the values in column index j as ints.
// It will panic if the column is not a []uint64.
// It is used to implement flux.ColReader.
func (r *queryTable) UInts(j int) []uint64 {
return r.cols[j].([]uint64)
func (r *queryTable) UInts(j int) *array.Uint64 {
return r.cols[j].(*array.Uint64)
}
// Floats returns the values in column index j as floats.
// It will panic if the column is not a []float64.
// It is used to implement flux.ColReader.
func (r *queryTable) Floats(j int) []float64 {
return r.cols[j].([]float64)
func (r *queryTable) Floats(j int) *array.Float64 {
return r.cols[j].(*array.Float64)
}
// Strings returns the values in column index j as strings.
// It will panic if the column is not a []string.
// It is used to implement flux.ColReader.
func (r *queryTable) Strings(j int) []string {
return r.cols[j].([]string)
func (r *queryTable) Strings(j int) *array.Binary {
return r.cols[j].(*array.Binary)
}
// Times returns the values in column index j as values.Times.
// It will panic if the column is not a []values.Time.
// It is used to implement flux.ColReader.
func (r *queryTable) Times(j int) []values.Time {
return r.cols[j].([]values.Time)
func (r *queryTable) Times(j int) *array.Int64 {
return r.cols[j].(*array.Int64)
}

View File

@ -7,6 +7,8 @@
package reads
import (
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/memory"
"sync"
"github.com/influxdata/flux"
@ -70,6 +72,12 @@ func (t *floatTable) Statistics() flux.Statistics {
}
func (t *floatTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *floatTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -94,24 +102,21 @@ func (t *floatTable) advance() bool {
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]float64, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -162,6 +167,12 @@ func (t *floatGroupTable) Close() {
}
func (t *floatGroupTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *floatGroupTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -191,24 +202,21 @@ RETRY:
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]float64, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -302,6 +310,12 @@ func (t *integerTable) Statistics() flux.Statistics {
}
func (t *integerTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *integerTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -326,24 +340,21 @@ func (t *integerTable) advance() bool {
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]int64, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -394,6 +405,12 @@ func (t *integerGroupTable) Close() {
}
func (t *integerGroupTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *integerGroupTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -423,24 +440,21 @@ RETRY:
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]int64, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -534,6 +548,12 @@ func (t *unsignedTable) Statistics() flux.Statistics {
}
func (t *unsignedTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *unsignedTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -558,24 +578,21 @@ func (t *unsignedTable) advance() bool {
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]uint64, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -626,6 +643,12 @@ func (t *unsignedGroupTable) Close() {
}
func (t *unsignedGroupTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *unsignedGroupTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -655,24 +678,21 @@ RETRY:
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]uint64, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -766,6 +786,12 @@ func (t *stringTable) Statistics() flux.Statistics {
}
func (t *stringTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *stringTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -790,24 +816,21 @@ func (t *stringTable) advance() bool {
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]string, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -858,6 +881,12 @@ func (t *stringGroupTable) Close() {
}
func (t *stringGroupTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *stringGroupTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -887,24 +916,21 @@ RETRY:
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]string, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -998,6 +1024,12 @@ func (t *booleanTable) Statistics() flux.Statistics {
}
func (t *booleanTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *booleanTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -1022,24 +1054,21 @@ func (t *booleanTable) advance() bool {
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]bool, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -1090,6 +1119,12 @@ func (t *booleanGroupTable) Close() {
}
func (t *booleanGroupTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *booleanGroupTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -1119,24 +1154,21 @@ RETRY:
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]bool, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true

View File

@ -1,6 +1,8 @@
package reads
import (
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/memory"
"sync"
"github.com/influxdata/flux"
@ -64,6 +66,12 @@ func (t *{{.name}}Table) Statistics() flux.Statistics {
}
func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *{{.name}}Table) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -88,24 +96,21 @@ func (t *{{.name}}Table) advance() bool {
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]{{.Type}}, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true
@ -156,6 +161,12 @@ func (t *{{.name}}GroupTable) Close() {
}
func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *{{.name}}GroupTable) DoArrow(f func(flux.ArrowColReader) error) error {
t.mu.Lock()
defer func() {
t.closeDone()
@ -185,24 +196,21 @@ RETRY:
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
t.timeBuf = make([]int64, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
copy(t.timeBuf, a.Timestamps)
if cap(t.valBuf) < t.l {
t.valBuf = make([]{{.Type}}, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.colBufs[timeColIdx] = arrow.NewInt(t.timeBuf, &memory.Allocator{})
t.colBufs[valueColIdx] = t.toArrowBuffer(t.valBuf)
t.appendTags()
t.appendBounds()
return true

View File

@ -6,8 +6,11 @@ import (
"fmt"
"sync/atomic"
"github.com/apache/arrow/go/arrow/array"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb/cursors"
)
@ -27,8 +30,8 @@ type table struct {
// The current number of records in memory
l int
colBufs []interface{}
timeBuf []execute.Time
colBufs []array.Interface
timeBuf []int64
err error
@ -48,7 +51,7 @@ func newTable(
key: key,
tags: make([][]byte, len(cols)),
defs: defs,
colBufs: make([]interface{}, len(cols)),
colBufs: make([]array.Interface, len(cols)),
cols: cols,
}
}
@ -72,34 +75,34 @@ func (t *table) isCancelled() bool {
return atomic.LoadInt32(&t.cancelled) != 0
}
func (t *table) Bools(j int) []bool {
func (t *table) Bools(j int) *array.Boolean {
execute.CheckColType(t.cols[j], flux.TBool)
return t.colBufs[j].([]bool)
return t.colBufs[j].(*array.Boolean)
}
func (t *table) Ints(j int) []int64 {
func (t *table) Ints(j int) *array.Int64 {
execute.CheckColType(t.cols[j], flux.TInt)
return t.colBufs[j].([]int64)
return t.colBufs[j].(*array.Int64)
}
func (t *table) UInts(j int) []uint64 {
func (t *table) UInts(j int) *array.Uint64 {
execute.CheckColType(t.cols[j], flux.TUInt)
return t.colBufs[j].([]uint64)
return t.colBufs[j].(*array.Uint64)
}
func (t *table) Floats(j int) []float64 {
func (t *table) Floats(j int) *array.Float64 {
execute.CheckColType(t.cols[j], flux.TFloat)
return t.colBufs[j].([]float64)
return t.colBufs[j].(*array.Float64)
}
func (t *table) Strings(j int) []string {
func (t *table) Strings(j int) *array.Binary {
execute.CheckColType(t.cols[j], flux.TString)
return t.colBufs[j].([]string)
return t.colBufs[j].(*array.Binary)
}
func (t *table) Times(j int) []execute.Time {
func (t *table) Times(j int) *array.Int64 {
execute.CheckColType(t.cols[j], flux.TTime)
return t.colBufs[j].([]execute.Time)
return t.colBufs[j].(*array.Int64)
}
// readTags populates b.tags with the provided tags
@ -123,20 +126,13 @@ func (t *table) appendTags() {
for j := range t.cols {
v := t.tags[j]
if v != nil {
if t.colBufs[j] == nil {
t.colBufs[j] = make([]string, len(t.cols))
b := arrow.NewStringBuilder(&memory.Allocator{})
b.Reserve(t.l)
for i := 0; i < t.l; i++ {
b.Append(v)
}
colBuf := t.colBufs[j].([]string)
if cap(colBuf) < t.l {
colBuf = make([]string, t.l)
} else {
colBuf = colBuf[:t.l]
}
vStr := string(v)
for i := range colBuf {
colBuf[i] = vStr
}
t.colBufs[j] = colBuf
t.colBufs[j] = b.NewArray()
b.Release()
}
}
}
@ -145,19 +141,13 @@ func (t *table) appendTags() {
func (t *table) appendBounds() {
bounds := []execute.Time{t.bounds.Start, t.bounds.Stop}
for j := range []int{startColIdx, stopColIdx} {
if t.colBufs[j] == nil {
t.colBufs[j] = make([]execute.Time, len(t.cols))
b := arrow.NewIntBuilder(&memory.Allocator{})
b.Reserve(t.l)
for i := 0; i < t.l; i++ {
b.UnsafeAppend(int64(bounds[j]))
}
colBuf := t.colBufs[j].([]execute.Time)
if cap(colBuf) < t.l {
colBuf = make([]execute.Time, t.l)
} else {
colBuf = colBuf[:t.l]
}
for i := range colBuf {
colBuf[i] = bounds[j]
}
t.colBufs[j] = colBuf
t.colBufs[j] = b.NewArray()
b.Release()
}
}
@ -224,6 +214,12 @@ func (t *tableNoPoints) Close() {}
func (t *tableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} }
func (t *tableNoPoints) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *tableNoPoints) DoArrow(f func(flux.ArrowColReader) error) error {
if t.isCancelled() {
return nil
}
@ -253,6 +249,12 @@ func newGroupTableNoPoints(
func (t *groupTableNoPoints) Close() {}
func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
return t.DoArrow(func(cr flux.ArrowColReader) error {
return f(arrow.ColReader(cr))
})
}
func (t *groupTableNoPoints) DoArrow(f func(flux.ArrowColReader) error) error {
if t.isCancelled() {
return nil
}
@ -262,3 +264,34 @@ func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
}
func (t *groupTableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} }
func (t *floatTable) toArrowBuffer(vs []float64) *array.Float64 {
return arrow.NewFloat(vs, &memory.Allocator{})
}
func (t *floatGroupTable) toArrowBuffer(vs []float64) *array.Float64 {
return arrow.NewFloat(vs, &memory.Allocator{})
}
func (t *integerTable) toArrowBuffer(vs []int64) *array.Int64 {
return arrow.NewInt(vs, &memory.Allocator{})
}
func (t *integerGroupTable) toArrowBuffer(vs []int64) *array.Int64 {
return arrow.NewInt(vs, &memory.Allocator{})
}
func (t *unsignedTable) toArrowBuffer(vs []uint64) *array.Uint64 {
return arrow.NewUint(vs, &memory.Allocator{})
}
func (t *unsignedGroupTable) toArrowBuffer(vs []uint64) *array.Uint64 {
return arrow.NewUint(vs, &memory.Allocator{})
}
func (t *stringTable) toArrowBuffer(vs []string) *array.Binary {
return arrow.NewString(vs, &memory.Allocator{})
}
func (t *stringGroupTable) toArrowBuffer(vs []string) *array.Binary {
return arrow.NewString(vs, &memory.Allocator{})
}
func (t *booleanTable) toArrowBuffer(vs []bool) *array.Boolean {
return arrow.NewBool(vs, &memory.Allocator{})
}
func (t *booleanGroupTable) toArrowBuffer(vs []bool) *array.Boolean {
return arrow.NewBool(vs, &memory.Allocator{})
}