Merge pull request #43 from influxdata/sgc-metaqueries

fix(query): Utilize improvements storage RPC API
pull/10616/head
Stuart Carnie 2018-05-24 15:11:00 -07:00 committed by GitHub
commit 11004024cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 985 additions and 291 deletions

View File

@ -117,7 +117,7 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) {
// Write rows // Write rows
r := 0 r := 0
f.b.Do(func(cr query.ColReader) error { w.err = f.b.Do(func(cr query.ColReader) error {
if r == 0 { if r == 0 {
l := cr.Len() l := cr.Len()
for i := 0; i < l; i++ { for i := 0; i < l; i++ {

View File

@ -103,7 +103,9 @@ func (r DistinctPointLimitRewriteRule) Rewrite(pr *plan.Procedure, planner plan.
} }
groupStar := !fromSpec.GroupingSet && distinct.Column != execute.DefaultValueColLabel groupStar := !fromSpec.GroupingSet && distinct.Column != execute.DefaultValueColLabel
groupByColumn := fromSpec.GroupingSet && ((len(fromSpec.GroupKeys) > 0 && execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)) || (len(fromSpec.GroupExcept) > 0 && !execute.ContainsStr(fromSpec.GroupExcept, distinct.Column))) groupByColumn := fromSpec.GroupingSet && len(fromSpec.GroupKeys) > 0 &&
((fromSpec.GroupMode == GroupModeBy && execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)) ||
(fromSpec.GroupMode == GroupModeExcept && !execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)))
if groupStar || groupByColumn { if groupStar || groupByColumn {
fromSpec.LimitSet = true fromSpec.LimitSet = true
fromSpec.PointsLimit = -1 fromSpec.PointsLimit = -1
@ -150,8 +152,20 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Block) er
colIdx := execute.ColIdx(t.column, b.Cols()) colIdx := execute.ColIdx(t.column, b.Cols())
if colIdx < 0 { if colIdx < 0 {
return fmt.Errorf("no column %q exists", t.column) // doesn't exist in this block, so add an empty value
execute.AddBlockKeyCols(b.Key(), builder)
colIdx = builder.AddCol(query.ColMeta{
Label: execute.DefaultValueColLabel,
Type: query.TString,
})
builder.AppendString(colIdx, "")
execute.AppendKeyValues(b.Key(), builder)
// TODO: hack required to ensure data flows downstream
return b.Do(func(query.ColReader) error {
return nil
})
} }
col := b.Cols()[colIdx] col := b.Cols()[colIdx]
execute.AddBlockKeyCols(b.Key(), builder) execute.AddBlockKeyCols(b.Key(), builder)
@ -178,7 +192,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Block) er
} }
execute.AppendKeyValues(b.Key(), builder) execute.AppendKeyValues(b.Key(), builder)
// TODO: this is a hack // TODO: hack required to ensure data flows downstream
return b.Do(func(query.ColReader) error { return b.Do(func(query.ColReader) error {
return nil return nil
}) })

View File

@ -101,9 +101,8 @@ type FromProcedureSpec struct {
GroupingSet bool GroupingSet bool
OrderByTime bool OrderByTime bool
MergeAll bool GroupMode GroupMode
GroupKeys []string GroupKeys []string
GroupExcept []string
AggregateSet bool AggregateSet bool
AggregateMethod string AggregateMethod string
@ -214,9 +213,8 @@ func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execu
SeriesOffset: spec.SeriesOffset, SeriesOffset: spec.SeriesOffset,
Descending: spec.Descending, Descending: spec.Descending,
OrderByTime: spec.OrderByTime, OrderByTime: spec.OrderByTime,
MergeAll: spec.MergeAll, GroupMode: storage.GroupMode(spec.GroupMode),
GroupKeys: spec.GroupKeys, GroupKeys: spec.GroupKeys,
GroupExcept: spec.GroupExcept,
AggregateMethod: spec.AggregateMethod, AggregateMethod: spec.AggregateMethod,
}, },
bounds, bounds,

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/platform/query/interpreter" "github.com/influxdata/platform/query/interpreter"
"github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/semantic" "github.com/influxdata/platform/query/semantic"
"math/bits"
) )
const GroupKind = "group" const GroupKind = "group"
@ -17,6 +18,8 @@ const GroupKind = "group"
type GroupOpSpec struct { type GroupOpSpec struct {
By []string `json:"by"` By []string `json:"by"`
Except []string `json:"except"` Except []string `json:"except"`
All bool `json:"all"`
None bool `json:"none"`
} }
var groupSignature = query.DefaultFunctionSignature() var groupSignature = query.DefaultFunctionSignature()
@ -24,6 +27,8 @@ var groupSignature = query.DefaultFunctionSignature()
func init() { func init() {
groupSignature.Params["by"] = semantic.NewArrayType(semantic.String) groupSignature.Params["by"] = semantic.NewArrayType(semantic.String)
groupSignature.Params["except"] = semantic.NewArrayType(semantic.String) groupSignature.Params["except"] = semantic.NewArrayType(semantic.String)
groupSignature.Params["none"] = semantic.Bool
groupSignature.Params["all"] = semantic.Bool
query.RegisterFunction(GroupKind, createGroupOpSpec, groupSignature) query.RegisterFunction(GroupKind, createGroupOpSpec, groupSignature)
query.RegisterOpSpec(GroupKind, newGroupOp) query.RegisterOpSpec(GroupKind, newGroupOp)
@ -38,6 +43,18 @@ func createGroupOpSpec(args query.Arguments, a *query.Administration) (query.Ope
} }
spec := new(GroupOpSpec) spec := new(GroupOpSpec)
if val, ok, err := args.GetBool("none"); err != nil {
return nil, err
} else if ok && val {
spec.None = true
}
if val, ok, err := args.GetBool("all"); err != nil {
return nil, err
} else if ok && val {
spec.All = true
}
if array, ok, err := args.GetArray("by", semantic.String); err != nil { if array, ok, err := args.GetArray("by", semantic.String); err != nil {
return nil, err return nil, err
} else if ok { } else if ok {
@ -55,9 +72,16 @@ func createGroupOpSpec(args query.Arguments, a *query.Administration) (query.Ope
} }
} }
if len(spec.By) > 0 && len(spec.Except) > 0 { switch bits.OnesCount(uint(groupModeFromSpec(spec))) {
return nil, errors.New(`cannot specify both "by" and "except" keyword arguments`) case 0:
// empty args
spec.All = true
case 1:
// all good
default:
return nil, errors.New(`specify one of "by", "except", "none" or "all" keyword arguments`)
} }
return spec, nil return spec, nil
} }
@ -69,9 +93,45 @@ func (s *GroupOpSpec) Kind() query.OperationKind {
return GroupKind return GroupKind
} }
type GroupMode int
const (
// GroupModeDefault will use the default grouping of GroupModeAll.
GroupModeDefault GroupMode = 0
// GroupModeNone merges all series into a single group.
GroupModeNone GroupMode = 1 << iota
// GroupModeAll produces a separate block for each series.
GroupModeAll
// GroupModeBy produces a block for each unique value of the specified GroupKeys.
GroupModeBy
// GroupModeExcept produces a block for the unique values of all keys, except those specified by GroupKeys.
GroupModeExcept
)
func groupModeFromSpec(spec *GroupOpSpec) GroupMode {
var mode GroupMode
if spec.All {
mode |= GroupModeAll
}
if spec.None {
mode |= GroupModeNone
}
if len(spec.By) > 0 {
mode |= GroupModeBy
}
if len(spec.Except) > 0 {
mode |= GroupModeExcept
}
if mode == GroupModeDefault {
mode = GroupModeAll
}
return mode
}
type GroupProcedureSpec struct { type GroupProcedureSpec struct {
By []string GroupMode GroupMode
Except []string GroupKeys []string
} }
func newGroupProcedure(qs query.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) { func newGroupProcedure(qs query.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
@ -80,9 +140,22 @@ func newGroupProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Pro
return nil, fmt.Errorf("invalid spec type %T", qs) return nil, fmt.Errorf("invalid spec type %T", qs)
} }
mode := groupModeFromSpec(spec)
var keys []string
switch mode {
case GroupModeAll:
case GroupModeNone:
case GroupModeBy:
keys = spec.By
case GroupModeExcept:
keys = spec.Except
default:
return nil, fmt.Errorf("invalid GroupOpSpec; multiple modes detected")
}
p := &GroupProcedureSpec{ p := &GroupProcedureSpec{
By: spec.By, GroupMode: mode,
Except: spec.Except, GroupKeys: keys,
} }
return p, nil return p, nil
} }
@ -93,11 +166,10 @@ func (s *GroupProcedureSpec) Kind() plan.ProcedureKind {
func (s *GroupProcedureSpec) Copy() plan.ProcedureSpec { func (s *GroupProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(GroupProcedureSpec) ns := new(GroupProcedureSpec)
ns.By = make([]string, len(s.By)) ns.GroupMode = s.GroupMode
copy(ns.By, s.By)
ns.Except = make([]string, len(s.Except)) ns.GroupKeys = make([]string, len(s.GroupKeys))
copy(ns.Except, s.Except) copy(ns.GroupKeys, s.GroupKeys)
return ns return ns
} }
@ -120,19 +192,16 @@ func (s *GroupProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Pro
selectSpec = root.Spec.(*FromProcedureSpec) selectSpec = root.Spec.(*FromProcedureSpec)
selectSpec.OrderByTime = false selectSpec.OrderByTime = false
selectSpec.GroupingSet = false selectSpec.GroupingSet = false
selectSpec.MergeAll = false selectSpec.GroupMode = GroupModeDefault
selectSpec.GroupKeys = nil selectSpec.GroupKeys = nil
selectSpec.GroupExcept = nil
return return
} }
selectSpec.GroupingSet = true selectSpec.GroupingSet = true
// TODO implement OrderByTime // TODO implement OrderByTime
//selectSpec.OrderByTime = true //selectSpec.OrderByTime = true
// Merge all series into a single group if we have no specific grouping dimensions. selectSpec.GroupMode = s.GroupMode
selectSpec.MergeAll = len(s.By) == 0 && len(s.Except) == 0 selectSpec.GroupKeys = s.GroupKeys
selectSpec.GroupKeys = s.By
selectSpec.GroupExcept = s.Except
} }
type AggregateGroupRewriteRule struct { type AggregateGroupRewriteRule struct {
@ -196,23 +265,18 @@ type groupTransformation struct {
d execute.Dataset d execute.Dataset
cache execute.BlockBuilderCache cache execute.BlockBuilderCache
keys []string mode GroupMode
except []string keys []string
// Ignoring is true of len(keys) == 0 && len(except) > 0
ignoring bool
} }
func NewGroupTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *GroupProcedureSpec) *groupTransformation { func NewGroupTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *GroupProcedureSpec) *groupTransformation {
t := &groupTransformation{ t := &groupTransformation{
d: d, d: d,
cache: cache, cache: cache,
keys: spec.By, mode: spec.GroupMode,
except: spec.Except, keys: spec.GroupKeys,
ignoring: len(spec.By) == 0 && len(spec.Except) > 0,
} }
sort.Strings(t.keys) sort.Strings(t.keys)
sort.Strings(t.except)
return t return t
} }
@ -233,14 +297,14 @@ func (t *groupTransformation) RetractBlock(id execute.DatasetID, key query.Parti
func (t *groupTransformation) Process(id execute.DatasetID, b query.Block) error { func (t *groupTransformation) Process(id execute.DatasetID, b query.Block) error {
cols := b.Cols() cols := b.Cols()
on := make(map[string]bool, len(cols)) on := make(map[string]bool, len(cols))
if len(t.keys) > 0 { if t.mode == GroupModeBy && len(t.keys) > 0 {
for _, k := range t.keys { for _, k := range t.keys {
on[k] = true on[k] = true
} }
} else if len(t.except) > 0 { } else if t.mode == GroupModeExcept && len(t.keys) > 0 {
COLS: COLS:
for _, c := range cols { for _, c := range cols {
for _, label := range t.except { for _, label := range t.keys {
if c.Label == label { if c.Label == label {
continue COLS continue COLS
} }

View File

@ -33,7 +33,8 @@ func TestGroup_Process(t *testing.T) {
{ {
name: "fan in", name: "fan in",
spec: &functions.GroupProcedureSpec{ spec: &functions.GroupProcedureSpec{
By: []string{"t1"}, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1"},
}, },
data: []query.Block{ data: []query.Block{
&executetest.Block{ &executetest.Block{
@ -117,7 +118,8 @@ func TestGroup_Process(t *testing.T) {
{ {
name: "fan in ignoring", name: "fan in ignoring",
spec: &functions.GroupProcedureSpec{ spec: &functions.GroupProcedureSpec{
Except: []string{"_time", "_value", "t2"}, GroupMode: functions.GroupModeExcept,
GroupKeys: []string{"_time", "_value", "t2"},
}, },
data: []query.Block{ data: []query.Block{
&executetest.Block{ &executetest.Block{
@ -207,7 +209,8 @@ func TestGroup_Process(t *testing.T) {
{ {
name: "fan out", name: "fan out",
spec: &functions.GroupProcedureSpec{ spec: &functions.GroupProcedureSpec{
By: []string{"t1"}, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1"},
}, },
data: []query.Block{&executetest.Block{ data: []query.Block{&executetest.Block{
ColMeta: []query.ColMeta{ ColMeta: []query.ColMeta{
@ -248,7 +251,8 @@ func TestGroup_Process(t *testing.T) {
{ {
name: "fan out ignoring", name: "fan out ignoring",
spec: &functions.GroupProcedureSpec{ spec: &functions.GroupProcedureSpec{
Except: []string{"_time", "_value", "t2"}, GroupMode: functions.GroupModeExcept,
GroupKeys: []string{"_time", "_value", "t2"},
}, },
data: []query.Block{&executetest.Block{ data: []query.Block{&executetest.Block{
ColMeta: []query.ColMeta{ ColMeta: []query.ColMeta{
@ -310,7 +314,8 @@ func TestGroup_Process(t *testing.T) {
func TestGroup_PushDown(t *testing.T) { func TestGroup_PushDown(t *testing.T) {
spec := &functions.GroupProcedureSpec{ spec := &functions.GroupProcedureSpec{
By: []string{"t1", "t2"}, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"},
} }
root := &plan.Procedure{ root := &plan.Procedure{
Spec: new(functions.FromProcedureSpec), Spec: new(functions.FromProcedureSpec),
@ -318,7 +323,7 @@ func TestGroup_PushDown(t *testing.T) {
want := &plan.Procedure{ want := &plan.Procedure{
Spec: &functions.FromProcedureSpec{ Spec: &functions.FromProcedureSpec{
GroupingSet: true, GroupingSet: true,
MergeAll: false, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"}, GroupKeys: []string{"t1", "t2"},
}, },
} }
@ -327,12 +332,13 @@ func TestGroup_PushDown(t *testing.T) {
} }
func TestGroup_PushDown_Duplicate(t *testing.T) { func TestGroup_PushDown_Duplicate(t *testing.T) {
spec := &functions.GroupProcedureSpec{ spec := &functions.GroupProcedureSpec{
By: []string{"t1", "t2"}, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"},
} }
root := &plan.Procedure{ root := &plan.Procedure{
Spec: &functions.FromProcedureSpec{ Spec: &functions.FromProcedureSpec{
GroupingSet: true, GroupingSet: true,
MergeAll: true, GroupMode: functions.GroupModeAll,
}, },
} }
want := &plan.Procedure{ want := &plan.Procedure{

View File

@ -113,7 +113,7 @@ func (x Node_Logical) String() string {
func (Node_Logical) EnumDescriptor() ([]byte, []int) { return fileDescriptorPredicate, []int{0, 2} } func (Node_Logical) EnumDescriptor() ([]byte, []int) { return fileDescriptorPredicate, []int{0, 2} }
type Node struct { type Node struct {
NodeType Node_Type `protobuf:"varint,1,opt,name=node_type,json=nodeType,proto3,enum=storage.Node_Type" json:"nodeType"` NodeType Node_Type `protobuf:"varint,1,opt,name=node_type,json=nodeType,proto3,enum=com.github.influxdata.influxdb.services.storage.Node_Type" json:"nodeType"`
Children []*Node `protobuf:"bytes,2,rep,name=children" json:"children,omitempty"` Children []*Node `protobuf:"bytes,2,rep,name=children" json:"children,omitempty"`
// Types that are valid to be assigned to Value: // Types that are valid to be assigned to Value:
// *Node_StringValue // *Node_StringValue
@ -165,10 +165,10 @@ type Node_FieldRefValue struct {
FieldRefValue string `protobuf:"bytes,10,opt,name=field_ref_value,json=fieldRefValue,proto3,oneof"` FieldRefValue string `protobuf:"bytes,10,opt,name=field_ref_value,json=fieldRefValue,proto3,oneof"`
} }
type Node_Logical_ struct { type Node_Logical_ struct {
Logical Node_Logical `protobuf:"varint,11,opt,name=logical,proto3,enum=storage.Node_Logical,oneof"` Logical Node_Logical `protobuf:"varint,11,opt,name=logical,proto3,enum=com.github.influxdata.influxdb.services.storage.Node_Logical,oneof"`
} }
type Node_Comparison_ struct { type Node_Comparison_ struct {
Comparison Node_Comparison `protobuf:"varint,12,opt,name=comparison,proto3,enum=storage.Node_Comparison,oneof"` Comparison Node_Comparison `protobuf:"varint,12,opt,name=comparison,proto3,enum=com.github.influxdata.influxdb.services.storage.Node_Comparison,oneof"`
} }
func (*Node_StringValue) isNode_Value() {} func (*Node_StringValue) isNode_Value() {}
@ -474,11 +474,11 @@ func (m *Predicate) GetRoot() *Node {
} }
func init() { func init() {
proto.RegisterType((*Node)(nil), "storage.Node") proto.RegisterType((*Node)(nil), "com.github.influxdata.influxdb.services.storage.Node")
proto.RegisterType((*Predicate)(nil), "storage.Predicate") proto.RegisterType((*Predicate)(nil), "com.github.influxdata.influxdb.services.storage.Predicate")
proto.RegisterEnum("storage.Node_Type", Node_Type_name, Node_Type_value) proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.Node_Type", Node_Type_name, Node_Type_value)
proto.RegisterEnum("storage.Node_Comparison", Node_Comparison_name, Node_Comparison_value) proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.Node_Comparison", Node_Comparison_name, Node_Comparison_value)
proto.RegisterEnum("storage.Node_Logical", Node_Logical_name, Node_Logical_value) proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.Node_Logical", Node_Logical_name, Node_Logical_value)
} }
func (m *Node) Marshal() (dAtA []byte, err error) { func (m *Node) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
@ -1287,58 +1287,61 @@ var (
func init() { proto.RegisterFile("predicate.proto", fileDescriptorPredicate) } func init() { proto.RegisterFile("predicate.proto", fileDescriptorPredicate) }
var fileDescriptorPredicate = []byte{ var fileDescriptorPredicate = []byte{
// 845 bytes of a gzipped FileDescriptorProto // 883 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x94, 0xcf, 0x6e, 0xdb, 0x46, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x95, 0xcb, 0x6e, 0xdb, 0x46,
0x10, 0xc6, 0x45, 0x49, 0xb6, 0xc4, 0x91, 0x65, 0x33, 0x9b, 0x38, 0x56, 0xd9, 0x46, 0xda, 0x3a, 0x14, 0x86, 0x45, 0x5d, 0x2c, 0xe9, 0xc8, 0x17, 0x66, 0x12, 0xc7, 0x0a, 0xdb, 0x48, 0x03, 0x07,
0x28, 0xa0, 0x1c, 0x2a, 0xc3, 0x6e, 0x73, 0x69, 0x0e, 0x05, 0xe5, 0xd0, 0xb2, 0x00, 0x56, 0x52, 0x05, 0xd4, 0x45, 0x65, 0xd8, 0xad, 0x37, 0x0d, 0x8a, 0x56, 0x72, 0x68, 0x59, 0x00, 0x2b, 0x29,
0x29, 0xa6, 0xc9, 0x4d, 0xa0, 0xa5, 0x15, 0x4d, 0x80, 0xe1, 0xaa, 0xcb, 0x55, 0x91, 0xbc, 0x41, 0x14, 0x73, 0x69, 0x37, 0x02, 0x25, 0x8d, 0x68, 0x02, 0x34, 0x47, 0x19, 0x8e, 0x02, 0xe7, 0x0d,
0xc1, 0x53, 0xef, 0x05, 0x4f, 0x7d, 0x99, 0x02, 0x45, 0x81, 0x3e, 0x81, 0x50, 0xa8, 0xb7, 0x3e, 0x0a, 0xae, 0xba, 0x2f, 0xb8, 0xea, 0xcb, 0x74, 0x53, 0xa0, 0x4f, 0x20, 0x14, 0xea, 0xae, 0x7d,
0x45, 0xc1, 0xe5, 0x3f, 0xa9, 0xc9, 0x6d, 0x67, 0xbe, 0xef, 0x37, 0xb3, 0xbb, 0x1c, 0x2e, 0x9c, 0x89, 0x82, 0xc3, 0x9b, 0xdc, 0x76, 0x53, 0xef, 0xe6, 0x9c, 0xf9, 0xbf, 0xff, 0xcc, 0x1c, 0x1e,
0xac, 0x18, 0x59, 0xb8, 0x73, 0x9b, 0x93, 0xde, 0x8a, 0x51, 0x4e, 0x51, 0x2d, 0xe0, 0x94, 0xd9, 0x92, 0x70, 0xb0, 0x64, 0x64, 0x6e, 0xcf, 0x4c, 0x4e, 0xda, 0x4b, 0x46, 0x39, 0x45, 0x27, 0x33,
0x0e, 0x51, 0xbf, 0x74, 0x5c, 0x7e, 0xbf, 0xbe, 0xeb, 0xcd, 0xe9, 0xdb, 0x0b, 0x87, 0x3a, 0xf4, 0x7a, 0xd3, 0xb6, 0x6c, 0x7e, 0xbd, 0x9a, 0xb6, 0x6d, 0x77, 0xe1, 0xac, 0x6e, 0xe7, 0x26, 0x37,
0x42, 0xe8, 0x77, 0xeb, 0xa5, 0x88, 0x44, 0x20, 0x56, 0x09, 0x77, 0xfe, 0x07, 0x40, 0x75, 0x44, 0x93, 0xe5, 0xb4, 0xed, 0x11, 0xf6, 0xde, 0x9e, 0x11, 0xaf, 0xed, 0x71, 0xca, 0x4c, 0x8b, 0x28,
0x17, 0x04, 0x0d, 0x41, 0xf6, 0xe9, 0x82, 0xcc, 0xf8, 0xfb, 0x15, 0x69, 0x49, 0x58, 0xea, 0x1e, 0x9f, 0xc5, 0xe2, 0x19, 0xbd, 0x39, 0xb1, 0xa8, 0x45, 0x4f, 0x84, 0xcf, 0x74, 0xb5, 0x10, 0x91,
0x5f, 0xa1, 0x5e, 0x5a, 0xb4, 0x17, 0x3b, 0x7a, 0xd6, 0xfb, 0x15, 0xe9, 0xb7, 0xb6, 0x9b, 0x4e, 0x08, 0xc4, 0x2a, 0xf2, 0x3f, 0xfe, 0xab, 0x06, 0xc5, 0x01, 0x9d, 0x13, 0xf4, 0x0e, 0xaa, 0x2e,
0x3d, 0x0e, 0xe3, 0xe8, 0xdf, 0x4d, 0xa7, 0xee, 0xa7, 0x6b, 0x33, 0x5f, 0xa1, 0x67, 0x50, 0x9f, 0x9d, 0x93, 0x09, 0xff, 0xb0, 0x24, 0x75, 0x09, 0x4b, 0xad, 0xfd, 0xb3, 0x2f, 0xdb, 0xff, 0xb3,
0xdf, 0xbb, 0xde, 0x82, 0x11, 0xbf, 0x55, 0xc6, 0x95, 0x6e, 0xe3, 0xaa, 0xb9, 0x57, 0xc9, 0xcc, 0x78, 0x3b, 0x74, 0x6a, 0x1b, 0x1f, 0x96, 0xa4, 0x5b, 0xdf, 0xac, 0x9b, 0x95, 0x30, 0x0c, 0xa3,
0x65, 0xf4, 0x35, 0x1c, 0x05, 0x9c, 0xb9, 0xbe, 0x33, 0xfb, 0xc9, 0xf6, 0xd6, 0xa4, 0x55, 0xc1, 0x3f, 0xd7, 0xcd, 0x8a, 0x1b, 0xaf, 0xf5, 0x74, 0x85, 0x5e, 0x42, 0x65, 0x76, 0x6d, 0x3b, 0x73,
0x52, 0x57, 0xee, 0x9f, 0x6c, 0x37, 0x9d, 0xc6, 0x54, 0xe4, 0x7f, 0x88, 0xd3, 0xb7, 0x25, 0xb3, 0x46, 0xdc, 0x7a, 0x1e, 0x17, 0x5a, 0xb5, 0xb3, 0xf3, 0x7b, 0x55, 0xd4, 0x53, 0x1b, 0xf4, 0x05,
0x11, 0x14, 0x21, 0xba, 0x04, 0xb8, 0xa3, 0xd4, 0x4b, 0x99, 0x2a, 0x96, 0xba, 0xf5, 0xbe, 0xb2, 0xec, 0x7a, 0x9c, 0xd9, 0xae, 0x35, 0x79, 0x6f, 0x3a, 0x2b, 0x52, 0x2f, 0x60, 0xa9, 0x55, 0xed,
0xdd, 0x74, 0x8e, 0xfa, 0x94, 0x7a, 0xc4, 0xf6, 0x33, 0x48, 0x8e, 0x5d, 0x09, 0x72, 0x01, 0xb2, 0x1e, 0x6c, 0xd6, 0xcd, 0xda, 0x58, 0xe4, 0x5f, 0x87, 0xe9, 0xab, 0x9c, 0x5e, 0xf3, 0xb2, 0x10,
0xeb, 0xf3, 0x94, 0x38, 0xc0, 0x52, 0xb7, 0x92, 0x10, 0x43, 0x9f, 0x13, 0x87, 0xb0, 0x8c, 0xa8, 0x9d, 0x02, 0x4c, 0x29, 0x75, 0x62, 0xa6, 0x88, 0xa5, 0x56, 0xa5, 0x2b, 0x6f, 0xd6, 0xcd, 0xdd,
0xbb, 0x3e, 0x4f, 0x80, 0x2b, 0x80, 0x75, 0x41, 0x1c, 0x62, 0xa9, 0x5b, 0xed, 0x3f, 0xd8, 0x6e, 0x2e, 0xa5, 0x0e, 0x31, 0xdd, 0x04, 0xaa, 0x86, 0xaa, 0x08, 0x39, 0x81, 0xaa, 0xed, 0xf2, 0x98,
0x3a, 0xcd, 0x57, 0x7e, 0xe0, 0x3a, 0x3e, 0x59, 0xe4, 0x4d, 0xd6, 0x39, 0x73, 0x09, 0x8d, 0xa5, 0x28, 0x61, 0xa9, 0x55, 0x88, 0x88, 0xbe, 0xcb, 0x89, 0x45, 0x58, 0x42, 0x54, 0x6c, 0x97, 0x47,
0x47, 0xed, 0x0c, 0xaa, 0x61, 0xa9, 0x2b, 0xf5, 0x8f, 0xb7, 0x9b, 0x0e, 0xdc, 0xc4, 0xe9, 0x8c, 0xc0, 0x19, 0xc0, 0x2a, 0x23, 0x76, 0xb0, 0xd4, 0x2a, 0x76, 0x1f, 0x6c, 0xd6, 0xcd, 0xbd, 0x57,
0x80, 0x65, 0x1e, 0xc5, 0x08, 0x23, 0x0e, 0x79, 0x97, 0x22, 0x75, 0x71, 0x7e, 0x81, 0x98, 0x71, 0xae, 0x67, 0x5b, 0x2e, 0x99, 0xa7, 0x45, 0x56, 0x29, 0x73, 0x0a, 0xb5, 0x85, 0x43, 0xcd, 0x04,
0x3a, 0x47, 0x58, 0x1e, 0xa1, 0xe7, 0xd0, 0xe4, 0xb6, 0x33, 0x63, 0x64, 0x99, 0x42, 0x72, 0x71, 0x2a, 0x63, 0xa9, 0x25, 0x75, 0xf7, 0x37, 0xeb, 0x26, 0x5c, 0x86, 0xe9, 0x84, 0x80, 0x45, 0x1a,
0x69, 0x96, 0xed, 0x98, 0x64, 0x99, 0x5f, 0x1a, 0x2f, 0x42, 0xf4, 0x02, 0x4e, 0x96, 0x2e, 0xf1, 0x85, 0x08, 0x23, 0x16, 0xb9, 0x8d, 0x91, 0x8a, 0xb8, 0xbf, 0x40, 0xf4, 0x30, 0x9d, 0x22, 0x2c,
0x16, 0x3b, 0x20, 0x08, 0x50, 0x9c, 0xea, 0x26, 0x96, 0x76, 0xd0, 0xe6, 0x72, 0x37, 0x81, 0x2e, 0x8d, 0xd0, 0x39, 0xec, 0x71, 0xd3, 0x9a, 0x30, 0xb2, 0x88, 0xa1, 0x6a, 0xd6, 0x34, 0xc3, 0xb4,
0xa1, 0xe6, 0x51, 0xc7, 0x9d, 0xdb, 0x5e, 0xab, 0x21, 0x66, 0xe3, 0x74, 0x7f, 0x36, 0x8c, 0x44, 0x74, 0xb2, 0x48, 0x9b, 0xc6, 0xb3, 0x10, 0x3d, 0x87, 0x83, 0x85, 0x4d, 0x9c, 0xf9, 0x16, 0x08,
0xbc, 0x2d, 0x99, 0x99, 0x0f, 0x7d, 0x03, 0x30, 0xa7, 0x6f, 0x57, 0x36, 0x73, 0x03, 0xea, 0xb7, 0x02, 0x14, 0xb7, 0xba, 0x0c, 0xb7, 0xb6, 0xd0, 0xbd, 0xc5, 0x76, 0x02, 0x7d, 0x07, 0x65, 0x87,
0x8e, 0x04, 0xd5, 0xda, 0xa7, 0xae, 0x73, 0x3d, 0x3e, 0x62, 0xe1, 0x3e, 0xff, 0xb5, 0x0c, 0x55, 0x5a, 0xf6, 0xcc, 0x74, 0xea, 0x35, 0x31, 0x6b, 0x5f, 0xdd, 0x6f, 0xd6, 0xb4, 0xc8, 0xe4, 0x2a,
0x31, 0x4a, 0xcf, 0x01, 0x19, 0xe3, 0xc1, 0xf0, 0x5a, 0x33, 0x66, 0xfa, 0x9b, 0x89, 0xa9, 0x4f, 0xa7, 0x27, 0x7e, 0x68, 0x0a, 0x30, 0xa3, 0x37, 0x4b, 0x93, 0xd9, 0x1e, 0x75, 0xeb, 0xbb, 0xc2,
0xa7, 0xc3, 0xf1, 0x48, 0x29, 0xa9, 0x4f, 0xc2, 0x08, 0x7f, 0x92, 0x8d, 0x61, 0xda, 0x5c, 0x7f, 0xfd, 0x9b, 0xfb, 0xb9, 0x5f, 0xa4, 0x3e, 0x61, 0xcb, 0x32, 0xd7, 0xe3, 0x9f, 0xf2, 0x50, 0x14,
0xb7, 0x62, 0x24, 0x08, 0x5c, 0xea, 0xa3, 0x17, 0x70, 0x7a, 0x3d, 0xfe, 0x6e, 0xa2, 0x99, 0xc3, 0x23, 0x7c, 0x0e, 0x48, 0x1b, 0xf6, 0xfa, 0x17, 0x1d, 0x6d, 0xa2, 0xbe, 0x1d, 0xe9, 0xea, 0x78,
0xe9, 0x78, 0xb4, 0x4b, 0x4a, 0x2a, 0x0e, 0x23, 0xfc, 0x59, 0x46, 0x16, 0x1b, 0xd8, 0x81, 0x2f, 0xdc, 0x1f, 0x0e, 0xe4, 0x9c, 0xf2, 0xd4, 0x0f, 0xf0, 0x93, 0x64, 0xfc, 0xe3, 0x43, 0xaa, 0xb7,
0x41, 0x99, 0x68, 0xa6, 0xbe, 0xc7, 0x95, 0xd5, 0x4f, 0xc3, 0x08, 0x9f, 0x65, 0xdc, 0xc4, 0x66, 0x4b, 0x46, 0x3c, 0xcf, 0xa6, 0x2e, 0x7a, 0x0e, 0x87, 0x17, 0xc3, 0x6f, 0x47, 0x1d, 0xbd, 0x3f,
0x64, 0x17, 0xe9, 0x40, 0xcd, 0xd2, 0x06, 0x33, 0x53, 0xbf, 0x51, 0x2a, 0x2a, 0x0a, 0x23, 0x7c, 0x1e, 0x0e, 0xb6, 0x49, 0x49, 0xc1, 0x7e, 0x80, 0x3f, 0x4e, 0xc8, 0xec, 0x00, 0x5b, 0xf0, 0x29,
0x9c, 0x39, 0x93, 0x0f, 0x82, 0x30, 0xd4, 0x8c, 0xa1, 0xa5, 0x9b, 0x9a, 0xa1, 0x54, 0xd5, 0x87, 0xc8, 0xa3, 0x8e, 0xae, 0xde, 0xe1, 0xf2, 0xca, 0x47, 0x7e, 0x80, 0x8f, 0x12, 0x6e, 0x64, 0x32,
0x61, 0x84, 0x4f, 0xf2, 0xcd, 0xbb, 0x9c, 0x30, 0xdb, 0x43, 0x4f, 0x41, 0xbe, 0x19, 0xea, 0xc6, 0xb2, 0x8d, 0x34, 0xa1, 0x6c, 0x74, 0x7a, 0x13, 0x5d, 0xbd, 0x94, 0x0b, 0x0a, 0xf2, 0x03, 0xbc,
0x4b, 0x51, 0xe4, 0x40, 0x7d, 0x14, 0x46, 0x58, 0xc9, 0x3c, 0xd9, 0xc7, 0x51, 0xab, 0x3f, 0xff, 0x9f, 0x28, 0xa3, 0x07, 0x8c, 0x30, 0x94, 0xb5, 0xbe, 0xa1, 0xea, 0x1d, 0x4d, 0x2e, 0x2a, 0x0f,
0xd6, 0x2e, 0x9d, 0xff, 0x59, 0x06, 0x28, 0x76, 0x8e, 0xda, 0x70, 0xa0, 0x7f, 0xff, 0x4a, 0x33, 0xfd, 0x00, 0x1f, 0xa4, 0x87, 0xb7, 0x39, 0x61, 0xa6, 0x83, 0x9e, 0x41, 0xf5, 0xb2, 0xaf, 0x6a,
0x94, 0x52, 0x52, 0x79, 0xe7, 0x50, 0x3f, 0xae, 0x6d, 0x0f, 0x7d, 0x01, 0xf2, 0x68, 0x6c, 0xcd, 0x2f, 0x84, 0x49, 0x49, 0x79, 0xe4, 0x07, 0x58, 0x4e, 0x34, 0xc9, 0xc3, 0x56, 0x8a, 0x3f, 0xfc,
0x12, 0x8f, 0xa4, 0x3e, 0x0e, 0x23, 0x8c, 0x0a, 0xcf, 0x88, 0xf2, 0xc4, 0xf6, 0x0c, 0x1a, 0x53, 0xdc, 0xc8, 0x1d, 0xff, 0x9a, 0x07, 0xc8, 0x4e, 0x8e, 0x1a, 0x50, 0x52, 0x5f, 0xbe, 0xea, 0x68,
0x4b, 0x33, 0xad, 0xe9, 0xec, 0xf5, 0xd0, 0xba, 0x55, 0xca, 0x6a, 0x2b, 0x8c, 0xf0, 0xa3, 0xc2, 0x72, 0x2e, 0x72, 0xde, 0xba, 0xd4, 0xbb, 0x95, 0xe9, 0xa0, 0x4f, 0xa0, 0x3a, 0x18, 0x1a, 0x93,
0x38, 0xe5, 0x36, 0xe3, 0xc1, 0x6b, 0x97, 0xdf, 0xc7, 0x1d, 0x4d, 0x7d, 0xa0, 0xbf, 0x51, 0x2a, 0x48, 0x23, 0x29, 0x8f, 0xfd, 0x00, 0xa3, 0x4c, 0x33, 0xa0, 0x3c, 0x92, 0x7d, 0x0a, 0xb5, 0xb1,
0xff, 0xef, 0x28, 0x86, 0x36, 0xeb, 0x98, 0x78, 0xaa, 0x1f, 0xe9, 0x98, 0xd8, 0x54, 0x28, 0x1b, 0xd1, 0xd1, 0x8d, 0xf1, 0xe4, 0x4d, 0xdf, 0xb8, 0x92, 0xf3, 0x4a, 0xdd, 0x0f, 0xf0, 0xa3, 0x4c,
0x96, 0x72, 0x90, 0x5c, 0x58, 0xa1, 0x1b, 0x24, 0x08, 0x10, 0x86, 0x8a, 0x61, 0xe9, 0xca, 0xa1, 0x38, 0xe6, 0x26, 0xe3, 0xde, 0x1b, 0x9b, 0x5f, 0x87, 0x15, 0x75, 0xb5, 0xa7, 0xbe, 0x95, 0x0b,
0x7a, 0x16, 0x46, 0xf8, 0xe1, 0xbe, 0x98, 0xec, 0xf7, 0x09, 0x94, 0x07, 0x96, 0x52, 0x53, 0x4f, 0xff, 0xac, 0x28, 0x5e, 0x82, 0xa4, 0x62, 0xa4, 0x29, 0xfe, 0x47, 0xc5, 0x48, 0xa6, 0x40, 0x5e,
0xc3, 0x08, 0x3f, 0x28, 0x0c, 0x03, 0x46, 0x6c, 0x4e, 0x18, 0x7a, 0x0a, 0x95, 0x81, 0xa5, 0x2b, 0x33, 0xe4, 0x52, 0xd4, 0xb0, 0x6c, 0x5f, 0x23, 0x9e, 0x87, 0x30, 0x14, 0x34, 0x43, 0x95, 0x77,
0x75, 0x55, 0x0d, 0x23, 0xfc, 0xf8, 0x03, 0x5d, 0xd4, 0x48, 0xef, 0xf3, 0x5b, 0xa8, 0xa5, 0x23, 0x94, 0x23, 0x3f, 0xc0, 0x0f, 0xef, 0x6e, 0x46, 0xe7, 0x7d, 0x0a, 0xf9, 0x9e, 0x21, 0x97, 0x95,
0x84, 0xce, 0xa0, 0xa2, 0x8d, 0x5e, 0x2a, 0x25, 0xf5, 0x38, 0x8c, 0x30, 0xa4, 0x59, 0xcd, 0x5f, 0x43, 0x3f, 0xc0, 0x0f, 0x32, 0x41, 0x8f, 0x11, 0x93, 0x13, 0x86, 0x9e, 0x41, 0xa1, 0x67, 0xa8,
0xa0, 0x53, 0x28, 0x8f, 0x4d, 0x45, 0x52, 0x9b, 0x61, 0x84, 0xe5, 0x34, 0x3f, 0x66, 0x49, 0x81, 0x72, 0x45, 0x51, 0xfc, 0x00, 0x3f, 0xfe, 0xd7, 0xbe, 0xf0, 0x88, 0xfb, 0xf9, 0x35, 0x94, 0xe3,
0x7e, 0x0d, 0x0e, 0xc4, 0x0f, 0x75, 0xde, 0x03, 0x79, 0x92, 0x3d, 0xcc, 0xe8, 0x73, 0xa8, 0x32, 0x11, 0x42, 0x47, 0x50, 0xe8, 0x0c, 0x5e, 0xc8, 0x39, 0x65, 0xdf, 0x0f, 0x30, 0xc4, 0xd9, 0x8e,
0x4a, 0xb9, 0x78, 0x4c, 0x3f, 0x78, 0x02, 0x85, 0xd4, 0x57, 0x7e, 0xdf, 0xb6, 0xa5, 0xbf, 0xb6, 0x3b, 0x47, 0x87, 0x90, 0x1f, 0xea, 0xb2, 0xa4, 0xec, 0xf9, 0x01, 0xae, 0xc6, 0xf9, 0x21, 0x8b,
0x6d, 0xe9, 0xef, 0x6d, 0x5b, 0xfa, 0xe5, 0x9f, 0x76, 0xe9, 0xee, 0x50, 0x3c, 0xcb, 0x5f, 0xfd, 0x0c, 0xba, 0x65, 0x28, 0x89, 0x17, 0xf4, 0xf8, 0x35, 0x54, 0x47, 0xc9, 0x0f, 0x06, 0xf5, 0xa1,
0x17, 0x00, 0x00, 0xff, 0xff, 0xfb, 0xde, 0x9f, 0x18, 0xe1, 0x05, 0x00, 0x00, 0xc8, 0x28, 0xe5, 0xe2, 0x63, 0x7f, 0xef, 0x4f, 0xaf, 0xb0, 0xe8, 0x3e, 0xf9, 0x65, 0xd3, 0x90,
0x7e, 0xdb, 0x34, 0xa4, 0xdf, 0x37, 0x0d, 0xe9, 0xc7, 0x3f, 0x1a, 0xb9, 0xef, 0xcb, 0xb1, 0x6a,
0xba, 0x23, 0xfe, 0x33, 0x9f, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x37, 0x3b, 0xbc, 0x43, 0xda,
0x06, 0x00, 0x00,
} }

View File

@ -90,19 +90,23 @@ func (bi *bockIterator) Do(f func(query.Block) error) error {
req.Descending = bi.readSpec.Descending req.Descending = bi.readSpec.Descending
req.TimestampRange.Start = int64(bi.bounds.Start) req.TimestampRange.Start = int64(bi.bounds.Start)
req.TimestampRange.End = int64(bi.bounds.Stop) req.TimestampRange.End = int64(bi.bounds.Stop)
req.Grouping = bi.readSpec.GroupKeys req.Group = convertGroupMode(bi.readSpec.GroupMode)
req.GroupKeys = bi.readSpec.GroupKeys
req.SeriesLimit = bi.readSpec.SeriesLimit req.SeriesLimit = bi.readSpec.SeriesLimit
req.PointsLimit = bi.readSpec.PointsLimit req.PointsLimit = bi.readSpec.PointsLimit
req.SeriesOffset = bi.readSpec.SeriesOffset req.SeriesOffset = bi.readSpec.SeriesOffset
req.Trace = bi.trace req.Trace = bi.trace
if req.PointsLimit == -1 {
req.Hints.SetNoPoints()
}
if agg, err := determineAggregateMethod(bi.readSpec.AggregateMethod); err != nil { if agg, err := determineAggregateMethod(bi.readSpec.AggregateMethod); err != nil {
return err return err
} else if agg != AggregateTypeNone { } else if agg != AggregateTypeNone {
req.Aggregate = &Aggregate{Type: agg} req.Aggregate = &Aggregate{Type: agg}
} }
isGrouping := req.Group != GroupAll
streams := make([]*streamState, 0, len(bi.conns)) streams := make([]*streamState, 0, len(bi.conns))
for _, c := range bi.conns { for _, c := range bi.conns {
if len(bi.readSpec.Hosts) > 0 { if len(bi.readSpec.Hosts) > 0 {
@ -126,12 +130,21 @@ func (bi *bockIterator) Do(f func(query.Block) error) error {
bounds: bi.bounds, bounds: bi.bounds,
stream: stream, stream: stream,
readSpec: &bi.readSpec, readSpec: &bi.readSpec,
group: isGrouping,
}) })
} }
ms := &mergedStreams{ ms := &mergedStreams{
streams: streams, streams: streams,
} }
if isGrouping {
return bi.handleGroupRead(f, ms)
}
return bi.handleRead(f, ms)
}
func (bi *bockIterator) handleRead(f func(query.Block) error, ms *mergedStreams) error {
for ms.more() { for ms.more() {
if p := ms.peek(); readFrameType(p) != seriesType { if p := ms.peek(); readFrameType(p) != seriesType {
//This means the consumer didn't read all the data off the block //This means the consumer didn't read all the data off the block
@ -141,8 +154,38 @@ func (bi *bockIterator) Do(f func(query.Block) error) error {
s := frame.GetSeries() s := frame.GetSeries()
typ := convertDataType(s.DataType) typ := convertDataType(s.DataType)
key := partitionKeyForSeries(s, &bi.readSpec, bi.bounds) key := partitionKeyForSeries(s, &bi.readSpec, bi.bounds)
cols := bi.determineBlockCols(s, typ) cols, defs := determineBlockColsForSeries(s, typ)
block := newBlock(bi.bounds, key, cols, ms, &bi.readSpec, s.Tags) block := newBlock(bi.bounds, key, cols, ms, &bi.readSpec, s.Tags, defs)
if err := f(block); err != nil {
// TODO(nathanielc): Close streams since we have abandoned the request
return err
}
// Wait until the block has been read.
block.wait()
}
return nil
}
func (bi *bockIterator) handleGroupRead(f func(query.Block) error, ms *mergedStreams) error {
for ms.more() {
if p := ms.peek(); readFrameType(p) != groupType {
//This means the consumer didn't read all the data off the block
return errors.New("internal error: short read")
}
frame := ms.next()
s := frame.GetGroup()
key := partitionKeyForGroup(s, &bi.readSpec, bi.bounds)
// try to infer type
// TODO(sgc): this is a hack
typ := query.TString
if p := ms.peek(); readFrameType(p) == seriesType {
typ = convertDataType(p.GetSeries().DataType)
}
cols, defs := determineBlockColsForGroup(s, typ)
block := newBlock(bi.bounds, key, cols, ms, &bi.readSpec, nil, defs)
if err := f(block); err != nil { if err := f(block); err != nil {
// TODO(nathanielc): Close streams since we have abandoned the request // TODO(nathanielc): Close streams since we have abandoned the request
@ -165,6 +208,22 @@ func determineAggregateMethod(agg string) (Aggregate_AggregateType, error) {
return 0, fmt.Errorf("unknown aggregate type %q", agg) return 0, fmt.Errorf("unknown aggregate type %q", agg)
} }
func convertGroupMode(m storage.GroupMode) ReadRequest_Group {
switch m {
case storage.GroupModeNone:
return GroupNone
case storage.GroupModeBy:
return GroupBy
case storage.GroupModeExcept:
return GroupExcept
case storage.GroupModeDefault, storage.GroupModeAll:
fallthrough
default:
return GroupAll
}
}
func convertDataType(t ReadResponse_DataType) query.DataType { func convertDataType(t ReadResponse_DataType) query.DataType {
switch t { switch t {
case DataTypeFloat: case DataTypeFloat:
@ -189,8 +248,9 @@ const (
valueColIdx = 3 valueColIdx = 3
) )
func (bi *bockIterator) determineBlockCols(s *ReadResponse_SeriesFrame, typ query.DataType) []query.ColMeta { func determineBlockColsForSeries(s *ReadResponse_SeriesFrame, typ query.DataType) ([]query.ColMeta, [][]byte) {
cols := make([]query.ColMeta, 4+len(s.Tags)) cols := make([]query.ColMeta, 4+len(s.Tags))
defs := make([][]byte, 4+len(s.Tags))
cols[startColIdx] = query.ColMeta{ cols[startColIdx] = query.ColMeta{
Label: execute.DefaultStartColLabel, Label: execute.DefaultStartColLabel,
Type: query.TTime, Type: query.TTime,
@ -212,8 +272,9 @@ func (bi *bockIterator) determineBlockCols(s *ReadResponse_SeriesFrame, typ quer
Label: string(tag.Key), Label: string(tag.Key),
Type: query.TString, Type: query.TString,
} }
defs[4+j] = []byte("")
} }
return cols return cols, defs
} }
func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey { func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey {
@ -229,37 +290,90 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp
Type: query.TTime, Type: query.TTime,
} }
values[1] = bnds.Stop values[1] = bnds.Stop
if len(readSpec.GroupKeys) > 0 { switch readSpec.GroupMode {
for _, tag := range s.Tags { case storage.GroupModeBy:
if !execute.ContainsStr(readSpec.GroupKeys, string(tag.Key)) { // partition key in GroupKeys order, including tags in the GroupKeys slice
continue for _, k := range readSpec.GroupKeys {
if i := indexOfTag(s.Tags, k); i < len(s.Tags) {
cols = append(cols, query.ColMeta{
Label: string(s.Tags[i].Key),
Type: query.TString,
})
values = append(values, string(s.Tags[i].Value))
} }
cols = append(cols, query.ColMeta{
Label: string(tag.Key),
Type: query.TString,
})
values = append(values, string(tag.Value))
} }
} else if len(readSpec.GroupExcept) > 0 { case storage.GroupModeExcept:
for _, tag := range s.Tags { // partition key in GroupKeys order, skipping tags in the GroupKeys slice
if !execute.ContainsStr(readSpec.GroupExcept, string(tag.Key)) { for _, k := range readSpec.GroupKeys {
continue if i := indexOfTag(s.Tags, k); i == len(s.Tags) {
cols = append(cols, query.ColMeta{
Label: string(s.Tags[i].Key),
Type: query.TString,
})
values = append(values, string(s.Tags[i].Value))
} }
}
case storage.GroupModeAll:
for i := range s.Tags {
cols = append(cols, query.ColMeta{ cols = append(cols, query.ColMeta{
Label: string(tag.Key), Label: string(s.Tags[i].Key),
Type: query.TString, Type: query.TString,
}) })
values = append(values, string(tag.Value)) values = append(values, string(s.Tags[i].Value))
}
} else if !readSpec.MergeAll {
for _, tag := range s.Tags {
cols = append(cols, query.ColMeta{
Label: string(tag.Key),
Type: query.TString,
})
values = append(values, string(tag.Value))
} }
}
return execute.NewPartitionKey(cols, values)
}
func determineBlockColsForGroup(f *ReadResponse_GroupFrame, typ query.DataType) ([]query.ColMeta, [][]byte) {
cols := make([]query.ColMeta, 4+len(f.TagKeys))
defs := make([][]byte, 4+len(f.TagKeys))
cols[startColIdx] = query.ColMeta{
Label: execute.DefaultStartColLabel,
Type: query.TTime,
}
cols[stopColIdx] = query.ColMeta{
Label: execute.DefaultStopColLabel,
Type: query.TTime,
}
cols[timeColIdx] = query.ColMeta{
Label: execute.DefaultTimeColLabel,
Type: query.TTime,
}
cols[valueColIdx] = query.ColMeta{
Label: execute.DefaultValueColLabel,
Type: typ,
}
for j, tag := range f.TagKeys {
cols[4+j] = query.ColMeta{
Label: string(tag),
Type: query.TString,
}
defs[4+j] = []byte("")
}
return cols, defs
}
func partitionKeyForGroup(g *ReadResponse_GroupFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey {
cols := make([]query.ColMeta, 2, len(readSpec.GroupKeys)+2)
values := make([]interface{}, 2, len(readSpec.GroupKeys)+2)
cols[0] = query.ColMeta{
Label: execute.DefaultStartColLabel,
Type: query.TTime,
}
values[0] = bnds.Start
cols[1] = query.ColMeta{
Label: execute.DefaultStopColLabel,
Type: query.TTime,
}
values[1] = bnds.Stop
for i := range readSpec.GroupKeys {
cols = append(cols, query.ColMeta{
Label: readSpec.GroupKeys[i],
Type: query.TString,
})
values = append(values, string(g.PartitionKeyVals[i]))
} }
return execute.NewPartitionKey(cols, values) return execute.NewPartitionKey(cols, values)
} }
@ -277,6 +391,7 @@ type block struct {
// cache of the tags on the current series. // cache of the tags on the current series.
// len(tags) == len(colMeta) // len(tags) == len(colMeta)
tags [][]byte tags [][]byte
defs [][]byte
readSpec *storage.ReadSpec readSpec *storage.ReadSpec
@ -309,11 +424,13 @@ func newBlock(
ms *mergedStreams, ms *mergedStreams,
readSpec *storage.ReadSpec, readSpec *storage.ReadSpec,
tags []Tag, tags []Tag,
defs [][]byte,
) *block { ) *block {
b := &block{ b := &block{
bounds: bounds, bounds: bounds,
key: key, key: key,
tags: make([][]byte, len(cols)), tags: make([][]byte, len(cols)),
defs: defs,
colBufs: make([]interface{}, len(cols)), colBufs: make([]interface{}, len(cols)),
cols: cols, cols: cols,
readSpec: readSpec, readSpec: readSpec,
@ -395,8 +512,13 @@ func (b *block) Times(j int) []execute.Time {
// readTags populates b.tags with the provided tags // readTags populates b.tags with the provided tags
func (b *block) readTags(tags []Tag) { func (b *block) readTags(tags []Tag) {
for j := range b.tags { for j := range b.tags {
b.tags[j] = nil b.tags[j] = b.defs[j]
} }
if len(tags) == 0 {
return
}
for _, t := range tags { for _, t := range tags {
k := string(t.Key) k := string(t.Key)
j := execute.ColIdx(k, b.cols) j := execute.ColIdx(k, b.cols)
@ -415,6 +537,8 @@ func (b *block) advance() bool {
b.floatBuf = b.floatBuf[0:0] b.floatBuf = b.floatBuf[0:0]
switch p := b.ms.peek(); readFrameType(p) { switch p := b.ms.peek(); readFrameType(p) {
case groupType:
return false
case seriesType: case seriesType:
if !b.ms.key().Equal(b.key) { if !b.ms.key().Equal(b.key) {
// We have reached the end of data for this block // We have reached the end of data for this block
@ -657,6 +781,7 @@ type streamState struct {
currentKey query.PartitionKey currentKey query.PartitionKey
readSpec *storage.ReadSpec readSpec *storage.ReadSpec
finished bool finished bool
group bool
} }
func (s *streamState) peek() ReadResponse_Frame { func (s *streamState) peek() ReadResponse_Frame {
@ -692,11 +817,21 @@ func (s *streamState) key() query.PartitionKey {
func (s *streamState) computeKey() { func (s *streamState) computeKey() {
// Determine new currentKey // Determine new currentKey
if p := s.peek(); readFrameType(p) == seriesType { p := s.peek()
series := p.GetSeries() ft := readFrameType(p)
s.currentKey = partitionKeyForSeries(series, s.readSpec, s.bounds) if s.group {
if ft == groupType {
group := p.GetGroup()
s.currentKey = partitionKeyForGroup(group, s.readSpec, s.bounds)
}
} else {
if ft == seriesType {
series := p.GetSeries()
s.currentKey = partitionKeyForSeries(series, s.readSpec, s.bounds)
}
} }
} }
func (s *streamState) next() ReadResponse_Frame { func (s *streamState) next() ReadResponse_Frame {
frame := s.rep.Frames[0] frame := s.rep.Frames[0]
s.rep.Frames = s.rep.Frames[1:] s.rep.Frames = s.rep.Frames[1:]
@ -779,6 +914,7 @@ type frameType int
const ( const (
seriesType frameType = iota seriesType frameType = iota
groupType
boolPointsType boolPointsType
intPointsType intPointsType
uintPointsType uintPointsType
@ -790,6 +926,8 @@ func readFrameType(frame ReadResponse_Frame) frameType {
switch frame.Data.(type) { switch frame.Data.(type) {
case *ReadResponse_Frame_Series: case *ReadResponse_Frame_Series:
return seriesType return seriesType
case *ReadResponse_Frame_Group:
return groupType
case *ReadResponse_Frame_BooleanPoints: case *ReadResponse_Frame_BooleanPoints:
return boolPointsType return boolPointsType
case *ReadResponse_Frame_IntegerPoints: case *ReadResponse_Frame_IntegerPoints:

View File

@ -0,0 +1,52 @@
package pb
import (
"strings"
"github.com/gogo/protobuf/proto"
"sort"
)
type HintFlags uint32
func (h HintFlags) NoPoints() bool {
return uint32(h)&uint32(HintNoPoints) != 0
}
func (h *HintFlags) SetNoPoints() {
*h |= HintFlags(HintNoPoints)
}
func (h HintFlags) NoSeries() bool {
return uint32(h)&uint32(HintNoSeries) != 0
}
func (h *HintFlags) SetNoSeries() {
*h |= HintFlags(HintNoSeries)
}
func (h HintFlags) String() string {
f := uint32(h)
var s []string
enums := proto.EnumValueMap("com.github.influxdata.influxdb.services.storage.ReadRequest_HintFlags")
if h == 0 {
return "HINT_NONE"
}
for k, v := range enums {
if v == 0 {
continue
}
v := uint32(v)
if f&v == v {
s = append(s, k)
}
}
return strings.Join(s, ",")
}
func indexOfTag(t []Tag, k string) int {
return sort.Search(len(t), func(i int) bool { return string(t[i].Key) >= k })
}

View File

@ -41,6 +41,65 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type ReadRequest_Group int32
const (
// GroupNone returns all series as a single group.
// The single GroupFrame.TagKeys will be the union of all tag keys.
GroupNone ReadRequest_Group = 0
// GroupAll returns a unique group for each series.
// As an optimization, no GroupFrames will be generated.
GroupAll ReadRequest_Group = 1
// GroupBy returns a group for each unique value of the specified GroupKeys.
GroupBy ReadRequest_Group = 2
// GroupExcept in not implemented.
GroupExcept ReadRequest_Group = 3
)
var ReadRequest_Group_name = map[int32]string{
0: "GROUP_NONE",
1: "GROUP_ALL",
2: "GROUP_BY",
3: "GROUP_EXCEPT",
}
var ReadRequest_Group_value = map[string]int32{
"GROUP_NONE": 0,
"GROUP_ALL": 1,
"GROUP_BY": 2,
"GROUP_EXCEPT": 3,
}
func (x ReadRequest_Group) String() string {
return proto.EnumName(ReadRequest_Group_name, int32(x))
}
func (ReadRequest_Group) EnumDescriptor() ([]byte, []int) { return fileDescriptorStorage, []int{0, 0} }
type ReadRequest_HintFlags int32
const (
HintNone ReadRequest_HintFlags = 0
HintNoPoints ReadRequest_HintFlags = 1
HintNoSeries ReadRequest_HintFlags = 2
)
var ReadRequest_HintFlags_name = map[int32]string{
0: "HINT_NONE",
1: "HINT_NO_POINTS",
2: "HINT_NO_SERIES",
}
var ReadRequest_HintFlags_value = map[string]int32{
"HINT_NONE": 0,
"HINT_NO_POINTS": 1,
"HINT_NO_SERIES": 2,
}
func (x ReadRequest_HintFlags) String() string {
return proto.EnumName(ReadRequest_HintFlags_name, int32(x))
}
func (ReadRequest_HintFlags) EnumDescriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{0, 1}
}
type Aggregate_AggregateType int32 type Aggregate_AggregateType int32
const ( const (
@ -129,8 +188,11 @@ type ReadRequest struct {
TimestampRange TimestampRange `protobuf:"bytes,2,opt,name=timestamp_range,json=timestampRange" json:"timestamp_range"` TimestampRange TimestampRange `protobuf:"bytes,2,opt,name=timestamp_range,json=timestampRange" json:"timestamp_range"`
// Descending indicates whether points should be returned in descending order. // Descending indicates whether points should be returned in descending order.
Descending bool `protobuf:"varint,3,opt,name=descending,proto3" json:"descending,omitempty"` Descending bool `protobuf:"varint,3,opt,name=descending,proto3" json:"descending,omitempty"`
// Grouping specifies a list of tags used to order the data // GroupKeys specifies a list of tag keys used to order the data. It is dependent on the Group property to determine
Grouping []string `protobuf:"bytes,4,rep,name=grouping" json:"grouping,omitempty"` // its behavior.
GroupKeys []string `protobuf:"bytes,4,rep,name=group_keys,json=groupKeys" json:"group_keys,omitempty"`
//
Group ReadRequest_Group `protobuf:"varint,11,opt,name=group,proto3,enum=com.github.influxdata.influxdb.services.storage.ReadRequest_Group" json:"group,omitempty"`
// Aggregate specifies an optional aggregate to apply to the data. // Aggregate specifies an optional aggregate to apply to the data.
// TODO(sgc): switch to slice for multiple aggregates in a single request // TODO(sgc): switch to slice for multiple aggregates in a single request
Aggregate *Aggregate `protobuf:"bytes,9,opt,name=aggregate" json:"aggregate,omitempty"` Aggregate *Aggregate `protobuf:"bytes,9,opt,name=aggregate" json:"aggregate,omitempty"`
@ -144,6 +206,9 @@ type ReadRequest struct {
PointsLimit int64 `protobuf:"varint,8,opt,name=points_limit,json=pointsLimit,proto3" json:"points_limit,omitempty"` PointsLimit int64 `protobuf:"varint,8,opt,name=points_limit,json=pointsLimit,proto3" json:"points_limit,omitempty"`
// Trace contains opaque data if a trace is active. // Trace contains opaque data if a trace is active.
Trace map[string]string `protobuf:"bytes,10,rep,name=trace" json:"trace,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Trace map[string]string `protobuf:"bytes,10,rep,name=trace" json:"trace,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// Hints is a bitwise OR of HintFlags to control the behavior
// of the read request.
Hints HintFlags `protobuf:"fixed32,12,opt,name=hints,proto3,casttype=HintFlags" json:"hints,omitempty"`
} }
func (m *ReadRequest) Reset() { *m = ReadRequest{} } func (m *ReadRequest) Reset() { *m = ReadRequest{} }
@ -152,7 +217,7 @@ func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptorStorage, []int{0} } func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptorStorage, []int{0} }
type Aggregate struct { type Aggregate struct {
Type Aggregate_AggregateType `protobuf:"varint,1,opt,name=type,proto3,enum=storage.Aggregate_AggregateType" json:"type,omitempty"` Type Aggregate_AggregateType `protobuf:"varint,1,opt,name=type,proto3,enum=com.github.influxdata.influxdb.services.storage.Aggregate_AggregateType" json:"type,omitempty"`
} }
func (m *Aggregate) Reset() { *m = Aggregate{} } func (m *Aggregate) Reset() { *m = Aggregate{} }
@ -182,6 +247,7 @@ func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptorStorage
type ReadResponse_Frame struct { type ReadResponse_Frame struct {
// Types that are valid to be assigned to Data: // Types that are valid to be assigned to Data:
// *ReadResponse_Frame_Group
// *ReadResponse_Frame_Series // *ReadResponse_Frame_Series
// *ReadResponse_Frame_FloatPoints // *ReadResponse_Frame_FloatPoints
// *ReadResponse_Frame_IntegerPoints // *ReadResponse_Frame_IntegerPoints
@ -202,6 +268,9 @@ type isReadResponse_Frame_Data interface {
Size() int Size() int
} }
type ReadResponse_Frame_Group struct {
Group *ReadResponse_GroupFrame `protobuf:"bytes,7,opt,name=group,oneof"`
}
type ReadResponse_Frame_Series struct { type ReadResponse_Frame_Series struct {
Series *ReadResponse_SeriesFrame `protobuf:"bytes,1,opt,name=series,oneof"` Series *ReadResponse_SeriesFrame `protobuf:"bytes,1,opt,name=series,oneof"`
} }
@ -221,6 +290,7 @@ type ReadResponse_Frame_StringPoints struct {
StringPoints *ReadResponse_StringPointsFrame `protobuf:"bytes,6,opt,name=string_points,json=stringPoints,oneof"` StringPoints *ReadResponse_StringPointsFrame `protobuf:"bytes,6,opt,name=string_points,json=stringPoints,oneof"`
} }
func (*ReadResponse_Frame_Group) isReadResponse_Frame_Data() {}
func (*ReadResponse_Frame_Series) isReadResponse_Frame_Data() {} func (*ReadResponse_Frame_Series) isReadResponse_Frame_Data() {}
func (*ReadResponse_Frame_FloatPoints) isReadResponse_Frame_Data() {} func (*ReadResponse_Frame_FloatPoints) isReadResponse_Frame_Data() {}
func (*ReadResponse_Frame_IntegerPoints) isReadResponse_Frame_Data() {} func (*ReadResponse_Frame_IntegerPoints) isReadResponse_Frame_Data() {}
@ -235,6 +305,13 @@ func (m *ReadResponse_Frame) GetData() isReadResponse_Frame_Data {
return nil return nil
} }
func (m *ReadResponse_Frame) GetGroup() *ReadResponse_GroupFrame {
if x, ok := m.GetData().(*ReadResponse_Frame_Group); ok {
return x.Group
}
return nil
}
func (m *ReadResponse_Frame) GetSeries() *ReadResponse_SeriesFrame { func (m *ReadResponse_Frame) GetSeries() *ReadResponse_SeriesFrame {
if x, ok := m.GetData().(*ReadResponse_Frame_Series); ok { if x, ok := m.GetData().(*ReadResponse_Frame_Series); ok {
return x.Series return x.Series
@ -280,6 +357,7 @@ func (m *ReadResponse_Frame) GetStringPoints() *ReadResponse_StringPointsFrame {
// XXX_OneofFuncs is for the internal use of the proto package. // XXX_OneofFuncs is for the internal use of the proto package.
func (*ReadResponse_Frame) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { func (*ReadResponse_Frame) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _ReadResponse_Frame_OneofMarshaler, _ReadResponse_Frame_OneofUnmarshaler, _ReadResponse_Frame_OneofSizer, []interface{}{ return _ReadResponse_Frame_OneofMarshaler, _ReadResponse_Frame_OneofUnmarshaler, _ReadResponse_Frame_OneofSizer, []interface{}{
(*ReadResponse_Frame_Group)(nil),
(*ReadResponse_Frame_Series)(nil), (*ReadResponse_Frame_Series)(nil),
(*ReadResponse_Frame_FloatPoints)(nil), (*ReadResponse_Frame_FloatPoints)(nil),
(*ReadResponse_Frame_IntegerPoints)(nil), (*ReadResponse_Frame_IntegerPoints)(nil),
@ -293,6 +371,11 @@ func _ReadResponse_Frame_OneofMarshaler(msg proto.Message, b *proto.Buffer) erro
m := msg.(*ReadResponse_Frame) m := msg.(*ReadResponse_Frame)
// data // data
switch x := m.Data.(type) { switch x := m.Data.(type) {
case *ReadResponse_Frame_Group:
_ = b.EncodeVarint(7<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.Group); err != nil {
return err
}
case *ReadResponse_Frame_Series: case *ReadResponse_Frame_Series:
_ = b.EncodeVarint(1<<3 | proto.WireBytes) _ = b.EncodeVarint(1<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.Series); err != nil { if err := b.EncodeMessage(x.Series); err != nil {
@ -333,6 +416,14 @@ func _ReadResponse_Frame_OneofMarshaler(msg proto.Message, b *proto.Buffer) erro
func _ReadResponse_Frame_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { func _ReadResponse_Frame_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*ReadResponse_Frame) m := msg.(*ReadResponse_Frame)
switch tag { switch tag {
case 7: // data.group
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(ReadResponse_GroupFrame)
err := b.DecodeMessage(msg)
m.Data = &ReadResponse_Frame_Group{msg}
return true, err
case 1: // data.series case 1: // data.series
if wire != proto.WireBytes { if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType return true, proto.ErrInternalBadWireType
@ -390,6 +481,11 @@ func _ReadResponse_Frame_OneofSizer(msg proto.Message) (n int) {
m := msg.(*ReadResponse_Frame) m := msg.(*ReadResponse_Frame)
// data // data
switch x := m.Data.(type) { switch x := m.Data.(type) {
case *ReadResponse_Frame_Group:
s := proto.Size(x.Group)
n += proto.SizeVarint(7<<3 | proto.WireBytes)
n += proto.SizeVarint(uint64(s))
n += s
case *ReadResponse_Frame_Series: case *ReadResponse_Frame_Series:
s := proto.Size(x.Series) s := proto.Size(x.Series)
n += proto.SizeVarint(1<<3 | proto.WireBytes) n += proto.SizeVarint(1<<3 | proto.WireBytes)
@ -427,16 +523,30 @@ func _ReadResponse_Frame_OneofSizer(msg proto.Message) (n int) {
return n return n
} }
type ReadResponse_GroupFrame struct {
// TagKeys
TagKeys [][]byte `protobuf:"bytes,1,rep,name=tag_keys,json=tagKeys" json:"tag_keys,omitempty"`
// PartitionKeyVals is the values of the partition key for this group, order matching ReadRequest.GroupKeys
PartitionKeyVals [][]byte `protobuf:"bytes,2,rep,name=partition_key_vals,json=partitionKeyVals" json:"partition_key_vals,omitempty"`
}
func (m *ReadResponse_GroupFrame) Reset() { *m = ReadResponse_GroupFrame{} }
func (m *ReadResponse_GroupFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_GroupFrame) ProtoMessage() {}
func (*ReadResponse_GroupFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 1}
}
type ReadResponse_SeriesFrame struct { type ReadResponse_SeriesFrame struct {
Tags []Tag `protobuf:"bytes,1,rep,name=tags" json:"tags"` Tags []Tag `protobuf:"bytes,1,rep,name=tags" json:"tags"`
DataType ReadResponse_DataType `protobuf:"varint,2,opt,name=data_type,json=dataType,proto3,enum=storage.ReadResponse_DataType" json:"data_type,omitempty"` DataType ReadResponse_DataType `protobuf:"varint,2,opt,name=data_type,json=dataType,proto3,enum=com.github.influxdata.influxdb.services.storage.ReadResponse_DataType" json:"data_type,omitempty"`
} }
func (m *ReadResponse_SeriesFrame) Reset() { *m = ReadResponse_SeriesFrame{} } func (m *ReadResponse_SeriesFrame) Reset() { *m = ReadResponse_SeriesFrame{} }
func (m *ReadResponse_SeriesFrame) String() string { return proto.CompactTextString(m) } func (m *ReadResponse_SeriesFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_SeriesFrame) ProtoMessage() {} func (*ReadResponse_SeriesFrame) ProtoMessage() {}
func (*ReadResponse_SeriesFrame) Descriptor() ([]byte, []int) { func (*ReadResponse_SeriesFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 1} return fileDescriptorStorage, []int{3, 2}
} }
type ReadResponse_FloatPointsFrame struct { type ReadResponse_FloatPointsFrame struct {
@ -448,7 +558,7 @@ func (m *ReadResponse_FloatPointsFrame) Reset() { *m = ReadResponse_Floa
func (m *ReadResponse_FloatPointsFrame) String() string { return proto.CompactTextString(m) } func (m *ReadResponse_FloatPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_FloatPointsFrame) ProtoMessage() {} func (*ReadResponse_FloatPointsFrame) ProtoMessage() {}
func (*ReadResponse_FloatPointsFrame) Descriptor() ([]byte, []int) { func (*ReadResponse_FloatPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 2} return fileDescriptorStorage, []int{3, 3}
} }
type ReadResponse_IntegerPointsFrame struct { type ReadResponse_IntegerPointsFrame struct {
@ -460,7 +570,7 @@ func (m *ReadResponse_IntegerPointsFrame) Reset() { *m = ReadResponse_In
func (m *ReadResponse_IntegerPointsFrame) String() string { return proto.CompactTextString(m) } func (m *ReadResponse_IntegerPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_IntegerPointsFrame) ProtoMessage() {} func (*ReadResponse_IntegerPointsFrame) ProtoMessage() {}
func (*ReadResponse_IntegerPointsFrame) Descriptor() ([]byte, []int) { func (*ReadResponse_IntegerPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 3} return fileDescriptorStorage, []int{3, 4}
} }
type ReadResponse_UnsignedPointsFrame struct { type ReadResponse_UnsignedPointsFrame struct {
@ -472,7 +582,7 @@ func (m *ReadResponse_UnsignedPointsFrame) Reset() { *m = ReadResponse_U
func (m *ReadResponse_UnsignedPointsFrame) String() string { return proto.CompactTextString(m) } func (m *ReadResponse_UnsignedPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_UnsignedPointsFrame) ProtoMessage() {} func (*ReadResponse_UnsignedPointsFrame) ProtoMessage() {}
func (*ReadResponse_UnsignedPointsFrame) Descriptor() ([]byte, []int) { func (*ReadResponse_UnsignedPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 4} return fileDescriptorStorage, []int{3, 5}
} }
type ReadResponse_BooleanPointsFrame struct { type ReadResponse_BooleanPointsFrame struct {
@ -484,7 +594,7 @@ func (m *ReadResponse_BooleanPointsFrame) Reset() { *m = ReadResponse_Bo
func (m *ReadResponse_BooleanPointsFrame) String() string { return proto.CompactTextString(m) } func (m *ReadResponse_BooleanPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_BooleanPointsFrame) ProtoMessage() {} func (*ReadResponse_BooleanPointsFrame) ProtoMessage() {}
func (*ReadResponse_BooleanPointsFrame) Descriptor() ([]byte, []int) { func (*ReadResponse_BooleanPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 5} return fileDescriptorStorage, []int{3, 6}
} }
type ReadResponse_StringPointsFrame struct { type ReadResponse_StringPointsFrame struct {
@ -496,7 +606,7 @@ func (m *ReadResponse_StringPointsFrame) Reset() { *m = ReadResponse_Str
func (m *ReadResponse_StringPointsFrame) String() string { return proto.CompactTextString(m) } func (m *ReadResponse_StringPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_StringPointsFrame) ProtoMessage() {} func (*ReadResponse_StringPointsFrame) ProtoMessage() {}
func (*ReadResponse_StringPointsFrame) Descriptor() ([]byte, []int) { func (*ReadResponse_StringPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 6} return fileDescriptorStorage, []int{3, 7}
} }
type CapabilitiesResponse struct { type CapabilitiesResponse struct {
@ -530,23 +640,26 @@ func (*TimestampRange) ProtoMessage() {}
func (*TimestampRange) Descriptor() ([]byte, []int) { return fileDescriptorStorage, []int{6} } func (*TimestampRange) Descriptor() ([]byte, []int) { return fileDescriptorStorage, []int{6} }
func init() { func init() {
proto.RegisterType((*ReadRequest)(nil), "storage.ReadRequest") proto.RegisterType((*ReadRequest)(nil), "com.github.influxdata.influxdb.services.storage.ReadRequest")
proto.RegisterType((*Aggregate)(nil), "storage.Aggregate") proto.RegisterType((*Aggregate)(nil), "com.github.influxdata.influxdb.services.storage.Aggregate")
proto.RegisterType((*Tag)(nil), "storage.Tag") proto.RegisterType((*Tag)(nil), "com.github.influxdata.influxdb.services.storage.Tag")
proto.RegisterType((*ReadResponse)(nil), "storage.ReadResponse") proto.RegisterType((*ReadResponse)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse")
proto.RegisterType((*ReadResponse_Frame)(nil), "storage.ReadResponse.Frame") proto.RegisterType((*ReadResponse_Frame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.Frame")
proto.RegisterType((*ReadResponse_SeriesFrame)(nil), "storage.ReadResponse.SeriesFrame") proto.RegisterType((*ReadResponse_GroupFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.GroupFrame")
proto.RegisterType((*ReadResponse_FloatPointsFrame)(nil), "storage.ReadResponse.FloatPointsFrame") proto.RegisterType((*ReadResponse_SeriesFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.SeriesFrame")
proto.RegisterType((*ReadResponse_IntegerPointsFrame)(nil), "storage.ReadResponse.IntegerPointsFrame") proto.RegisterType((*ReadResponse_FloatPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.FloatPointsFrame")
proto.RegisterType((*ReadResponse_UnsignedPointsFrame)(nil), "storage.ReadResponse.UnsignedPointsFrame") proto.RegisterType((*ReadResponse_IntegerPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.IntegerPointsFrame")
proto.RegisterType((*ReadResponse_BooleanPointsFrame)(nil), "storage.ReadResponse.BooleanPointsFrame") proto.RegisterType((*ReadResponse_UnsignedPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.UnsignedPointsFrame")
proto.RegisterType((*ReadResponse_StringPointsFrame)(nil), "storage.ReadResponse.StringPointsFrame") proto.RegisterType((*ReadResponse_BooleanPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.BooleanPointsFrame")
proto.RegisterType((*CapabilitiesResponse)(nil), "storage.CapabilitiesResponse") proto.RegisterType((*ReadResponse_StringPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.StringPointsFrame")
proto.RegisterType((*HintsResponse)(nil), "storage.HintsResponse") proto.RegisterType((*CapabilitiesResponse)(nil), "com.github.influxdata.influxdb.services.storage.CapabilitiesResponse")
proto.RegisterType((*TimestampRange)(nil), "storage.TimestampRange") proto.RegisterType((*HintsResponse)(nil), "com.github.influxdata.influxdb.services.storage.HintsResponse")
proto.RegisterEnum("storage.Aggregate_AggregateType", Aggregate_AggregateType_name, Aggregate_AggregateType_value) proto.RegisterType((*TimestampRange)(nil), "com.github.influxdata.influxdb.services.storage.TimestampRange")
proto.RegisterEnum("storage.ReadResponse_FrameType", ReadResponse_FrameType_name, ReadResponse_FrameType_value) proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.ReadRequest_Group", ReadRequest_Group_name, ReadRequest_Group_value)
proto.RegisterEnum("storage.ReadResponse_DataType", ReadResponse_DataType_name, ReadResponse_DataType_value) proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.ReadRequest_HintFlags", ReadRequest_HintFlags_name, ReadRequest_HintFlags_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.Aggregate_AggregateType", Aggregate_AggregateType_name, Aggregate_AggregateType_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.ReadResponse_FrameType", ReadResponse_FrameType_name, ReadResponse_FrameType_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.ReadResponse_DataType", ReadResponse_DataType_name, ReadResponse_DataType_value)
} }
func (m *ReadRequest) Marshal() (dAtA []byte, err error) { func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
@ -587,8 +700,8 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
} }
i++ i++
} }
if len(m.Grouping) > 0 { if len(m.GroupKeys) > 0 {
for _, s := range m.Grouping { for _, s := range m.GroupKeys {
dAtA[i] = 0x22 dAtA[i] = 0x22
i++ i++
l = len(s) l = len(s)
@ -654,6 +767,16 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
i += copy(dAtA[i:], v) i += copy(dAtA[i:], v)
} }
} }
if m.Group != 0 {
dAtA[i] = 0x58
i++
i = encodeVarintStorage(dAtA, i, uint64(m.Group))
}
if m.Hints != 0 {
dAtA[i] = 0x65
i++
i = encodeFixed32Storage(dAtA, i, uint32(m.Hints))
}
return i, nil return i, nil
} }
@ -849,6 +972,54 @@ func (m *ReadResponse_Frame_StringPoints) MarshalTo(dAtA []byte) (int, error) {
} }
return i, nil return i, nil
} }
func (m *ReadResponse_Frame_Group) MarshalTo(dAtA []byte) (int, error) {
i := 0
if m.Group != nil {
dAtA[i] = 0x3a
i++
i = encodeVarintStorage(dAtA, i, uint64(m.Group.Size()))
n11, err := m.Group.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n11
}
return i, nil
}
func (m *ReadResponse_GroupFrame) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReadResponse_GroupFrame) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.TagKeys) > 0 {
for _, b := range m.TagKeys {
dAtA[i] = 0xa
i++
i = encodeVarintStorage(dAtA, i, uint64(len(b)))
i += copy(dAtA[i:], b)
}
}
if len(m.PartitionKeyVals) > 0 {
for _, b := range m.PartitionKeyVals {
dAtA[i] = 0x12
i++
i = encodeVarintStorage(dAtA, i, uint64(len(b)))
i += copy(dAtA[i:], b)
}
}
return i, nil
}
func (m *ReadResponse_SeriesFrame) Marshal() (dAtA []byte, err error) { func (m *ReadResponse_SeriesFrame) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
@ -927,22 +1098,22 @@ func (m *ReadResponse_FloatPointsFrame) MarshalTo(dAtA []byte) (int, error) {
i++ i++
i = encodeVarintStorage(dAtA, i, uint64(len(m.Values)*8)) i = encodeVarintStorage(dAtA, i, uint64(len(m.Values)*8))
for _, num := range m.Values { for _, num := range m.Values {
f11 := math.Float64bits(float64(num)) f12 := math.Float64bits(float64(num))
dAtA[i] = uint8(f11) dAtA[i] = uint8(f12)
i++ i++
dAtA[i] = uint8(f11 >> 8) dAtA[i] = uint8(f12 >> 8)
i++ i++
dAtA[i] = uint8(f11 >> 16) dAtA[i] = uint8(f12 >> 16)
i++ i++
dAtA[i] = uint8(f11 >> 24) dAtA[i] = uint8(f12 >> 24)
i++ i++
dAtA[i] = uint8(f11 >> 32) dAtA[i] = uint8(f12 >> 32)
i++ i++
dAtA[i] = uint8(f11 >> 40) dAtA[i] = uint8(f12 >> 40)
i++ i++
dAtA[i] = uint8(f11 >> 48) dAtA[i] = uint8(f12 >> 48)
i++ i++
dAtA[i] = uint8(f11 >> 56) dAtA[i] = uint8(f12 >> 56)
i++ i++
} }
} }
@ -988,22 +1159,22 @@ func (m *ReadResponse_IntegerPointsFrame) MarshalTo(dAtA []byte) (int, error) {
} }
} }
if len(m.Values) > 0 { if len(m.Values) > 0 {
dAtA13 := make([]byte, len(m.Values)*10) dAtA14 := make([]byte, len(m.Values)*10)
var j12 int var j13 int
for _, num1 := range m.Values { for _, num1 := range m.Values {
num := uint64(num1) num := uint64(num1)
for num >= 1<<7 { for num >= 1<<7 {
dAtA13[j12] = uint8(uint64(num)&0x7f | 0x80) dAtA14[j13] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7 num >>= 7
j12++ j13++
} }
dAtA13[j12] = uint8(num) dAtA14[j13] = uint8(num)
j12++ j13++
} }
dAtA[i] = 0x12 dAtA[i] = 0x12
i++ i++
i = encodeVarintStorage(dAtA, i, uint64(j12)) i = encodeVarintStorage(dAtA, i, uint64(j13))
i += copy(dAtA[i:], dAtA13[:j12]) i += copy(dAtA[i:], dAtA14[:j13])
} }
return i, nil return i, nil
} }
@ -1047,21 +1218,21 @@ func (m *ReadResponse_UnsignedPointsFrame) MarshalTo(dAtA []byte) (int, error) {
} }
} }
if len(m.Values) > 0 { if len(m.Values) > 0 {
dAtA15 := make([]byte, len(m.Values)*10) dAtA16 := make([]byte, len(m.Values)*10)
var j14 int var j15 int
for _, num := range m.Values { for _, num := range m.Values {
for num >= 1<<7 { for num >= 1<<7 {
dAtA15[j14] = uint8(uint64(num)&0x7f | 0x80) dAtA16[j15] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7 num >>= 7
j14++ j15++
} }
dAtA15[j14] = uint8(num) dAtA16[j15] = uint8(num)
j14++ j15++
} }
dAtA[i] = 0x12 dAtA[i] = 0x12
i++ i++
i = encodeVarintStorage(dAtA, i, uint64(j14)) i = encodeVarintStorage(dAtA, i, uint64(j15))
i += copy(dAtA[i:], dAtA15[:j14]) i += copy(dAtA[i:], dAtA16[:j15])
} }
return i, nil return i, nil
} }
@ -1296,8 +1467,8 @@ func (m *ReadRequest) Size() (n int) {
if m.Descending { if m.Descending {
n += 2 n += 2
} }
if len(m.Grouping) > 0 { if len(m.GroupKeys) > 0 {
for _, s := range m.Grouping { for _, s := range m.GroupKeys {
l = len(s) l = len(s)
n += 1 + l + sovStorage(uint64(l)) n += 1 + l + sovStorage(uint64(l))
} }
@ -1327,6 +1498,12 @@ func (m *ReadRequest) Size() (n int) {
n += mapEntrySize + 1 + sovStorage(uint64(mapEntrySize)) n += mapEntrySize + 1 + sovStorage(uint64(mapEntrySize))
} }
} }
if m.Group != 0 {
n += 1 + sovStorage(uint64(m.Group))
}
if m.Hints != 0 {
n += 5
}
return n return n
} }
@ -1428,6 +1605,33 @@ func (m *ReadResponse_Frame_StringPoints) Size() (n int) {
} }
return n return n
} }
func (m *ReadResponse_Frame_Group) Size() (n int) {
var l int
_ = l
if m.Group != nil {
l = m.Group.Size()
n += 1 + l + sovStorage(uint64(l))
}
return n
}
func (m *ReadResponse_GroupFrame) Size() (n int) {
var l int
_ = l
if len(m.TagKeys) > 0 {
for _, b := range m.TagKeys {
l = len(b)
n += 1 + l + sovStorage(uint64(l))
}
}
if len(m.PartitionKeyVals) > 0 {
for _, b := range m.PartitionKeyVals {
l = len(b)
n += 1 + l + sovStorage(uint64(l))
}
}
return n
}
func (m *ReadResponse_SeriesFrame) Size() (n int) { func (m *ReadResponse_SeriesFrame) Size() (n int) {
var l int var l int
_ = l _ = l
@ -1669,7 +1873,7 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error {
m.Descending = bool(v != 0) m.Descending = bool(v != 0)
case 4: case 4:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Grouping", wireType) return fmt.Errorf("proto: wrong wireType = %d for field GroupKeys", wireType)
} }
var stringLen uint64 var stringLen uint64
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
@ -1694,7 +1898,7 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Grouping = append(m.Grouping, string(dAtA[iNdEx:postIndex])) m.GroupKeys = append(m.GroupKeys, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex iNdEx = postIndex
case 5: case 5:
if wireType != 2 { if wireType != 2 {
@ -1937,6 +2141,38 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error {
} }
m.Trace[mapkey] = mapvalue m.Trace[mapkey] = mapvalue
iNdEx = postIndex iNdEx = postIndex
case 11:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Group", wireType)
}
m.Group = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Group |= (ReadRequest_Group(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 12:
if wireType != 5 {
return fmt.Errorf("proto: wrong wireType = %d for field Hints", wireType)
}
m.Hints = 0
if (iNdEx + 4) > l {
return io.ErrUnexpectedEOF
}
iNdEx += 4
m.Hints = HintFlags(dAtA[iNdEx-4])
m.Hints |= HintFlags(dAtA[iNdEx-3]) << 8
m.Hints |= HintFlags(dAtA[iNdEx-2]) << 16
m.Hints |= HintFlags(dAtA[iNdEx-1]) << 24
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipStorage(dAtA[iNdEx:]) skippy, err := skipStorage(dAtA[iNdEx:])
@ -2441,6 +2677,146 @@ func (m *ReadResponse_Frame) Unmarshal(dAtA []byte) error {
} }
m.Data = &ReadResponse_Frame_StringPoints{v} m.Data = &ReadResponse_Frame_StringPoints{v}
iNdEx = postIndex iNdEx = postIndex
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Group", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthStorage
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &ReadResponse_GroupFrame{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Data = &ReadResponse_Frame_Group{v}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipStorage(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthStorage
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ReadResponse_GroupFrame) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GroupFrame: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GroupFrame: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TagKeys", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthStorage
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.TagKeys = append(m.TagKeys, make([]byte, postIndex-iNdEx))
copy(m.TagKeys[len(m.TagKeys)-1], dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PartitionKeyVals", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthStorage
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.PartitionKeyVals = append(m.PartitionKeyVals, make([]byte, postIndex-iNdEx))
copy(m.PartitionKeyVals[len(m.PartitionKeyVals)-1], dAtA[iNdEx:postIndex])
iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipStorage(dAtA[iNdEx:]) skippy, err := skipStorage(dAtA[iNdEx:])
@ -3791,81 +4167,101 @@ var (
func init() { proto.RegisterFile("storage.proto", fileDescriptorStorage) } func init() { proto.RegisterFile("storage.proto", fileDescriptorStorage) }
var fileDescriptorStorage = []byte{ var fileDescriptorStorage = []byte{
// 1206 bytes of a gzipped FileDescriptorProto // 1530 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x41, 0x8f, 0xdb, 0x44, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x57, 0x4f, 0x8f, 0x1a, 0xc9,
0x14, 0xb6, 0xd7, 0x4e, 0x76, 0xf3, 0x92, 0xec, 0x7a, 0xa7, 0xdb, 0x25, 0x72, 0x69, 0xe2, 0xe6, 0x15, 0xa7, 0xf9, 0xcf, 0xe3, 0x5f, 0x4f, 0x79, 0x32, 0x22, 0xed, 0x18, 0xda, 0x28, 0x8a, 0x38,
0x50, 0xc2, 0xa1, 0x69, 0x15, 0x40, 0x14, 0x2a, 0x24, 0x9a, 0x36, 0xed, 0x2e, 0xdd, 0x26, 0xd5, 0xd8, 0x4c, 0x44, 0x12, 0xc5, 0xb2, 0xf2, 0x47, 0xc3, 0x98, 0x19, 0x88, 0xc7, 0x30, 0x29, 0x98,
0x24, 0x2b, 0x71, 0x40, 0x5a, 0x26, 0x9b, 0x89, 0x6b, 0x91, 0xd8, 0xc6, 0x9e, 0xa0, 0xee, 0x8d, 0xc8, 0x89, 0x22, 0x91, 0x82, 0xa9, 0x69, 0xb7, 0x0c, 0xdd, 0x9d, 0xee, 0xc6, 0x1a, 0x72, 0xf2,
0x23, 0x5a, 0x71, 0xe0, 0xc0, 0x35, 0x27, 0x7e, 0x03, 0x5c, 0x90, 0x38, 0x70, 0xea, 0x91, 0x23, 0x21, 0x52, 0x2c, 0xb2, 0x07, 0x5f, 0xf7, 0xc0, 0x69, 0xbf, 0xc2, 0xee, 0x7e, 0x80, 0x3d, 0xf9,
0xa7, 0x08, 0xc2, 0x1f, 0x41, 0x33, 0x63, 0x3b, 0xf6, 0x6e, 0x5a, 0x69, 0x2f, 0xd1, 0xbc, 0xf7, 0xb8, 0x9f, 0x00, 0xed, 0xe2, 0x4f, 0xb1, 0x7b, 0xd9, 0x55, 0x55, 0x75, 0x43, 0xe3, 0xf1, 0x1e,
0xbe, 0xf7, 0xbd, 0xf7, 0x66, 0xde, 0x7b, 0x31, 0x94, 0x43, 0xe6, 0x05, 0xc4, 0xa6, 0x4d, 0x3f, 0x30, 0xbe, 0xb4, 0xea, 0xbd, 0x57, 0xf5, 0x7b, 0x7f, 0xea, 0xbd, 0x57, 0xaf, 0x21, 0xeb, 0xb8,
0xf0, 0x98, 0x87, 0x36, 0x23, 0xd1, 0xbc, 0x63, 0x3b, 0xec, 0xe5, 0x6c, 0xd8, 0x3c, 0xf5, 0xa6, 0xa6, 0x4d, 0x34, 0x5a, 0xb5, 0x6c, 0xd3, 0x35, 0xd1, 0xe1, 0xd0, 0x1c, 0x57, 0x35, 0xdd, 0x7d,
0x77, 0x6d, 0xcf, 0xf6, 0xee, 0x0a, 0xfb, 0x70, 0x36, 0x16, 0x92, 0x10, 0xc4, 0x49, 0xfa, 0x99, 0x36, 0x19, 0x54, 0x75, 0xe3, 0x6a, 0x34, 0xb9, 0xbe, 0x24, 0x2e, 0xf1, 0x97, 0x83, 0xaa, 0x43,
0x37, 0x6c, 0xcf, 0xb3, 0x27, 0x74, 0x85, 0xa2, 0x53, 0x9f, 0x9d, 0x45, 0xc6, 0x56, 0x8a, 0xcb, 0xed, 0x17, 0xfa, 0x90, 0x3a, 0x55, 0xef, 0x98, 0x72, 0xdf, 0xdb, 0x3c, 0x34, 0xc7, 0x87, 0x9a,
0x71, 0xc7, 0x93, 0xd9, 0xab, 0x11, 0x61, 0xe4, 0xee, 0x19, 0x09, 0xfc, 0x53, 0xf9, 0x2b, 0xf9, 0xa9, 0x99, 0x87, 0x1c, 0x67, 0x30, 0xb9, 0xe2, 0x14, 0x27, 0xf8, 0x4a, 0xe0, 0x2b, 0xb7, 0x35,
0xc4, 0x31, 0xf2, 0xd9, 0xf1, 0x03, 0x3a, 0x72, 0x4e, 0x09, 0x8b, 0x32, 0xab, 0xff, 0xa1, 0x43, 0xd3, 0xd4, 0x46, 0x74, 0xbd, 0x8b, 0x8e, 0x2d, 0x77, 0xea, 0x09, 0x6b, 0x01, 0xac, 0xb5, 0xf2,
0x11, 0x53, 0x32, 0xc2, 0xf4, 0xdb, 0x19, 0x0d, 0x19, 0x32, 0x61, 0x8b, 0xb3, 0x0c, 0x49, 0x48, 0xc3, 0x29, 0xb1, 0xad, 0xa1, 0xf8, 0x0a, 0x3c, 0xbe, 0xf4, 0xce, 0xe4, 0x2d, 0x9b, 0x5e, 0xea,
0x2b, 0xaa, 0xa5, 0x36, 0x0a, 0x38, 0x91, 0xd1, 0x97, 0xb0, 0xc3, 0x9c, 0x29, 0x0d, 0x19, 0x99, 0x43, 0xe2, 0x7a, 0x1e, 0x94, 0x7f, 0x48, 0x42, 0x1a, 0x53, 0x72, 0x89, 0xe9, 0xbf, 0x27, 0xd4,
0xfa, 0x27, 0x01, 0x71, 0x6d, 0x5a, 0xd9, 0xb0, 0xd4, 0x46, 0xb1, 0xf5, 0x4e, 0x33, 0x2e, 0x77, 0x71, 0x91, 0x02, 0x49, 0x86, 0x32, 0x20, 0x0e, 0x2d, 0x48, 0xaa, 0x54, 0x49, 0xe1, 0x15, 0x8d,
0x10, 0xdb, 0x31, 0x37, 0xb7, 0xf7, 0x5f, 0x2f, 0x6a, 0xca, 0x72, 0x51, 0xdb, 0xce, 0xea, 0xf1, 0x5e, 0x4a, 0x90, 0x77, 0xf5, 0x31, 0x75, 0x5c, 0x32, 0xb6, 0xfa, 0x36, 0x31, 0x34, 0x5a, 0x08,
0x36, 0xcb, 0xc8, 0xa8, 0x0a, 0x30, 0xa2, 0xe1, 0x29, 0x75, 0x47, 0x8e, 0x6b, 0x57, 0x34, 0x4b, 0xab, 0x52, 0x25, 0x5d, 0xfb, 0x73, 0x75, 0xcb, 0x40, 0x54, 0x7b, 0x3e, 0x0e, 0x66, 0x30, 0xf5,
0x6d, 0x6c, 0xe1, 0x94, 0x86, 0x67, 0x65, 0x07, 0xde, 0xcc, 0xe7, 0x56, 0xdd, 0xd2, 0x78, 0x56, 0x83, 0x37, 0x8b, 0x52, 0x68, 0xb9, 0x28, 0xe5, 0x36, 0xf9, 0x38, 0xe7, 0x6e, 0xd0, 0xa8, 0x08,
0xb1, 0x8c, 0xee, 0x41, 0x21, 0x29, 0xaa, 0x92, 0x13, 0xf9, 0xa0, 0x24, 0x9f, 0x17, 0xb1, 0x05, 0x70, 0x49, 0x9d, 0x21, 0x35, 0x2e, 0x75, 0x43, 0x2b, 0x44, 0x54, 0xa9, 0x92, 0xc4, 0x01, 0x0e,
0xaf, 0x40, 0xa8, 0x05, 0xa5, 0x90, 0x06, 0x0e, 0x0d, 0x4f, 0x26, 0xce, 0xd4, 0x61, 0x95, 0xbc, 0xba, 0x07, 0xa0, 0xd9, 0xe6, 0xc4, 0xea, 0x3f, 0xa7, 0x53, 0xa7, 0x10, 0x55, 0x23, 0x95, 0x54,
0xa5, 0x36, 0xb4, 0xf6, 0xce, 0x72, 0x51, 0x2b, 0xf6, 0x85, 0xfe, 0x88, 0xab, 0x71, 0x31, 0x5c, 0x3d, 0xbb, 0x5c, 0x94, 0x52, 0xa7, 0x8c, 0xfb, 0x98, 0x4e, 0x1d, 0x9c, 0xd2, 0xfc, 0x25, 0x7a,
0x09, 0xe8, 0x23, 0x28, 0x47, 0x3e, 0xde, 0x78, 0x1c, 0x52, 0x56, 0xd9, 0x14, 0x4e, 0xc6, 0x72, 0x0a, 0xa9, 0x55, 0x3c, 0x0a, 0x31, 0xee, 0xc9, 0xc3, 0xad, 0x3d, 0x39, 0xf7, 0x11, 0xf0, 0x1a,
0x51, 0x2b, 0x49, 0xa7, 0x9e, 0xd0, 0xe3, 0x88, 0x5a, 0x4a, 0x3c, 0x94, 0xef, 0x39, 0x2e, 0x8b, 0x0c, 0xd5, 0x20, 0xe3, 0x50, 0x5b, 0xa7, 0x4e, 0x7f, 0xa4, 0x8f, 0x75, 0xb7, 0x10, 0x57, 0xa5,
0x43, 0x6d, 0xad, 0x42, 0xbd, 0x10, 0xfa, 0x28, 0x94, 0xbf, 0x12, 0x78, 0x41, 0xc4, 0xb6, 0x03, 0x4a, 0xa4, 0x9e, 0x5f, 0x2e, 0x4a, 0xe9, 0x2e, 0xe7, 0x9f, 0x31, 0x36, 0x4e, 0x3b, 0x6b, 0x02,
0x6a, 0xf3, 0x82, 0x0a, 0x17, 0x0a, 0x7a, 0x18, 0x5b, 0xf0, 0x0a, 0x84, 0x3e, 0x87, 0x1c, 0x0b, 0xfd, 0x0e, 0xb2, 0xde, 0x19, 0xf3, 0xea, 0xca, 0xa1, 0x6e, 0x21, 0xc1, 0x0f, 0xc9, 0xcb, 0x45,
0xc8, 0x29, 0xad, 0x80, 0xa5, 0x35, 0x8a, 0xad, 0x5a, 0x82, 0x4e, 0xbd, 0x6c, 0x73, 0xc0, 0x11, 0x29, 0x23, 0x0e, 0x75, 0x38, 0x1f, 0x7b, 0xd0, 0x82, 0x62, 0xaa, 0x2c, 0x53, 0x37, 0x5c, 0x5f,
0x1d, 0x97, 0x05, 0x67, 0xed, 0xc2, 0x72, 0x51, 0xcb, 0x09, 0x19, 0x4b, 0x47, 0xf3, 0x3e, 0xc0, 0x55, 0x72, 0xad, 0xea, 0x9c, 0xf3, 0x3d, 0x55, 0xd6, 0x9a, 0x60, 0x8e, 0x13, 0x4d, 0xb3, 0xa9,
0xca, 0x8e, 0x0c, 0xd0, 0xbe, 0xa1, 0x67, 0xd1, 0xfb, 0xf3, 0x23, 0xda, 0x83, 0xdc, 0x77, 0x64, 0xc6, 0x1c, 0x4f, 0x7d, 0xa0, 0xe3, 0x47, 0x3e, 0x02, 0x5e, 0x83, 0xa1, 0x67, 0x10, 0x73, 0x6d,
0x32, 0x93, 0x0f, 0x5e, 0xc0, 0x52, 0xf8, 0x74, 0xe3, 0xbe, 0x5a, 0xff, 0x5d, 0x85, 0x42, 0x92, 0x32, 0xa4, 0x05, 0x50, 0x23, 0x95, 0x74, 0xed, 0x74, 0x6b, 0xd4, 0x40, 0x32, 0x56, 0x7b, 0x0c,
0x14, 0xfa, 0x10, 0x74, 0x76, 0xe6, 0xcb, 0xd6, 0xd9, 0x6e, 0x59, 0x97, 0xd3, 0x5e, 0x9d, 0x06, 0xa9, 0x61, 0xb8, 0xf6, 0xb4, 0x9e, 0x5a, 0x2e, 0x4a, 0x31, 0x4e, 0x63, 0xa1, 0x00, 0x3d, 0x85,
0x67, 0x3e, 0xc5, 0x02, 0x5d, 0x7f, 0x05, 0xe5, 0x8c, 0x1a, 0xd5, 0x40, 0xef, 0xf6, 0xba, 0x1d, 0x18, 0xbf, 0xc9, 0x42, 0x5a, 0x95, 0x2a, 0xb9, 0x5a, 0x7d, 0x27, 0x4d, 0x3c, 0x3d, 0xb0, 0x00,
0x43, 0x31, 0xaf, 0x9f, 0xcf, 0xad, 0xdd, 0x8c, 0xb1, 0xeb, 0xb9, 0x14, 0xdd, 0x04, 0xad, 0x7f, 0x44, 0xf7, 0x20, 0xf6, 0x8c, 0xc5, 0xaa, 0x90, 0x51, 0xa5, 0x4a, 0xa2, 0x7e, 0xc0, 0x54, 0x37,
0xfc, 0xdc, 0x50, 0xcd, 0xbd, 0xf3, 0xb9, 0x65, 0x64, 0xec, 0xfd, 0xd9, 0x14, 0xdd, 0x82, 0xdc, 0x19, 0xe3, 0xfb, 0x45, 0x29, 0xc5, 0x16, 0x27, 0x23, 0xa2, 0x39, 0x58, 0x6c, 0x52, 0x1e, 0x00,
0xa3, 0xde, 0x71, 0x77, 0x60, 0x6c, 0x98, 0xfb, 0xe7, 0x73, 0x0b, 0x65, 0x00, 0x8f, 0xbc, 0x99, 0xac, 0xed, 0x44, 0x32, 0x44, 0x9e, 0xd3, 0xa9, 0x57, 0x3a, 0x6c, 0x89, 0xf6, 0x21, 0xf6, 0x82,
0xcb, 0x4c, 0xfd, 0x87, 0x5f, 0xaa, 0x4a, 0xfd, 0x0e, 0x68, 0x03, 0x62, 0xa7, 0x0b, 0x2e, 0xad, 0x8c, 0x26, 0xa2, 0x54, 0x52, 0x58, 0x10, 0x0f, 0xc3, 0x0f, 0xa4, 0xf2, 0xff, 0x24, 0x88, 0x71,
0x29, 0xb8, 0x14, 0x15, 0x5c, 0xff, 0xb9, 0x08, 0x25, 0x79, 0xa7, 0xa1, 0xef, 0xb9, 0x21, 0x45, 0xc5, 0xe8, 0x0e, 0xc0, 0x29, 0xee, 0x5c, 0x9c, 0xf7, 0xdb, 0x9d, 0x76, 0x43, 0x0e, 0x29, 0xd9,
0x9f, 0x40, 0x7e, 0x1c, 0x90, 0x29, 0x0d, 0x2b, 0xaa, 0xb8, 0xfa, 0x1b, 0x17, 0xae, 0x5e, 0xc2, 0xd9, 0x5c, 0x15, 0x29, 0xdb, 0x36, 0x0d, 0x8a, 0x6e, 0x43, 0x4a, 0x88, 0x8f, 0xce, 0xce, 0x64,
0x9a, 0x4f, 0x38, 0xa6, 0xad, 0xf3, 0x69, 0xc0, 0x91, 0x83, 0xf9, 0xa7, 0x0e, 0x39, 0xa1, 0x47, 0x49, 0xc9, 0xcc, 0xe6, 0x6a, 0x92, 0x4b, 0x8f, 0x46, 0x23, 0xf4, 0x73, 0x48, 0x0a, 0x61, 0xfd,
0x0f, 0x20, 0x2f, 0x9b, 0x46, 0x24, 0x50, 0x6c, 0xdd, 0x5a, 0x4f, 0x22, 0xdb, 0x4c, 0xb8, 0x1c, 0xef, 0x72, 0x58, 0x49, 0xcf, 0xe6, 0x6a, 0x82, 0xcb, 0xea, 0x53, 0x74, 0x17, 0x32, 0x42, 0xd4,
0x28, 0x38, 0x72, 0x41, 0x5f, 0x41, 0x69, 0x3c, 0xf1, 0x08, 0x3b, 0x91, 0x2d, 0x14, 0x4d, 0xe4, 0x78, 0x7a, 0xdc, 0x38, 0xef, 0xc9, 0x11, 0x25, 0x3f, 0x9b, 0xab, 0x69, 0x2e, 0x6e, 0x5c, 0x0f,
0xed, 0x37, 0xe4, 0xc1, 0x91, 0xb2, 0xf1, 0x64, 0x4a, 0xa2, 0x13, 0x53, 0xda, 0x03, 0x05, 0x17, 0xa9, 0xe5, 0x2a, 0xd1, 0x57, 0x9f, 0x15, 0x43, 0xe5, 0xff, 0xc0, 0xda, 0x2f, 0xa6, 0xad, 0xd9,
0xc7, 0x2b, 0x11, 0x8d, 0x60, 0xdb, 0x71, 0x19, 0xb5, 0x69, 0x10, 0xf3, 0x6b, 0x82, 0xbf, 0xb1, 0x6a, 0xf7, 0x7c, 0x5b, 0xb8, 0x36, 0x26, 0xe5, 0xa6, 0xfc, 0x12, 0x72, 0x9e, 0xb0, 0x7f, 0xde,
0x9e, 0xff, 0x50, 0x62, 0xd3, 0x11, 0x76, 0x97, 0x8b, 0x5a, 0x39, 0xa3, 0x3f, 0x50, 0x70, 0xd9, 0x69, 0xb5, 0x7b, 0x5d, 0x59, 0x52, 0xe4, 0xd9, 0x5c, 0xcd, 0x88, 0x1d, 0x22, 0xe3, 0x82, 0xbb,
0x49, 0x2b, 0xd0, 0x4b, 0xd8, 0x99, 0xb9, 0xa1, 0x63, 0xbb, 0x74, 0x14, 0x87, 0xd1, 0x45, 0x98, 0xba, 0x0d, 0xdc, 0x6a, 0x74, 0xe5, 0x70, 0x70, 0x97, 0xc8, 0x66, 0x4f, 0xf7, 0x5b, 0x09, 0x52,
0xf7, 0xd7, 0x87, 0x39, 0x8e, 0xc0, 0xe9, 0x38, 0x88, 0xaf, 0x99, 0xac, 0xe1, 0x40, 0xc1, 0xdb, 0xab, 0x54, 0x42, 0xff, 0x84, 0xa8, 0x3b, 0xb5, 0x44, 0xef, 0xc9, 0xd5, 0x9a, 0x1f, 0x9e, 0x94,
0xb3, 0x8c, 0x86, 0xd7, 0x33, 0xf4, 0xbc, 0x09, 0x25, 0x6e, 0x1c, 0x28, 0xf7, 0xb6, 0x7a, 0xda, 0xeb, 0x55, 0x6f, 0x6a, 0x51, 0xcc, 0x51, 0xcb, 0xd7, 0x90, 0xdd, 0x60, 0xa3, 0x12, 0x44, 0x3d,
0x12, 0x7b, 0xa9, 0x9e, 0x8c, 0x9e, 0xd7, 0x33, 0x4c, 0x2b, 0xd0, 0xd7, 0x7c, 0xff, 0x07, 0x8e, 0x37, 0x7f, 0x36, 0x9b, 0xab, 0x7b, 0x1b, 0x42, 0xee, 0xef, 0x1d, 0x88, 0x74, 0x2f, 0x9e, 0xc8,
0x6b, 0xc7, 0x41, 0xf2, 0x22, 0xc8, 0x7b, 0x6f, 0x78, 0x57, 0x01, 0x4d, 0xc7, 0x90, 0x5b, 0x25, 0x92, 0xb2, 0x3f, 0x9b, 0xab, 0xf2, 0x86, 0xbc, 0x3b, 0x19, 0xa3, 0xbb, 0x10, 0x3b, 0xee, 0x5c,
0xa5, 0x3e, 0x50, 0x70, 0x29, 0x4c, 0xc9, 0xed, 0x3c, 0xe8, 0x7c, 0x2d, 0x9b, 0x01, 0x14, 0x53, 0xb4, 0x7b, 0x72, 0x58, 0x39, 0x98, 0xcd, 0x55, 0xb4, 0xb1, 0xe1, 0xd8, 0x9c, 0x18, 0x7e, 0x84,
0x6d, 0x81, 0x6e, 0x83, 0xce, 0x88, 0x1d, 0x37, 0x63, 0x69, 0xb5, 0x96, 0x89, 0x1d, 0x75, 0x9f, 0xef, 0x43, 0xa4, 0x47, 0xb4, 0x60, 0x7a, 0x64, 0xde, 0x93, 0x1e, 0x19, 0x2f, 0x3d, 0xca, 0x8b,
0xb0, 0xa3, 0x07, 0x50, 0xe0, 0xee, 0x27, 0x62, 0x56, 0x37, 0xc4, 0xac, 0x56, 0xd7, 0x27, 0xf7, 0x3c, 0x64, 0x44, 0x7e, 0x3a, 0x96, 0x69, 0x38, 0x14, 0x11, 0x88, 0x5f, 0xd9, 0x64, 0x4c, 0x9d,
0x98, 0x30, 0x22, 0x26, 0x55, 0xfc, 0x0d, 0xf0, 0x93, 0xf9, 0x05, 0x18, 0x17, 0xfb, 0x88, 0x2f, 0x82, 0xc4, 0x0b, 0xeb, 0xf8, 0x03, 0xd3, 0x5d, 0xc0, 0x55, 0x4f, 0x18, 0x56, 0x3d, 0xca, 0xba,
0xf0, 0x64, 0xa5, 0xcb, 0xf0, 0x06, 0x4e, 0x69, 0xd0, 0x3e, 0xe4, 0xc5, 0x04, 0xf1, 0xfe, 0xd4, 0x2e, 0xf6, 0x80, 0x95, 0x4f, 0x13, 0x10, 0xe3, 0x7c, 0x34, 0x84, 0xb8, 0x68, 0x31, 0xdc, 0xd0,
0x1a, 0x2a, 0x8e, 0x24, 0xf3, 0x08, 0xd0, 0xe5, 0x9e, 0xb9, 0x22, 0x9b, 0x96, 0xb0, 0x3d, 0x87, 0x74, 0xad, 0xb5, 0x9b, 0x32, 0x71, 0xdd, 0x1c, 0xba, 0x19, 0xc2, 0x1e, 0x34, 0xfa, 0xaf, 0x04,
0x6b, 0x6b, 0x5a, 0xe3, 0x8a, 0x74, 0x7a, 0x3a, 0xb9, 0xcb, 0x0d, 0x70, 0x45, 0xb6, 0xad, 0x84, 0x99, 0xab, 0x91, 0x49, 0xdc, 0xbe, 0xe8, 0x4c, 0xde, 0x53, 0xd2, 0xde, 0xd1, 0x31, 0x86, 0x28,
0xed, 0x19, 0xec, 0x5e, 0x7a, 0xe9, 0x2b, 0x92, 0x15, 0x62, 0xb2, 0x7a, 0x1f, 0x0a, 0x82, 0x20, 0xb2, 0x50, 0xf8, 0xc8, 0x1b, 0x61, 0x80, 0xdb, 0x0c, 0xe1, 0xf4, 0xd5, 0x9a, 0x44, 0x9f, 0x48,
0xda, 0x96, 0xf9, 0x7e, 0x07, 0x1f, 0x76, 0xfa, 0x86, 0x62, 0x5e, 0x3b, 0x9f, 0x5b, 0x3b, 0x89, 0x90, 0xd3, 0x0d, 0x97, 0x6a, 0xd4, 0xf6, 0x0d, 0x89, 0x70, 0x43, 0xce, 0x77, 0x33, 0xa4, 0x25,
0x49, 0xf6, 0x06, 0x07, 0xbc, 0xe8, 0x1d, 0x76, 0x07, 0x7d, 0x43, 0xbd, 0x00, 0x90, 0xb9, 0x44, 0x30, 0x83, 0xa6, 0xec, 0x2d, 0x17, 0xa5, 0xec, 0x06, 0xbf, 0x19, 0xc2, 0x59, 0x3d, 0xc8, 0x40,
0xcb, 0xf0, 0x37, 0x15, 0xb6, 0xe2, 0xf7, 0x46, 0xef, 0x42, 0xee, 0xc9, 0x51, 0xef, 0xe1, 0xc0, 0xaf, 0x25, 0xc8, 0x4f, 0x0c, 0x47, 0xd7, 0x0c, 0x7a, 0xe9, 0xdb, 0x13, 0xe5, 0xf6, 0xfc, 0x75,
0x50, 0xcc, 0xdd, 0xf3, 0xb9, 0x55, 0x8e, 0x0d, 0xe2, 0xe9, 0x91, 0x05, 0x9b, 0x87, 0xdd, 0x41, 0x37, 0x7b, 0x2e, 0x3c, 0xd0, 0xa0, 0x41, 0x88, 0xbd, 0xb8, 0x9b, 0x82, 0x66, 0x08, 0xe7, 0x26,
0xe7, 0x69, 0x07, 0xc7, 0x94, 0xb1, 0x3d, 0x7a, 0x4e, 0x54, 0x87, 0xad, 0xe3, 0x6e, 0xff, 0xf0, 0x1b, 0x1c, 0x1e, 0xa1, 0x81, 0x69, 0x8e, 0x28, 0x31, 0x7c, 0x8b, 0x62, 0x1f, 0x23, 0x42, 0x75,
0x69, 0xb7, 0xf3, 0xd8, 0xd8, 0x90, 0x6b, 0x3a, 0x86, 0xc4, 0x6f, 0xc4, 0x59, 0xda, 0xbd, 0xde, 0x81, 0x79, 0x23, 0x42, 0x1b, 0x7c, 0x16, 0xa1, 0x41, 0x90, 0x81, 0x5e, 0x49, 0x6c, 0x0a, 0xb3,
0x51, 0xe7, 0x61, 0xd7, 0xd0, 0xb2, 0x2c, 0xd1, 0xbd, 0xa3, 0x2a, 0xe4, 0xfb, 0x03, 0x7c, 0xd8, 0x75, 0x43, 0xf3, 0xad, 0x89, 0x73, 0x6b, 0x3a, 0x3b, 0x26, 0x29, 0x87, 0x0c, 0x1a, 0x23, 0x1e,
0x7d, 0x6a, 0xe8, 0x26, 0x3a, 0x9f, 0x5b, 0xdb, 0x31, 0x40, 0x5e, 0x65, 0x94, 0xf8, 0x8f, 0x2a, 0xde, 0x00, 0xbb, 0x19, 0xc2, 0x19, 0x27, 0x40, 0xa3, 0x7f, 0xf9, 0x4f, 0x50, 0x82, 0x5b, 0xd0,
0xec, 0x3d, 0x22, 0x3e, 0x19, 0x3a, 0x13, 0x87, 0x39, 0x34, 0x4c, 0xd6, 0xf3, 0x03, 0xd0, 0x4f, 0xdc, 0xcd, 0x02, 0xde, 0x96, 0xfd, 0x2a, 0x11, 0xc0, 0xf5, 0x38, 0x44, 0x19, 0x84, 0x72, 0x0d,
0x89, 0x1f, 0xcf, 0xc3, 0x6a, 0xfe, 0xd6, 0x81, 0xb9, 0x32, 0x14, 0xff, 0x7f, 0x58, 0x38, 0x99, 0xb0, 0x16, 0xa3, 0x5f, 0x41, 0xd2, 0x25, 0x9a, 0x98, 0x71, 0x58, 0x3b, 0xc8, 0xd4, 0xd3, 0xcb,
0x1f, 0x43, 0x21, 0x51, 0x5d, 0xe9, 0x2f, 0x71, 0x07, 0xca, 0x07, 0xfc, 0x5a, 0x63, 0xe6, 0xfa, 0x45, 0x29, 0xd1, 0x23, 0x1a, 0x9f, 0x70, 0x12, 0xae, 0x58, 0xa0, 0x3a, 0x20, 0x8b, 0xd8, 0xae,
0x7d, 0xb8, 0xf0, 0x01, 0xc4, 0x9d, 0x43, 0x46, 0x02, 0x26, 0x08, 0x35, 0x2c, 0x05, 0x1e, 0x84, 0xee, 0xea, 0xa6, 0xc1, 0x76, 0xf7, 0x5f, 0x90, 0x11, 0xab, 0x33, 0x76, 0x62, 0x7f, 0xb9, 0x28,
0xba, 0x23, 0x41, 0xa8, 0x61, 0x7e, 0x6c, 0xfd, 0xad, 0xc2, 0x66, 0x5f, 0x26, 0xcd, 0x8b, 0xe1, 0xc9, 0xe7, 0xbe, 0xf4, 0x31, 0x9d, 0xfe, 0x8d, 0x8c, 0x1c, 0x2c, 0x5b, 0xef, 0x70, 0x94, 0xaf,
0xa3, 0x89, 0xf6, 0xd6, 0xfd, 0xbd, 0x9b, 0xd7, 0xd7, 0xce, 0x6f, 0x5d, 0xff, 0xfe, 0xd7, 0x8a, 0x24, 0x48, 0x07, 0x0a, 0x18, 0xb5, 0x21, 0xea, 0x12, 0xcd, 0x6f, 0x43, 0xbf, 0xdd, 0x7e, 0xf0,
0x72, 0x4f, 0x45, 0xcf, 0xa0, 0x94, 0x2e, 0x1a, 0xed, 0x37, 0xe5, 0xa7, 0x65, 0x33, 0xfe, 0xb4, 0x23, 0x9a, 0xd7, 0x77, 0x38, 0x0e, 0x1a, 0x42, 0x8a, 0x9d, 0xe8, 0xf3, 0xae, 0x1f, 0xe6, 0x5d,
0x6c, 0x76, 0xf8, 0xa7, 0xa5, 0x79, 0xf3, 0xad, 0x77, 0x24, 0xe8, 0x54, 0xf4, 0x19, 0xe4, 0x44, 0xff, 0x64, 0xb7, 0x38, 0x3e, 0x22, 0x2e, 0xe1, 0x3d, 0x9f, 0x4f, 0xae, 0x6c, 0xa5, 0xfc, 0x05,
0x81, 0x6f, 0x64, 0xd9, 0x4f, 0x58, 0xb2, 0x17, 0xc1, 0xdd, 0x37, 0x4c, 0x91, 0x53, 0x7b, 0xef, 0xe4, 0x77, 0x1b, 0x03, 0x1b, 0x25, 0x57, 0xc3, 0xa5, 0x70, 0x47, 0xc6, 0x01, 0x0e, 0x3a, 0x80,
0xf5, 0xbf, 0x55, 0xe5, 0xf5, 0xb2, 0xaa, 0xfe, 0xb5, 0xac, 0xaa, 0xff, 0x2c, 0xab, 0xea, 0x4f, 0x38, 0xef, 0xc5, 0x22, 0x60, 0x12, 0xf6, 0x28, 0xe5, 0x0c, 0xd0, 0xcd, 0xda, 0xde, 0x12, 0x2d,
0xff, 0x55, 0x95, 0x61, 0x5e, 0x30, 0x7d, 0xf0, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8b, 0x99, 0xb2, 0x42, 0x7b, 0x02, 0xb7, 0xde, 0x53, 0x99, 0x5b, 0xc2, 0x45, 0x83, 0xc6, 0xdd, 0x2c, 0xab,
0xc4, 0xf8, 0x41, 0x0b, 0x00, 0x00, 0x2d, 0xd1, 0x92, 0x2b, 0xb4, 0xc7, 0xb0, 0x77, 0xa3, 0x2c, 0xb6, 0x04, 0x4b, 0xf9, 0x60, 0xe5,
0x2e, 0xa4, 0x38, 0x80, 0xf7, 0xee, 0xc6, 0xbd, 0xc1, 0x20, 0xa4, 0xdc, 0x9a, 0xcd, 0xd5, 0xfc,
0x4a, 0x24, 0x72, 0x8d, 0x6d, 0x58, 0xcd, 0x17, 0x9b, 0x1b, 0x84, 0x2d, 0xde, 0xb3, 0xfa, 0xa5,
0x04, 0x49, 0xff, 0xbe, 0xd1, 0x2f, 0x20, 0x76, 0x72, 0xd6, 0x39, 0xea, 0xc9, 0x21, 0x65, 0x6f,
0x36, 0x57, 0xb3, 0xbe, 0x80, 0x5f, 0x3d, 0x52, 0x21, 0xd1, 0x6a, 0xf7, 0x1a, 0xa7, 0x0d, 0xec,
0x43, 0xfa, 0x72, 0xef, 0x3a, 0x51, 0x19, 0x92, 0x17, 0xed, 0x6e, 0xeb, 0xb4, 0xdd, 0x78, 0x24,
0x87, 0xc5, 0x83, 0xef, 0x6f, 0xf1, 0xef, 0x88, 0xa1, 0xd4, 0x3b, 0x9d, 0xb3, 0xc6, 0x51, 0x5b,
0x8e, 0x6c, 0xa2, 0x78, 0x71, 0x47, 0x45, 0x88, 0x77, 0x7b, 0xb8, 0xd5, 0x3e, 0x95, 0xa3, 0x0a,
0x9a, 0xcd, 0xd5, 0x9c, 0xbf, 0x41, 0x84, 0xd2, 0x33, 0xfc, 0x73, 0x09, 0xf6, 0x8f, 0x89, 0x45,
0x06, 0xfa, 0x48, 0x77, 0x75, 0xea, 0xac, 0x1e, 0xfa, 0x21, 0x44, 0x87, 0xc4, 0xf2, 0xeb, 0x6b,
0xfb, 0xa6, 0xf6, 0x3e, 0x50, 0xc6, 0x74, 0xf8, 0x7c, 0x8a, 0x39, 0xb8, 0xf2, 0x7b, 0x48, 0xad,
0x58, 0x5b, 0x8d, 0xac, 0x79, 0xc8, 0xf2, 0x49, 0xd8, 0x47, 0x2e, 0x3f, 0x80, 0x77, 0x7e, 0xd9,
0xd8, 0x61, 0xc7, 0x25, 0xb6, 0xcb, 0x01, 0x23, 0x58, 0x10, 0x4c, 0x09, 0x35, 0x2e, 0x39, 0x60,
0x04, 0xb3, 0x65, 0xed, 0xbb, 0x30, 0x24, 0xba, 0xc2, 0x68, 0xf4, 0x7f, 0x09, 0xa2, 0xac, 0x86,
0xd1, 0x1f, 0x76, 0x99, 0xe2, 0x95, 0x3f, 0xee, 0xd4, 0x38, 0xca, 0xd1, 0x97, 0x5f, 0x14, 0x42,
0xbf, 0x96, 0x90, 0x03, 0x99, 0x60, 0x14, 0xd1, 0x41, 0x55, 0xfc, 0x86, 0x57, 0xfd, 0xdf, 0xf0,
0x6a, 0x83, 0xfd, 0x86, 0x2b, 0x8d, 0x8f, 0x72, 0x39, 0x5c, 0xad, 0x84, 0x28, 0x88, 0x7f, 0x8c,
0x9f, 0xd4, 0xf6, 0xa7, 0xad, 0xb5, 0x6d, 0xde, 0x14, 0x53, 0x13, 0x56, 0xb8, 0x8f, 0xf5, 0x3b,
0x6f, 0xbe, 0x2d, 0x86, 0xde, 0x2c, 0x8b, 0xd2, 0xd7, 0xcb, 0xa2, 0xf4, 0xcd, 0xb2, 0x28, 0xbd,
0x7e, 0x5b, 0x0c, 0xfd, 0x23, 0xe1, 0x1d, 0x1c, 0xc4, 0xb9, 0xea, 0xdf, 0xfc, 0x18, 0x00, 0x00,
0xff, 0xff, 0xa9, 0x9b, 0xef, 0x45, 0xee, 0x10, 0x00, 0x00,
} }

View File

@ -23,7 +23,6 @@ package pb
import ( import (
context "context" context "context"
yarpc "github.com/influxdata/yarpc" yarpc "github.com/influxdata/yarpc"
) )
@ -164,7 +163,7 @@ func _Storage_Hints_Handler(srv interface{}, ctx context.Context, dec func(inter
} }
var _Storage_serviceDesc = yarpc.ServiceDesc{ var _Storage_serviceDesc = yarpc.ServiceDesc{
ServiceName: "storage.Storage", ServiceName: "com.github.influxdata.influxdb.services.storage.Storage",
Index: 0, Index: 0,
HandlerType: (*StorageServer)(nil), HandlerType: (*StorageServer)(nil),
Methods: []yarpc.MethodDesc{ Methods: []yarpc.MethodDesc{

View File

@ -143,6 +143,21 @@ func (s *source) next(ctx context.Context, trace map[string]string) (query.Block
return bi, stop, true return bi, stop, true
} }
type GroupMode int
const (
// GroupModeDefault specifies the default grouping mode, which is GroupModeAll.
GroupModeDefault GroupMode = 0
// GroupModeNone merges all series into a single group.
GroupModeNone GroupMode = 1 << iota
// GroupModeAll produces a separate block for each series.
GroupModeAll
// GroupModeBy produces a block for each unique value of the specified GroupKeys.
GroupModeBy
// GroupModeExcept produces a block for the unique values of all keys, except those specified by GroupKeys.
GroupModeExcept
)
type ReadSpec struct { type ReadSpec struct {
OrganizationID []byte OrganizationID []byte
BucketID []byte BucketID []byte
@ -162,12 +177,13 @@ type ReadSpec struct {
// By default this is false meaning all values of time are produced for a given series, // By default this is false meaning all values of time are produced for a given series,
// before any values are produced from the next series. // before any values are produced from the next series.
OrderByTime bool OrderByTime bool
// MergeAll indicates that all series should be merged into a single group // GroupMode instructs
MergeAll bool GroupMode GroupMode
// GroupKeys is the list of dimensions along which to group // GroupKeys is the list of dimensions along which to group.
//
// When GroupMode is GroupModeBy, the results will be grouped by the specified keys.
// When GroupMode is GroupModeExcept, the results will be grouped by all keys, except those specified.
GroupKeys []string GroupKeys []string
// GroupExcept is the list of dimensions along which to not group
GroupExcept []string
} }
type Reader interface { type Reader interface {

View File

@ -6,8 +6,8 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query" "github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest" "github.com/influxdata/platform/query/plan/plantest"
) )
@ -422,7 +422,8 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("group"): { plan.ProcedureIDFromOperationID("group"): {
ID: plan.ProcedureIDFromOperationID("group"), ID: plan.ProcedureIDFromOperationID("group"),
Spec: &functions.GroupProcedureSpec{ Spec: &functions.GroupProcedureSpec{
By: []string{"host", "region"}, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host", "region"},
}, },
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")}, Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("sum")}, Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("sum")},
@ -465,6 +466,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
}, },
}, },
GroupingSet: true, GroupingSet: true,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host", "region"}, GroupKeys: []string{"host", "region"},
AggregateSet: true, AggregateSet: true,
AggregateMethod: "sum", AggregateMethod: "sum",
@ -523,7 +525,8 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("group"): { plan.ProcedureIDFromOperationID("group"): {
ID: plan.ProcedureIDFromOperationID("group"), ID: plan.ProcedureIDFromOperationID("group"),
Spec: &functions.GroupProcedureSpec{ Spec: &functions.GroupProcedureSpec{
By: []string{"host"}, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
}, },
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")}, Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")}, Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")},
@ -568,6 +571,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
}, },
}, },
GroupingSet: true, GroupingSet: true,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"}, GroupKeys: []string{"host"},
LimitSet: true, LimitSet: true,
PointsLimit: -1, PointsLimit: -1,
@ -626,7 +630,8 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("group"): { plan.ProcedureIDFromOperationID("group"): {
ID: plan.ProcedureIDFromOperationID("group"), ID: plan.ProcedureIDFromOperationID("group"),
Spec: &functions.GroupProcedureSpec{ Spec: &functions.GroupProcedureSpec{
By: []string{"host"}, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
}, },
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")}, Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")}, Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")},
@ -671,6 +676,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
}, },
}, },
GroupingSet: true, GroupingSet: true,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"}, GroupKeys: []string{"host"},
}, },
Parents: nil, Parents: nil,
@ -727,7 +733,8 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("group"): { plan.ProcedureIDFromOperationID("group"): {
ID: plan.ProcedureIDFromOperationID("group"), ID: plan.ProcedureIDFromOperationID("group"),
Spec: &functions.GroupProcedureSpec{ Spec: &functions.GroupProcedureSpec{
By: []string{"host"}, GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
}, },
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")}, Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")}, Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")},
@ -772,6 +779,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
}, },
}, },
GroupingSet: true, GroupingSet: true,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"}, GroupKeys: []string{"host"},
}, },
Parents: nil, Parents: nil,

View File

@ -225,8 +225,8 @@ func (r *REPL) doQuery(spec *query.Spec) error {
blocks := r.Blocks() blocks := r.Blocks()
fmt.Println("Result:", name) fmt.Println("Result:", name)
err := blocks.Do(func(b query.Block) error { err := blocks.Do(func(b query.Block) error {
execute.NewFormatter(b, nil).WriteTo(os.Stdout) _, err := execute.NewFormatter(b, nil).WriteTo(os.Stdout)
return nil return err
}) })
if err != nil { if err != nil {
return err return err