feat(flux): Add initial support for executing Flux queries

pull/10299/head
Stuart Carnie 2018-09-17 17:31:44 -07:00
parent 1a236cf629
commit 62b9791da8
14 changed files with 2604 additions and 10 deletions

87
Gopkg.lock generated
View File

@ -142,6 +142,32 @@
revision = "3af367b6b30c263d47e8895973edcca9a49cf029"
version = "v0.2.0"
[[projects]]
branch = "master"
digest = "1:60c9794469b42f5dd51df4652a93fbcba5ddcd990a02fd234be3cb44c18b0e67"
name = "github.com/influxdata/flux"
packages = [
".",
"ast",
"builtin",
"compiler",
"control",
"csv",
"execute",
"functions",
"functions/storage",
"interpreter",
"iocounter",
"lang",
"options",
"parser",
"plan",
"semantic",
"values",
]
pruneopts = "UT"
revision = "272ec9cad23d26d93339feca224ee1f9a29a21c4"
[[projects]]
branch = "master"
digest = "1:ac08c9f0449762095e72fcf636b8b2cfd1e16fea10d4f69776efddc95dc3a7be"
@ -153,6 +179,25 @@
pruneopts = "UT"
revision = "c661ab7db8ad858626cc7a2114e786f4e7463564"
[[projects]]
branch = "master"
digest = "1:b263f6201ed97bab3221ed8af2dc7575b2f07e7410b3e311a3149515ba3780f6"
name = "github.com/influxdata/platform"
packages = [
".",
"query",
]
pruneopts = "UT"
revision = "34fd898d6ced9f42dd5111526dfbdc2b62fd3279"
[[projects]]
branch = "master"
digest = "1:b07e7ba2bb99a70ce3318ea387a62666371c1ab4f9bfcae192a59e3f4b8ffa56"
name = "github.com/influxdata/tdigest"
packages = ["."]
pruneopts = "UT"
revision = "a7d76c6f093a59b94a01c6c2b8429122d444a8cc"
[[projects]]
branch = "master"
digest = "1:a6411d501f20aa4325c2cef806205a4b4802aec94b296f495db662c6ef46c787"
@ -233,6 +278,18 @@
revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c"
version = "v1.0.1"
[[projects]]
digest = "1:d4d60a22dc6c84bf1f8d2991cddcb98fb25f433f8a856fb6888fc21885998a13"
name = "github.com/mna/pigeon"
packages = [
".",
"ast",
"builder",
]
pruneopts = "UT"
revision = "9df264905d4734c0133161d8c67828ff522b154a"
version = "v1.0.0"
[[projects]]
branch = "master"
digest = "1:c22fdadfe2de0e1df75987cac34f1c721157dc24f180c76b4ef66a3c9637a799"
@ -277,6 +334,14 @@
revision = "bb6d471dc95d4fe11e432687f8b70ff496cf3136"
version = "v1.0.0"
[[projects]]
digest = "1:40e195917a951a8bf867cd05de2a46aaf1806c50cf92eebf4c16f78cd196f747"
name = "github.com/pkg/errors"
packages = ["."]
pruneopts = "UT"
revision = "645ef00459ed84a119197bfb8d8205042c6df63d"
version = "v0.8.0"
[[projects]]
digest = "1:70f78dea42b8c0ff38ecf5487eaa79006fa2193fc804fc7c1d7222745d9e2522"
name = "github.com/prometheus/client_golang"
@ -328,6 +393,14 @@
pruneopts = "UT"
revision = "101a6d2f8b52abfc409ac188958e7e7be0116331"
[[projects]]
digest = "1:274f67cb6fed9588ea2521ecdac05a6d62a8c51c074c1fccc6a49a40ba80e925"
name = "github.com/satori/go.uuid"
packages = ["."]
pruneopts = "UT"
revision = "f58768cc1a7a7e77a3bd49e98cdd21419399b6a3"
version = "v1.2.0"
[[projects]]
digest = "1:eaa6698f44de8f2977e93c9b946e60a8af75f565058658aad2df8032b55c84e5"
name = "github.com/tinylib/msgp"
@ -530,7 +603,20 @@
"github.com/golang/snappy",
"github.com/google/go-cmp/cmp",
"github.com/google/go-cmp/cmp/cmpopts",
"github.com/influxdata/flux",
"github.com/influxdata/flux/ast",
"github.com/influxdata/flux/builtin",
"github.com/influxdata/flux/control",
"github.com/influxdata/flux/csv",
"github.com/influxdata/flux/execute",
"github.com/influxdata/flux/functions",
"github.com/influxdata/flux/functions/storage",
"github.com/influxdata/flux/lang",
"github.com/influxdata/flux/semantic",
"github.com/influxdata/flux/values",
"github.com/influxdata/influxql",
"github.com/influxdata/platform",
"github.com/influxdata/platform/query",
"github.com/influxdata/usage-client/v1",
"github.com/jsternberg/zap-logfmt",
"github.com/jwilder/encoding/simple8b",
@ -540,6 +626,7 @@
"github.com/opentracing/opentracing-go/ext",
"github.com/paulbellamy/ratecounter",
"github.com/peterh/liner",
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus/promhttp",
"github.com/retailnext/hllpp",
"github.com/tinylib/msgp/msgp",

View File

@ -69,3 +69,7 @@
[prune]
go-tests = true
unused-packages = true
[[constraint]]
branch = "master"
name = "github.com/influxdata/flux"

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/flux/control"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
@ -286,7 +287,9 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
srv.Handler.PointsWriter = s.PointsWriter
srv.Handler.Version = s.buildInfo.Version
srv.Handler.BuildType = "OSS"
srv.Handler.Store = storage.NewStore(s.TSDBStore, s.MetaClient)
ss := storage.NewStore(s.TSDBStore, s.MetaClient)
srv.Handler.Store = ss
srv.Handler.Controller = control.NewController(ss, s.Logger)
s.Services = append(s.Services, srv)
}

View File

@ -0,0 +1,53 @@
package control
import (
"context"
_ "github.com/influxdata/flux/builtin"
"github.com/influxdata/flux/control"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/functions"
fstorage "github.com/influxdata/flux/functions/storage"
"github.com/influxdata/influxdb/flux/functions/store"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/platform"
"go.uber.org/zap"
)
func NewController(s storage.Store, logger *zap.Logger) *control.Controller {
// flux
var (
concurrencyQuota = 10
memoryBytesQuota = 1e6
)
cc := control.Config{
ExecutorDependencies: make(execute.Dependencies),
ConcurrencyQuota: concurrencyQuota,
MemoryBytesQuota: int64(memoryBytesQuota),
Logger: logger,
Verbose: false,
}
err := functions.InjectFromDependencies(cc.ExecutorDependencies, fstorage.Dependencies{
Reader: store.NewReader(s),
BucketLookup: bucketLookup{},
OrganizationLookup: orgLookup{},
})
if err != nil {
panic(err)
}
return control.New(cc)
}
type orgLookup struct{}
func (l orgLookup) Lookup(ctx context.Context, name string) (platform.ID, bool) {
return platform.ID(name), true
}
type bucketLookup struct{}
func (l bucketLookup) Lookup(orgID platform.ID, name string) (platform.ID, bool) {
return platform.ID(name), true
}

View File

@ -0,0 +1,160 @@
package store
import (
"fmt"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/semantic"
ostorage "github.com/influxdata/influxdb/services/storage"
"github.com/pkg/errors"
)
func ToStoragePredicate(f *semantic.FunctionExpression) (*ostorage.Predicate, error) {
if len(f.Params) != 1 {
return nil, errors.New("storage predicate functions must have exactly one parameter")
}
root, err := toStoragePredicate(f.Body.(semantic.Expression), f.Params[0].Key.Name)
if err != nil {
return nil, err
}
return &ostorage.Predicate{
Root: root,
}, nil
}
func toStoragePredicate(n semantic.Expression, objectName string) (*ostorage.Node, error) {
switch n := n.(type) {
case *semantic.LogicalExpression:
left, err := toStoragePredicate(n.Left, objectName)
if err != nil {
return nil, errors.Wrap(err, "left hand side")
}
right, err := toStoragePredicate(n.Right, objectName)
if err != nil {
return nil, errors.Wrap(err, "right hand side")
}
children := []*ostorage.Node{left, right}
switch n.Operator {
case ast.AndOperator:
return &ostorage.Node{
NodeType: ostorage.NodeTypeLogicalExpression,
Value: &ostorage.Node_Logical_{Logical: ostorage.LogicalAnd},
Children: children,
}, nil
case ast.OrOperator:
return &ostorage.Node{
NodeType: ostorage.NodeTypeLogicalExpression,
Value: &ostorage.Node_Logical_{Logical: ostorage.LogicalOr},
Children: children,
}, nil
default:
return nil, fmt.Errorf("unknown logical operator %v", n.Operator)
}
case *semantic.BinaryExpression:
left, err := toStoragePredicate(n.Left, objectName)
if err != nil {
return nil, errors.Wrap(err, "left hand side")
}
right, err := toStoragePredicate(n.Right, objectName)
if err != nil {
return nil, errors.Wrap(err, "right hand side")
}
children := []*ostorage.Node{left, right}
op, err := toComparisonOperator(n.Operator)
if err != nil {
return nil, err
}
return &ostorage.Node{
NodeType: ostorage.NodeTypeComparisonExpression,
Value: &ostorage.Node_Comparison_{Comparison: op},
Children: children,
}, nil
case *semantic.StringLiteral:
return &ostorage.Node{
NodeType: ostorage.NodeTypeLiteral,
Value: &ostorage.Node_StringValue{
StringValue: n.Value,
},
}, nil
case *semantic.IntegerLiteral:
return &ostorage.Node{
NodeType: ostorage.NodeTypeLiteral,
Value: &ostorage.Node_IntegerValue{
IntegerValue: n.Value,
},
}, nil
case *semantic.BooleanLiteral:
return &ostorage.Node{
NodeType: ostorage.NodeTypeLiteral,
Value: &ostorage.Node_BooleanValue{
BooleanValue: n.Value,
},
}, nil
case *semantic.FloatLiteral:
return &ostorage.Node{
NodeType: ostorage.NodeTypeLiteral,
Value: &ostorage.Node_FloatValue{
FloatValue: n.Value,
},
}, nil
case *semantic.RegexpLiteral:
return &ostorage.Node{
NodeType: ostorage.NodeTypeLiteral,
Value: &ostorage.Node_RegexValue{
RegexValue: n.Value.String(),
},
}, nil
case *semantic.MemberExpression:
// Sanity check that the object is the objectName identifier
if ident, ok := n.Object.(*semantic.IdentifierExpression); !ok || ident.Name != objectName {
return nil, fmt.Errorf("unknown object %q", n.Object)
}
if n.Property == "_value" {
return &ostorage.Node{
NodeType: ostorage.NodeTypeFieldRef,
Value: &ostorage.Node_FieldRefValue{
FieldRefValue: "_value",
},
}, nil
}
return &ostorage.Node{
NodeType: ostorage.NodeTypeTagRef,
Value: &ostorage.Node_TagRefValue{
TagRefValue: n.Property,
},
}, nil
case *semantic.DurationLiteral:
return nil, errors.New("duration literals not supported in storage predicates")
case *semantic.DateTimeLiteral:
return nil, errors.New("time literals not supported in storage predicates")
default:
return nil, fmt.Errorf("unsupported semantic expression type %T", n)
}
}
func toComparisonOperator(o ast.OperatorKind) (ostorage.Node_Comparison, error) {
switch o {
case ast.EqualOperator:
return ostorage.ComparisonEqual, nil
case ast.NotEqualOperator:
return ostorage.ComparisonNotEqual, nil
case ast.RegexpMatchOperator:
return ostorage.ComparisonRegex, nil
case ast.NotRegexpMatchOperator:
return ostorage.ComparisonNotRegex, nil
case ast.StartsWithOperator:
return ostorage.ComparisonStartsWith, nil
case ast.LessThanOperator:
return ostorage.ComparisonLess, nil
case ast.LessThanEqualOperator:
return ostorage.ComparisonLessEqual, nil
case ast.GreaterThanOperator:
return ostorage.ComparisonGreater, nil
case ast.GreaterThanEqualOperator:
return ostorage.ComparisonGreaterEqual, nil
default:
return 0, fmt.Errorf("unknown operator %v", o)
}
}

View File

@ -0,0 +1,453 @@
package store
import (
"context"
"fmt"
"strings"
"github.com/gogo/protobuf/types"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
fstorage "github.com/influxdata/flux/functions/storage"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/storage"
ostorage "github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/tsdb"
)
type storageTable interface {
flux.Table
Close()
Done() chan struct{}
}
type storeReader struct {
s storage.Store
}
func NewReader(s storage.Store) fstorage.Reader {
return &storeReader{s: s}
}
func (r *storeReader) Read(ctx context.Context, rs fstorage.ReadSpec, start, stop execute.Time) (flux.TableIterator, error) {
var predicate *ostorage.Predicate
if rs.Predicate != nil {
p, err := ToStoragePredicate(rs.Predicate)
if err != nil {
return nil, err
}
predicate = p
}
return &tableIterator{
ctx: ctx,
bounds: execute.Bounds{Start: start, Stop: stop},
s: r.s,
readSpec: rs,
predicate: predicate,
}, nil
}
func (r *storeReader) Close() {}
type tableIterator struct {
ctx context.Context
bounds execute.Bounds
s storage.Store
readSpec fstorage.ReadSpec
predicate *ostorage.Predicate
}
func (bi *tableIterator) Do(f func(flux.Table) error) error {
src := ostorage.ReadSource{Database: string(bi.readSpec.BucketID)}
if i := strings.IndexByte(src.Database, '/'); i > -1 {
src.RetentionPolicy = src.Database[i+1:]
src.Database = src.Database[:i]
}
// Setup read request
var req ostorage.ReadRequest
if any, err := types.MarshalAny(&src); err != nil {
return err
} else {
req.ReadSource = any
}
req.Predicate = bi.predicate
req.Descending = bi.readSpec.Descending
req.TimestampRange.Start = int64(bi.bounds.Start)
req.TimestampRange.End = int64(bi.bounds.Stop)
req.Group = convertGroupMode(bi.readSpec.GroupMode)
req.GroupKeys = bi.readSpec.GroupKeys
req.SeriesLimit = bi.readSpec.SeriesLimit
req.PointsLimit = bi.readSpec.PointsLimit
req.SeriesOffset = bi.readSpec.SeriesOffset
if req.PointsLimit == -1 {
req.Hints.SetNoPoints()
}
if agg, err := determineAggregateMethod(bi.readSpec.AggregateMethod); err != nil {
return err
} else if agg != ostorage.AggregateTypeNone {
req.Aggregate = &ostorage.Aggregate{Type: agg}
}
switch {
case req.Group != ostorage.GroupAll:
rs, err := bi.s.GroupRead(bi.ctx, &req)
if err != nil {
return err
}
if req.Hints.NoPoints() {
return bi.handleGroupReadNoPoints(f, rs)
}
return bi.handleGroupRead(f, rs)
default:
rs, err := bi.s.Read(bi.ctx, &req)
if err != nil {
return err
}
if req.Hints.NoPoints() {
return bi.handleReadNoPoints(f, rs)
}
return bi.handleRead(f, rs)
}
}
func (bi *tableIterator) handleRead(f func(flux.Table) error, rs ostorage.ResultSet) error {
defer func() {
rs.Close()
fmt.Println("handleRead: DONE")
}()
READ:
for rs.Next() {
cur := rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds)
var table storageTable
switch cur := cur.(type) {
case tsdb.IntegerArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
table = newIntegerTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
case tsdb.FloatArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
table = newFloatTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
case tsdb.UnsignedArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
table = newUnsignedTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
case tsdb.BooleanArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
table = newBooleanTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
case tsdb.StringArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
if table.Empty() {
table.Close()
continue
}
if err := f(table); err != nil {
table.Close()
return err
}
select {
case <-table.Done():
case <-bi.ctx.Done():
fmt.Println("CANCELED")
break READ
}
}
return nil
}
func (bi *tableIterator) handleReadNoPoints(f func(flux.Table) error, rs ostorage.ResultSet) error {
defer func() {
rs.Close()
fmt.Println("handleReadNoPoints: DONE")
}()
READ:
for rs.Next() {
cur := rs.Cursor()
if !hasPoints(cur) {
// no data for series key + field combination
continue
}
key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds)
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table := newTableNoPoints(bi.bounds, key, cols, rs.Tags(), defs)
if err := f(table); err != nil {
table.Close()
return err
}
select {
case <-table.Done():
case <-bi.ctx.Done():
fmt.Println("CANCELED")
break READ
}
}
return nil
}
func (bi *tableIterator) handleGroupRead(f func(flux.Table) error, rs ostorage.GroupResultSet) error {
defer func() {
rs.Close()
fmt.Println("handleGroupRead: DONE")
}()
gc := rs.Next()
READ:
for gc != nil {
var cur tsdb.Cursor
for gc.Next() {
cur = gc.Cursor()
if cur != nil {
break
}
}
if cur == nil {
gc = rs.Next()
continue
}
key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds)
var table storageTable
switch cur := cur.(type) {
case tsdb.IntegerArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt)
table = newIntegerGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
case tsdb.FloatArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat)
table = newFloatGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
case tsdb.UnsignedArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt)
table = newUnsignedGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
case tsdb.BooleanArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool)
table = newBooleanGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
case tsdb.StringArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
table = newStringGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
if err := f(table); err != nil {
table.Close()
return err
}
// Wait until the table has been read.
select {
case <-table.Done():
case <-bi.ctx.Done():
fmt.Println("CANCELED")
break READ
}
gc = rs.Next()
}
return nil
}
func (bi *tableIterator) handleGroupReadNoPoints(f func(flux.Table) error, rs ostorage.GroupResultSet) error {
defer func() {
rs.Close()
fmt.Println("handleGroupReadNoPoints: DONE")
}()
gc := rs.Next()
READ:
for gc != nil {
key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds)
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
table := newGroupTableNoPoints(gc, bi.bounds, key, cols, gc.Tags(), defs)
if err := f(table); err != nil {
table.Close()
return err
}
// Wait until the table has been read.
select {
case <-table.Done():
case <-bi.ctx.Done():
fmt.Println("CANCELED")
break READ
}
gc = rs.Next()
}
return nil
}
func determineAggregateMethod(agg string) (ostorage.Aggregate_AggregateType, error) {
if agg == "" {
return ostorage.AggregateTypeNone, nil
}
if t, ok := ostorage.Aggregate_AggregateType_value[strings.ToUpper(agg)]; ok {
return ostorage.Aggregate_AggregateType(t), nil
}
return 0, fmt.Errorf("unknown aggregate type %q", agg)
}
func convertGroupMode(m fstorage.GroupMode) ostorage.ReadRequest_Group {
switch m {
case fstorage.GroupModeNone:
return ostorage.GroupNone
case fstorage.GroupModeBy:
return ostorage.GroupBy
case fstorage.GroupModeExcept:
return ostorage.GroupExcept
case fstorage.GroupModeDefault, fstorage.GroupModeAll:
fallthrough
default:
return ostorage.GroupAll
}
}
const (
startColIdx = 0
stopColIdx = 1
timeColIdx = 2
valueColIdx = 3
)
func determineTableColsForSeries(tags models.Tags, typ flux.DataType) ([]flux.ColMeta, [][]byte) {
cols := make([]flux.ColMeta, 4+len(tags))
defs := make([][]byte, 4+len(tags))
cols[startColIdx] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
cols[stopColIdx] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
cols[timeColIdx] = flux.ColMeta{
Label: execute.DefaultTimeColLabel,
Type: flux.TTime,
}
cols[valueColIdx] = flux.ColMeta{
Label: execute.DefaultValueColLabel,
Type: typ,
}
for j, tag := range tags {
cols[4+j] = flux.ColMeta{
Label: string(tag.Key),
Type: flux.TString,
}
defs[4+j] = []byte("")
}
return cols, defs
}
func groupKeyForSeries(tags models.Tags, readSpec *fstorage.ReadSpec, bnds execute.Bounds) flux.GroupKey {
cols := make([]flux.ColMeta, 2, len(tags))
vs := make([]values.Value, 2, len(tags))
cols[0] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
vs[0] = values.NewTimeValue(bnds.Start)
cols[1] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
vs[1] = values.NewTimeValue(bnds.Stop)
switch readSpec.GroupMode {
case fstorage.GroupModeBy:
// group key in GroupKeys order, including tags in the GroupKeys slice
for _, k := range readSpec.GroupKeys {
if v := tags.Get([]byte(k)); len(v) > 0 {
cols = append(cols, flux.ColMeta{
Label: k,
Type: flux.TString,
})
vs = append(vs, values.NewStringValue(string(v)))
}
}
case fstorage.GroupModeExcept:
// group key in GroupKeys order, skipping tags in the GroupKeys slice
panic("not implemented")
case fstorage.GroupModeDefault, fstorage.GroupModeAll:
for i := range tags {
cols = append(cols, flux.ColMeta{
Label: string(tags[i].Key),
Type: flux.TString,
})
vs = append(vs, values.NewStringValue(string(tags[i].Value)))
}
}
return execute.NewGroupKey(cols, vs)
}
func determineTableColsForGroup(tagKeys [][]byte, typ flux.DataType) ([]flux.ColMeta, [][]byte) {
cols := make([]flux.ColMeta, 4+len(tagKeys))
defs := make([][]byte, 4+len(tagKeys))
cols[startColIdx] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
cols[stopColIdx] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
cols[timeColIdx] = flux.ColMeta{
Label: execute.DefaultTimeColLabel,
Type: flux.TTime,
}
cols[valueColIdx] = flux.ColMeta{
Label: execute.DefaultValueColLabel,
Type: typ,
}
for j, tag := range tagKeys {
cols[4+j] = flux.ColMeta{
Label: string(tag),
Type: flux.TString,
}
defs[4+j] = []byte("")
}
return cols, defs
}
func groupKeyForGroup(kv [][]byte, readSpec *fstorage.ReadSpec, bnds execute.Bounds) flux.GroupKey {
cols := make([]flux.ColMeta, 2, len(readSpec.GroupKeys)+2)
vs := make([]values.Value, 2, len(readSpec.GroupKeys)+2)
cols[0] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
vs[0] = values.NewTimeValue(bnds.Start)
cols[1] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
vs[1] = values.NewTimeValue(bnds.Stop)
for i := range readSpec.GroupKeys {
cols = append(cols, flux.ColMeta{
Label: readSpec.GroupKeys[i],
Type: flux.TString,
})
vs = append(vs, values.NewStringValue(string(kv[i])))
}
return execute.NewGroupKey(cols, vs)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,221 @@
package store
import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/tsdb"
"github.com/pkg/errors"
)
{{range .In}}
//
// *********** {{.Name}} ***********
//
type {{.name}}Table struct {
table
cur tsdb.{{.Name}}ArrayCursor
valBuf []{{.Type}}
}
func new{{.Name}}Table(
cur tsdb.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
) *{{.name}}Table {
t := &{{.name}}Table{
table: newTable(bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
return t
}
func (t *{{.name}}Table) Close() {
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error {
defer t.Close()
if !t.more {
return t.err
}
f(t)
for t.advance() {
if err := f(t); err != nil {
return err
}
}
return t.err
}
func (t *{{.name}}Table) advance() bool {
a := t.cur.Next()
t.l = a.Len()
if t.l == 0 {
return false
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
if cap(t.valBuf) < t.l {
t.valBuf = make([]{{.Type}}, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
// group table
type {{.name}}GroupTable struct {
table
gc storage.GroupCursor
cur tsdb.{{.Name}}ArrayCursor
valBuf []{{.Type}}
}
func new{{.Name}}GroupTable(
gc storage.GroupCursor,
cur tsdb.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
) *{{.name}}GroupTable {
t := &{{.name}}GroupTable{
table: newTable(bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
return t
}
func (t *{{.name}}GroupTable) Close() {
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
if !t.more {
return t.err
}
f(t)
for t.advance() {
if err := f(t); err != nil {
return err
}
}
return t.err
}
func (t *{{.name}}GroupTable) advance() bool {
RETRY:
a := t.cur.Next()
t.l = a.Len()
if t.l == 0 {
if t.advanceCursor() {
goto RETRY
}
return false
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
if cap(t.valBuf) < t.l {
t.valBuf = make([]{{.Type}}, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
func (t *{{.name}}GroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
if cur == nil {
continue
}
if cur, ok := cur.(tsdb.{{.Name}}ArrayCursor); !ok {
// TODO(sgc): error or skip?
cur.Close()
t.err = errors.Errorf("expected {{.name}} cursor type, got %T", cur)
return false
} else {
t.readTags(t.gc.Tags())
t.cur = cur
return true
}
}
return false
}
{{end}}

View File

@ -0,0 +1,270 @@
package store
//go:generate go run ../../../_tools/tmpl/main.go -i -data=types.tmpldata table.gen.go.tmpl=table.gen.go
import (
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/tsdb"
)
type table struct {
bounds execute.Bounds
key flux.GroupKey
cols []flux.ColMeta
// cache of the tags on the current series.
// len(tags) == len(colMeta)
tags [][]byte
defs [][]byte
done chan struct{}
// The current number of records in memory
l int
colBufs []interface{}
timeBuf []execute.Time
err error
empty bool
more bool
}
func newTable(
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
defs [][]byte,
) table {
return table{
bounds: bounds,
key: key,
tags: make([][]byte, len(cols)),
defs: defs,
colBufs: make([]interface{}, len(cols)),
cols: cols,
done: make(chan struct{}),
empty: true,
}
}
func (t *table) Done() chan struct{} { return t.done }
func (t *table) Key() flux.GroupKey { return t.key }
func (t *table) Cols() []flux.ColMeta { return t.cols }
func (t *table) RefCount(n int) {}
func (t *table) Err() error { return t.err }
func (t *table) Empty() bool { return t.empty }
func (t *table) Len() int { return t.l }
func (t *table) Bools(j int) []bool {
execute.CheckColType(t.cols[j], flux.TBool)
return t.colBufs[j].([]bool)
}
func (t *table) Ints(j int) []int64 {
execute.CheckColType(t.cols[j], flux.TInt)
return t.colBufs[j].([]int64)
}
func (t *table) UInts(j int) []uint64 {
execute.CheckColType(t.cols[j], flux.TUInt)
return t.colBufs[j].([]uint64)
}
func (t *table) Floats(j int) []float64 {
execute.CheckColType(t.cols[j], flux.TFloat)
return t.colBufs[j].([]float64)
}
func (t *table) Strings(j int) []string {
execute.CheckColType(t.cols[j], flux.TString)
return t.colBufs[j].([]string)
}
func (t *table) Times(j int) []execute.Time {
execute.CheckColType(t.cols[j], flux.TTime)
return t.colBufs[j].([]execute.Time)
}
// readTags populates b.tags with the provided tags
func (t *table) readTags(tags models.Tags) {
for j := range t.tags {
t.tags[j] = t.defs[j]
}
if len(tags) == 0 {
return
}
for _, tag := range tags {
j := execute.ColIdx(string(tag.Key), t.cols)
t.tags[j] = tag.Value
}
}
// appendTags fills the colBufs for the tag columns with the tag value.
func (t *table) appendTags() {
for j := range t.cols {
v := t.tags[j]
if v != nil {
if t.colBufs[j] == nil {
t.colBufs[j] = make([]string, len(t.cols))
}
colBuf := t.colBufs[j].([]string)
if cap(colBuf) < t.l {
colBuf = make([]string, t.l)
} else {
colBuf = colBuf[:t.l]
}
vStr := string(v)
for i := range colBuf {
colBuf[i] = vStr
}
t.colBufs[j] = colBuf
}
}
}
// appendBounds fills the colBufs for the time bounds
func (t *table) appendBounds() {
bounds := []execute.Time{t.bounds.Start, t.bounds.Stop}
for j := range []int{startColIdx, stopColIdx} {
if t.colBufs[j] == nil {
t.colBufs[j] = make([]execute.Time, len(t.cols))
}
colBuf := t.colBufs[j].([]execute.Time)
if cap(colBuf) < t.l {
colBuf = make([]execute.Time, t.l)
} else {
colBuf = colBuf[:t.l]
}
for i := range colBuf {
colBuf[i] = bounds[j]
}
t.colBufs[j] = colBuf
}
}
func hasPoints(cur tsdb.Cursor) bool {
if cur == nil {
return false
}
res := false
switch cur := cur.(type) {
case tsdb.IntegerArrayCursor:
a := cur.Next()
res = a.Len() > 0
case tsdb.FloatArrayCursor:
a := cur.Next()
res = a.Len() > 0
case tsdb.UnsignedArrayCursor:
a := cur.Next()
res = a.Len() > 0
case tsdb.BooleanArrayCursor:
a := cur.Next()
res = a.Len() > 0
case tsdb.StringArrayCursor:
a := cur.Next()
res = a.Len() > 0
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
cur.Close()
return res
}
type tableNoPoints struct {
table
}
func newTableNoPoints(
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
) *tableNoPoints {
t := &tableNoPoints{
table: newTable(bounds, key, cols, defs),
}
t.readTags(tags)
return t
}
func (t *tableNoPoints) Close() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *tableNoPoints) Do(f func(flux.ColReader) error) error {
defer t.Close()
f(t)
return t.err
}
type groupTableNoPoints struct {
table
gc storage.GroupCursor
}
func newGroupTableNoPoints(
gc storage.GroupCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
) *groupTableNoPoints {
t := &groupTableNoPoints{
table: newTable(bounds, key, cols, defs),
gc: gc,
}
t.readTags(tags)
return t
}
func (t *groupTableNoPoints) Close() {
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
defer t.Close()
for t.advanceCursor() {
if err := f(t); err != nil {
return err
}
}
return t.err
}
func (t *groupTableNoPoints) advanceCursor() bool {
for t.gc.Next() {
if hasPoints(t.gc.Cursor()) {
t.readTags(t.gc.Tags())
return true
}
}
return false
}

View File

@ -0,0 +1,27 @@
[
{
"Name":"Float",
"name":"float",
"Type":"float64"
},
{
"Name":"Integer",
"name":"integer",
"Type":"int64"
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"uint64"
},
{
"Name":"String",
"name":"string",
"Type":"string"
},
{
"Name":"Boolean",
"name":"boolean",
"Type":"bool"
}
]

172
services/httpd/flux.go Normal file
View File

@ -0,0 +1,172 @@
package httpd
import (
"encoding/json"
"fmt"
"io/ioutil"
"mime"
"net/http"
"unicode/utf8"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
"github.com/influxdata/platform/query"
"github.com/pkg/errors"
)
// QueryRequest is a flux query request.
type QueryRequest struct {
Spec *flux.Spec `json:"spec,omitempty"`
Query string `json:"query"`
Type string `json:"type"`
Dialect QueryDialect `json:"dialect"`
}
// QueryDialect is the formatting options for the query response.
type QueryDialect struct {
Header *bool `json:"header"`
Delimiter string `json:"delimiter"`
CommentPrefix string `json:"commentPrefix"`
DateTimeFormat string `json:"dateTimeFormat"`
Annotations []string `json:"annotations"`
}
// WithDefaults adds default values to the request.
func (r QueryRequest) WithDefaults() QueryRequest {
if r.Type == "" {
r.Type = "flux"
}
if r.Dialect.Delimiter == "" {
r.Dialect.Delimiter = ","
}
if r.Dialect.DateTimeFormat == "" {
r.Dialect.DateTimeFormat = "RFC3339"
}
if r.Dialect.Header == nil {
header := true
r.Dialect.Header = &header
}
return r
}
// Validate checks the query request and returns an error if the request is invalid.
func (r QueryRequest) Validate() error {
if r.Query == "" && r.Spec == nil {
return errors.New(`request body requires either spec or query`)
}
if r.Type != "flux" {
return fmt.Errorf(`unknown query type: %s`, r.Type)
}
if len(r.Dialect.CommentPrefix) > 1 {
return fmt.Errorf("invalid dialect comment prefix: must be length 0 or 1")
}
if len(r.Dialect.Delimiter) != 1 {
return fmt.Errorf("invalid dialect delimeter: must be length 1")
}
rn, size := utf8.DecodeRuneInString(r.Dialect.Delimiter)
if rn == utf8.RuneError && size == 1 {
return fmt.Errorf("invalid dialect delimeter character")
}
for _, a := range r.Dialect.Annotations {
switch a {
case "group", "datatype", "default":
default:
return fmt.Errorf(`unknown dialect annotation type: %s`, a)
}
}
switch r.Dialect.DateTimeFormat {
case "RFC3339", "RFC3339Nano":
default:
return fmt.Errorf(`unknown dialect date time format: %s`, r.Dialect.DateTimeFormat)
}
return nil
}
// ProxyRequest specifies a query request and the dialect for the results.
type ProxyRequest struct {
// Request is the basic query request
Request query.Request `json:"request"`
// Dialect is the result encoder
Dialect flux.Dialect `json:"dialect"`
}
// ProxyRequest returns a request to proxy from the flux.
func (r QueryRequest) ProxyRequest() *ProxyRequest {
// Query is preferred over spec
var compiler flux.Compiler
if r.Query != "" {
compiler = lang.FluxCompiler{
Query: r.Query,
}
} else if r.Spec != nil {
compiler = lang.SpecCompiler{
Spec: r.Spec,
}
}
delimiter, _ := utf8.DecodeRuneInString(r.Dialect.Delimiter)
noHeader := false
if r.Dialect.Header != nil {
noHeader = !*r.Dialect.Header
}
cfg := csv.DefaultEncoderConfig()
cfg.NoHeader = noHeader
cfg.Delimiter = delimiter
// TODO(nathanielc): Use commentPrefix and dateTimeFormat
// once they are supported.
return &ProxyRequest{
Request: query.Request{
Compiler: compiler,
},
Dialect: csv.Dialect{
ResultEncoderConfig: cfg,
},
}
}
// httpDialect is an encoding dialect that can write metadata to HTTP headers
type httpDialect interface {
SetHeaders(w http.ResponseWriter)
}
func decodeQueryRequest(r *http.Request) (*QueryRequest, error) {
ct := r.Header.Get("Content-Type")
mt, _, err := mime.ParseMediaType(ct)
if err != nil {
return nil, err
}
var req QueryRequest
switch mt {
case "application/vnd.flux":
if d, err := ioutil.ReadAll(r.Body); err != nil {
return nil, err
} else {
req.Query = string(d)
}
default:
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err
}
}
req = req.WithDefaults()
err = req.Validate()
if err != nil {
return nil, err
}
return &req, err
}

View File

@ -24,6 +24,8 @@ import (
"github.com/dgrijalva/jwt-go"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/influxdata/flux"
"github.com/influxdata/flux/control"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
@ -37,6 +39,7 @@ import (
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/uuid"
"github.com/influxdata/influxql"
pquery "github.com/influxdata/platform/query"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
@ -112,6 +115,10 @@ type Handler struct {
Store Store
// Flux services
Controller *control.Controller
CompilerMappings flux.CompilerMappings
Config *Config
Logger *zap.Logger
CLFLogger *log.Logger
@ -172,6 +179,10 @@ func NewHandler(c Config) *Handler {
"prometheus-read", // Prometheus remote read
"POST", "/api/v1/prom/read", true, true, h.servePromRead,
},
Route{
"flux-read", // Prometheus remote read
"POST", "/v2/query", true, true, h.serveFluxQuery,
},
Route{ // Ping
"ping",
"GET", "/ping", false, true, h.servePing,
@ -1092,6 +1103,74 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed)))
}
func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeQueryRequest(r)
if err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
pr := req.ProxyRequest()
ctx = pquery.ContextWithRequest(ctx, &pr.Request)
q, err := h.Controller.Query(ctx, pr.Request.Compiler)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() {
q.Cancel()
q.Done()
}()
// Setup headers
//stats, hasStats := results.(flux.Statisticser)
//if hasStats {
// w.Header().Set("Trailer", statsTrailer)
//}
// NOTE: We do not write out the headers here.
// It is possible that if the encoding step fails
// that we can write an error header so long as
// the encoder did not write anything.
// As such we rely on the http.ResponseWriter behavior
// to write an StatusOK header with the first write.
switch r.Header.Get("Accept") {
case "text/csv":
fallthrough
default:
if hd, ok := pr.Dialect.(httpDialect); !ok {
h.httpError(w, fmt.Sprintf("unsupported dialect over HTTP %T", req.Dialect), http.StatusBadRequest)
return
} else {
hd.SetHeaders(w)
}
encoder := pr.Dialect.Encoder()
results := flux.NewResultIteratorFromQuery(q)
n, err := encoder.Encode(w, results)
if err != nil {
results.Cancel()
if n == 0 {
// If the encoder did not write anything, we can write an error header.
h.httpError(w, err.Error(), http.StatusInternalServerError)
}
}
}
//if hasStats {
// data, err := json.Marshal(stats.Statistics())
// if err != nil {
// h.Logger.Info("Failed to encode statistics", zap.Error(err))
// return
// }
// // Write statisitcs trailer
// w.Header().Set(statsTrailer, string(data))
//}
}
// serveExpvar serves internal metrics in /debug/vars format over HTTP.
func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
// Retrieve statistics from the monitor.

View File

@ -3,7 +3,6 @@ package storage
import (
"context"
"errors"
"math"
"strings"
"github.com/gogo/protobuf/types"
@ -93,14 +92,6 @@ func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
)
}
if req.Hints.NoPoints() {
req.PointsLimit = -1
}
if req.PointsLimit == 0 {
req.PointsLimit = math.MaxInt64
}
w := NewResponseWriter(stream, req.Hints)
switch req.Group {

View File

@ -3,6 +3,7 @@ package storage
import (
"context"
"errors"
"math"
"sort"
"time"
@ -125,6 +126,14 @@ func (s *localStore) Read(ctx context.Context, req *ReadRequest) (ResultSet, err
panic("Read: len(Grouping) > 0")
}
if req.Hints.NoPoints() {
req.PointsLimit = -1
}
if req.PointsLimit == 0 {
req.PointsLimit = math.MaxInt64
}
source, err := getReadSource(req)
if err != nil {
return nil, err
@ -167,6 +176,14 @@ func (s *localStore) GroupRead(ctx context.Context, req *ReadRequest) (GroupResu
return nil, errors.New("GroupRead: SeriesLimit and SeriesOffset not supported when Grouping")
}
if req.Hints.NoPoints() {
req.PointsLimit = -1
}
if req.PointsLimit == 0 {
req.PointsLimit = math.MaxInt64
}
source, err := getReadSource(req)
if err != nil {
return nil, err