chore: refactor reads service and make it consumable externally

This pulls in the code that allows doing reads with flux into the
platform repo, and removes extra.go.

The reusable portion is under storage/reads, where the concrete
implementation for one of the platform's engines is in
storage/readservice.

In order to make this more reusable, the cursors had to move into
their own package, decoupling it from all of the other code in the
tsdb package. tsdb/cursors is this new package, and type/function
aliases have been added to the tsdb package to point at it.

The models package already is very light on transitive dependencies
and so it was allowed to be depended on in a concrete way in the
cursors package.

Finally, the protobuf definitions for issuing GRPC reads has been
moved into its own package for two reasons:
    1. It's a clean separation, and helps keep it that way.
    2. Many/most consumers will not be using GRPC. We just
       use the datatypes to express the API which helps making
       a GRPC server easier.
It is left up to future refactorings (specifically ones that involve
GPRC) to determine if these types should remain, or if there is a
cleaner way.

There's still some dependencies on both github.com/influxdata/influxql
and github.com/influxdata/influxdb/logger that we can hopefully remove
in future refactorings.
pull/10616/head
Jeff Wendling 2018-10-05 16:02:31 -06:00
parent 555c454cd8
commit 810833f33f
50 changed files with 12771 additions and 11033 deletions

File diff suppressed because it is too large Load Diff

View File

@ -28,6 +28,7 @@ import (
pcontrol "github.com/influxdata/platform/query/control"
"github.com/influxdata/platform/source"
"github.com/influxdata/platform/storage"
"github.com/influxdata/platform/storage/readservice"
"github.com/influxdata/platform/task"
taskbackend "github.com/influxdata/platform/task/backend"
taskbolt "github.com/influxdata/platform/task/backend/bolt"
@ -209,8 +210,6 @@ func platformF(cmd *cobra.Command, args []string) {
var onboardingSvc platform.OnboardingService = c
// TODO(jeff): this block is hacky support for a storage engine. it is not intended to
// be a long term solution.
var storageQueryService query.ProxyQueryService
var pointsWriter storage.PointsWriter
{
@ -227,18 +226,15 @@ func platformF(cmd *cobra.Command, args []string) {
}
pointsWriter = engine
storageQueryService = query.ProxyQueryServiceBridge{
QueryService: query.QueryServiceBridge{
AsyncQueryService: &queryAdapter{
Controller: NewController(
&store{engine: engine},
query.FromBucketService(c),
query.FromOrganizationService(c),
logger.With(zap.String("service", "storage")),
),
},
},
service, err := readservice.NewProxyQueryService(
engine, bucketSvc, orgSvc, logger.With(zap.String("service", "storage-reads")))
if err != nil {
logger.Error("failed to create query service", zap.Error(err))
os.Exit(1)
}
storageQueryService = service
}
var queryService query.QueryService

View File

@ -216,8 +216,8 @@ func (e *Engine) MustOpen() {
// This allows us to use the old `models` package helper functions and still write
// the points in the correct format.
func (e *Engine) Write1xPoints(pts []models.Point) error {
org, _ := platform.IDFromString("1111111111111111")
bucket, _ := platform.IDFromString("2222222222222222")
org, _ := platform.IDFromString("3131313131313131")
bucket, _ := platform.IDFromString("3232323232323232")
points, err := tsdb.ExplodePoints(*org, *bucket, pts)
if err != nil {
return err

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,236 @@
package reads
import (
"errors"
"github.com/influxdata/platform/tsdb/cursors"
"github.com/influxdata/platform/tsdb/defaults"
)
{{range .}}
{{$arrayType := print "*cursors." .Name "Array"}}
{{$type := print .name "ArrayFilterCursor"}}
{{$Type := print .Name "ArrayFilterCursor"}}
// ********************
// {{.Name}} Array Cursor
type {{$type}} struct {
cursors.{{.Name}}ArrayCursor
cond expression
m *singleValue
res {{$arrayType}}
tmp {{$arrayType}}
}
func new{{.Name}}FilterArrayCursor(cond expression) *{{$type}} {
return &{{$type}}{
cond: cond,
m: &singleValue{},
res: cursors.New{{.Name}}ArrayLen(defaults.DefaultMaxPointsPerBlock),
tmp: &cursors.{{.Name}}Array{},
}
}
func (c *{{$type}}) reset(cur cursors.{{.Name}}ArrayCursor) {
c.{{.Name}}ArrayCursor = cur
c.tmp.Timestamps, c.tmp.Values = nil, nil
}
func (c *{{$type}}) Next() {{$arrayType}} {
pos := 0
var a {{$arrayType}}
if a.Len() > 0 {
a = c.tmp
c.tmp.Timestamps = nil
c.tmp.Values = nil
} else {
a = c.{{.Name}}ArrayCursor.Next()
}
LOOP:
for len(a.Timestamps) > 0 {
for i, v := range a.Values {
c.m.v = v
if c.cond.EvalBool(c.m) {
c.res.Timestamps[pos] = a.Timestamps[i]
c.res.Values[pos] = v
pos++
if pos >= defaults.DefaultMaxPointsPerBlock {
c.tmp.Timestamps = a.Timestamps[i+1:]
c.tmp.Values = a.Values[i+1:]
break LOOP
}
}
}
a = c.{{.Name}}ArrayCursor.Next()
}
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos]
return c.res
}
type {{.name}}MultiShardArrayCursor struct {
cursors.{{.Name}}ArrayCursor
cursorContext
filter *{{$type}}
}
func (c *{{.name}}MultiShardArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, itrs cursors.CursorIterators, cond expression) {
if cond != nil {
if c.filter == nil {
c.filter = new{{.Name}}FilterArrayCursor(cond)
}
c.filter.reset(cur)
cur = c.filter
}
c.{{.Name}}ArrayCursor = cur
c.itrs = itrs
c.err = nil
c.count = 0
}
func (c *{{.name}}MultiShardArrayCursor) Err() error { return c.err }
func (c *{{.name}}MultiShardArrayCursor) Next() {{$arrayType}} {
for {
a := c.{{.Name}}ArrayCursor.Next()
if a.Len() == 0 {
if c.nextArrayCursor() {
continue
}
}
c.count += int64(a.Len())
if c.count > c.limit {
diff := c.count - c.limit
c.count -= diff
rem := int64(a.Len()) - diff
a.Timestamps = a.Timestamps[:rem]
a.Values = a.Values[:rem]
}
return a
}
}
func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool {
if len(c.itrs) == 0 {
return false
}
c.{{.Name}}ArrayCursor.Close()
var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
}
var ok bool
if cur != nil {
var next cursors.{{.Name}}ArrayCursor
next, ok = cur.(cursors.{{.Name}}ArrayCursor)
if !ok {
cur.Close()
next = {{.Name}}EmptyArrayCursor
c.itrs = nil
c.err = errors.New("expected {{.name}} cursor")
} else {
if c.filter != nil {
c.filter.reset(next)
next = c.filter
}
}
c.{{.Name}}ArrayCursor = next
} else {
c.{{.Name}}ArrayCursor = {{.Name}}EmptyArrayCursor
}
return ok
}
{{if .Agg}}
{{$type := print .name "ArraySumCursor"}}
{{$Type := print .Name "ArraySumCursor"}}
type {{$type}} struct {
cursors.{{.Name}}ArrayCursor
ts [1]int64
vs [1]{{.Type}}
res {{$arrayType}}
}
func new{{$Type}}(cur cursors.{{.Name}}ArrayCursor) *{{$type}} {
return &{{$type}}{
{{.Name}}ArrayCursor: cur,
res: &cursors.{{.Name}}Array{},
}
}
func (c {{$type}}) Next() {{$arrayType}} {
a := c.{{.Name}}ArrayCursor.Next()
if len(a.Timestamps) == 0 {
return a
}
ts := a.Timestamps[0]
var acc {{.Type}}
for {
for _, v := range a.Values {
acc += v
}
a = c.{{.Name}}ArrayCursor.Next()
if len(a.Timestamps) == 0 {
c.ts[0] = ts
c.vs[0] = acc
c.res.Timestamps = c.ts[:]
c.res.Values = c.vs[:]
return c.res
}
}
}
{{end}}
type integer{{.Name}}CountArrayCursor struct {
cursors.{{.Name}}ArrayCursor
}
func (c *integer{{.Name}}CountArrayCursor) Next() *cursors.IntegerArray {
a := c.{{.Name}}ArrayCursor.Next()
if len(a.Timestamps) == 0 {
return &cursors.IntegerArray{}
}
ts := a.Timestamps[0]
var acc int64
for {
acc += int64(len(a.Timestamps))
a = c.{{.Name}}ArrayCursor.Next()
if len(a.Timestamps) == 0 {
res := cursors.NewIntegerArrayLen(1)
res.Timestamps[0] = ts
res.Values[0] = acc
return res
}
}
}
type {{.name}}EmptyArrayCursor struct {
res cursors.{{.Name}}Array
}
var {{.Name}}EmptyArrayCursor cursors.{{.Name}}ArrayCursor = &{{.name}}EmptyArrayCursor{}
func (c *{{.name}}EmptyArrayCursor) Err() error { return nil }
func (c *{{.name}}EmptyArrayCursor) Close() {}
func (c *{{.name}}EmptyArrayCursor) Next() {{$arrayType}} { return &c.res }
{{end}}

View File

@ -0,0 +1,40 @@
[
{
"Name":"Float",
"name":"float",
"Type":"float64",
"ValueType":"FloatValue",
"Nil":"0",
"Agg":true
},
{
"Name":"Integer",
"name":"integer",
"Type":"int64",
"ValueType":"IntegerValue",
"Nil":"0",
"Agg":true
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"uint64",
"ValueType":"UnsignedValue",
"Nil":"0",
"Agg":true
},
{
"Name":"String",
"name":"string",
"Type":"string",
"ValueType":"StringValue",
"Nil":"\"\""
},
{
"Name":"Boolean",
"name":"boolean",
"Type":"bool",
"ValueType":"BooleanValue",
"Nil":"false"
}
]

View File

@ -0,0 +1,163 @@
package reads
import (
"context"
"fmt"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb/cursors"
)
type singleValue struct {
v interface{}
}
func (v *singleValue) Value(key string) (interface{}, bool) {
return v.v, true
}
func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor {
if cursor == nil {
return nil
}
switch agg.Type {
case datatypes.AggregateTypeSum:
return newSumArrayCursor(cursor)
case datatypes.AggregateTypeCount:
return newCountArrayCursor(cursor)
default:
// TODO(sgc): should be validated higher up
panic("invalid aggregate")
}
}
func newSumArrayCursor(cur cursors.Cursor) cursors.Cursor {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return newFloatArraySumCursor(cur)
case cursors.IntegerArrayCursor:
return newIntegerArraySumCursor(cur)
case cursors.UnsignedArrayCursor:
return newUnsignedArraySumCursor(cur)
default:
// TODO(sgc): propagate an error instead?
return nil
}
}
func newCountArrayCursor(cur cursors.Cursor) cursors.Cursor {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return &integerFloatCountArrayCursor{FloatArrayCursor: cur}
case cursors.IntegerArrayCursor:
return &integerIntegerCountArrayCursor{IntegerArrayCursor: cur}
case cursors.UnsignedArrayCursor:
return &integerUnsignedCountArrayCursor{UnsignedArrayCursor: cur}
case cursors.StringArrayCursor:
return &integerStringCountArrayCursor{StringArrayCursor: cur}
case cursors.BooleanArrayCursor:
return &integerBooleanCountArrayCursor{BooleanArrayCursor: cur}
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
}
type cursorContext struct {
ctx context.Context
req *cursors.CursorRequest
itrs cursors.CursorIterators
limit int64
count int64
err error
}
type multiShardArrayCursors struct {
ctx context.Context
limit int64
req cursors.CursorRequest
cursors struct {
i integerMultiShardArrayCursor
f floatMultiShardArrayCursor
u unsignedMultiShardArrayCursor
b booleanMultiShardArrayCursor
s stringMultiShardArrayCursor
}
}
func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool, limit int64) *multiShardArrayCursors {
if limit < 0 {
limit = 1
}
m := &multiShardArrayCursors{
ctx: ctx,
limit: limit,
req: cursors.CursorRequest{
Ascending: asc,
StartTime: start,
EndTime: end,
},
}
cc := cursorContext{
ctx: ctx,
limit: limit,
req: &m.req,
}
m.cursors.i.cursorContext = cc
m.cursors.f.cursorContext = cc
m.cursors.u.cursorContext = cc
m.cursors.b.cursorContext = cc
m.cursors.s.cursorContext = cc
return m
}
func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
m.req.Name = row.Name
m.req.Tags = row.SeriesTags
m.req.Field = row.Field
var cond expression
if row.ValueCond != nil {
cond = &astExpr{row.ValueCond}
}
var shard cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(row.Query) > 0 {
shard, row.Query = row.Query[0], row.Query[1:]
cur, _ = shard.Next(m.ctx, &m.req)
}
if cur == nil {
return nil
}
switch c := cur.(type) {
case cursors.IntegerArrayCursor:
m.cursors.i.reset(c, row.Query, cond)
return &m.cursors.i
case cursors.FloatArrayCursor:
m.cursors.f.reset(c, row.Query, cond)
return &m.cursors.f
case cursors.UnsignedArrayCursor:
m.cursors.u.reset(c, row.Query, cond)
return &m.cursors.u
case cursors.StringArrayCursor:
m.cursors.s.reset(c, row.Query, cond)
return &m.cursors.s
case cursors.BooleanArrayCursor:
m.cursors.b.reset(c, row.Query, cond)
return &m.cursors.b
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
}
func (m *multiShardArrayCursors) newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor {
return newAggregateArrayCursor(ctx, agg, cursor)
}

View File

@ -0,0 +1,3 @@
package datatypes
//go:generate protoc -I$GOPATH/src/github.com/influxdata/influxdb/vendor -I. --gogofaster_out=Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,plugins=grpc:. storage_common.proto storage.proto predicate.proto

View File

@ -0,0 +1,55 @@
package datatypes
import (
"strings"
"github.com/gogo/protobuf/proto"
)
type HintFlags uint32
func (h HintFlags) NoPoints() bool {
return uint32(h)&uint32(HintNoPoints) != 0
}
func (h *HintFlags) SetNoPoints() {
*h |= HintFlags(HintNoPoints)
}
func (h HintFlags) NoSeries() bool {
return uint32(h)&uint32(HintNoSeries) != 0
}
func (h *HintFlags) SetNoSeries() {
*h |= HintFlags(HintNoSeries)
}
func (h HintFlags) HintSchemaAllTime() bool {
return uint32(h)&uint32(HintSchemaAllTime) != 0
}
func (h *HintFlags) SetHintSchemaAllTime() {
*h |= HintFlags(HintSchemaAllTime)
}
func (h HintFlags) String() string {
f := uint32(h)
var s []string
enums := proto.EnumValueMap("influxdata.platform.storage.ReadRequest_HintFlags")
if h == 0 {
return "HINT_NONE"
}
for k, v := range enums {
if v == 0 {
continue
}
v := uint32(v)
if f&v == v {
s = append(s, k)
}
}
return strings.Join(s, ",")
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,60 @@
syntax = "proto3";
package influxdata.platform.storage;
option go_package = "datatypes";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
message Node {
enum Type {
option (gogoproto.goproto_enum_prefix) = false;
LOGICAL_EXPRESSION = 0 [(gogoproto.enumvalue_customname) = "NodeTypeLogicalExpression"];
COMPARISON_EXPRESSION = 1 [(gogoproto.enumvalue_customname) = "NodeTypeComparisonExpression"];
PAREN_EXPRESSION = 2 [(gogoproto.enumvalue_customname) = "NodeTypeParenExpression"];
TAG_REF = 3 [(gogoproto.enumvalue_customname) = "NodeTypeTagRef"];
LITERAL = 4 [(gogoproto.enumvalue_customname) = "NodeTypeLiteral"];
FIELD_REF = 5 [(gogoproto.enumvalue_customname) = "NodeTypeFieldRef"];
}
enum Comparison {
option (gogoproto.goproto_enum_prefix) = false;
EQUAL = 0 [(gogoproto.enumvalue_customname) = "ComparisonEqual"];
NOT_EQUAL = 1 [(gogoproto.enumvalue_customname) = "ComparisonNotEqual"];
STARTS_WITH = 2 [(gogoproto.enumvalue_customname) = "ComparisonStartsWith"];
REGEX = 3 [(gogoproto.enumvalue_customname) = "ComparisonRegex"];
NOT_REGEX = 4 [(gogoproto.enumvalue_customname) = "ComparisonNotRegex"];
LT = 5 [(gogoproto.enumvalue_customname) = "ComparisonLess"];
LTE = 6 [(gogoproto.enumvalue_customname) = "ComparisonLessEqual"];
GT = 7 [(gogoproto.enumvalue_customname) = "ComparisonGreater"];
GTE = 8 [(gogoproto.enumvalue_customname) = "ComparisonGreaterEqual"];
}
// Logical operators apply to boolean values and combine to produce a single boolean result.
enum Logical {
option (gogoproto.goproto_enum_prefix) = false;
AND = 0 [(gogoproto.enumvalue_customname) = "LogicalAnd"];
OR = 1 [(gogoproto.enumvalue_customname) = "LogicalOr"];
}
Type node_type = 1 [(gogoproto.customname) = "NodeType", (gogoproto.jsontag) = "nodeType"];
repeated Node children = 2;
oneof value {
string string_value = 3 [(gogoproto.customname) = "StringValue"];
bool bool_value = 4 [(gogoproto.customname) = "BooleanValue"];
int64 int_value = 5 [(gogoproto.customname) = "IntegerValue"];
uint64 uint_value = 6 [(gogoproto.customname) = "UnsignedValue"];
double float_value = 7 [(gogoproto.customname) = "FloatValue"];
string regex_value = 8 [(gogoproto.customname) = "RegexValue"];
string tag_ref_value = 9 [(gogoproto.customname) = "TagRefValue"];
string field_ref_value = 10 [(gogoproto.customname) = "FieldRefValue"];
Logical logical = 11;
Comparison comparison = 12;
}
}
message Predicate {
Node root = 1;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,206 @@
syntax = "proto3";
package influxdata.platform.storage;
option go_package = "datatypes";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "predicate.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
service Storage {
// Read performs a read operation using the given ReadRequest
rpc Read (ReadRequest) returns (stream ReadResponse);
// Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine
rpc Capabilities (google.protobuf.Empty) returns (CapabilitiesResponse);
rpc Hints (google.protobuf.Empty) returns (HintsResponse);
// Explain describes the costs associated with executing a given Read request
// rpc Explain(google.protobuf.Empty) returns (ExplainResponse){}
}
// Request message for Storage.Read.
message ReadRequest {
enum Group {
option (gogoproto.goproto_enum_prefix) = false;
// GroupNone returns all series as a single group.
// The single GroupFrame.TagKeys will be the union of all tag keys.
GROUP_NONE = 0 [(gogoproto.enumvalue_customname) = "GroupNone"];
// GroupAll returns a unique group for each series.
// As an optimization, no GroupFrames will be generated.
GROUP_ALL = 1 [(gogoproto.enumvalue_customname) = "GroupAll"];
// GroupBy returns a group for each unique value of the specified GroupKeys.
GROUP_BY = 2 [(gogoproto.enumvalue_customname) = "GroupBy"];
// GroupExcept in not implemented.
GROUP_EXCEPT = 3 [(gogoproto.enumvalue_customname) = "GroupExcept"];
}
enum HintFlags {
option (gogoproto.goproto_enum_prefix) = false;
HINT_NONE = 0x00 [(gogoproto.enumvalue_customname) = "HintNone"];
HINT_NO_POINTS = 0x01 [(gogoproto.enumvalue_customname) = "HintNoPoints"];
HINT_NO_SERIES = 0x02 [(gogoproto.enumvalue_customname) = "HintNoSeries"];
// HintSchemaAllTime performs schema queries without using time ranges
HINT_SCHEMA_ALL_TIME = 0x04 [(gogoproto.enumvalue_customname) = "HintSchemaAllTime"];
}
google.protobuf.Any read_source = 13 [(gogoproto.customname) = "ReadSource"];
TimestampRange timestamp_range = 2 [(gogoproto.customname) = "TimestampRange", (gogoproto.nullable) = false];
// Descending indicates whether points should be returned in descending order.
bool descending = 3;
// GroupKeys specifies a list of tag keys used to order the data. It is dependent on the Group property to determine
// its behavior.
repeated string group_keys = 4 [(gogoproto.customname) = "GroupKeys"];
//
Group group = 11;
// Aggregate specifies an optional aggregate to apply to the data.
// TODO(sgc): switch to slice for multiple aggregates in a single request
Aggregate aggregate = 9;
Predicate predicate = 5;
// SeriesLimit determines the maximum number of series to be returned for the request. Specify 0 for no limit.
int64 series_limit = 6 [(gogoproto.customname) = "SeriesLimit"];
// SeriesOffset determines how many series to skip before processing the request.
int64 series_offset = 7 [(gogoproto.customname) = "SeriesOffset"];
// PointsLimit determines the maximum number of values per series to be returned for the request.
// Specify 0 for no limit. -1 to return series frames only.
int64 points_limit = 8 [(gogoproto.customname) = "PointsLimit"];
// Trace contains opaque data if a trace is active.
map<string, string> trace = 10 [(gogoproto.customname) = "Trace"];
// Hints is a bitwise OR of HintFlags to control the behavior
// of the read request.
fixed32 hints = 12 [(gogoproto.customname) = "Hints", (gogoproto.casttype) = "HintFlags"];
}
message Aggregate {
enum AggregateType {
option (gogoproto.goproto_enum_prefix) = false;
NONE = 0 [(gogoproto.enumvalue_customname) = "AggregateTypeNone"];
SUM = 1 [(gogoproto.enumvalue_customname) = "AggregateTypeSum"];
COUNT = 2 [(gogoproto.enumvalue_customname) = "AggregateTypeCount"];
}
AggregateType type = 1;
// additional arguments?
}
message Tag {
bytes key = 1;
bytes value = 2;
}
// Response message for Storage.Read.
message ReadResponse {
enum FrameType {
option (gogoproto.goproto_enum_prefix) = false;
SERIES = 0 [(gogoproto.enumvalue_customname) = "FrameTypeSeries"];
POINTS = 1 [(gogoproto.enumvalue_customname) = "FrameTypePoints"];
}
enum DataType {
option (gogoproto.goproto_enum_prefix) = false;
FLOAT = 0 [(gogoproto.enumvalue_customname) = "DataTypeFloat"];
INTEGER = 1 [(gogoproto.enumvalue_customname) = "DataTypeInteger"];
UNSIGNED = 2 [(gogoproto.enumvalue_customname) = "DataTypeUnsigned"];
BOOLEAN = 3 [(gogoproto.enumvalue_customname) = "DataTypeBoolean"];
STRING = 4 [(gogoproto.enumvalue_customname) = "DataTypeString"];
}
message Frame {
oneof data {
GroupFrame group = 7;
SeriesFrame series = 1;
FloatPointsFrame float_points = 2 [(gogoproto.customname) = "FloatPoints"];
IntegerPointsFrame integer_points = 3 [(gogoproto.customname) = "IntegerPoints"];
UnsignedPointsFrame unsigned_points = 4 [(gogoproto.customname) = "UnsignedPoints"];
BooleanPointsFrame boolean_points = 5 [(gogoproto.customname) = "BooleanPoints"];
StringPointsFrame string_points = 6 [(gogoproto.customname) = "StringPoints"];
}
}
message GroupFrame {
// TagKeys
repeated bytes tag_keys = 1 [(gogoproto.customname) = "TagKeys"];
// PartitionKeyVals is the values of the partition key for this group, order matching ReadRequest.GroupKeys
repeated bytes partition_key_vals = 2 [(gogoproto.customname) = "PartitionKeyVals"];
}
message SeriesFrame {
repeated Tag tags = 1 [(gogoproto.nullable) = false];
DataType data_type = 2;
}
message FloatPointsFrame {
repeated sfixed64 timestamps = 1;
repeated double values = 2;
}
message IntegerPointsFrame {
repeated sfixed64 timestamps = 1;
repeated int64 values = 2;
}
message UnsignedPointsFrame {
repeated sfixed64 timestamps = 1;
repeated uint64 values = 2;
}
message BooleanPointsFrame {
repeated sfixed64 timestamps = 1;
repeated bool values = 2;
}
message StringPointsFrame {
repeated sfixed64 timestamps = 1;
repeated string values = 2;
}
repeated Frame frames = 1 [(gogoproto.nullable) = false];
}
message CapabilitiesResponse {
map<string, string> caps = 1;
}
message HintsResponse {
}
// Specifies a continuous range of nanosecond timestamps.
message TimestampRange {
// Start defines the inclusive lower bound.
int64 start = 1;
// End defines the inclusive upper bound.
int64 end = 2;
}
//message ExplainRequest {
// ReadRequest read_request = 1 [(gogoproto.customname) = "ReadRequest"];
//}
//
//message ExplainResponse {}

284
storage/reads/eval.go Normal file
View File

@ -0,0 +1,284 @@
package reads
import (
"math"
"regexp"
"github.com/influxdata/influxql"
)
// evalExpr evaluates expr against a map.
func evalExpr(expr influxql.Expr, m Valuer) interface{} {
if expr == nil {
return nil
}
switch expr := expr.(type) {
case *influxql.BinaryExpr:
return evalBinaryExpr(expr, m)
case *influxql.BooleanLiteral:
return expr.Val
case *influxql.IntegerLiteral:
return expr.Val
case *influxql.UnsignedLiteral:
return expr.Val
case *influxql.NumberLiteral:
return expr.Val
case *influxql.ParenExpr:
return evalExpr(expr.Expr, m)
case *influxql.RegexLiteral:
return expr.Val
case *influxql.StringLiteral:
return expr.Val
case *influxql.VarRef:
v, _ := m.Value(expr.Val)
return v
default:
return nil
}
}
func evalBinaryExpr(expr *influxql.BinaryExpr, m Valuer) interface{} {
lhs := evalExpr(expr.LHS, m)
rhs := evalExpr(expr.RHS, m)
if lhs == nil && rhs != nil {
// When the LHS is nil and the RHS is a boolean, implicitly cast the
// nil to false.
if _, ok := rhs.(bool); ok {
lhs = false
}
} else if lhs != nil && rhs == nil {
// Implicit cast of the RHS nil to false when the LHS is a boolean.
if _, ok := lhs.(bool); ok {
rhs = false
}
}
// Evaluate if both sides are simple types.
switch lhs := lhs.(type) {
case bool:
rhs, ok := rhs.(bool)
switch expr.Op {
case influxql.AND:
return ok && (lhs && rhs)
case influxql.OR:
return ok && (lhs || rhs)
case influxql.BITWISE_AND:
return ok && (lhs && rhs)
case influxql.BITWISE_OR:
return ok && (lhs || rhs)
case influxql.BITWISE_XOR:
return ok && (lhs != rhs)
case influxql.EQ:
return ok && (lhs == rhs)
case influxql.NEQ:
return ok && (lhs != rhs)
}
case float64:
// Try the rhs as a float64 or int64
rhsf, ok := rhs.(float64)
if !ok {
var rhsi int64
if rhsi, ok = rhs.(int64); ok {
rhsf = float64(rhsi)
}
}
rhs := rhsf
switch expr.Op {
case influxql.EQ:
return ok && (lhs == rhs)
case influxql.NEQ:
return ok && (lhs != rhs)
case influxql.LT:
return ok && (lhs < rhs)
case influxql.LTE:
return ok && (lhs <= rhs)
case influxql.GT:
return ok && (lhs > rhs)
case influxql.GTE:
return ok && (lhs >= rhs)
case influxql.ADD:
if !ok {
return nil
}
return lhs + rhs
case influxql.SUB:
if !ok {
return nil
}
return lhs - rhs
case influxql.MUL:
if !ok {
return nil
}
return lhs * rhs
case influxql.DIV:
if !ok {
return nil
} else if rhs == 0 {
return float64(0)
}
return lhs / rhs
case influxql.MOD:
if !ok {
return nil
}
return math.Mod(lhs, rhs)
}
case int64:
// Try as a float64 to see if a float cast is required.
rhsf, ok := rhs.(float64)
if ok {
lhs := float64(lhs)
rhs := rhsf
switch expr.Op {
case influxql.EQ:
return lhs == rhs
case influxql.NEQ:
return lhs != rhs
case influxql.LT:
return lhs < rhs
case influxql.LTE:
return lhs <= rhs
case influxql.GT:
return lhs > rhs
case influxql.GTE:
return lhs >= rhs
case influxql.ADD:
return lhs + rhs
case influxql.SUB:
return lhs - rhs
case influxql.MUL:
return lhs * rhs
case influxql.DIV:
if rhs == 0 {
return float64(0)
}
return lhs / rhs
case influxql.MOD:
return math.Mod(lhs, rhs)
}
} else {
rhs, ok := rhs.(int64)
switch expr.Op {
case influxql.EQ:
return ok && (lhs == rhs)
case influxql.NEQ:
return ok && (lhs != rhs)
case influxql.LT:
return ok && (lhs < rhs)
case influxql.LTE:
return ok && (lhs <= rhs)
case influxql.GT:
return ok && (lhs > rhs)
case influxql.GTE:
return ok && (lhs >= rhs)
case influxql.ADD:
if !ok {
return nil
}
return lhs + rhs
case influxql.SUB:
if !ok {
return nil
}
return lhs - rhs
case influxql.MUL:
if !ok {
return nil
}
return lhs * rhs
case influxql.DIV:
if !ok {
return nil
} else if rhs == 0 {
return float64(0)
}
return lhs / rhs
case influxql.MOD:
if !ok {
return nil
} else if rhs == 0 {
return int64(0)
}
return lhs % rhs
case influxql.BITWISE_AND:
if !ok {
return nil
}
return lhs & rhs
case influxql.BITWISE_OR:
if !ok {
return nil
}
return lhs | rhs
case influxql.BITWISE_XOR:
if !ok {
return nil
}
return lhs ^ rhs
}
}
case string:
switch expr.Op {
case influxql.EQ:
rhs, ok := rhs.(string)
if !ok {
return nil
}
return lhs == rhs
case influxql.NEQ:
rhs, ok := rhs.(string)
if !ok {
return nil
}
return lhs != rhs
case influxql.EQREGEX:
rhs, ok := rhs.(*regexp.Regexp)
if !ok {
return nil
}
return rhs.MatchString(lhs)
case influxql.NEQREGEX:
rhs, ok := rhs.(*regexp.Regexp)
if !ok {
return nil
}
return !rhs.MatchString(lhs)
}
case []byte:
switch expr.Op {
case influxql.EQ:
rhs, ok := rhs.(string)
if !ok {
return nil
}
return string(lhs) == rhs
case influxql.NEQ:
rhs, ok := rhs.(string)
if !ok {
return nil
}
return string(lhs) != rhs
case influxql.EQREGEX:
rhs, ok := rhs.(*regexp.Regexp)
if !ok {
return nil
}
return rhs.Match(lhs)
case influxql.NEQREGEX:
rhs, ok := rhs.(*regexp.Regexp)
if !ok {
return nil
}
return !rhs.Match(lhs)
}
}
return nil
}
func EvalExprBool(expr influxql.Expr, m Valuer) bool {
v, _ := evalExpr(expr, m).(bool)
return v
}

25
storage/reads/expr.go Normal file
View File

@ -0,0 +1,25 @@
package reads
import (
"github.com/influxdata/influxql"
)
// TODO(sgc): build expression evaluator that does not use influxql AST
type expression interface {
EvalBool(v Valuer) bool
}
type astExpr struct {
expr influxql.Expr
}
func (e *astExpr) EvalBool(v Valuer) bool {
return EvalExprBool(e.expr, v)
}
// Valuer is the interface that wraps the Value() method.
type Valuer interface {
// Value returns the value and existence flag for a given key.
Value(key string) (interface{}, bool)
}

4
storage/reads/gen.go Normal file
View File

@ -0,0 +1,4 @@
package reads
//go:generate tmpl -data=@array_cursor.gen.go.tmpldata array_cursor.gen.go.tmpl
//go:generate tmpl -data=@array_cursor.gen.go.tmpldata response_writer.gen.go.tmpl

View File

@ -0,0 +1,424 @@
package reads
import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb/cursors"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
type groupResultSet struct {
ctx context.Context
req *datatypes.ReadRequest
agg *datatypes.Aggregate
mb multiShardCursors
i int
rows []*SeriesRow
keys [][]byte
rgc groupByCursor
km keyMerger
newCursorFn func() (SeriesCursor, error)
nextGroupFn func(c *groupResultSet) GroupCursor
sortFn func(c *groupResultSet) (int, error)
eof bool
}
func NewGroupResultSet(ctx context.Context, req *datatypes.ReadRequest, newCursorFn func() (SeriesCursor, error)) GroupResultSet {
g := &groupResultSet{
ctx: ctx,
req: req,
agg: req.Aggregate,
keys: make([][]byte, len(req.GroupKeys)),
newCursorFn: newCursorFn,
}
g.mb = newMultiShardArrayCursors(ctx, req.TimestampRange.Start, req.TimestampRange.End, !req.Descending, req.PointsLimit)
for i, k := range req.GroupKeys {
g.keys[i] = []byte(k)
}
switch req.Group {
case datatypes.GroupBy:
g.sortFn = groupBySort
g.nextGroupFn = groupByNextGroup
g.rgc = groupByCursor{
ctx: ctx,
mb: g.mb,
agg: req.Aggregate,
vals: make([][]byte, len(req.GroupKeys)),
}
case datatypes.GroupNone:
g.sortFn = groupNoneSort
g.nextGroupFn = groupNoneNextGroup
default:
panic("not implemented")
}
n, err := g.sort()
if n == 0 || err != nil {
return nil
}
return g
}
var nilKey = [...]byte{0xff}
func (g *groupResultSet) Close() {}
func (g *groupResultSet) Next() GroupCursor {
if g.eof {
return nil
}
return g.nextGroupFn(g)
}
func (g *groupResultSet) sort() (int, error) {
log := logger.LoggerFromContext(g.ctx)
if log != nil {
var f func()
log, f = logger.NewOperation(log, "Sort", "group.sort", zap.String("group_type", g.req.Group.String()))
defer f()
}
span := opentracing.SpanFromContext(g.ctx)
if span != nil {
span = opentracing.StartSpan(
"group.sort",
opentracing.ChildOf(span.Context()),
opentracing.Tag{Key: "group_type", Value: g.req.Group.String()})
defer span.Finish()
}
n, err := g.sortFn(g)
if span != nil {
span.SetTag("rows", n)
}
if log != nil {
log.Info("Sort completed", zap.Int("rows", n))
}
return n, err
}
// seriesHasPoints reads the first block of TSM data to verify the series has points for
// the time range of the query.
func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool {
// TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys.
cur := g.mb.createCursor(*row)
var ts []int64
switch c := cur.(type) {
case cursors.IntegerArrayCursor:
a := c.Next()
ts = a.Timestamps
case cursors.FloatArrayCursor:
a := c.Next()
ts = a.Timestamps
case cursors.UnsignedArrayCursor:
a := c.Next()
ts = a.Timestamps
case cursors.BooleanArrayCursor:
a := c.Next()
ts = a.Timestamps
case cursors.StringArrayCursor:
a := c.Next()
ts = a.Timestamps
case nil:
return false
default:
panic(fmt.Sprintf("unreachable: %T", c))
}
cur.Close()
return len(ts) > 0
}
func groupNoneNextGroup(g *groupResultSet) GroupCursor {
cur, err := g.newCursorFn()
if err != nil {
// TODO(sgc): store error
return nil
} else if cur == nil {
return nil
}
g.eof = true
return &groupNoneCursor{
ctx: g.ctx,
mb: g.mb,
agg: g.agg,
cur: cur,
keys: g.km.get(),
}
}
func groupNoneSort(g *groupResultSet) (int, error) {
cur, err := g.newCursorFn()
if err != nil {
return 0, err
} else if cur == nil {
return 0, nil
}
allTime := g.req.Hints.HintSchemaAllTime()
g.km.clear()
n := 0
row := cur.Next()
for row != nil {
n++
if allTime || g.seriesHasPoints(row) {
g.km.mergeTagKeys(row.Tags)
}
row = cur.Next()
}
cur.Close()
return n, nil
}
func groupByNextGroup(g *groupResultSet) GroupCursor {
next:
row := g.rows[g.i]
for i := range g.keys {
g.rgc.vals[i] = row.Tags.Get(g.keys[i])
}
g.km.clear()
allTime := g.req.Hints.HintSchemaAllTime()
c := 0
rowKey := row.SortKey
j := g.i
for j < len(g.rows) && bytes.Equal(rowKey, g.rows[j].SortKey) {
if allTime || g.seriesHasPoints(g.rows[j]) {
g.km.mergeTagKeys(g.rows[j].Tags)
c++
}
j++
}
g.rgc.reset(g.rows[g.i:j])
g.rgc.keys = g.km.get()
g.i = j
if j == len(g.rows) {
g.eof = true
} else if c == 0 {
// no rows with points
goto next
}
return &g.rgc
}
func groupBySort(g *groupResultSet) (int, error) {
cur, err := g.newCursorFn()
if err != nil {
return 0, err
} else if cur == nil {
return 0, nil
}
var rows []*SeriesRow
vals := make([][]byte, len(g.keys))
tagsBuf := &tagsBuffer{sz: 4096}
row := cur.Next()
for row != nil {
nr := *row
nr.SeriesTags = tagsBuf.copyTags(nr.SeriesTags)
nr.Tags = tagsBuf.copyTags(nr.Tags)
l := 0
for i, k := range g.keys {
vals[i] = nr.Tags.Get(k)
if len(vals[i]) == 0 {
vals[i] = nilKey[:] // if there was no value, ensure it sorts last
}
l += len(vals[i])
}
nr.SortKey = make([]byte, 0, l)
for _, v := range vals {
nr.SortKey = append(nr.SortKey, v...)
}
rows = append(rows, &nr)
row = cur.Next()
}
sort.Slice(rows, func(i, j int) bool {
return bytes.Compare(rows[i].SortKey, rows[j].SortKey) == -1
})
g.rows = rows
cur.Close()
return len(rows), nil
}
type groupNoneCursor struct {
ctx context.Context
mb multiShardCursors
agg *datatypes.Aggregate
cur SeriesCursor
row SeriesRow
keys [][]byte
}
func (c *groupNoneCursor) Tags() models.Tags { return c.row.Tags }
func (c *groupNoneCursor) Keys() [][]byte { return c.keys }
func (c *groupNoneCursor) PartitionKeyVals() [][]byte { return nil }
func (c *groupNoneCursor) Close() { c.cur.Close() }
func (c *groupNoneCursor) Next() bool {
row := c.cur.Next()
if row == nil {
return false
}
c.row = *row
return true
}
func (c *groupNoneCursor) Cursor() cursors.Cursor {
cur := c.mb.createCursor(c.row)
if c.agg != nil {
cur = c.mb.newAggregateCursor(c.ctx, c.agg, cur)
}
return cur
}
type groupByCursor struct {
ctx context.Context
mb multiShardCursors
agg *datatypes.Aggregate
i int
rows []*SeriesRow
keys [][]byte
vals [][]byte
}
func (c *groupByCursor) reset(rows []*SeriesRow) {
c.i = 0
c.rows = rows
}
func (c *groupByCursor) Keys() [][]byte { return c.keys }
func (c *groupByCursor) PartitionKeyVals() [][]byte { return c.vals }
func (c *groupByCursor) Tags() models.Tags { return c.rows[c.i-1].Tags }
func (c *groupByCursor) Close() {}
func (c *groupByCursor) Next() bool {
if c.i < len(c.rows) {
c.i++
return true
}
return false
}
func (c *groupByCursor) Cursor() cursors.Cursor {
cur := c.mb.createCursor(*c.rows[c.i-1])
if c.agg != nil {
cur = c.mb.newAggregateCursor(c.ctx, c.agg, cur)
}
return cur
}
// keyMerger is responsible for determining a merged set of tag keys
type keyMerger struct {
i int
keys [2][][]byte
}
func (km *keyMerger) clear() {
km.i = 0
km.keys[0] = km.keys[0][:0]
}
func (km *keyMerger) get() [][]byte { return km.keys[km.i&1] }
func (km *keyMerger) String() string {
var s []string
for _, k := range km.get() {
s = append(s, string(k))
}
return strings.Join(s, ",")
}
func (km *keyMerger) mergeTagKeys(tags models.Tags) {
keys := km.keys[km.i&1]
i, j := 0, 0
for i < len(keys) && j < len(tags) && bytes.Equal(keys[i], tags[j].Key) {
i++
j++
}
if j == len(tags) {
// no new tags
return
}
km.i = (km.i + 1) & 1
l := len(keys) + len(tags)
if cap(km.keys[km.i]) < l {
km.keys[km.i] = make([][]byte, l)
} else {
km.keys[km.i] = km.keys[km.i][:l]
}
keya := km.keys[km.i]
// back up the pointers
if i > 0 {
i--
j--
}
k := i
copy(keya[:k], keys[:k])
for i < len(keys) && j < len(tags) {
cmp := bytes.Compare(keys[i], tags[j].Key)
if cmp < 0 {
keya[k] = keys[i]
i++
} else if cmp > 0 {
keya[k] = tags[j].Key
j++
} else {
keya[k] = keys[i]
i++
j++
}
k++
}
if i < len(keys) {
k += copy(keya[k:], keys[i:])
}
for j < len(tags) {
keya[k] = tags[j].Key
j++
k++
}
km.keys[km.i] = keya[:k]
}

View File

@ -0,0 +1,222 @@
package reads
import (
"context"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage/reads/datatypes"
)
func TestGroupGroupResultSetSorting(t *testing.T) {
tests := []struct {
name string
cur SeriesCursor
group datatypes.ReadRequest_Group
keys []string
exp []SeriesRow
}{
{
name: "group by tag1 in all series",
cur: &sliceSeriesCursor{
rows: newSeriesRows(
"cpu,tag0=val0,tag1=val0",
"cpu,tag0=val0,tag1=val1",
"cpu,tag0=val0,tag1=val2",
"cpu,tag0=val1,tag1=val0",
"cpu,tag0=val1,tag1=val1",
"cpu,tag0=val1,tag1=val2",
)},
group: datatypes.GroupBy,
keys: []string{"tag1"},
exp: newSeriesRows(
"cpu,tag0=val0,tag1=val0",
"cpu,tag0=val1,tag1=val0",
"cpu,tag0=val0,tag1=val1",
"cpu,tag0=val1,tag1=val1",
"cpu,tag0=val0,tag1=val2",
"cpu,tag0=val1,tag1=val2",
),
},
{
name: "group by tag1 in partial series",
cur: &sliceSeriesCursor{
rows: newSeriesRows(
"aaa,tag0=val0",
"aaa,tag0=val1",
"cpu,tag0=val0,tag1=val0",
"cpu,tag0=val0,tag1=val1",
"cpu,tag0=val0,tag1=val2",
"cpu,tag0=val1,tag1=val0",
"cpu,tag0=val1,tag1=val1",
"cpu,tag0=val1,tag1=val2",
)},
group: datatypes.GroupBy,
keys: []string{"tag1"},
exp: newSeriesRows(
"cpu,tag0=val0,tag1=val0",
"cpu,tag0=val1,tag1=val0",
"cpu,tag0=val0,tag1=val1",
"cpu,tag0=val1,tag1=val1",
"cpu,tag0=val0,tag1=val2",
"cpu,tag0=val1,tag1=val2",
"aaa,tag0=val0",
"aaa,tag0=val1",
),
},
{
name: "group by tag2,tag1 with partial series",
cur: &sliceSeriesCursor{
rows: newSeriesRows(
"aaa,tag0=val0",
"aaa,tag0=val1",
"cpu,tag0=val0,tag1=val0",
"cpu,tag0=val0,tag1=val1",
"cpu,tag0=val0,tag1=val2",
"mem,tag1=val0,tag2=val0",
"mem,tag1=val1,tag2=val0",
"mem,tag1=val1,tag2=val1",
)},
group: datatypes.GroupBy,
keys: []string{"tag2,tag1"},
exp: newSeriesRows(
"mem,tag1=val0,tag2=val0",
"mem,tag1=val1,tag2=val0",
"mem,tag1=val1,tag2=val1",
"cpu,tag0=val0,tag1=val0",
"cpu,tag0=val0,tag1=val1",
"cpu,tag0=val0,tag1=val2",
"aaa,tag0=val0",
"aaa,tag0=val1",
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
newCursor := func() (SeriesCursor, error) {
return tt.cur, nil
}
rs := NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: tt.group, GroupKeys: tt.keys}, newCursor).(*groupResultSet)
var rows []SeriesRow
for i := range rs.rows {
rows = append(rows, *rs.rows[i])
}
got := selectTags(rows, tt.keys)
exp := selectTags(tt.exp, tt.keys)
if !cmp.Equal(got, exp) {
t.Errorf("unexpected rows -got/+exp\n%s", cmp.Diff(got, exp))
}
})
}
}
func TestKeyMerger(t *testing.T) {
tests := []struct {
name string
tags []models.Tags
exp string
}{
{
name: "mixed",
tags: []models.Tags{
models.ParseTags([]byte("foo,tag0=v0,tag1=v0,tag2=v0")),
models.ParseTags([]byte("foo,tag0=v0,tag1=v0,tag2=v1")),
models.ParseTags([]byte("foo,tag0=v0")),
models.ParseTags([]byte("foo,tag0=v0,tag3=v0")),
},
exp: "tag0,tag1,tag2,tag3",
},
{
name: "mixed 2",
tags: []models.Tags{
models.ParseTags([]byte("foo,tag0=v0")),
models.ParseTags([]byte("foo,tag0=v0,tag3=v0")),
models.ParseTags([]byte("foo,tag0=v0,tag1=v0,tag2=v0")),
models.ParseTags([]byte("foo,tag0=v0,tag1=v0,tag2=v1")),
},
exp: "tag0,tag1,tag2,tag3",
},
{
name: "all different",
tags: []models.Tags{
models.ParseTags([]byte("foo,tag0=v0")),
models.ParseTags([]byte("foo,tag1=v0")),
models.ParseTags([]byte("foo,tag2=v1")),
models.ParseTags([]byte("foo,tag3=v0")),
},
exp: "tag0,tag1,tag2,tag3",
},
{
name: "new tags,verify clear",
tags: []models.Tags{
models.ParseTags([]byte("foo,tag9=v0")),
models.ParseTags([]byte("foo,tag8=v0")),
},
exp: "tag8,tag9",
},
}
var km keyMerger
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
km.clear()
for _, tags := range tt.tags {
km.mergeTagKeys(tags)
}
if got := km.String(); !cmp.Equal(got, tt.exp) {
t.Errorf("unexpected keys -got/+exp\n%s", cmp.Diff(got, tt.exp))
}
})
}
}
func selectTags(rows []SeriesRow, keys []string) string {
var srows []string
for _, row := range rows {
var ss []string
for _, key := range keys {
for _, tag := range row.Tags {
if key == string(tag.Key) {
ss = append(ss, string(tag.Key)+"="+string(tag.Value))
}
}
}
srows = append(srows, strings.Join(ss, ","))
}
return strings.Join(srows, "\n")
}
type sliceSeriesCursor struct {
rows []SeriesRow
i int
}
func newSeriesRows(keys ...string) []SeriesRow {
rows := make([]SeriesRow, len(keys))
for i := range keys {
rows[i].Name, rows[i].SeriesTags = models.ParseKeyBytes([]byte(keys[i]))
rows[i].Tags = rows[i].SeriesTags.Clone()
rows[i].Tags.Set([]byte("_m"), rows[i].Name)
}
return rows
}
func (s *sliceSeriesCursor) Close() {}
func (s *sliceSeriesCursor) Err() error { return nil }
func (s *sliceSeriesCursor) Next() *SeriesRow {
if s.i < len(s.rows) {
s.i++
return &s.rows[s.i-1]
}
return nil
}

319
storage/reads/predicate.go Normal file
View File

@ -0,0 +1,319 @@
package reads
import (
"bytes"
"fmt"
"strconv"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb"
"github.com/pkg/errors"
)
// NodeVisitor can be called by Walk to traverse the Node hierarchy.
// The Visit() function is called once per node.
type NodeVisitor interface {
Visit(*datatypes.Node) NodeVisitor
}
func WalkChildren(v NodeVisitor, node *datatypes.Node) {
for _, n := range node.Children {
WalkNode(v, n)
}
}
func WalkNode(v NodeVisitor, node *datatypes.Node) {
if v = v.Visit(node); v == nil {
return
}
WalkChildren(v, node)
}
func PredicateToExprString(p *datatypes.Predicate) string {
if p == nil {
return "[none]"
}
var v predicateExpressionPrinter
WalkNode(&v, p.Root)
return v.Buffer.String()
}
type predicateExpressionPrinter struct {
bytes.Buffer
}
func (v *predicateExpressionPrinter) Visit(n *datatypes.Node) NodeVisitor {
switch n.NodeType {
case datatypes.NodeTypeLogicalExpression:
if len(n.Children) > 0 {
var op string
if n.GetLogical() == datatypes.LogicalAnd {
op = " AND "
} else {
op = " OR "
}
WalkNode(v, n.Children[0])
for _, e := range n.Children[1:] {
v.Buffer.WriteString(op)
WalkNode(v, e)
}
}
return nil
case datatypes.NodeTypeParenExpression:
if len(n.Children) == 1 {
v.Buffer.WriteString("( ")
WalkNode(v, n.Children[0])
v.Buffer.WriteString(" )")
}
return nil
case datatypes.NodeTypeComparisonExpression:
WalkNode(v, n.Children[0])
v.Buffer.WriteByte(' ')
switch n.GetComparison() {
case datatypes.ComparisonEqual:
v.Buffer.WriteByte('=')
case datatypes.ComparisonNotEqual:
v.Buffer.WriteString("!=")
case datatypes.ComparisonStartsWith:
v.Buffer.WriteString("startsWith")
case datatypes.ComparisonRegex:
v.Buffer.WriteString("=~")
case datatypes.ComparisonNotRegex:
v.Buffer.WriteString("!~")
case datatypes.ComparisonLess:
v.Buffer.WriteByte('<')
case datatypes.ComparisonLessEqual:
v.Buffer.WriteString("<=")
case datatypes.ComparisonGreater:
v.Buffer.WriteByte('>')
case datatypes.ComparisonGreaterEqual:
v.Buffer.WriteString(">=")
}
v.Buffer.WriteByte(' ')
WalkNode(v, n.Children[1])
return nil
case datatypes.NodeTypeTagRef:
v.Buffer.WriteByte('\'')
v.Buffer.WriteString(n.GetTagRefValue())
v.Buffer.WriteByte('\'')
return nil
case datatypes.NodeTypeFieldRef:
v.Buffer.WriteByte('$')
return nil
case datatypes.NodeTypeLiteral:
switch val := n.Value.(type) {
case *datatypes.Node_StringValue:
v.Buffer.WriteString(strconv.Quote(val.StringValue))
case *datatypes.Node_RegexValue:
v.Buffer.WriteByte('/')
v.Buffer.WriteString(val.RegexValue)
v.Buffer.WriteByte('/')
case *datatypes.Node_IntegerValue:
v.Buffer.WriteString(strconv.FormatInt(val.IntegerValue, 10))
case *datatypes.Node_UnsignedValue:
v.Buffer.WriteString(strconv.FormatUint(val.UnsignedValue, 10))
case *datatypes.Node_FloatValue:
v.Buffer.WriteString(strconv.FormatFloat(val.FloatValue, 'f', 10, 64))
case *datatypes.Node_BooleanValue:
if val.BooleanValue {
v.Buffer.WriteString("true")
} else {
v.Buffer.WriteString("false")
}
}
return nil
default:
return v
}
}
func toStoragePredicate(f *semantic.FunctionExpression) (*datatypes.Predicate, error) {
if len(f.Params) != 1 {
return nil, errors.New("storage predicate functions must have exactly one parameter")
}
root, err := toStoragePredicateHelper(f.Body.(semantic.Expression), f.Params[0].Key.Name)
if err != nil {
return nil, err
}
return &datatypes.Predicate{
Root: root,
}, nil
}
func toStoragePredicateHelper(n semantic.Expression, objectName string) (*datatypes.Node, error) {
switch n := n.(type) {
case *semantic.LogicalExpression:
left, err := toStoragePredicateHelper(n.Left, objectName)
if err != nil {
return nil, errors.Wrap(err, "left hand side")
}
right, err := toStoragePredicateHelper(n.Right, objectName)
if err != nil {
return nil, errors.Wrap(err, "right hand side")
}
children := []*datatypes.Node{left, right}
switch n.Operator {
case ast.AndOperator:
return &datatypes.Node{
NodeType: datatypes.NodeTypeLogicalExpression,
Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalAnd},
Children: children,
}, nil
case ast.OrOperator:
return &datatypes.Node{
NodeType: datatypes.NodeTypeLogicalExpression,
Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalOr},
Children: children,
}, nil
default:
return nil, fmt.Errorf("unknown logical operator %v", n.Operator)
}
case *semantic.BinaryExpression:
left, err := toStoragePredicateHelper(n.Left, objectName)
if err != nil {
return nil, errors.Wrap(err, "left hand side")
}
right, err := toStoragePredicateHelper(n.Right, objectName)
if err != nil {
return nil, errors.Wrap(err, "right hand side")
}
children := []*datatypes.Node{left, right}
op, err := toComparisonOperator(n.Operator)
if err != nil {
return nil, err
}
return &datatypes.Node{
NodeType: datatypes.NodeTypeComparisonExpression,
Value: &datatypes.Node_Comparison_{Comparison: op},
Children: children,
}, nil
case *semantic.StringLiteral:
return &datatypes.Node{
NodeType: datatypes.NodeTypeLiteral,
Value: &datatypes.Node_StringValue{
StringValue: n.Value,
},
}, nil
case *semantic.IntegerLiteral:
return &datatypes.Node{
NodeType: datatypes.NodeTypeLiteral,
Value: &datatypes.Node_IntegerValue{
IntegerValue: n.Value,
},
}, nil
case *semantic.BooleanLiteral:
return &datatypes.Node{
NodeType: datatypes.NodeTypeLiteral,
Value: &datatypes.Node_BooleanValue{
BooleanValue: n.Value,
},
}, nil
case *semantic.FloatLiteral:
return &datatypes.Node{
NodeType: datatypes.NodeTypeLiteral,
Value: &datatypes.Node_FloatValue{
FloatValue: n.Value,
},
}, nil
case *semantic.RegexpLiteral:
return &datatypes.Node{
NodeType: datatypes.NodeTypeLiteral,
Value: &datatypes.Node_RegexValue{
RegexValue: n.Value.String(),
},
}, nil
case *semantic.MemberExpression:
const (
fieldKey = "_field"
measurementKey = "_measurement"
valueKey = "_value"
)
// Sanity check that the object is the objectName identifier
if ident, ok := n.Object.(*semantic.IdentifierExpression); !ok || ident.Name != objectName {
return nil, fmt.Errorf("unknown object %q", n.Object)
}
switch n.Property {
case fieldKey:
return &datatypes.Node{
NodeType: datatypes.NodeTypeTagRef,
Value: &datatypes.Node_TagRefValue{
TagRefValue: tsdb.FieldKeyTagKey,
},
}, nil
case measurementKey:
return &datatypes.Node{
NodeType: datatypes.NodeTypeTagRef,
Value: &datatypes.Node_TagRefValue{
TagRefValue: tsdb.MeasurementTagKey,
},
}, nil
case valueKey:
return &datatypes.Node{
NodeType: datatypes.NodeTypeFieldRef,
Value: &datatypes.Node_FieldRefValue{
FieldRefValue: valueKey,
},
}, nil
}
return &datatypes.Node{
NodeType: datatypes.NodeTypeTagRef,
Value: &datatypes.Node_TagRefValue{
TagRefValue: n.Property,
},
}, nil
case *semantic.DurationLiteral:
return nil, errors.New("duration literals not supported in storage predicates")
case *semantic.DateTimeLiteral:
return nil, errors.New("time literals not supported in storage predicates")
default:
return nil, fmt.Errorf("unsupported semantic expression type %T", n)
}
}
func toComparisonOperator(o ast.OperatorKind) (datatypes.Node_Comparison, error) {
switch o {
case ast.EqualOperator:
return datatypes.ComparisonEqual, nil
case ast.NotEqualOperator:
return datatypes.ComparisonNotEqual, nil
case ast.RegexpMatchOperator:
return datatypes.ComparisonRegex, nil
case ast.NotRegexpMatchOperator:
return datatypes.ComparisonNotRegex, nil
case ast.StartsWithOperator:
return datatypes.ComparisonStartsWith, nil
case ast.LessThanOperator:
return datatypes.ComparisonLess, nil
case ast.LessThanEqualOperator:
return datatypes.ComparisonLessEqual, nil
case ast.GreaterThanOperator:
return datatypes.ComparisonGreater, nil
case ast.GreaterThanEqualOperator:
return datatypes.ComparisonGreaterEqual, nil
default:
return 0, fmt.Errorf("unknown operator %v", o)
}
}

View File

@ -0,0 +1,58 @@
package reads_test
import (
"testing"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
)
func TestPredicateToExprString(t *testing.T) {
cases := []struct {
n string
r *datatypes.Predicate
e string
}{
{
n: "returns [none] for nil",
r: nil,
e: "[none]",
},
{
n: "logical AND",
r: &datatypes.Predicate{
Root: &datatypes.Node{
NodeType: datatypes.NodeTypeLogicalExpression,
Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalAnd},
Children: []*datatypes.Node{
{
NodeType: datatypes.NodeTypeComparisonExpression,
Value: &datatypes.Node_Comparison_{Comparison: datatypes.ComparisonEqual},
Children: []*datatypes.Node{
{NodeType: datatypes.NodeTypeTagRef, Value: &datatypes.Node_TagRefValue{TagRefValue: "host"}},
{NodeType: datatypes.NodeTypeLiteral, Value: &datatypes.Node_StringValue{StringValue: "host1"}},
},
},
{
NodeType: datatypes.NodeTypeComparisonExpression,
Value: &datatypes.Node_Comparison_{Comparison: datatypes.ComparisonRegex},
Children: []*datatypes.Node{
{NodeType: datatypes.NodeTypeTagRef, Value: &datatypes.Node_TagRefValue{TagRefValue: "region"}},
{NodeType: datatypes.NodeTypeLiteral, Value: &datatypes.Node_RegexValue{RegexValue: "^us-west"}},
},
},
},
},
},
e: `'host' = "host1" AND 'region' =~ /^us-west/`,
},
}
for _, tc := range cases {
t.Run(tc.n, func(t *testing.T) {
if got, wanted := reads.PredicateToExprString(tc.r), tc.e; got != wanted {
t.Fatal("got:", got, "wanted:", wanted)
}
})
}
}

456
storage/reads/reader.go Normal file
View File

@ -0,0 +1,456 @@
package reads
import (
"bytes"
"context"
"fmt"
"strings"
"github.com/gogo/protobuf/types"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
fstorage "github.com/influxdata/flux/functions/inputs/storage"
"github.com/influxdata/flux/values"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb/cursors"
)
type storageTable interface {
flux.Table
Close()
Done() chan struct{}
}
type storeReader struct {
s Store
}
func NewReader(s Store) fstorage.Reader {
return &storeReader{s: s}
}
func (r *storeReader) Read(ctx context.Context, rs fstorage.ReadSpec, start, stop execute.Time) (flux.TableIterator, error) {
var predicate *datatypes.Predicate
if rs.Predicate != nil {
p, err := toStoragePredicate(rs.Predicate)
if err != nil {
return nil, err
}
predicate = p
}
return &tableIterator{
ctx: ctx,
bounds: execute.Bounds{Start: start, Stop: stop},
s: r.s,
readSpec: rs,
predicate: predicate,
}, nil
}
func (r *storeReader) Close() {}
type tableIterator struct {
ctx context.Context
bounds execute.Bounds
s Store
readSpec fstorage.ReadSpec
predicate *datatypes.Predicate
}
func (bi *tableIterator) Do(f func(flux.Table) error) error {
src, err := bi.s.GetSource(bi.readSpec)
if err != nil {
return err
}
// Setup read request
var req datatypes.ReadRequest
if any, err := types.MarshalAny(src); err != nil {
return err
} else {
req.ReadSource = any
}
req.Predicate = bi.predicate
req.Descending = bi.readSpec.Descending
req.TimestampRange.Start = int64(bi.bounds.Start)
req.TimestampRange.End = int64(bi.bounds.Stop)
req.Group = convertGroupMode(bi.readSpec.GroupMode)
req.GroupKeys = bi.readSpec.GroupKeys
req.SeriesLimit = bi.readSpec.SeriesLimit
req.PointsLimit = bi.readSpec.PointsLimit
req.SeriesOffset = bi.readSpec.SeriesOffset
if req.PointsLimit == -1 {
req.Hints.SetNoPoints()
}
if agg, err := determineAggregateMethod(bi.readSpec.AggregateMethod); err != nil {
return err
} else if agg != datatypes.AggregateTypeNone {
req.Aggregate = &datatypes.Aggregate{Type: agg}
}
switch {
case req.Group != datatypes.GroupAll:
rs, err := bi.s.GroupRead(bi.ctx, &req)
if err != nil {
return err
}
if rs == nil {
return nil
}
if req.Hints.NoPoints() {
return bi.handleGroupReadNoPoints(f, rs)
}
return bi.handleGroupRead(f, rs)
default:
rs, err := bi.s.Read(bi.ctx, &req)
if err != nil {
return err
}
if rs == nil {
return nil
}
if req.Hints.NoPoints() {
return bi.handleReadNoPoints(f, rs)
}
return bi.handleRead(f, rs)
}
}
func (bi *tableIterator) handleRead(f func(flux.Table) error, rs ResultSet) error {
defer func() {
rs.Close()
}()
READ:
for rs.Next() {
cur := rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds)
var table storageTable
switch cur := cur.(type) {
case cursors.IntegerArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
table = newIntegerTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.FloatArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
table = newFloatTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.UnsignedArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
table = newUnsignedTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.BooleanArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
table = newBooleanTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.StringArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringTable(cur, bi.bounds, key, cols, rs.Tags(), defs)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
if table.Empty() {
table.Close()
continue
}
if err := f(table); err != nil {
table.Close()
return err
}
select {
case <-table.Done():
case <-bi.ctx.Done():
break READ
}
}
return nil
}
func (bi *tableIterator) handleReadNoPoints(f func(flux.Table) error, rs ResultSet) error {
defer func() {
rs.Close()
}()
READ:
for rs.Next() {
cur := rs.Cursor()
if !hasPoints(cur) {
// no data for series key + field combination
continue
}
key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds)
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table := newTableNoPoints(bi.bounds, key, cols, rs.Tags(), defs)
if err := f(table); err != nil {
table.Close()
return err
}
select {
case <-table.Done():
case <-bi.ctx.Done():
break READ
}
}
return nil
}
func (bi *tableIterator) handleGroupRead(f func(flux.Table) error, rs GroupResultSet) error {
defer func() {
rs.Close()
}()
gc := rs.Next()
READ:
for gc != nil {
var cur cursors.Cursor
for gc.Next() {
cur = gc.Cursor()
if cur != nil {
break
}
}
if cur == nil {
gc = rs.Next()
continue
}
key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds)
var table storageTable
switch cur := cur.(type) {
case cursors.IntegerArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt)
table = newIntegerGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.FloatArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat)
table = newFloatGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.UnsignedArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt)
table = newUnsignedGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.BooleanArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool)
table = newBooleanGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.StringArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
table = newStringGroupTable(gc, cur, bi.bounds, key, cols, gc.Tags(), defs)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
if err := f(table); err != nil {
table.Close()
return err
}
// Wait until the table has been read.
select {
case <-table.Done():
case <-bi.ctx.Done():
break READ
}
gc = rs.Next()
}
return nil
}
func (bi *tableIterator) handleGroupReadNoPoints(f func(flux.Table) error, rs GroupResultSet) error {
defer func() {
rs.Close()
}()
gc := rs.Next()
READ:
for gc != nil {
key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds)
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
table := newGroupTableNoPoints(bi.bounds, key, cols, defs)
gc.Close()
if err := f(table); err != nil {
table.Close()
return err
}
// Wait until the table has been read.
select {
case <-table.Done():
case <-bi.ctx.Done():
break READ
}
gc = rs.Next()
}
return nil
}
func determineAggregateMethod(agg string) (datatypes.Aggregate_AggregateType, error) {
if agg == "" {
return datatypes.AggregateTypeNone, nil
}
if t, ok := datatypes.Aggregate_AggregateType_value[strings.ToUpper(agg)]; ok {
return datatypes.Aggregate_AggregateType(t), nil
}
return 0, fmt.Errorf("unknown aggregate type %q", agg)
}
func convertGroupMode(m fstorage.GroupMode) datatypes.ReadRequest_Group {
switch m {
case fstorage.GroupModeNone:
return datatypes.GroupNone
case fstorage.GroupModeBy:
return datatypes.GroupBy
case fstorage.GroupModeExcept:
return datatypes.GroupExcept
case fstorage.GroupModeDefault, fstorage.GroupModeAll:
fallthrough
default:
return datatypes.GroupAll
}
}
const (
startColIdx = 0
stopColIdx = 1
timeColIdx = 2
valueColIdx = 3
)
func determineTableColsForSeries(tags models.Tags, typ flux.DataType) ([]flux.ColMeta, [][]byte) {
cols := make([]flux.ColMeta, 4+len(tags))
defs := make([][]byte, 4+len(tags))
cols[startColIdx] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
cols[stopColIdx] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
cols[timeColIdx] = flux.ColMeta{
Label: execute.DefaultTimeColLabel,
Type: flux.TTime,
}
cols[valueColIdx] = flux.ColMeta{
Label: execute.DefaultValueColLabel,
Type: typ,
}
for j, tag := range tags {
cols[4+j] = flux.ColMeta{
Label: string(tag.Key),
Type: flux.TString,
}
defs[4+j] = []byte("")
}
return cols, defs
}
func groupKeyForSeries(tags models.Tags, readSpec *fstorage.ReadSpec, bnds execute.Bounds) flux.GroupKey {
cols := make([]flux.ColMeta, 2, len(tags))
vs := make([]values.Value, 2, len(tags))
cols[0] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
vs[0] = values.NewTimeValue(bnds.Start)
cols[1] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
vs[1] = values.NewTimeValue(bnds.Stop)
switch readSpec.GroupMode {
case fstorage.GroupModeBy:
// group key in GroupKeys order, including tags in the GroupKeys slice
for _, k := range readSpec.GroupKeys {
bk := []byte(k)
for _, t := range tags {
if bytes.Equal(t.Key, bk) && len(t.Value) > 0 {
cols = append(cols, flux.ColMeta{
Label: k,
Type: flux.TString,
})
vs = append(vs, values.NewStringValue(string(t.Value)))
}
}
}
case fstorage.GroupModeExcept:
// group key in GroupKeys order, skipping tags in the GroupKeys slice
panic("not implemented")
case fstorage.GroupModeDefault, fstorage.GroupModeAll:
for i := range tags {
cols = append(cols, flux.ColMeta{
Label: string(tags[i].Key),
Type: flux.TString,
})
vs = append(vs, values.NewStringValue(string(tags[i].Value)))
}
}
return execute.NewGroupKey(cols, vs)
}
func determineTableColsForGroup(tagKeys [][]byte, typ flux.DataType) ([]flux.ColMeta, [][]byte) {
cols := make([]flux.ColMeta, 4+len(tagKeys))
defs := make([][]byte, 4+len(tagKeys))
cols[startColIdx] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
cols[stopColIdx] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
cols[timeColIdx] = flux.ColMeta{
Label: execute.DefaultTimeColLabel,
Type: flux.TTime,
}
cols[valueColIdx] = flux.ColMeta{
Label: execute.DefaultValueColLabel,
Type: typ,
}
for j, tag := range tagKeys {
cols[4+j] = flux.ColMeta{
Label: string(tag),
Type: flux.TString,
}
defs[4+j] = []byte("")
}
return cols, defs
}
func groupKeyForGroup(kv [][]byte, readSpec *fstorage.ReadSpec, bnds execute.Bounds) flux.GroupKey {
cols := make([]flux.ColMeta, 2, len(readSpec.GroupKeys)+2)
vs := make([]values.Value, 2, len(readSpec.GroupKeys)+2)
cols[0] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
vs[0] = values.NewTimeValue(bnds.Start)
cols[1] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
vs[1] = values.NewTimeValue(bnds.Stop)
for i := range readSpec.GroupKeys {
cols = append(cols, flux.ColMeta{
Label: readSpec.GroupKeys[i],
Type: flux.TString,
})
vs = append(vs, values.NewStringValue(string(kv[i])))
}
return execute.NewGroupKey(cols, vs)
}

View File

@ -0,0 +1,457 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: response_writer.gen.go.tmpl
package reads
import (
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb/cursors"
)
func (w *ResponseWriter) getFloatPointsFrame() *datatypes.ReadResponse_Frame_FloatPoints {
var res *datatypes.ReadResponse_Frame_FloatPoints
if len(w.buffer.Float) > 0 {
i := len(w.buffer.Float) - 1
res = w.buffer.Float[i]
w.buffer.Float[i] = nil
w.buffer.Float = w.buffer.Float[:i]
} else {
res = &datatypes.ReadResponse_Frame_FloatPoints{
FloatPoints: &datatypes.ReadResponse_FloatPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]float64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putFloatPointsFrame(f *datatypes.ReadResponse_Frame_FloatPoints) {
f.FloatPoints.Timestamps = f.FloatPoints.Timestamps[:0]
f.FloatPoints.Values = f.FloatPoints.Values[:0]
w.buffer.Float = append(w.buffer.Float, f)
}
func (w *ResponseWriter) streamFloatArraySeries(cur cursors.FloatArrayCursor) {
w.sf.DataType = datatypes.DataTypeFloat
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamFloatArrayPoints(cur cursors.FloatArrayCursor) {
w.sf.DataType = datatypes.DataTypeFloat
ss := len(w.res.Frames) - 1
p := w.getFloatPointsFrame()
frame := p.FloatPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var (
seriesValueCount = 0
b = 0
)
for {
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps)
if b >= batchSize {
seriesValueCount += b
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
p = w.getFloatPointsFrame()
frame = p.FloatPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) getIntegerPointsFrame() *datatypes.ReadResponse_Frame_IntegerPoints {
var res *datatypes.ReadResponse_Frame_IntegerPoints
if len(w.buffer.Integer) > 0 {
i := len(w.buffer.Integer) - 1
res = w.buffer.Integer[i]
w.buffer.Integer[i] = nil
w.buffer.Integer = w.buffer.Integer[:i]
} else {
res = &datatypes.ReadResponse_Frame_IntegerPoints{
IntegerPoints: &datatypes.ReadResponse_IntegerPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]int64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putIntegerPointsFrame(f *datatypes.ReadResponse_Frame_IntegerPoints) {
f.IntegerPoints.Timestamps = f.IntegerPoints.Timestamps[:0]
f.IntegerPoints.Values = f.IntegerPoints.Values[:0]
w.buffer.Integer = append(w.buffer.Integer, f)
}
func (w *ResponseWriter) streamIntegerArraySeries(cur cursors.IntegerArrayCursor) {
w.sf.DataType = datatypes.DataTypeInteger
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamIntegerArrayPoints(cur cursors.IntegerArrayCursor) {
w.sf.DataType = datatypes.DataTypeInteger
ss := len(w.res.Frames) - 1
p := w.getIntegerPointsFrame()
frame := p.IntegerPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var (
seriesValueCount = 0
b = 0
)
for {
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps)
if b >= batchSize {
seriesValueCount += b
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
p = w.getIntegerPointsFrame()
frame = p.IntegerPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) getUnsignedPointsFrame() *datatypes.ReadResponse_Frame_UnsignedPoints {
var res *datatypes.ReadResponse_Frame_UnsignedPoints
if len(w.buffer.Unsigned) > 0 {
i := len(w.buffer.Unsigned) - 1
res = w.buffer.Unsigned[i]
w.buffer.Unsigned[i] = nil
w.buffer.Unsigned = w.buffer.Unsigned[:i]
} else {
res = &datatypes.ReadResponse_Frame_UnsignedPoints{
UnsignedPoints: &datatypes.ReadResponse_UnsignedPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]uint64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putUnsignedPointsFrame(f *datatypes.ReadResponse_Frame_UnsignedPoints) {
f.UnsignedPoints.Timestamps = f.UnsignedPoints.Timestamps[:0]
f.UnsignedPoints.Values = f.UnsignedPoints.Values[:0]
w.buffer.Unsigned = append(w.buffer.Unsigned, f)
}
func (w *ResponseWriter) streamUnsignedArraySeries(cur cursors.UnsignedArrayCursor) {
w.sf.DataType = datatypes.DataTypeUnsigned
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamUnsignedArrayPoints(cur cursors.UnsignedArrayCursor) {
w.sf.DataType = datatypes.DataTypeUnsigned
ss := len(w.res.Frames) - 1
p := w.getUnsignedPointsFrame()
frame := p.UnsignedPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var (
seriesValueCount = 0
b = 0
)
for {
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps)
if b >= batchSize {
seriesValueCount += b
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
p = w.getUnsignedPointsFrame()
frame = p.UnsignedPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) getStringPointsFrame() *datatypes.ReadResponse_Frame_StringPoints {
var res *datatypes.ReadResponse_Frame_StringPoints
if len(w.buffer.String) > 0 {
i := len(w.buffer.String) - 1
res = w.buffer.String[i]
w.buffer.String[i] = nil
w.buffer.String = w.buffer.String[:i]
} else {
res = &datatypes.ReadResponse_Frame_StringPoints{
StringPoints: &datatypes.ReadResponse_StringPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]string, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putStringPointsFrame(f *datatypes.ReadResponse_Frame_StringPoints) {
f.StringPoints.Timestamps = f.StringPoints.Timestamps[:0]
f.StringPoints.Values = f.StringPoints.Values[:0]
w.buffer.String = append(w.buffer.String, f)
}
func (w *ResponseWriter) streamStringArraySeries(cur cursors.StringArrayCursor) {
w.sf.DataType = datatypes.DataTypeString
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamStringArrayPoints(cur cursors.StringArrayCursor) {
w.sf.DataType = datatypes.DataTypeString
ss := len(w.res.Frames) - 1
p := w.getStringPointsFrame()
frame := p.StringPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var (
seriesValueCount = 0
b = 0
)
for {
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps)
if b >= batchSize {
seriesValueCount += b
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
p = w.getStringPointsFrame()
frame = p.StringPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) getBooleanPointsFrame() *datatypes.ReadResponse_Frame_BooleanPoints {
var res *datatypes.ReadResponse_Frame_BooleanPoints
if len(w.buffer.Boolean) > 0 {
i := len(w.buffer.Boolean) - 1
res = w.buffer.Boolean[i]
w.buffer.Boolean[i] = nil
w.buffer.Boolean = w.buffer.Boolean[:i]
} else {
res = &datatypes.ReadResponse_Frame_BooleanPoints{
BooleanPoints: &datatypes.ReadResponse_BooleanPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]bool, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putBooleanPointsFrame(f *datatypes.ReadResponse_Frame_BooleanPoints) {
f.BooleanPoints.Timestamps = f.BooleanPoints.Timestamps[:0]
f.BooleanPoints.Values = f.BooleanPoints.Values[:0]
w.buffer.Boolean = append(w.buffer.Boolean, f)
}
func (w *ResponseWriter) streamBooleanArraySeries(cur cursors.BooleanArrayCursor) {
w.sf.DataType = datatypes.DataTypeBoolean
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamBooleanArrayPoints(cur cursors.BooleanArrayCursor) {
w.sf.DataType = datatypes.DataTypeBoolean
ss := len(w.res.Frames) - 1
p := w.getBooleanPointsFrame()
frame := p.BooleanPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var (
seriesValueCount = 0
b = 0
)
for {
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps)
if b >= batchSize {
seriesValueCount += b
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
p = w.getBooleanPointsFrame()
frame = p.BooleanPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}

View File

@ -0,0 +1,98 @@
package reads
import (
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb/cursors"
)
{{range .}}
func (w *ResponseWriter) get{{.Name}}PointsFrame() *datatypes.ReadResponse_Frame_{{.Name}}Points {
var res *datatypes.ReadResponse_Frame_{{.Name}}Points
if len(w.buffer.{{.Name}}) > 0 {
i := len(w.buffer.{{.Name}}) - 1
res = w.buffer.{{.Name}}[i]
w.buffer.{{.Name}}[i] = nil
w.buffer.{{.Name}} = w.buffer.{{.Name}}[:i]
} else {
res = &datatypes.ReadResponse_Frame_{{.Name}}Points{
{{.Name}}Points: &datatypes.ReadResponse_{{.Name}}PointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]{{.Type}}, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) put{{.Name}}PointsFrame(f *datatypes.ReadResponse_Frame_{{.Name}}Points) {
f.{{.Name}}Points.Timestamps = f.{{.Name}}Points.Timestamps[:0]
f.{{.Name}}Points.Values = f.{{.Name}}Points.Values[:0]
w.buffer.{{.Name}} = append(w.buffer.{{.Name}}, f)
}
func (w *ResponseWriter) stream{{.Name}}ArraySeries(cur cursors.{{.Name}}ArrayCursor) {
w.sf.DataType = datatypes.DataType{{.Name}}
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) stream{{.Name}}ArrayPoints(cur cursors.{{.Name}}ArrayCursor) {
w.sf.DataType = datatypes.DataType{{.Name}}
ss := len(w.res.Frames) - 1
p := w.get{{.Name}}PointsFrame()
frame := p.{{.Name}}Points
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var (
seriesValueCount = 0
b = 0
)
for {
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps)
if b >= batchSize {
seriesValueCount += b
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
p = w.get{{.Name}}PointsFrame()
frame = p.{{.Name}}Points
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
{{end}}

View File

@ -0,0 +1,266 @@
package reads
import (
"fmt"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb/cursors"
)
type ResponseStream interface {
Send(*datatypes.ReadResponse) error
}
const (
batchSize = 1000
frameCount = 50
writeSize = 64 << 10 // 64k
)
type ResponseWriter struct {
stream ResponseStream
res *datatypes.ReadResponse
err error
// current series
sf *datatypes.ReadResponse_SeriesFrame
ss int // pointer to current series frame; used to skip writing if no points
sz int // estimated size in bytes for pending write
vc int // total value count
buffer struct {
Float []*datatypes.ReadResponse_Frame_FloatPoints
Integer []*datatypes.ReadResponse_Frame_IntegerPoints
Unsigned []*datatypes.ReadResponse_Frame_UnsignedPoints
Boolean []*datatypes.ReadResponse_Frame_BooleanPoints
String []*datatypes.ReadResponse_Frame_StringPoints
Series []*datatypes.ReadResponse_Frame_Series
Group []*datatypes.ReadResponse_Frame_Group
}
hints datatypes.HintFlags
}
func NewResponseWriter(stream ResponseStream, hints datatypes.HintFlags) *ResponseWriter {
rw := &ResponseWriter{stream: stream,
res: &datatypes.ReadResponse{
Frames: make([]datatypes.ReadResponse_Frame, 0, frameCount),
},
hints: hints,
}
return rw
}
// WrittenN returns the number of values written to the response stream.
func (w *ResponseWriter) WrittenN() int { return w.vc }
func (w *ResponseWriter) WriteResultSet(rs ResultSet) error {
for rs.Next() {
cur := rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
w.startSeries(rs.Tags())
w.streamCursor(cur)
if w.err != nil {
cur.Close()
return w.err
}
}
return nil
}
func (w *ResponseWriter) WriteGroupResultSet(rs GroupResultSet) error {
gc := rs.Next()
for gc != nil {
w.startGroup(gc.Keys(), gc.PartitionKeyVals())
for gc.Next() {
cur := gc.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
w.startSeries(gc.Tags())
w.streamCursor(cur)
if w.err != nil {
gc.Close()
return w.err
}
}
gc.Close()
gc = rs.Next()
}
return nil
}
func (w *ResponseWriter) Err() error { return w.err }
func (w *ResponseWriter) getGroupFrame(keys, partitionKey [][]byte) *datatypes.ReadResponse_Frame_Group {
var res *datatypes.ReadResponse_Frame_Group
if len(w.buffer.Group) > 0 {
i := len(w.buffer.Group) - 1
res = w.buffer.Group[i]
w.buffer.Group[i] = nil
w.buffer.Group = w.buffer.Group[:i]
} else {
res = &datatypes.ReadResponse_Frame_Group{Group: &datatypes.ReadResponse_GroupFrame{}}
}
if cap(res.Group.TagKeys) < len(keys) {
res.Group.TagKeys = make([][]byte, len(keys))
} else if len(res.Group.TagKeys) != len(keys) {
res.Group.TagKeys = res.Group.TagKeys[:len(keys)]
}
if cap(res.Group.PartitionKeyVals) < len(partitionKey) {
res.Group.PartitionKeyVals = make([][]byte, len(partitionKey))
} else if len(res.Group.PartitionKeyVals) != len(partitionKey) {
res.Group.PartitionKeyVals = res.Group.PartitionKeyVals[:len(partitionKey)]
}
return res
}
func (w *ResponseWriter) putGroupFrame(f *datatypes.ReadResponse_Frame_Group) {
for i := range f.Group.TagKeys {
f.Group.TagKeys[i] = nil
}
for i := range f.Group.PartitionKeyVals {
f.Group.PartitionKeyVals[i] = nil
}
w.buffer.Group = append(w.buffer.Group, f)
}
func (w *ResponseWriter) getSeriesFrame(next models.Tags) *datatypes.ReadResponse_Frame_Series {
var res *datatypes.ReadResponse_Frame_Series
if len(w.buffer.Series) > 0 {
i := len(w.buffer.Series) - 1
res = w.buffer.Series[i]
w.buffer.Series[i] = nil
w.buffer.Series = w.buffer.Series[:i]
} else {
res = &datatypes.ReadResponse_Frame_Series{Series: &datatypes.ReadResponse_SeriesFrame{}}
}
if cap(res.Series.Tags) < len(next) {
res.Series.Tags = make([]datatypes.Tag, len(next))
} else if len(res.Series.Tags) != len(next) {
res.Series.Tags = res.Series.Tags[:len(next)]
}
return res
}
func (w *ResponseWriter) putSeriesFrame(f *datatypes.ReadResponse_Frame_Series) {
tags := f.Series.Tags
for i := range tags {
tags[i].Key = nil
tags[i].Value = nil
}
w.buffer.Series = append(w.buffer.Series, f)
}
func (w *ResponseWriter) startGroup(keys, partitionKey [][]byte) {
f := w.getGroupFrame(keys, partitionKey)
copy(f.Group.TagKeys, keys)
copy(f.Group.PartitionKeyVals, partitionKey)
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: f})
w.sz += f.Size()
}
func (w *ResponseWriter) startSeries(next models.Tags) {
if w.hints.NoSeries() {
return
}
w.ss = len(w.res.Frames)
f := w.getSeriesFrame(next)
w.sf = f.Series
for i, t := range next {
w.sf.Tags[i] = datatypes.Tag(t)
}
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: f})
w.sz += w.sf.Size()
}
func (w *ResponseWriter) streamCursor(cur cursors.Cursor) {
switch {
case w.hints.NoSeries():
// skip
case w.hints.NoPoints():
switch cur := cur.(type) {
case cursors.IntegerArrayCursor:
w.streamIntegerArraySeries(cur)
case cursors.FloatArrayCursor:
w.streamFloatArraySeries(cur)
case cursors.UnsignedArrayCursor:
w.streamUnsignedArraySeries(cur)
case cursors.BooleanArrayCursor:
w.streamBooleanArraySeries(cur)
case cursors.StringArrayCursor:
w.streamStringArraySeries(cur)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
default:
switch cur := cur.(type) {
case cursors.IntegerArrayCursor:
w.streamIntegerArrayPoints(cur)
case cursors.FloatArrayCursor:
w.streamFloatArrayPoints(cur)
case cursors.UnsignedArrayCursor:
w.streamUnsignedArrayPoints(cur)
case cursors.BooleanArrayCursor:
w.streamBooleanArrayPoints(cur)
case cursors.StringArrayCursor:
w.streamStringArrayPoints(cur)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
}
cur.Close()
}
func (w *ResponseWriter) Flush() {
if w.err != nil || w.sz == 0 {
return
}
w.sz = 0
if w.err = w.stream.Send(w.res); w.err != nil {
return
}
for i := range w.res.Frames {
d := w.res.Frames[i].Data
w.res.Frames[i].Data = nil
switch p := d.(type) {
case *datatypes.ReadResponse_Frame_FloatPoints:
w.putFloatPointsFrame(p)
case *datatypes.ReadResponse_Frame_IntegerPoints:
w.putIntegerPointsFrame(p)
case *datatypes.ReadResponse_Frame_UnsignedPoints:
w.putUnsignedPointsFrame(p)
case *datatypes.ReadResponse_Frame_BooleanPoints:
w.putBooleanPointsFrame(p)
case *datatypes.ReadResponse_Frame_StringPoints:
w.putStringPointsFrame(p)
case *datatypes.ReadResponse_Frame_Series:
w.putSeriesFrame(p)
case *datatypes.ReadResponse_Frame_Group:
w.putGroupFrame(p)
}
}
w.res.Frames = w.res.Frames[:0]
}

View File

@ -0,0 +1,68 @@
package reads
import (
"context"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb/cursors"
)
type multiShardCursors interface {
createCursor(row SeriesRow) cursors.Cursor
newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor
}
type resultSet struct {
ctx context.Context
agg *datatypes.Aggregate
cur SeriesCursor
row SeriesRow
mb multiShardCursors
}
func NewResultSet(ctx context.Context, req *datatypes.ReadRequest, cur SeriesCursor) ResultSet {
return &resultSet{
ctx: ctx,
agg: req.Aggregate,
cur: cur,
mb: newMultiShardArrayCursors(ctx, req.TimestampRange.Start, req.TimestampRange.End, !req.Descending, req.PointsLimit),
}
}
// Close closes the result set. Close is idempotent.
func (r *resultSet) Close() {
if r == nil {
return // Nothing to do.
}
r.row.Query = nil
r.cur.Close()
}
// Next returns true if there are more results available.
func (r *resultSet) Next() bool {
if r == nil {
return false
}
row := r.cur.Next()
if row == nil {
return false
}
r.row = *row
return true
}
func (r *resultSet) Cursor() cursors.Cursor {
cur := r.mb.createCursor(r.row)
if r.agg != nil {
cur = r.mb.newAggregateCursor(r.ctx, r.agg, cur)
}
return cur
}
func (r *resultSet) Tags() models.Tags {
return r.row.Tags
}

View File

@ -0,0 +1,51 @@
package reads
import (
"context"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb/cursors"
)
type SeriesCursor interface {
Close()
Next() *SeriesRow
Err() error
}
type SeriesRow struct {
SortKey []byte
Name []byte // measurement name
SeriesTags models.Tags // unmodified series tags
Tags models.Tags
Field string
Query cursors.CursorIterators
ValueCond influxql.Expr
}
type limitSeriesCursor struct {
SeriesCursor
n, o, c int64
}
func NewLimitSeriesCursor(ctx context.Context, cur SeriesCursor, n, o int64) SeriesCursor {
return &limitSeriesCursor{SeriesCursor: cur, o: o, n: n}
}
func (c *limitSeriesCursor) Next() *SeriesRow {
if c.o > 0 {
for i := int64(0); i < c.o; i++ {
if c.SeriesCursor.Next() == nil {
break
}
}
c.o = 0
}
if c.c >= c.n {
return nil
}
c.c++
return c.SeriesCursor.Next()
}

38
storage/reads/store.go Normal file
View File

@ -0,0 +1,38 @@
package reads
import (
"context"
"github.com/gogo/protobuf/proto"
fstorage "github.com/influxdata/flux/functions/inputs/storage"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb/cursors"
)
type ResultSet interface {
Close()
Next() bool
Cursor() cursors.Cursor
Tags() models.Tags
}
type GroupResultSet interface {
Next() GroupCursor
Close()
}
type GroupCursor interface {
Tags() models.Tags
Keys() [][]byte
PartitionKeyVals() [][]byte
Next() bool
Cursor() cursors.Cursor
Close()
}
type Store interface {
Read(ctx context.Context, req *datatypes.ReadRequest) (ResultSet, error)
GroupRead(ctx context.Context, req *datatypes.ReadRequest) (GroupResultSet, error)
GetSource(rs fstorage.ReadSpec) (proto.Message, error)
}

1060
storage/reads/table.gen.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,220 @@
package reads
import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb/cursors"
"github.com/pkg/errors"
)
{{range .}}
//
// *********** {{.Name}} ***********
//
type {{.name}}Table struct {
table
cur cursors.{{.Name}}ArrayCursor
valBuf []{{.Type}}
}
func new{{.Name}}Table(
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
) *{{.name}}Table {
t := &{{.name}}Table{
table: newTable(bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
return t
}
func (t *{{.name}}Table) Close() {
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error {
defer t.Close()
if !t.more {
return t.err
}
f(t)
for t.advance() {
if err := f(t); err != nil {
return err
}
}
return t.err
}
func (t *{{.name}}Table) advance() bool {
a := t.cur.Next()
t.l = a.Len()
if t.l == 0 {
return false
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
if cap(t.valBuf) < t.l {
t.valBuf = make([]{{.Type}}, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
// group table
type {{.name}}GroupTable struct {
table
gc GroupCursor
cur cursors.{{.Name}}ArrayCursor
valBuf []{{.Type}}
}
func new{{.Name}}GroupTable(
gc GroupCursor,
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
) *{{.name}}GroupTable {
t := &{{.name}}GroupTable{
table: newTable(bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
return t
}
func (t *{{.name}}GroupTable) Close() {
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
if !t.more {
return t.err
}
f(t)
for t.advance() {
if err := f(t); err != nil {
return err
}
}
return t.err
}
func (t *{{.name}}GroupTable) advance() bool {
RETRY:
a := t.cur.Next()
t.l = a.Len()
if t.l == 0 {
if t.advanceCursor() {
goto RETRY
}
return false
}
if cap(t.timeBuf) < t.l {
t.timeBuf = make([]execute.Time, t.l)
} else {
t.timeBuf = t.timeBuf[:t.l]
}
for i := range a.Timestamps {
t.timeBuf[i] = execute.Time(a.Timestamps[i])
}
if cap(t.valBuf) < t.l {
t.valBuf = make([]{{.Type}}, t.l)
} else {
t.valBuf = t.valBuf[:t.l]
}
copy(t.valBuf, a.Values)
t.colBufs[timeColIdx] = t.timeBuf
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
func (t *{{.name}}GroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
if cur == nil {
continue
}
if cur, ok := cur.(cursors.{{.Name}}ArrayCursor); !ok {
// TODO(sgc): error or skip?
cur.Close()
t.err = errors.Errorf("expected {{.name}} cursor type, got %T", cur)
return false
} else {
t.readTags(t.gc.Tags())
t.cur = cur
return true
}
}
return false
}
{{end}}

242
storage/reads/table.go Normal file
View File

@ -0,0 +1,242 @@
package reads
//go:generate tmpl -data=@types.tmpldata table.gen.go.tmpl
import (
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb/cursors"
)
type table struct {
bounds execute.Bounds
key flux.GroupKey
cols []flux.ColMeta
// cache of the tags on the current series.
// len(tags) == len(colMeta)
tags [][]byte
defs [][]byte
done chan struct{}
// The current number of records in memory
l int
colBufs []interface{}
timeBuf []execute.Time
err error
empty bool
more bool
}
func newTable(
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
defs [][]byte,
) table {
return table{
bounds: bounds,
key: key,
tags: make([][]byte, len(cols)),
defs: defs,
colBufs: make([]interface{}, len(cols)),
cols: cols,
done: make(chan struct{}),
empty: true,
}
}
func (t *table) Done() chan struct{} { return t.done }
func (t *table) Key() flux.GroupKey { return t.key }
func (t *table) Cols() []flux.ColMeta { return t.cols }
func (t *table) RefCount(n int) {}
func (t *table) Err() error { return t.err }
func (t *table) Empty() bool { return t.empty }
func (t *table) Len() int { return t.l }
func (t *table) Bools(j int) []bool {
execute.CheckColType(t.cols[j], flux.TBool)
return t.colBufs[j].([]bool)
}
func (t *table) Ints(j int) []int64 {
execute.CheckColType(t.cols[j], flux.TInt)
return t.colBufs[j].([]int64)
}
func (t *table) UInts(j int) []uint64 {
execute.CheckColType(t.cols[j], flux.TUInt)
return t.colBufs[j].([]uint64)
}
func (t *table) Floats(j int) []float64 {
execute.CheckColType(t.cols[j], flux.TFloat)
return t.colBufs[j].([]float64)
}
func (t *table) Strings(j int) []string {
execute.CheckColType(t.cols[j], flux.TString)
return t.colBufs[j].([]string)
}
func (t *table) Times(j int) []execute.Time {
execute.CheckColType(t.cols[j], flux.TTime)
return t.colBufs[j].([]execute.Time)
}
// readTags populates b.tags with the provided tags
func (t *table) readTags(tags models.Tags) {
for j := range t.tags {
t.tags[j] = t.defs[j]
}
if len(tags) == 0 {
return
}
for _, tag := range tags {
j := execute.ColIdx(string(tag.Key), t.cols)
t.tags[j] = tag.Value
}
}
// appendTags fills the colBufs for the tag columns with the tag value.
func (t *table) appendTags() {
for j := range t.cols {
v := t.tags[j]
if v != nil {
if t.colBufs[j] == nil {
t.colBufs[j] = make([]string, len(t.cols))
}
colBuf := t.colBufs[j].([]string)
if cap(colBuf) < t.l {
colBuf = make([]string, t.l)
} else {
colBuf = colBuf[:t.l]
}
vStr := string(v)
for i := range colBuf {
colBuf[i] = vStr
}
t.colBufs[j] = colBuf
}
}
}
// appendBounds fills the colBufs for the time bounds
func (t *table) appendBounds() {
bounds := []execute.Time{t.bounds.Start, t.bounds.Stop}
for j := range []int{startColIdx, stopColIdx} {
if t.colBufs[j] == nil {
t.colBufs[j] = make([]execute.Time, len(t.cols))
}
colBuf := t.colBufs[j].([]execute.Time)
if cap(colBuf) < t.l {
colBuf = make([]execute.Time, t.l)
} else {
colBuf = colBuf[:t.l]
}
for i := range colBuf {
colBuf[i] = bounds[j]
}
t.colBufs[j] = colBuf
}
}
func hasPoints(cur cursors.Cursor) bool {
if cur == nil {
return false
}
res := false
switch cur := cur.(type) {
case cursors.IntegerArrayCursor:
a := cur.Next()
res = a.Len() > 0
case cursors.FloatArrayCursor:
a := cur.Next()
res = a.Len() > 0
case cursors.UnsignedArrayCursor:
a := cur.Next()
res = a.Len() > 0
case cursors.BooleanArrayCursor:
a := cur.Next()
res = a.Len() > 0
case cursors.StringArrayCursor:
a := cur.Next()
res = a.Len() > 0
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
cur.Close()
return res
}
type tableNoPoints struct {
table
}
func newTableNoPoints(
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
) *tableNoPoints {
t := &tableNoPoints{
table: newTable(bounds, key, cols, defs),
}
t.readTags(tags)
return t
}
func (t *tableNoPoints) Close() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *tableNoPoints) Do(f func(flux.ColReader) error) error {
t.err = f(t)
t.Close()
return t.err
}
type groupTableNoPoints struct {
table
}
func newGroupTableNoPoints(
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
defs [][]byte,
) *groupTableNoPoints {
t := &groupTableNoPoints{
table: newTable(bounds, key, cols, defs),
}
return t
}
func (t *groupTableNoPoints) Close() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
t.err = f(t)
t.Close()
return t.err
}

View File

@ -0,0 +1,30 @@
package reads
import (
"github.com/influxdata/platform/models"
)
type tagsBuffer struct {
sz int
i int
buf models.Tags
}
func (tb *tagsBuffer) copyTags(src models.Tags) models.Tags {
var buf models.Tags
if len(src) > tb.sz {
buf = make(models.Tags, len(src))
} else {
if tb.i+len(src) > len(tb.buf) {
tb.buf = make(models.Tags, tb.sz)
tb.i = 0
}
buf = tb.buf[tb.i : tb.i+len(src)]
tb.i += len(src)
}
copy(buf, src)
return buf
}

View File

@ -0,0 +1,27 @@
package readservice
import "github.com/influxdata/platform/models"
const (
fieldKey = "_field"
measurementKey = "_measurement"
valueKey = "_value"
)
var (
fieldKeyBytes = []byte(fieldKey)
measurementKeyBytes = []byte(measurementKey)
)
func normalizeTags(tags models.Tags) {
for i, tag := range tags {
if len(tag.Key) == 2 && tag.Key[0] == '_' {
switch tag.Key[1] {
case 'f':
tags[i].Key = fieldKeyBytes
case 'm':
tags[i].Key = measurementKeyBytes
}
}
}
}

View File

@ -0,0 +1,141 @@
package readservice
import (
"context"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/platform/tsdb"
opentracing "github.com/opentracing/opentracing-go"
)
type indexSeriesCursor struct {
sqry storage.SeriesCursor
err error
tags models.Tags
cond influxql.Expr
row reads.SeriesRow
eof bool
hasValueExpr bool
}
func newIndexSeriesCursor(ctx context.Context, src *readSource, req *datatypes.ReadRequest, engine *storage.Engine) (*indexSeriesCursor, error) {
queries, err := engine.CreateCursorIterator(ctx)
if err != nil {
return nil, err
}
if queries == nil {
return nil, nil
}
span := opentracing.SpanFromContext(ctx)
if span != nil {
span = opentracing.StartSpan("index_cursor.create", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
opt := query.IteratorOptions{
Aux: []influxql.VarRef{{Val: "key"}},
Authorizer: query.OpenAuthorizer,
Ascending: true,
Ordered: true,
}
p := &indexSeriesCursor{row: reads.SeriesRow{Query: tsdb.CursorIterators{queries}}}
m := append(append([]byte(nil), src.OrganizationID...), src.BucketID...)
mi := tsdb.NewMeasurementSliceIterator([][]byte{m})
if root := req.Predicate.GetRoot(); root != nil {
if p.cond, err = nodeToExpr(root, nil); err != nil {
return nil, err
}
p.hasValueExpr = hasFieldValueKey(p.cond)
if !p.hasValueExpr {
opt.Condition = p.cond
} else {
opt.Condition = influxql.Reduce(rewriteExprRemoveFieldValue(influxql.CloneExpr(p.cond)), nil)
if isTrueBooleanLiteral(opt.Condition) {
opt.Condition = nil
}
}
}
p.sqry, err = engine.CreateSeriesCursor(ctx, storage.SeriesCursorRequest{Measurements: mi}, opt.Condition)
if err != nil {
p.Close()
return nil, err
}
return p, nil
}
func (c *indexSeriesCursor) Close() {
if !c.eof {
c.eof = true
if c.sqry != nil {
c.sqry.Close()
c.sqry = nil
}
}
}
func copyTags(dst, src models.Tags) models.Tags {
if cap(dst) < src.Len() {
dst = make(models.Tags, src.Len())
} else {
dst = dst[:src.Len()]
}
copy(dst, src)
return dst
}
func (c *indexSeriesCursor) Next() *reads.SeriesRow {
if c.eof {
return nil
}
// next series key
sr, err := c.sqry.Next()
if err != nil {
c.err = err
c.Close()
return nil
} else if sr == nil {
c.Close()
return nil
}
c.row.Name = sr.Name
//TODO(edd): check this.
c.row.SeriesTags = copyTags(c.row.SeriesTags, sr.Tags)
c.row.Tags = copyTags(c.row.Tags, sr.Tags)
c.row.Field = string(c.row.Tags.Get(tsdb.FieldKeyTagKeyBytes))
normalizeTags(c.row.Tags)
if c.cond != nil && c.hasValueExpr {
// TODO(sgc): lazily evaluate valueCond
c.row.ValueCond = influxql.Reduce(c.cond, c)
if isTrueBooleanLiteral(c.row.ValueCond) {
// we've reduced the expression to "true"
c.row.ValueCond = nil
}
}
return &c.row
}
func (c *indexSeriesCursor) Value(key string) (interface{}, bool) {
res := c.row.Tags.Get([]byte(key))
// Return res as a string so it compares correctly with the string literals
return string(res), res != nil
}
func (c *indexSeriesCursor) Err() error {
return c.err
}

View File

@ -0,0 +1,273 @@
package readservice
import (
"errors"
"regexp"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
)
var measurementRemap = map[string]string{"_measurement": "_name"}
// nodeToExpr transforms a predicate node to an influxql.Expr.
func nodeToExpr(node *datatypes.Node, remap map[string]string) (influxql.Expr, error) {
v := &nodeToExprVisitor{remap: remap}
reads.WalkNode(v, node)
if err := v.Err(); err != nil {
return nil, err
}
if len(v.exprs) > 1 {
return nil, errors.New("invalid expression")
}
if len(v.exprs) == 0 {
return nil, nil
}
// TODO(edd): It would be preferable if RewriteRegexConditions was a
// package level function in influxql.
stmt := &influxql.SelectStatement{
Condition: v.exprs[0],
}
stmt.RewriteRegexConditions()
return stmt.Condition, nil
}
type nodeToExprVisitor struct {
remap map[string]string
exprs []influxql.Expr
err error
}
func (v *nodeToExprVisitor) Visit(n *datatypes.Node) reads.NodeVisitor {
if v.err != nil {
return nil
}
switch n.NodeType {
case datatypes.NodeTypeLogicalExpression:
if len(n.Children) > 1 {
op := influxql.AND
if n.GetLogical() == datatypes.LogicalOr {
op = influxql.OR
}
reads.WalkNode(v, n.Children[0])
if v.err != nil {
return nil
}
for i := 1; i < len(n.Children); i++ {
reads.WalkNode(v, n.Children[i])
if v.err != nil {
return nil
}
if len(v.exprs) >= 2 {
lhs, rhs := v.pop2()
v.exprs = append(v.exprs, &influxql.BinaryExpr{LHS: lhs, Op: op, RHS: rhs})
}
}
return nil
}
case datatypes.NodeTypeParenExpression:
if len(n.Children) != 1 {
v.err = errors.New("ParenExpression expects one child")
return nil
}
reads.WalkNode(v, n.Children[0])
if v.err != nil {
return nil
}
if len(v.exprs) > 0 {
v.exprs = append(v.exprs, &influxql.ParenExpr{Expr: v.pop()})
}
return nil
case datatypes.NodeTypeComparisonExpression:
reads.WalkChildren(v, n)
if len(v.exprs) < 2 {
v.err = errors.New("ComparisonExpression expects two children")
return nil
}
lhs, rhs := v.pop2()
be := &influxql.BinaryExpr{LHS: lhs, RHS: rhs}
switch n.GetComparison() {
case datatypes.ComparisonEqual:
be.Op = influxql.EQ
case datatypes.ComparisonNotEqual:
be.Op = influxql.NEQ
case datatypes.ComparisonStartsWith:
// TODO(sgc): rewrite to anchored RE, as index does not support startsWith yet
v.err = errors.New("startsWith not implemented")
return nil
case datatypes.ComparisonRegex:
be.Op = influxql.EQREGEX
case datatypes.ComparisonNotRegex:
be.Op = influxql.NEQREGEX
case datatypes.ComparisonLess:
be.Op = influxql.LT
case datatypes.ComparisonLessEqual:
be.Op = influxql.LTE
case datatypes.ComparisonGreater:
be.Op = influxql.GT
case datatypes.ComparisonGreaterEqual:
be.Op = influxql.GTE
default:
v.err = errors.New("invalid comparison operator")
return nil
}
v.exprs = append(v.exprs, be)
return nil
case datatypes.NodeTypeTagRef:
ref := n.GetTagRefValue()
if v.remap != nil {
if nk, ok := v.remap[ref]; ok {
ref = nk
}
}
v.exprs = append(v.exprs, &influxql.VarRef{Val: ref, Type: influxql.Tag})
return nil
case datatypes.NodeTypeFieldRef:
v.exprs = append(v.exprs, &influxql.VarRef{Val: "$"})
return nil
case datatypes.NodeTypeLiteral:
switch val := n.Value.(type) {
case *datatypes.Node_StringValue:
v.exprs = append(v.exprs, &influxql.StringLiteral{Val: val.StringValue})
case *datatypes.Node_RegexValue:
// TODO(sgc): consider hashing the RegexValue and cache compiled version
re, err := regexp.Compile(val.RegexValue)
if err != nil {
v.err = err
}
v.exprs = append(v.exprs, &influxql.RegexLiteral{Val: re})
return nil
case *datatypes.Node_IntegerValue:
v.exprs = append(v.exprs, &influxql.IntegerLiteral{Val: val.IntegerValue})
case *datatypes.Node_UnsignedValue:
v.exprs = append(v.exprs, &influxql.UnsignedLiteral{Val: val.UnsignedValue})
case *datatypes.Node_FloatValue:
v.exprs = append(v.exprs, &influxql.NumberLiteral{Val: val.FloatValue})
case *datatypes.Node_BooleanValue:
v.exprs = append(v.exprs, &influxql.BooleanLiteral{Val: val.BooleanValue})
default:
v.err = errors.New("unexpected literal type")
return nil
}
return nil
default:
return v
}
return nil
}
func (v *nodeToExprVisitor) Err() error {
return v.err
}
func (v *nodeToExprVisitor) pop() influxql.Expr {
if len(v.exprs) == 0 {
panic("stack empty")
}
var top influxql.Expr
top, v.exprs = v.exprs[len(v.exprs)-1], v.exprs[:len(v.exprs)-1]
return top
}
func (v *nodeToExprVisitor) pop2() (influxql.Expr, influxql.Expr) {
if len(v.exprs) < 2 {
panic("stack empty")
}
rhs := v.exprs[len(v.exprs)-1]
lhs := v.exprs[len(v.exprs)-2]
v.exprs = v.exprs[:len(v.exprs)-2]
return lhs, rhs
}
func isTrueBooleanLiteral(expr influxql.Expr) bool {
b, ok := expr.(*influxql.BooleanLiteral)
if ok {
return b.Val
}
return false
}
func rewriteExprRemoveFieldValue(expr influxql.Expr) influxql.Expr {
return influxql.RewriteExpr(expr, func(expr influxql.Expr) influxql.Expr {
if be, ok := expr.(*influxql.BinaryExpr); ok {
if ref, ok := be.LHS.(*influxql.VarRef); ok {
if ref.Val == "$" {
return &influxql.BooleanLiteral{Val: true}
}
}
}
return expr
})
}
type hasRefs struct {
refs []string
found []bool
}
func (v *hasRefs) allFound() bool {
for _, val := range v.found {
if !val {
return false
}
}
return true
}
func (v *hasRefs) Visit(node influxql.Node) influxql.Visitor {
if v.allFound() {
return nil
}
if n, ok := node.(*influxql.VarRef); ok {
for i, r := range v.refs {
if !v.found[i] && r == n.Val {
v.found[i] = true
if v.allFound() {
return nil
}
}
}
}
return v
}
func hasFieldValueKey(expr influxql.Expr) bool {
refs := hasRefs{refs: []string{valueKey}, found: make([]bool, 1)}
influxql.Walk(&refs, expr)
return refs.found[0]
}

View File

@ -0,0 +1,58 @@
package readservice
import (
"context"
"github.com/influxdata/flux"
"github.com/influxdata/flux/control"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/functions/inputs"
fstorage "github.com/influxdata/flux/functions/inputs/storage"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/storage"
"github.com/influxdata/platform/storage/reads"
"go.uber.org/zap"
)
func NewProxyQueryService(engine *storage.Engine, bucketSvc platform.BucketService, orgSvc platform.OrganizationService, logger *zap.Logger) (query.ProxyQueryService, error) {
var ( // flux
concurrencyQuota = 10
memoryBytesQuota = 1e6
)
cc := control.Config{
ExecutorDependencies: make(execute.Dependencies),
ConcurrencyQuota: concurrencyQuota,
MemoryBytesQuota: int64(memoryBytesQuota),
Logger: logger,
Verbose: false,
}
err := inputs.InjectFromDependencies(cc.ExecutorDependencies, fstorage.Dependencies{
Reader: reads.NewReader(newStore(engine)),
BucketLookup: query.FromBucketService(bucketSvc),
OrganizationLookup: query.FromOrganizationService(orgSvc),
})
if err != nil {
return nil, err
}
return query.ProxyQueryServiceBridge{
QueryService: query.QueryServiceBridge{
AsyncQueryService: &queryAdapter{
Controller: control.New(cc),
},
},
}, nil
}
type queryAdapter struct {
Controller *control.Controller
}
func (q *queryAdapter) Query(ctx context.Context, req *query.Request) (flux.Query, error) {
ctx = query.ContextWithRequest(ctx, req)
ctx = context.WithValue(ctx, "org", req.OrganizationID.String())
return q.Controller.Query(ctx, req.Compiler)
}

View File

@ -0,0 +1,133 @@
package readservice
import (
"context"
"errors"
"math"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
fstorage "github.com/influxdata/flux/functions/inputs/storage"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/storage"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
)
type store struct {
engine *storage.Engine
}
func newStore(engine *storage.Engine) *store {
return &store{engine: engine}
}
func (s *store) Read(ctx context.Context, req *datatypes.ReadRequest) (reads.ResultSet, error) {
if len(req.GroupKeys) > 0 {
panic("Read: len(Grouping) > 0")
}
if req.Hints.NoPoints() {
req.PointsLimit = -1
}
if req.PointsLimit == 0 {
req.PointsLimit = math.MaxInt64
}
source, err := getReadSource(req)
if err != nil {
return nil, err
}
if req.TimestampRange.Start == 0 {
req.TimestampRange.Start = models.MinNanoTime
}
if req.TimestampRange.End == 0 {
req.TimestampRange.End = models.MaxNanoTime
}
var cur reads.SeriesCursor
if ic, err := newIndexSeriesCursor(ctx, source, req, s.engine); err != nil {
return nil, err
} else if ic == nil {
return nil, nil
} else {
cur = ic
}
if req.SeriesLimit > 0 || req.SeriesOffset > 0 {
cur = reads.NewLimitSeriesCursor(ctx, cur, req.SeriesLimit, req.SeriesOffset)
}
return reads.NewResultSet(ctx, req, cur), nil
}
func (s *store) GroupRead(ctx context.Context, req *datatypes.ReadRequest) (reads.GroupResultSet, error) {
if req.SeriesLimit > 0 || req.SeriesOffset > 0 {
return nil, errors.New("GroupRead: SeriesLimit and SeriesOffset not supported when Grouping")
}
if req.Hints.NoPoints() {
req.PointsLimit = -1
}
if req.PointsLimit == 0 {
req.PointsLimit = math.MaxInt64
}
source, err := getReadSource(req)
if err != nil {
return nil, err
}
if req.TimestampRange.Start <= 0 {
req.TimestampRange.Start = models.MinNanoTime
}
if req.TimestampRange.End <= 0 {
req.TimestampRange.End = models.MaxNanoTime
}
newCursor := func() (reads.SeriesCursor, error) {
cur, err := newIndexSeriesCursor(ctx, source, req, s.engine)
if cur == nil || err != nil {
return nil, err
}
return cur, nil
}
return reads.NewGroupResultSet(ctx, req, newCursor), nil
}
// this is easier than fooling around with .proto files.
type readSource struct {
BucketID []byte `protobuf:"bytes,1,opt,name=bucket_id,proto3"`
OrganizationID []byte `protobuf:"bytes,2,opt,name=organization_id,proto3"`
}
func (r *readSource) XXX_MessageName() string { return "readSource" }
func (r *readSource) Reset() { *r = readSource{} }
func (r *readSource) String() string { return "readSource{}" }
func (r *readSource) ProtoMessage() {}
func (s *store) GetSource(rs fstorage.ReadSpec) (proto.Message, error) {
return &readSource{
BucketID: rs.BucketID,
OrganizationID: rs.OrganizationID,
}, nil
}
func getReadSource(req *datatypes.ReadRequest) (*readSource, error) {
if req.ReadSource == nil {
return nil, errors.New("missing read source")
}
var source readSource
if err := types.UnmarshalAny(req.ReadSource, &source); err != nil {
return nil, err
}
return &source, nil
}

View File

@ -3,58 +3,29 @@ package tsdb
import (
"errors"
"fmt"
"time"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/toml"
"github.com/influxdata/platform/tsdb/defaults"
)
const (
// DefaultEngine is the default engine for new shards
DefaultEngine = "tsm1"
// TODO(jeff): port things to use the defaults package
// DefaultIndex is the default index for new shards
DefaultIndex = TSI1IndexName
// EOF represents a "not found" key returned by a Cursor.
const EOF = query.ZeroTime
// tsdb/engine/wal configuration options
// Default settings for TSM
// DefaultCacheMaxMemorySize is the maximum size a shard's cache can
// reach before it starts rejecting writes.
DefaultCacheMaxMemorySize = 1024 * 1024 * 1024 // 1GB
// DefaultCacheSnapshotMemorySize is the size at which the engine will
// snapshot the cache and write it to a TSM file, freeing up memory
DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB
// DefaultCacheSnapshotWriteColdDuration is the length of time at which
// the engine will snapshot the cache and write it to a new TSM file if
// the shard hasn't received writes or deletes
DefaultCacheSnapshotWriteColdDuration = time.Duration(10 * time.Minute)
// DefaultCompactFullWriteColdDuration is the duration at which the engine
// will compact all TSM files in a shard if it hasn't received a write or delete
DefaultCompactFullWriteColdDuration = time.Duration(4 * time.Hour)
// DefaultCompactThroughput is the rate limit in bytes per second that we
// will allow TSM compactions to write to disk. Not that short bursts are allowed
// to happen at a possibly larger value, set by DefaultCompactThroughputBurst.
// A value of 0 here will disable compaction rate limiting
DefaultCompactThroughput = 48 * 1024 * 1024
// DefaultCompactThroughputBurst is the rate limit in bytes per second that we
// will allow TSM compactions to write to disk. If this is not set, the burst value
// will be set to equal the normal throughput
DefaultCompactThroughputBurst = 48 * 1024 * 1024
// DefaultMaxPointsPerBlock is the maximum number of points in an encoded
// block in a TSM file
DefaultMaxPointsPerBlock = 1000
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
// that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime.
DefaultMaxConcurrentCompactions = 0
const ( // See the defaults package for explanations of what these mean
DefaultEngine = defaults.DefaultEngine
DefaultIndex = defaults.DefaultIndex
DefaultCacheMaxMemorySize = defaults.DefaultCacheMaxMemorySize
DefaultCacheSnapshotMemorySize = defaults.DefaultCacheSnapshotMemorySize
DefaultCacheSnapshotWriteColdDuration = defaults.DefaultCacheSnapshotWriteColdDuration
DefaultCompactFullWriteColdDuration = defaults.DefaultCompactFullWriteColdDuration
DefaultCompactThroughput = defaults.DefaultCompactThroughput
DefaultCompactThroughputBurst = defaults.DefaultCompactThroughputBurst
DefaultMaxPointsPerBlock = defaults.DefaultMaxPointsPerBlock
DefaultMaxConcurrentCompactions = defaults.DefaultMaxConcurrentCompactions
)
// Config holds the configuration for the tsbd package.

View File

@ -1,57 +1,32 @@
package tsdb
import (
"context"
import "github.com/influxdata/platform/tsdb/cursors"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/platform/models"
// These aliases exist to maintain api compatability when they were moved
// into their own package to avoid having a heavy dependency in order to
// talk about consuming data.
type (
IntegerArray = cursors.IntegerArray
FloatArray = cursors.FloatArray
UnsignedArray = cursors.UnsignedArray
StringArray = cursors.StringArray
BooleanArray = cursors.BooleanArray
IntegerArrayCursor = cursors.IntegerArrayCursor
FloatArrayCursor = cursors.FloatArrayCursor
UnsignedArrayCursor = cursors.UnsignedArrayCursor
StringArrayCursor = cursors.StringArrayCursor
BooleanArrayCursor = cursors.BooleanArrayCursor
Cursor = cursors.Cursor
CursorRequest = cursors.CursorRequest
CursorIterator = cursors.CursorIterator
CursorIterators = cursors.CursorIterators
)
// EOF represents a "not found" key returned by a Cursor.
const EOF = query.ZeroTime
// Cursor represents an iterator over a series.
type Cursor interface {
Close()
Err() error
}
type IntegerArrayCursor interface {
Cursor
Next() *IntegerArray
}
type FloatArrayCursor interface {
Cursor
Next() *FloatArray
}
type UnsignedArrayCursor interface {
Cursor
Next() *UnsignedArray
}
type StringArrayCursor interface {
Cursor
Next() *StringArray
}
type BooleanArrayCursor interface {
Cursor
Next() *BooleanArray
}
type CursorRequest struct {
Name []byte
Tags models.Tags
Field string
Ascending bool
StartTime int64
EndTime int64
}
type CursorIterator interface {
Next(ctx context.Context, r *CursorRequest) (Cursor, error)
}
type CursorIterators []CursorIterator
func NewIntegerArrayLen(sz int) *IntegerArray { return cursors.NewIntegerArrayLen(sz) }
func NewFloatArrayLen(sz int) *FloatArray { return cursors.NewFloatArrayLen(sz) }
func NewUnsignedArrayLen(sz int) *UnsignedArray { return cursors.NewUnsignedArrayLen(sz) }
func NewStringArrayLen(sz int) *StringArray { return cursors.NewStringArrayLen(sz) }
func NewBooleanArrayLen(sz int) *BooleanArray { return cursors.NewBooleanArrayLen(sz) }

View File

@ -4,7 +4,7 @@
// DO NOT EDIT!
// Source: arrayvalues.gen.go.tmpl
package tsdb
package cursors
type FloatArray struct {
Timestamps []int64

View File

@ -1,4 +1,4 @@
package tsdb
package cursors
{{range .}}
@ -57,9 +57,9 @@ func (a *{{ $typename }}) Exclude(min, max int64) {
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
return
}
}
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}

View File

@ -0,0 +1,27 @@
[
{
"Name":"Float",
"name":"float",
"Type":"float64"
},
{
"Name":"Integer",
"name":"integer",
"Type":"int64"
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"uint64"
},
{
"Name":"String",
"name":"string",
"Type":"string"
},
{
"Name":"Boolean",
"name":"boolean",
"Type":"bool"
}
]

View File

@ -1,4 +1,4 @@
package tsdb
package cursors
import (
"fmt"

View File

@ -1,18 +1,18 @@
package tsdb_test
package cursors_test
import (
"strconv"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/platform/tsdb/cursors"
)
func makeBooleanArray(v ...interface{}) *tsdb.BooleanArray {
func makeBooleanArray(v ...interface{}) *cursors.BooleanArray {
if len(v)&1 == 1 {
panic("invalid array length")
}
a := tsdb.NewBooleanArrayLen(len(v) / 2)
a := cursors.NewBooleanArrayLen(len(v) / 2)
for i := 0; i < len(v); i += 2 {
a.Timestamps[i/2] = int64(v[i].(int))
a.Values[i/2] = v[i+1].(bool)
@ -20,11 +20,11 @@ func makeBooleanArray(v ...interface{}) *tsdb.BooleanArray {
return a
}
func makeFloatArray(v ...interface{}) *tsdb.FloatArray {
func makeFloatArray(v ...interface{}) *cursors.FloatArray {
if len(v)&1 == 1 {
panic("invalid array length")
}
a := tsdb.NewFloatArrayLen(len(v) / 2)
a := cursors.NewFloatArrayLen(len(v) / 2)
for i := 0; i < len(v); i += 2 {
a.Timestamps[i/2] = int64(v[i].(int))
a.Values[i/2] = v[i+1].(float64)
@ -32,11 +32,11 @@ func makeFloatArray(v ...interface{}) *tsdb.FloatArray {
return a
}
func makeIntegerArray(v ...interface{}) *tsdb.IntegerArray {
func makeIntegerArray(v ...interface{}) *cursors.IntegerArray {
if len(v)&1 == 1 {
panic("invalid array length")
}
a := tsdb.NewIntegerArrayLen(len(v) / 2)
a := cursors.NewIntegerArrayLen(len(v) / 2)
for i := 0; i < len(v); i += 2 {
a.Timestamps[i/2] = int64(v[i].(int))
a.Values[i/2] = int64(v[i+1].(int))
@ -44,11 +44,11 @@ func makeIntegerArray(v ...interface{}) *tsdb.IntegerArray {
return a
}
func makeUnsignedArray(v ...interface{}) *tsdb.UnsignedArray {
func makeUnsignedArray(v ...interface{}) *cursors.UnsignedArray {
if len(v)&1 == 1 {
panic("invalid array length")
}
a := tsdb.NewUnsignedArrayLen(len(v) / 2)
a := cursors.NewUnsignedArrayLen(len(v) / 2)
for i := 0; i < len(v); i += 2 {
a.Timestamps[i/2] = int64(v[i].(int))
a.Values[i/2] = uint64(v[i+1].(int))
@ -56,11 +56,11 @@ func makeUnsignedArray(v ...interface{}) *tsdb.UnsignedArray {
return a
}
func makeStringArray(v ...interface{}) *tsdb.StringArray {
func makeStringArray(v ...interface{}) *cursors.StringArray {
if len(v)&1 == 1 {
panic("invalid array length")
}
a := tsdb.NewStringArrayLen(len(v) / 2)
a := cursors.NewStringArrayLen(len(v) / 2)
for i := 0; i < len(v); i += 2 {
a.Timestamps[i/2] = int64(v[i].(int))
a.Values[i/2] = strconv.Itoa(v[i+1].(int))
@ -71,7 +71,7 @@ func makeStringArray(v ...interface{}) *tsdb.StringArray {
func TestBooleanArray_Merge(t *testing.T) {
tests := []struct {
name string
a, b, exp *tsdb.BooleanArray
a, b, exp *cursors.BooleanArray
}{
{
name: "empty a",
@ -149,7 +149,7 @@ func TestBooleanArray_Merge(t *testing.T) {
func TestFloatArray_Merge(t *testing.T) {
tests := []struct {
name string
a, b, exp *tsdb.FloatArray
a, b, exp *cursors.FloatArray
}{
{
name: "empty a",
@ -227,7 +227,7 @@ func TestFloatArray_Merge(t *testing.T) {
func TestIntegerArray_Merge(t *testing.T) {
tests := []struct {
name string
a, b, exp *tsdb.IntegerArray
a, b, exp *cursors.IntegerArray
}{
{
name: "empty a",
@ -305,7 +305,7 @@ func TestIntegerArray_Merge(t *testing.T) {
func TestUnsignedArray_Merge(t *testing.T) {
tests := []struct {
name string
a, b, exp *tsdb.UnsignedArray
a, b, exp *cursors.UnsignedArray
}{
{
name: "empty a",
@ -383,7 +383,7 @@ func TestUnsignedArray_Merge(t *testing.T) {
func TestStringArray_Merge(t *testing.T) {
tests := []struct {
name string
a, b, exp *tsdb.StringArray
a, b, exp *cursors.StringArray
}{
{
name: "empty a",

54
tsdb/cursors/cursor.go Normal file
View File

@ -0,0 +1,54 @@
package cursors
import (
"context"
"github.com/influxdata/platform/models"
)
const DefaultMaxPointsPerBlock = 1000
type Cursor interface {
Close()
Err() error
}
type IntegerArrayCursor interface {
Cursor
Next() *IntegerArray
}
type FloatArrayCursor interface {
Cursor
Next() *FloatArray
}
type UnsignedArrayCursor interface {
Cursor
Next() *UnsignedArray
}
type StringArrayCursor interface {
Cursor
Next() *StringArray
}
type BooleanArrayCursor interface {
Cursor
Next() *BooleanArray
}
type CursorRequest struct {
Name []byte
Tags models.Tags
Field string
Ascending bool
StartTime int64
EndTime int64
}
type CursorIterator interface {
Next(ctx context.Context, r *CursorRequest) (Cursor, error)
}
type CursorIterators []CursorIterator

51
tsdb/defaults/defaults.go Normal file
View File

@ -0,0 +1,51 @@
package defaults
import "time"
const (
// DefaultEngine is the default engine for new shards
DefaultEngine = "tsm1"
// DefaultIndex is the default index for new shards
DefaultIndex = "tsi1"
// tsdb/engine/wal configuration options
// Default settings for TSM
// DefaultCacheMaxMemorySize is the maximum size a shard's cache can
// reach before it starts rejecting writes.
DefaultCacheMaxMemorySize = 1024 * 1024 * 1024 // 1GB
// DefaultCacheSnapshotMemorySize is the size at which the engine will
// snapshot the cache and write it to a TSM file, freeing up memory
DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB
// DefaultCacheSnapshotWriteColdDuration is the length of time at which
// the engine will snapshot the cache and write it to a new TSM file if
// the shard hasn't received writes or deletes
DefaultCacheSnapshotWriteColdDuration = time.Duration(10 * time.Minute)
// DefaultCompactFullWriteColdDuration is the duration at which the engine
// will compact all TSM files in a shard if it hasn't received a write or delete
DefaultCompactFullWriteColdDuration = time.Duration(4 * time.Hour)
// DefaultCompactThroughput is the rate limit in bytes per second that we
// will allow TSM compactions to write to disk. Not that short bursts are allowed
// to happen at a possibly larger value, set by DefaultCompactThroughputBurst.
// A value of 0 here will disable compaction rate limiting
DefaultCompactThroughput = 48 * 1024 * 1024
// DefaultCompactThroughputBurst is the rate limit in bytes per second that we
// will allow TSM compactions to write to disk. If this is not set, the burst value
// will be set to equal the normal throughput
DefaultCompactThroughputBurst = 48 * 1024 * 1024
// DefaultMaxPointsPerBlock is the maximum number of points in an encoded
// block in a TSM file
DefaultMaxPointsPerBlock = 1000
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
// that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime.
DefaultMaxConcurrentCompactions = 0
)

View File

@ -210,14 +210,24 @@ func (f *SeriesFile) Series(id SeriesID) ([]byte, models.Tags) {
return ParseSeriesKey(key)
}
// SeriesID return the series id for the series.
// SeriesID returns the series id for the series.
func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) SeriesID {
return f.SeriesIDTyped(name, tags, buf).SeriesID()
}
// SeriesIDTyped returns the typed series id for the series.
func (f *SeriesFile) SeriesIDTyped(name []byte, tags models.Tags, buf []byte) SeriesIDTyped {
key := AppendSeriesKey(buf[:0], name, tags)
return f.SeriesIDTypedBySeriesKey(key)
}
// SeriesIDTypedBySeriesKey returns the typed series id for the series.
func (f *SeriesFile) SeriesIDTypedBySeriesKey(key []byte) SeriesIDTyped {
keyPartition := f.SeriesKeyPartition(key)
if keyPartition == nil {
return SeriesID{}
return SeriesIDTyped{}
}
return keyPartition.FindIDBySeriesKey(key)
return keyPartition.FindIDTypedBySeriesKey(key)
}
// HasSeries return true if the series exists.

View File

@ -383,14 +383,19 @@ func (p *SeriesPartition) Series(id SeriesID) ([]byte, models.Tags) {
// FindIDBySeriesKey return the series id for the series key.
func (p *SeriesPartition) FindIDBySeriesKey(key []byte) SeriesID {
return p.FindIDTypedBySeriesKey(key).SeriesID()
}
// FindIDTypedBySeriesKey return the typed series id for the series key.
func (p *SeriesPartition) FindIDTypedBySeriesKey(key []byte) SeriesIDTyped {
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return SeriesID{}
return SeriesIDTyped{}
}
id := p.index.FindIDBySeriesKey(p.segments, key)
p.mu.RUnlock()
return id.SeriesID()
return id
}
// SeriesCount returns the number of series.

View File

@ -6,7 +6,6 @@ import (
"github.com/influxdata/influxdb/pkg/metrics"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb"
)
@ -33,16 +32,9 @@ type arrayCursorIterator struct {
}
func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) (tsdb.Cursor, error) {
// Look up fields for measurement.
mf := q.e.fieldset.Fields(r.Name)
if mf == nil {
return nil, nil
}
// Find individual field.
f := mf.Field(r.Field)
if f == nil {
// field doesn't exist for this measurement
q.key = tsdb.AppendSeriesKey(q.key[:0], r.Name, r.Tags)
id := q.e.sfile.SeriesIDTypedBySeriesKey(q.key)
if id.IsZero() {
return nil, nil
}
@ -56,19 +48,19 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) (
opt.EndTime = r.EndTime
// Return appropriate cursor based on type.
switch f.Type {
case influxql.Float:
switch typ := id.Type(); typ {
case models.Float:
return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
case influxql.Integer:
case models.Integer:
return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
case influxql.Unsigned:
case models.Unsigned:
return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
case influxql.String:
case models.String:
return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
case influxql.Boolean:
case models.Boolean:
return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
default:
panic(fmt.Sprintf("unreachable: %T", f.Type))
panic(fmt.Sprintf("unreachable: %v", typ))
}
}

View File

@ -1194,7 +1194,6 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
// names from composite keys, and add them to the database index and measurement
// fields.
func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType) error {
var field []byte
collection := &tsdb.SeriesCollection{
Keys: keys,
Names: make([][]byte, 0, len(keys)),
@ -1204,13 +1203,8 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
for i := 0; i < len(keys); i++ {
// Replace tsm key format with index key format.
collection.Keys[i], field = SeriesAndFieldFromCompositeKey(collection.Keys[i])
collection.Keys[i], _ = SeriesAndFieldFromCompositeKey(collection.Keys[i])
name := models.ParseName(collection.Keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
return err
}
collection.Names = append(collection.Names, name)
collection.Tags = append(collection.Tags, models.ParseTags(keys[i]))
collection.Types = append(collection.Types, fieldTypeFromDataType(fieldTypes[i]))
@ -1644,40 +1638,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
if err := e.deleteMeasurement(name); err != nil {
return err
}
// A sentinel error message to cause DeleteWithLock to not delete the measurement
abortErr := fmt.Errorf("measurements still exist")
// Under write lock, delete the measurement if we no longer have any data stored for
// the measurement. If data exists, we can't delete the field set yet as there
// were writes to the measurement while we are deleting it.
if err := e.fieldset.DeleteWithLock(string(name), func() error {
encodedName := models.EscapeMeasurement(name)
// First scan the cache to see if any series exists for this measurement.
if err := e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
if bytes.HasPrefix(k, encodedName) {
return abortErr
}
return nil
}); err != nil {
return err
}
// Check the filestore.
return e.FileStore.WalkKeys(name, func(k []byte, typ byte) error {
if bytes.HasPrefix(k, encodedName) {
return abortErr
}
return nil
})
}); err != nil && err != abortErr {
// Something else failed, return it
return err
}
return e.fieldset.Save()
return nil
}
// DeleteMeasurement deletes a measurement and all related series.