feat(storageflux): move flux components out to separate package

pull/17164/head
Sebastian Borza 2020-03-10 00:00:52 -05:00
parent 49c1d85770
commit a50e69451e
No known key found for this signature in database
GPG Key ID: CA2A58DAF0E754A7
14 changed files with 115 additions and 56 deletions

View File

@ -42,7 +42,7 @@ import (
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/source"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/storage/reads"
storageflux "github.com/influxdata/influxdb/storage/flux"
"github.com/influxdata/influxdb/storage/readservice"
taskbackend "github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/coordinator"
@ -594,7 +594,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
)
deps, err := influxdb.NewDependencies(
reads.NewReader(readservice.NewStore(m.engine)),
storageflux.NewReader(readservice.NewStore(m.engine)),
m.engine,
authorizer.NewBucketService(bucketSvc),
authorizer.NewOrgService(orgSvc),

View File

@ -3,7 +3,7 @@ TARGETS =
# List any source files used to generate the targets here
SOURCES =
# List any directories that have their own Makefile here
SUBDIRS = reads
SUBDIRS = reads flux
# Default target
all: $(SUBDIRS) $(TARGETS)

28
storage/flux/Makefile Normal file
View File

@ -0,0 +1,28 @@
# List any generated files here
TARGETS = table.gen.go
# List any source files used to generate the targets here
SOURCES = table.gen.go.tmpl
# List any directories that have their own Makefile here
SUBDIRS =
# Default target
all: $(SUBDIRS) $(TARGETS)
# Recurse into subdirs for same make goal
$(SUBDIRS):
$(MAKE) -C $@ $(MAKECMDGOALS)
# Clean all targets recursively
clean: $(SUBDIRS)
rm -f $(TARGETS)
# Define go generate if not already defined
GO_GENERATE := go generate
# Run go generate for the targets
$(TARGETS): $(SOURCES)
$(GO_GENERATE) -x
.PHONY: all clean $(SUBDIRS)

View File

@ -1,4 +1,4 @@
package reads
package storageflux
import (
"fmt"
@ -10,12 +10,6 @@ import (
"github.com/pkg/errors"
)
const (
fieldKey = "_field"
measurementKey = "_measurement"
valueKey = "_value"
)
func toStoragePredicate(f *semantic.FunctionExpression) (*datatypes.Predicate, error) {
if f.Block.Parameters == nil || len(f.Block.Parameters.List) != 1 {
return nil, errors.New("storage predicate functions must have exactly one parameter")
@ -119,25 +113,25 @@ func toStoragePredicateHelper(n semantic.Expression, objectName string) (*dataty
return nil, fmt.Errorf("unknown object %q", n.Object)
}
switch n.Property {
case fieldKey:
case datatypes.FieldKey:
return &datatypes.Node{
NodeType: datatypes.NodeTypeTagRef,
Value: &datatypes.Node_TagRefValue{
TagRefValue: models.FieldKeyTagKey,
},
}, nil
case measurementKey:
case datatypes.MeasurementKey:
return &datatypes.Node{
NodeType: datatypes.NodeTypeTagRef,
Value: &datatypes.Node_TagRefValue{
TagRefValue: models.MeasurementTagKey,
},
}, nil
case valueKey:
case datatypes.ValueKey:
return &datatypes.Node{
NodeType: datatypes.NodeTypeFieldRef,
Value: &datatypes.Node_FieldRefValue{
FieldRefValue: valueKey,
FieldRefValue: datatypes.ValueKey,
},
}, nil

View File

@ -1,4 +1,4 @@
package reads
package storageflux
import (
"context"
@ -12,6 +12,7 @@ import (
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
storage "github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
@ -24,10 +25,11 @@ type storageTable interface {
}
type storeReader struct {
s Store
s storage.Store
}
func NewReader(s Store) influxdb.Reader {
// NewReader returns a new storageflux reader
func NewReader(s storage.Store) influxdb.Reader {
return &storeReader{s: s}
}
@ -95,7 +97,7 @@ func (r *storeReader) Close() {}
type filterIterator struct {
ctx context.Context
s Store
s storage.Store
spec influxdb.ReadFilterSpec
stats cursors.CursorStats
cache *tagsCache
@ -143,7 +145,7 @@ func (fi *filterIterator) Do(f func(flux.Table) error) error {
return fi.handleRead(f, rs)
}
func (fi *filterIterator) handleRead(f func(flux.Table) error, rs ResultSet) error {
func (fi *filterIterator) handleRead(f func(flux.Table) error, rs storage.ResultSet) error {
// these resources must be closed if not nil on return
var (
cur cursors.Cursor
@ -219,7 +221,7 @@ READ:
type groupIterator struct {
ctx context.Context
s Store
s storage.Store
spec influxdb.ReadGroupSpec
stats cursors.CursorStats
cache *tagsCache
@ -275,10 +277,10 @@ func (gi *groupIterator) Do(f func(flux.Table) error) error {
return gi.handleRead(f, rs)
}
func (gi *groupIterator) handleRead(f func(flux.Table) error, rs GroupResultSet) error {
func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupResultSet) error {
// these resources must be closed if not nil on return
var (
gc GroupCursor
gc storage.GroupCursor
cur cursors.Cursor
table storageTable
)
@ -502,7 +504,7 @@ func groupKeyForGroup(kv [][]byte, spec *influxdb.ReadGroupSpec, bnds execute.Bo
type tagKeysIterator struct {
ctx context.Context
bounds execute.Bounds
s Store
s storage.Store
readSpec influxdb.ReadTagKeysSpec
predicate *datatypes.Predicate
alloc *memory.Allocator
@ -515,11 +517,12 @@ func (ti *tagKeysIterator) Do(f func(flux.Table) error) error {
)
var req datatypes.TagKeysRequest
if any, err := types.MarshalAny(src); err != nil {
any, err := types.MarshalAny(src)
if err != nil {
return err
} else {
req.TagsSource = any
}
req.TagsSource = any
req.Predicate = ti.predicate
req.Range.Start = int64(ti.bounds.Start)
req.Range.End = int64(ti.bounds.Stop)
@ -584,7 +587,7 @@ func (ti *tagKeysIterator) Statistics() cursors.CursorStats {
type tagValuesIterator struct {
ctx context.Context
bounds execute.Bounds
s Store
s storage.Store
readSpec influxdb.ReadTagValuesSpec
predicate *datatypes.Predicate
alloc *memory.Allocator
@ -597,11 +600,12 @@ func (ti *tagValuesIterator) Do(f func(flux.Table) error) error {
)
var req datatypes.TagValuesRequest
if any, err := types.MarshalAny(src); err != nil {
any, err := types.MarshalAny(src)
if err != nil {
return err
} else {
req.TagsSource = any
}
req.TagsSource = any
switch ti.readSpec.TagKey {
case "_measurement":
req.TagKey = models.MeasurementTagKey

View File

@ -2,9 +2,9 @@
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: flux_table.gen.go.tmpl
// Source: table.gen.go.tmpl
package reads
package storageflux
import (
"sync"
@ -14,6 +14,7 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/influxdb/models"
storage "github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/pkg/errors"
)
@ -101,13 +102,13 @@ func (t *floatTable) advance() bool {
type floatGroupTable struct {
table
mu sync.Mutex
gc GroupCursor
gc storage.GroupCursor
cur cursors.FloatArrayCursor
}
func newFloatGroupTable(
done chan struct{},
gc GroupCursor,
gc storage.GroupCursor,
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -286,13 +287,13 @@ func (t *integerTable) advance() bool {
type integerGroupTable struct {
table
mu sync.Mutex
gc GroupCursor
gc storage.GroupCursor
cur cursors.IntegerArrayCursor
}
func newIntegerGroupTable(
done chan struct{},
gc GroupCursor,
gc storage.GroupCursor,
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -471,13 +472,13 @@ func (t *unsignedTable) advance() bool {
type unsignedGroupTable struct {
table
mu sync.Mutex
gc GroupCursor
gc storage.GroupCursor
cur cursors.UnsignedArrayCursor
}
func newUnsignedGroupTable(
done chan struct{},
gc GroupCursor,
gc storage.GroupCursor,
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -656,13 +657,13 @@ func (t *stringTable) advance() bool {
type stringGroupTable struct {
table
mu sync.Mutex
gc GroupCursor
gc storage.GroupCursor
cur cursors.StringArrayCursor
}
func newStringGroupTable(
done chan struct{},
gc GroupCursor,
gc storage.GroupCursor,
cur cursors.StringArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -841,13 +842,13 @@ func (t *booleanTable) advance() bool {
type booleanGroupTable struct {
table
mu sync.Mutex
gc GroupCursor
gc storage.GroupCursor
cur cursors.BooleanArrayCursor
}
func newBooleanGroupTable(
done chan struct{},
gc GroupCursor,
gc storage.GroupCursor,
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,

View File

@ -1,4 +1,4 @@
package reads
package storageflux
import (
"sync"
@ -8,6 +8,7 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/influxdb/models"
storage "github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/pkg/errors"
)
@ -95,13 +96,13 @@ func (t *{{.name}}Table) advance() bool {
type {{.name}}GroupTable struct {
table
mu sync.Mutex
gc GroupCursor
gc storage.GroupCursor
cur cursors.{{.Name}}ArrayCursor
}
func new{{.Name}}GroupTable(
done chan struct{},
gc GroupCursor,
gc storage.GroupCursor,
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,

View File

@ -1,6 +1,6 @@
package reads
package storageflux
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@types.tmpldata flux_table.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@types.tmpldata table.gen.go.tmpl
import (
"errors"

View File

@ -1,4 +1,4 @@
package reads_test
package storageflux_test
import (
"context"
@ -20,7 +20,6 @@ import (
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/readservice"
"go.uber.org/zap/zaptest"
)
@ -154,7 +153,7 @@ func benchmarkRead(b *testing.B, sg gen.SeriesGenerator, f func(r influxdb.Reade
if err := engine.Open(context.Background()); err != nil {
b.Fatal(err)
}
reader := reads.NewReader(readservice.NewStore(engine))
reader := NewReader(readservice.NewStore(engine))
b.ResetTimer()
b.ReportAllocs()

View File

@ -1,4 +1,4 @@
package reads
package storageflux
import (
"container/list"

View File

@ -0,0 +1,27 @@
[
{
"Name":"Float",
"name":"float",
"Type":"float64"
},
{
"Name":"Integer",
"name":"integer",
"Type":"int64"
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"uint64"
},
{
"Name":"String",
"name":"string",
"Type":"string"
},
{
"Name":"Boolean",
"name":"boolean",
"Type":"bool"
}
]

View File

@ -1,12 +1,10 @@
# List any generated files here
TARGETS = array_cursor.gen.go \
flux_table.gen.go
TARGETS = array_cursor.gen.go
# List any source files used to generate the targets here
SOURCES = gen.go \
array_cursor.gen.go.tmpl \
array_cursor.gen.go.tmpldata \
flux_table.gen.go.tmpl \
types.tmpldata
# List any directories that have their own Makefile here

View File

@ -0,0 +1,7 @@
package datatypes
const (
FieldKey = "_field"
MeasurementKey = "_measurement"
ValueKey = "_value"
)

View File

@ -31,8 +31,8 @@ type SeriesRow struct {
}
var (
fieldKeyBytes = []byte(fieldKey)
measurementKeyBytes = []byte(measurementKey)
fieldKeyBytes = []byte(datatypes.FieldKey)
measurementKeyBytes = []byte(datatypes.MeasurementKey)
)
type indexSeriesCursor struct {