fix(query): Utilize improvements storage RPC API

* clarifies grouping behavior in data types, matching RPC
pull/10616/head
Stuart Carnie 2018-05-23 21:59:47 -07:00
parent 5705f33f55
commit 6e12d6634d
13 changed files with 985 additions and 291 deletions

View File

@ -117,7 +117,7 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) {
// Write rows
r := 0
f.b.Do(func(cr query.ColReader) error {
w.err = f.b.Do(func(cr query.ColReader) error {
if r == 0 {
l := cr.Len()
for i := 0; i < l; i++ {

View File

@ -103,7 +103,9 @@ func (r DistinctPointLimitRewriteRule) Rewrite(pr *plan.Procedure, planner plan.
}
groupStar := !fromSpec.GroupingSet && distinct.Column != execute.DefaultValueColLabel
groupByColumn := fromSpec.GroupingSet && ((len(fromSpec.GroupKeys) > 0 && execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)) || (len(fromSpec.GroupExcept) > 0 && !execute.ContainsStr(fromSpec.GroupExcept, distinct.Column)))
groupByColumn := fromSpec.GroupingSet && len(fromSpec.GroupKeys) > 0 &&
((fromSpec.GroupMode == GroupModeBy && execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)) ||
(fromSpec.GroupMode == GroupModeExcept && !execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)))
if groupStar || groupByColumn {
fromSpec.LimitSet = true
fromSpec.PointsLimit = -1
@ -150,8 +152,20 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Block) er
colIdx := execute.ColIdx(t.column, b.Cols())
if colIdx < 0 {
return fmt.Errorf("no column %q exists", t.column)
// doesn't exist in this block, so add an empty value
execute.AddBlockKeyCols(b.Key(), builder)
colIdx = builder.AddCol(query.ColMeta{
Label: execute.DefaultValueColLabel,
Type: query.TString,
})
builder.AppendString(colIdx, "")
execute.AppendKeyValues(b.Key(), builder)
// TODO: hack required to ensure data flows downstream
return b.Do(func(query.ColReader) error {
return nil
})
}
col := b.Cols()[colIdx]
execute.AddBlockKeyCols(b.Key(), builder)
@ -178,7 +192,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Block) er
}
execute.AppendKeyValues(b.Key(), builder)
// TODO: this is a hack
// TODO: hack required to ensure data flows downstream
return b.Do(func(query.ColReader) error {
return nil
})

View File

@ -101,9 +101,8 @@ type FromProcedureSpec struct {
GroupingSet bool
OrderByTime bool
MergeAll bool
GroupMode GroupMode
GroupKeys []string
GroupExcept []string
AggregateSet bool
AggregateMethod string
@ -214,9 +213,8 @@ func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execu
SeriesOffset: spec.SeriesOffset,
Descending: spec.Descending,
OrderByTime: spec.OrderByTime,
MergeAll: spec.MergeAll,
GroupMode: storage.GroupMode(spec.GroupMode),
GroupKeys: spec.GroupKeys,
GroupExcept: spec.GroupExcept,
AggregateMethod: spec.AggregateMethod,
},
bounds,

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/platform/query/interpreter"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/semantic"
"math/bits"
)
const GroupKind = "group"
@ -17,6 +18,8 @@ const GroupKind = "group"
type GroupOpSpec struct {
By []string `json:"by"`
Except []string `json:"except"`
All bool `json:"all"`
None bool `json:"none"`
}
var groupSignature = query.DefaultFunctionSignature()
@ -24,6 +27,8 @@ var groupSignature = query.DefaultFunctionSignature()
func init() {
groupSignature.Params["by"] = semantic.NewArrayType(semantic.String)
groupSignature.Params["except"] = semantic.NewArrayType(semantic.String)
groupSignature.Params["none"] = semantic.Bool
groupSignature.Params["all"] = semantic.Bool
query.RegisterFunction(GroupKind, createGroupOpSpec, groupSignature)
query.RegisterOpSpec(GroupKind, newGroupOp)
@ -38,6 +43,18 @@ func createGroupOpSpec(args query.Arguments, a *query.Administration) (query.Ope
}
spec := new(GroupOpSpec)
if val, ok, err := args.GetBool("none"); err != nil {
return nil, err
} else if ok && val {
spec.None = true
}
if val, ok, err := args.GetBool("all"); err != nil {
return nil, err
} else if ok && val {
spec.All = true
}
if array, ok, err := args.GetArray("by", semantic.String); err != nil {
return nil, err
} else if ok {
@ -55,9 +72,16 @@ func createGroupOpSpec(args query.Arguments, a *query.Administration) (query.Ope
}
}
if len(spec.By) > 0 && len(spec.Except) > 0 {
return nil, errors.New(`cannot specify both "by" and "except" keyword arguments`)
switch bits.OnesCount(uint(groupModeFromSpec(spec))) {
case 0:
// empty args
spec.All = true
case 1:
// all good
default:
return nil, errors.New(`specify one of "by", "except", "none" or "all" keyword arguments`)
}
return spec, nil
}
@ -69,9 +93,45 @@ func (s *GroupOpSpec) Kind() query.OperationKind {
return GroupKind
}
type GroupMode int
const (
// GroupModeDefault will use the default grouping of GroupModeAll.
GroupModeDefault GroupMode = 0
// GroupModeNone merges all series into a single group.
GroupModeNone GroupMode = 1 << iota
// GroupModeAll produces a separate block for each series.
GroupModeAll
// GroupModeBy produces a block for each unique value of the specified GroupKeys.
GroupModeBy
// GroupModeExcept produces a block for the unique values of all keys, except those specified by GroupKeys.
GroupModeExcept
)
func groupModeFromSpec(spec *GroupOpSpec) GroupMode {
var mode GroupMode
if spec.All {
mode |= GroupModeAll
}
if spec.None {
mode |= GroupModeNone
}
if len(spec.By) > 0 {
mode |= GroupModeBy
}
if len(spec.Except) > 0 {
mode |= GroupModeExcept
}
if mode == GroupModeDefault {
mode = GroupModeAll
}
return mode
}
type GroupProcedureSpec struct {
By []string
Except []string
GroupMode GroupMode
GroupKeys []string
}
func newGroupProcedure(qs query.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
@ -80,9 +140,22 @@ func newGroupProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Pro
return nil, fmt.Errorf("invalid spec type %T", qs)
}
mode := groupModeFromSpec(spec)
var keys []string
switch mode {
case GroupModeAll:
case GroupModeNone:
case GroupModeBy:
keys = spec.By
case GroupModeExcept:
keys = spec.Except
default:
return nil, fmt.Errorf("invalid GroupOpSpec; multiple modes detected")
}
p := &GroupProcedureSpec{
By: spec.By,
Except: spec.Except,
GroupMode: mode,
GroupKeys: keys,
}
return p, nil
}
@ -93,11 +166,10 @@ func (s *GroupProcedureSpec) Kind() plan.ProcedureKind {
func (s *GroupProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(GroupProcedureSpec)
ns.By = make([]string, len(s.By))
copy(ns.By, s.By)
ns.GroupMode = s.GroupMode
ns.Except = make([]string, len(s.Except))
copy(ns.Except, s.Except)
ns.GroupKeys = make([]string, len(s.GroupKeys))
copy(ns.GroupKeys, s.GroupKeys)
return ns
}
@ -120,19 +192,16 @@ func (s *GroupProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Pro
selectSpec = root.Spec.(*FromProcedureSpec)
selectSpec.OrderByTime = false
selectSpec.GroupingSet = false
selectSpec.MergeAll = false
selectSpec.GroupMode = GroupModeDefault
selectSpec.GroupKeys = nil
selectSpec.GroupExcept = nil
return
}
selectSpec.GroupingSet = true
// TODO implement OrderByTime
//selectSpec.OrderByTime = true
// Merge all series into a single group if we have no specific grouping dimensions.
selectSpec.MergeAll = len(s.By) == 0 && len(s.Except) == 0
selectSpec.GroupKeys = s.By
selectSpec.GroupExcept = s.Except
selectSpec.GroupMode = s.GroupMode
selectSpec.GroupKeys = s.GroupKeys
}
type AggregateGroupRewriteRule struct {
@ -196,23 +265,18 @@ type groupTransformation struct {
d execute.Dataset
cache execute.BlockBuilderCache
mode GroupMode
keys []string
except []string
// Ignoring is true of len(keys) == 0 && len(except) > 0
ignoring bool
}
func NewGroupTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *GroupProcedureSpec) *groupTransformation {
t := &groupTransformation{
d: d,
cache: cache,
keys: spec.By,
except: spec.Except,
ignoring: len(spec.By) == 0 && len(spec.Except) > 0,
mode: spec.GroupMode,
keys: spec.GroupKeys,
}
sort.Strings(t.keys)
sort.Strings(t.except)
return t
}
@ -233,14 +297,14 @@ func (t *groupTransformation) RetractBlock(id execute.DatasetID, key query.Parti
func (t *groupTransformation) Process(id execute.DatasetID, b query.Block) error {
cols := b.Cols()
on := make(map[string]bool, len(cols))
if len(t.keys) > 0 {
if t.mode == GroupModeBy && len(t.keys) > 0 {
for _, k := range t.keys {
on[k] = true
}
} else if len(t.except) > 0 {
} else if t.mode == GroupModeExcept && len(t.keys) > 0 {
COLS:
for _, c := range cols {
for _, label := range t.except {
for _, label := range t.keys {
if c.Label == label {
continue COLS
}

View File

@ -33,7 +33,8 @@ func TestGroup_Process(t *testing.T) {
{
name: "fan in",
spec: &functions.GroupProcedureSpec{
By: []string{"t1"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1"},
},
data: []query.Block{
&executetest.Block{
@ -117,7 +118,8 @@ func TestGroup_Process(t *testing.T) {
{
name: "fan in ignoring",
spec: &functions.GroupProcedureSpec{
Except: []string{"_time", "_value", "t2"},
GroupMode: functions.GroupModeExcept,
GroupKeys: []string{"_time", "_value", "t2"},
},
data: []query.Block{
&executetest.Block{
@ -207,7 +209,8 @@ func TestGroup_Process(t *testing.T) {
{
name: "fan out",
spec: &functions.GroupProcedureSpec{
By: []string{"t1"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1"},
},
data: []query.Block{&executetest.Block{
ColMeta: []query.ColMeta{
@ -248,7 +251,8 @@ func TestGroup_Process(t *testing.T) {
{
name: "fan out ignoring",
spec: &functions.GroupProcedureSpec{
Except: []string{"_time", "_value", "t2"},
GroupMode: functions.GroupModeExcept,
GroupKeys: []string{"_time", "_value", "t2"},
},
data: []query.Block{&executetest.Block{
ColMeta: []query.ColMeta{
@ -310,7 +314,8 @@ func TestGroup_Process(t *testing.T) {
func TestGroup_PushDown(t *testing.T) {
spec := &functions.GroupProcedureSpec{
By: []string{"t1", "t2"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"},
}
root := &plan.Procedure{
Spec: new(functions.FromProcedureSpec),
@ -318,7 +323,7 @@ func TestGroup_PushDown(t *testing.T) {
want := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
GroupingSet: true,
MergeAll: false,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"},
},
}
@ -327,12 +332,13 @@ func TestGroup_PushDown(t *testing.T) {
}
func TestGroup_PushDown_Duplicate(t *testing.T) {
spec := &functions.GroupProcedureSpec{
By: []string{"t1", "t2"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"},
}
root := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
GroupingSet: true,
MergeAll: true,
GroupMode: functions.GroupModeAll,
},
}
want := &plan.Procedure{

View File

@ -113,7 +113,7 @@ func (x Node_Logical) String() string {
func (Node_Logical) EnumDescriptor() ([]byte, []int) { return fileDescriptorPredicate, []int{0, 2} }
type Node struct {
NodeType Node_Type `protobuf:"varint,1,opt,name=node_type,json=nodeType,proto3,enum=storage.Node_Type" json:"nodeType"`
NodeType Node_Type `protobuf:"varint,1,opt,name=node_type,json=nodeType,proto3,enum=com.github.influxdata.influxdb.services.storage.Node_Type" json:"nodeType"`
Children []*Node `protobuf:"bytes,2,rep,name=children" json:"children,omitempty"`
// Types that are valid to be assigned to Value:
// *Node_StringValue
@ -165,10 +165,10 @@ type Node_FieldRefValue struct {
FieldRefValue string `protobuf:"bytes,10,opt,name=field_ref_value,json=fieldRefValue,proto3,oneof"`
}
type Node_Logical_ struct {
Logical Node_Logical `protobuf:"varint,11,opt,name=logical,proto3,enum=storage.Node_Logical,oneof"`
Logical Node_Logical `protobuf:"varint,11,opt,name=logical,proto3,enum=com.github.influxdata.influxdb.services.storage.Node_Logical,oneof"`
}
type Node_Comparison_ struct {
Comparison Node_Comparison `protobuf:"varint,12,opt,name=comparison,proto3,enum=storage.Node_Comparison,oneof"`
Comparison Node_Comparison `protobuf:"varint,12,opt,name=comparison,proto3,enum=com.github.influxdata.influxdb.services.storage.Node_Comparison,oneof"`
}
func (*Node_StringValue) isNode_Value() {}
@ -474,11 +474,11 @@ func (m *Predicate) GetRoot() *Node {
}
func init() {
proto.RegisterType((*Node)(nil), "storage.Node")
proto.RegisterType((*Predicate)(nil), "storage.Predicate")
proto.RegisterEnum("storage.Node_Type", Node_Type_name, Node_Type_value)
proto.RegisterEnum("storage.Node_Comparison", Node_Comparison_name, Node_Comparison_value)
proto.RegisterEnum("storage.Node_Logical", Node_Logical_name, Node_Logical_value)
proto.RegisterType((*Node)(nil), "com.github.influxdata.influxdb.services.storage.Node")
proto.RegisterType((*Predicate)(nil), "com.github.influxdata.influxdb.services.storage.Predicate")
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.Node_Type", Node_Type_name, Node_Type_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.Node_Comparison", Node_Comparison_name, Node_Comparison_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.Node_Logical", Node_Logical_name, Node_Logical_value)
}
func (m *Node) Marshal() (dAtA []byte, err error) {
size := m.Size()
@ -1287,58 +1287,61 @@ var (
func init() { proto.RegisterFile("predicate.proto", fileDescriptorPredicate) }
var fileDescriptorPredicate = []byte{
// 845 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x94, 0xcf, 0x6e, 0xdb, 0x46,
0x10, 0xc6, 0x45, 0x49, 0xb6, 0xc4, 0x91, 0x65, 0x33, 0x9b, 0x38, 0x56, 0xd9, 0x46, 0xda, 0x3a,
0x28, 0xa0, 0x1c, 0x2a, 0xc3, 0x6e, 0x73, 0x69, 0x0e, 0x05, 0xe5, 0xd0, 0xb2, 0x00, 0x56, 0x52,
0x29, 0xa6, 0xc9, 0x4d, 0xa0, 0xa5, 0x15, 0x4d, 0x80, 0xe1, 0xaa, 0xcb, 0x55, 0x91, 0xbc, 0x41,
0xc1, 0x53, 0xef, 0x05, 0x4f, 0x7d, 0x99, 0x02, 0x45, 0x81, 0x3e, 0x81, 0x50, 0xa8, 0xb7, 0x3e,
0x45, 0xc1, 0xe5, 0x3f, 0xa9, 0xc9, 0x6d, 0x67, 0xbe, 0xef, 0x37, 0xb3, 0xbb, 0x1c, 0x2e, 0x9c,
0xac, 0x18, 0x59, 0xb8, 0x73, 0x9b, 0x93, 0xde, 0x8a, 0x51, 0x4e, 0x51, 0x2d, 0xe0, 0x94, 0xd9,
0x0e, 0x51, 0xbf, 0x74, 0x5c, 0x7e, 0xbf, 0xbe, 0xeb, 0xcd, 0xe9, 0xdb, 0x0b, 0x87, 0x3a, 0xf4,
0x42, 0xe8, 0x77, 0xeb, 0xa5, 0x88, 0x44, 0x20, 0x56, 0x09, 0x77, 0xfe, 0x07, 0x40, 0x75, 0x44,
0x17, 0x04, 0x0d, 0x41, 0xf6, 0xe9, 0x82, 0xcc, 0xf8, 0xfb, 0x15, 0x69, 0x49, 0x58, 0xea, 0x1e,
0x5f, 0xa1, 0x5e, 0x5a, 0xb4, 0x17, 0x3b, 0x7a, 0xd6, 0xfb, 0x15, 0xe9, 0xb7, 0xb6, 0x9b, 0x4e,
0x3d, 0x0e, 0xe3, 0xe8, 0xdf, 0x4d, 0xa7, 0xee, 0xa7, 0x6b, 0x33, 0x5f, 0xa1, 0x67, 0x50, 0x9f,
0xdf, 0xbb, 0xde, 0x82, 0x11, 0xbf, 0x55, 0xc6, 0x95, 0x6e, 0xe3, 0xaa, 0xb9, 0x57, 0xc9, 0xcc,
0x65, 0xf4, 0x35, 0x1c, 0x05, 0x9c, 0xb9, 0xbe, 0x33, 0xfb, 0xc9, 0xf6, 0xd6, 0xa4, 0x55, 0xc1,
0x52, 0x57, 0xee, 0x9f, 0x6c, 0x37, 0x9d, 0xc6, 0x54, 0xe4, 0x7f, 0x88, 0xd3, 0xb7, 0x25, 0xb3,
0x11, 0x14, 0x21, 0xba, 0x04, 0xb8, 0xa3, 0xd4, 0x4b, 0x99, 0x2a, 0x96, 0xba, 0xf5, 0xbe, 0xb2,
0xdd, 0x74, 0x8e, 0xfa, 0x94, 0x7a, 0xc4, 0xf6, 0x33, 0x48, 0x8e, 0x5d, 0x09, 0x72, 0x01, 0xb2,
0xeb, 0xf3, 0x94, 0x38, 0xc0, 0x52, 0xb7, 0x92, 0x10, 0x43, 0x9f, 0x13, 0x87, 0xb0, 0x8c, 0xa8,
0xbb, 0x3e, 0x4f, 0x80, 0x2b, 0x80, 0x75, 0x41, 0x1c, 0x62, 0xa9, 0x5b, 0xed, 0x3f, 0xd8, 0x6e,
0x3a, 0xcd, 0x57, 0x7e, 0xe0, 0x3a, 0x3e, 0x59, 0xe4, 0x4d, 0xd6, 0x39, 0x73, 0x09, 0x8d, 0xa5,
0x47, 0xed, 0x0c, 0xaa, 0x61, 0xa9, 0x2b, 0xf5, 0x8f, 0xb7, 0x9b, 0x0e, 0xdc, 0xc4, 0xe9, 0x8c,
0x80, 0x65, 0x1e, 0xc5, 0x08, 0x23, 0x0e, 0x79, 0x97, 0x22, 0x75, 0x71, 0x7e, 0x81, 0x98, 0x71,
0x3a, 0x47, 0x58, 0x1e, 0xa1, 0xe7, 0xd0, 0xe4, 0xb6, 0x33, 0x63, 0x64, 0x99, 0x42, 0x72, 0x71,
0x69, 0x96, 0xed, 0x98, 0x64, 0x99, 0x5f, 0x1a, 0x2f, 0x42, 0xf4, 0x02, 0x4e, 0x96, 0x2e, 0xf1,
0x16, 0x3b, 0x20, 0x08, 0x50, 0x9c, 0xea, 0x26, 0x96, 0x76, 0xd0, 0xe6, 0x72, 0x37, 0x81, 0x2e,
0xa1, 0xe6, 0x51, 0xc7, 0x9d, 0xdb, 0x5e, 0xab, 0x21, 0x66, 0xe3, 0x74, 0x7f, 0x36, 0x8c, 0x44,
0xbc, 0x2d, 0x99, 0x99, 0x0f, 0x7d, 0x03, 0x30, 0xa7, 0x6f, 0x57, 0x36, 0x73, 0x03, 0xea, 0xb7,
0x8e, 0x04, 0xd5, 0xda, 0xa7, 0xae, 0x73, 0x3d, 0x3e, 0x62, 0xe1, 0x3e, 0xff, 0xb5, 0x0c, 0x55,
0x31, 0x4a, 0xcf, 0x01, 0x19, 0xe3, 0xc1, 0xf0, 0x5a, 0x33, 0x66, 0xfa, 0x9b, 0x89, 0xa9, 0x4f,
0xa7, 0xc3, 0xf1, 0x48, 0x29, 0xa9, 0x4f, 0xc2, 0x08, 0x7f, 0x92, 0x8d, 0x61, 0xda, 0x5c, 0x7f,
0xb7, 0x62, 0x24, 0x08, 0x5c, 0xea, 0xa3, 0x17, 0x70, 0x7a, 0x3d, 0xfe, 0x6e, 0xa2, 0x99, 0xc3,
0xe9, 0x78, 0xb4, 0x4b, 0x4a, 0x2a, 0x0e, 0x23, 0xfc, 0x59, 0x46, 0x16, 0x1b, 0xd8, 0x81, 0x2f,
0x41, 0x99, 0x68, 0xa6, 0xbe, 0xc7, 0x95, 0xd5, 0x4f, 0xc3, 0x08, 0x9f, 0x65, 0xdc, 0xc4, 0x66,
0x64, 0x17, 0xe9, 0x40, 0xcd, 0xd2, 0x06, 0x33, 0x53, 0xbf, 0x51, 0x2a, 0x2a, 0x0a, 0x23, 0x7c,
0x9c, 0x39, 0x93, 0x0f, 0x82, 0x30, 0xd4, 0x8c, 0xa1, 0xa5, 0x9b, 0x9a, 0xa1, 0x54, 0xd5, 0x87,
0x61, 0x84, 0x4f, 0xf2, 0xcd, 0xbb, 0x9c, 0x30, 0xdb, 0x43, 0x4f, 0x41, 0xbe, 0x19, 0xea, 0xc6,
0x4b, 0x51, 0xe4, 0x40, 0x7d, 0x14, 0x46, 0x58, 0xc9, 0x3c, 0xd9, 0xc7, 0x51, 0xab, 0x3f, 0xff,
0xd6, 0x2e, 0x9d, 0xff, 0x59, 0x06, 0x28, 0x76, 0x8e, 0xda, 0x70, 0xa0, 0x7f, 0xff, 0x4a, 0x33,
0x94, 0x52, 0x52, 0x79, 0xe7, 0x50, 0x3f, 0xae, 0x6d, 0x0f, 0x7d, 0x01, 0xf2, 0x68, 0x6c, 0xcd,
0x12, 0x8f, 0xa4, 0x3e, 0x0e, 0x23, 0x8c, 0x0a, 0xcf, 0x88, 0xf2, 0xc4, 0xf6, 0x0c, 0x1a, 0x53,
0x4b, 0x33, 0xad, 0xe9, 0xec, 0xf5, 0xd0, 0xba, 0x55, 0xca, 0x6a, 0x2b, 0x8c, 0xf0, 0xa3, 0xc2,
0x38, 0xe5, 0x36, 0xe3, 0xc1, 0x6b, 0x97, 0xdf, 0xc7, 0x1d, 0x4d, 0x7d, 0xa0, 0xbf, 0x51, 0x2a,
0xff, 0xef, 0x28, 0x86, 0x36, 0xeb, 0x98, 0x78, 0xaa, 0x1f, 0xe9, 0x98, 0xd8, 0x54, 0x28, 0x1b,
0x96, 0x72, 0x90, 0x5c, 0x58, 0xa1, 0x1b, 0x24, 0x08, 0x10, 0x86, 0x8a, 0x61, 0xe9, 0xca, 0xa1,
0x7a, 0x16, 0x46, 0xf8, 0xe1, 0xbe, 0x98, 0xec, 0xf7, 0x09, 0x94, 0x07, 0x96, 0x52, 0x53, 0x4f,
0xc3, 0x08, 0x3f, 0x28, 0x0c, 0x03, 0x46, 0x6c, 0x4e, 0x18, 0x7a, 0x0a, 0x95, 0x81, 0xa5, 0x2b,
0x75, 0x55, 0x0d, 0x23, 0xfc, 0xf8, 0x03, 0x5d, 0xd4, 0x48, 0xef, 0xf3, 0x5b, 0xa8, 0xa5, 0x23,
0x84, 0xce, 0xa0, 0xa2, 0x8d, 0x5e, 0x2a, 0x25, 0xf5, 0x38, 0x8c, 0x30, 0xa4, 0x59, 0xcd, 0x5f,
0xa0, 0x53, 0x28, 0x8f, 0x4d, 0x45, 0x52, 0x9b, 0x61, 0x84, 0xe5, 0x34, 0x3f, 0x66, 0x49, 0x81,
0x7e, 0x0d, 0x0e, 0xc4, 0x0f, 0x75, 0xde, 0x03, 0x79, 0x92, 0x3d, 0xcc, 0xe8, 0x73, 0xa8, 0x32,
0x4a, 0xb9, 0x78, 0x4c, 0x3f, 0x78, 0x02, 0x85, 0xd4, 0x57, 0x7e, 0xdf, 0xb6, 0xa5, 0xbf, 0xb6,
0x6d, 0xe9, 0xef, 0x6d, 0x5b, 0xfa, 0xe5, 0x9f, 0x76, 0xe9, 0xee, 0x50, 0x3c, 0xcb, 0x5f, 0xfd,
0x17, 0x00, 0x00, 0xff, 0xff, 0xfb, 0xde, 0x9f, 0x18, 0xe1, 0x05, 0x00, 0x00,
// 883 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x95, 0xcb, 0x6e, 0xdb, 0x46,
0x14, 0x86, 0x45, 0x5d, 0x2c, 0xe9, 0xc8, 0x17, 0x66, 0x12, 0xc7, 0x0a, 0xdb, 0x48, 0x03, 0x07,
0x05, 0xd4, 0x45, 0x65, 0xd8, 0xad, 0x37, 0x0d, 0x8a, 0x56, 0x72, 0x68, 0x59, 0x00, 0x2b, 0x29,
0x14, 0x73, 0x69, 0x37, 0x02, 0x25, 0x8d, 0x68, 0x02, 0x34, 0x47, 0x19, 0x8e, 0x02, 0xe7, 0x0d,
0x0a, 0xae, 0xba, 0x2f, 0xb8, 0xea, 0xcb, 0x74, 0x53, 0xa0, 0x4f, 0x20, 0x14, 0xea, 0xae, 0x7d,
0x89, 0x82, 0xc3, 0x9b, 0xdc, 0x76, 0x53, 0xef, 0xe6, 0x9c, 0xf9, 0xbf, 0xff, 0xcc, 0x1c, 0x1e,
0x92, 0x70, 0xb0, 0x64, 0x64, 0x6e, 0xcf, 0x4c, 0x4e, 0xda, 0x4b, 0x46, 0x39, 0x45, 0x27, 0x33,
0x7a, 0xd3, 0xb6, 0x6c, 0x7e, 0xbd, 0x9a, 0xb6, 0x6d, 0x77, 0xe1, 0xac, 0x6e, 0xe7, 0x26, 0x37,
0x93, 0xe5, 0xb4, 0xed, 0x11, 0xf6, 0xde, 0x9e, 0x11, 0xaf, 0xed, 0x71, 0xca, 0x4c, 0x8b, 0x28,
0x9f, 0xc5, 0xe2, 0x19, 0xbd, 0x39, 0xb1, 0xa8, 0x45, 0x4f, 0x84, 0xcf, 0x74, 0xb5, 0x10, 0x91,
0x08, 0xc4, 0x2a, 0xf2, 0x3f, 0xfe, 0xab, 0x06, 0xc5, 0x01, 0x9d, 0x13, 0xf4, 0x0e, 0xaa, 0x2e,
0x9d, 0x93, 0x09, 0xff, 0xb0, 0x24, 0x75, 0x09, 0x4b, 0xad, 0xfd, 0xb3, 0x2f, 0xdb, 0xff, 0xb3,
0x78, 0x3b, 0x74, 0x6a, 0x1b, 0x1f, 0x96, 0xa4, 0x5b, 0xdf, 0xac, 0x9b, 0x95, 0x30, 0x0c, 0xa3,
0x3f, 0xd7, 0xcd, 0x8a, 0x1b, 0xaf, 0xf5, 0x74, 0x85, 0x5e, 0x42, 0x65, 0x76, 0x6d, 0x3b, 0x73,
0x46, 0xdc, 0x7a, 0x1e, 0x17, 0x5a, 0xb5, 0xb3, 0xf3, 0x7b, 0x55, 0xd4, 0x53, 0x1b, 0xf4, 0x05,
0xec, 0x7a, 0x9c, 0xd9, 0xae, 0x35, 0x79, 0x6f, 0x3a, 0x2b, 0x52, 0x2f, 0x60, 0xa9, 0x55, 0xed,
0x1e, 0x6c, 0xd6, 0xcd, 0xda, 0x58, 0xe4, 0x5f, 0x87, 0xe9, 0xab, 0x9c, 0x5e, 0xf3, 0xb2, 0x10,
0x9d, 0x02, 0x4c, 0x29, 0x75, 0x62, 0xa6, 0x88, 0xa5, 0x56, 0xa5, 0x2b, 0x6f, 0xd6, 0xcd, 0xdd,
0x2e, 0xa5, 0x0e, 0x31, 0xdd, 0x04, 0xaa, 0x86, 0xaa, 0x08, 0x39, 0x81, 0xaa, 0xed, 0xf2, 0x98,
0x28, 0x61, 0xa9, 0x55, 0x88, 0x88, 0xbe, 0xcb, 0x89, 0x45, 0x58, 0x42, 0x54, 0x6c, 0x97, 0x47,
0xc0, 0x19, 0xc0, 0x2a, 0x23, 0x76, 0xb0, 0xd4, 0x2a, 0x76, 0x1f, 0x6c, 0xd6, 0xcd, 0xbd, 0x57,
0xae, 0x67, 0x5b, 0x2e, 0x99, 0xa7, 0x45, 0x56, 0x29, 0x73, 0x0a, 0xb5, 0x85, 0x43, 0xcd, 0x04,
0x2a, 0x63, 0xa9, 0x25, 0x75, 0xf7, 0x37, 0xeb, 0x26, 0x5c, 0x86, 0xe9, 0x84, 0x80, 0x45, 0x1a,
0x85, 0x08, 0x23, 0x16, 0xb9, 0x8d, 0x91, 0x8a, 0xb8, 0xbf, 0x40, 0xf4, 0x30, 0x9d, 0x22, 0x2c,
0x8d, 0xd0, 0x39, 0xec, 0x71, 0xd3, 0x9a, 0x30, 0xb2, 0x88, 0xa1, 0x6a, 0xd6, 0x34, 0xc3, 0xb4,
0x74, 0xb2, 0x48, 0x9b, 0xc6, 0xb3, 0x10, 0x3d, 0x87, 0x83, 0x85, 0x4d, 0x9c, 0xf9, 0x16, 0x08,
0x02, 0x14, 0xb7, 0xba, 0x0c, 0xb7, 0xb6, 0xd0, 0xbd, 0xc5, 0x76, 0x02, 0x7d, 0x07, 0x65, 0x87,
0x5a, 0xf6, 0xcc, 0x74, 0xea, 0x35, 0x31, 0x6b, 0x5f, 0xdd, 0x6f, 0xd6, 0xb4, 0xc8, 0xe4, 0x2a,
0xa7, 0x27, 0x7e, 0x68, 0x0a, 0x30, 0xa3, 0x37, 0x4b, 0x93, 0xd9, 0x1e, 0x75, 0xeb, 0xbb, 0xc2,
0xfd, 0x9b, 0xfb, 0xb9, 0x5f, 0xa4, 0x3e, 0x61, 0xcb, 0x32, 0xd7, 0xe3, 0x9f, 0xf2, 0x50, 0x14,
0x23, 0x7c, 0x0e, 0x48, 0x1b, 0xf6, 0xfa, 0x17, 0x1d, 0x6d, 0xa2, 0xbe, 0x1d, 0xe9, 0xea, 0x78,
0xdc, 0x1f, 0x0e, 0xe4, 0x9c, 0xf2, 0xd4, 0x0f, 0xf0, 0x93, 0x64, 0xfc, 0xe3, 0x43, 0xaa, 0xb7,
0x4b, 0x46, 0x3c, 0xcf, 0xa6, 0x2e, 0x7a, 0x0e, 0x87, 0x17, 0xc3, 0x6f, 0x47, 0x1d, 0xbd, 0x3f,
0x1e, 0x0e, 0xb6, 0x49, 0x49, 0xc1, 0x7e, 0x80, 0x3f, 0x4e, 0xc8, 0xec, 0x00, 0x5b, 0xf0, 0x29,
0xc8, 0xa3, 0x8e, 0xae, 0xde, 0xe1, 0xf2, 0xca, 0x47, 0x7e, 0x80, 0x8f, 0x12, 0x6e, 0x64, 0x32,
0xb2, 0x8d, 0x34, 0xa1, 0x6c, 0x74, 0x7a, 0x13, 0x5d, 0xbd, 0x94, 0x0b, 0x0a, 0xf2, 0x03, 0xbc,
0x9f, 0x28, 0xa3, 0x07, 0x8c, 0x30, 0x94, 0xb5, 0xbe, 0xa1, 0xea, 0x1d, 0x4d, 0x2e, 0x2a, 0x0f,
0xfd, 0x00, 0x1f, 0xa4, 0x87, 0xb7, 0x39, 0x61, 0xa6, 0x83, 0x9e, 0x41, 0xf5, 0xb2, 0xaf, 0x6a,
0x2f, 0x84, 0x49, 0x49, 0x79, 0xe4, 0x07, 0x58, 0x4e, 0x34, 0xc9, 0xc3, 0x56, 0x8a, 0x3f, 0xfc,
0xdc, 0xc8, 0x1d, 0xff, 0x9a, 0x07, 0xc8, 0x4e, 0x8e, 0x1a, 0x50, 0x52, 0x5f, 0xbe, 0xea, 0x68,
0x72, 0x2e, 0x72, 0xde, 0xba, 0xd4, 0xbb, 0x95, 0xe9, 0xa0, 0x4f, 0xa0, 0x3a, 0x18, 0x1a, 0x93,
0x48, 0x23, 0x29, 0x8f, 0xfd, 0x00, 0xa3, 0x4c, 0x33, 0xa0, 0x3c, 0x92, 0x7d, 0x0a, 0xb5, 0xb1,
0xd1, 0xd1, 0x8d, 0xf1, 0xe4, 0x4d, 0xdf, 0xb8, 0x92, 0xf3, 0x4a, 0xdd, 0x0f, 0xf0, 0xa3, 0x4c,
0x38, 0xe6, 0x26, 0xe3, 0xde, 0x1b, 0x9b, 0x5f, 0x87, 0x15, 0x75, 0xb5, 0xa7, 0xbe, 0x95, 0x0b,
0xff, 0xac, 0x28, 0x5e, 0x82, 0xa4, 0x62, 0xa4, 0x29, 0xfe, 0x47, 0xc5, 0x48, 0xa6, 0x40, 0x5e,
0x33, 0xe4, 0x52, 0xd4, 0xb0, 0x6c, 0x5f, 0x23, 0x9e, 0x87, 0x30, 0x14, 0x34, 0x43, 0x95, 0x77,
0x94, 0x23, 0x3f, 0xc0, 0x0f, 0xef, 0x6e, 0x46, 0xe7, 0x7d, 0x0a, 0xf9, 0x9e, 0x21, 0x97, 0x95,
0x43, 0x3f, 0xc0, 0x0f, 0x32, 0x41, 0x8f, 0x11, 0x93, 0x13, 0x86, 0x9e, 0x41, 0xa1, 0x67, 0xa8,
0x72, 0x45, 0x51, 0xfc, 0x00, 0x3f, 0xfe, 0xd7, 0xbe, 0xf0, 0x88, 0xfb, 0xf9, 0x35, 0x94, 0xe3,
0x11, 0x42, 0x47, 0x50, 0xe8, 0x0c, 0x5e, 0xc8, 0x39, 0x65, 0xdf, 0x0f, 0x30, 0xc4, 0xd9, 0x8e,
0x3b, 0x47, 0x87, 0x90, 0x1f, 0xea, 0xb2, 0xa4, 0xec, 0xf9, 0x01, 0xae, 0xc6, 0xf9, 0x21, 0x8b,
0x0c, 0xba, 0x65, 0x28, 0x89, 0x17, 0xf4, 0xf8, 0x35, 0x54, 0x47, 0xc9, 0x0f, 0x06, 0xf5, 0xa1,
0xc8, 0x28, 0xe5, 0xe2, 0x63, 0x7f, 0xef, 0x4f, 0xaf, 0xb0, 0xe8, 0x3e, 0xf9, 0x65, 0xd3, 0x90,
0x7e, 0xdb, 0x34, 0xa4, 0xdf, 0x37, 0x0d, 0xe9, 0xc7, 0x3f, 0x1a, 0xb9, 0xef, 0xcb, 0xb1, 0x6a,
0xba, 0x23, 0xfe, 0x33, 0x9f, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x37, 0x3b, 0xbc, 0x43, 0xda,
0x06, 0x00, 0x00,
}

View File

@ -90,19 +90,23 @@ func (bi *bockIterator) Do(f func(query.Block) error) error {
req.Descending = bi.readSpec.Descending
req.TimestampRange.Start = int64(bi.bounds.Start)
req.TimestampRange.End = int64(bi.bounds.Stop)
req.Grouping = bi.readSpec.GroupKeys
req.Group = convertGroupMode(bi.readSpec.GroupMode)
req.GroupKeys = bi.readSpec.GroupKeys
req.SeriesLimit = bi.readSpec.SeriesLimit
req.PointsLimit = bi.readSpec.PointsLimit
req.SeriesOffset = bi.readSpec.SeriesOffset
req.Trace = bi.trace
if req.PointsLimit == -1 {
req.Hints.SetNoPoints()
}
if agg, err := determineAggregateMethod(bi.readSpec.AggregateMethod); err != nil {
return err
} else if agg != AggregateTypeNone {
req.Aggregate = &Aggregate{Type: agg}
}
isGrouping := req.Group != GroupAll
streams := make([]*streamState, 0, len(bi.conns))
for _, c := range bi.conns {
if len(bi.readSpec.Hosts) > 0 {
@ -126,12 +130,21 @@ func (bi *bockIterator) Do(f func(query.Block) error) error {
bounds: bi.bounds,
stream: stream,
readSpec: &bi.readSpec,
group: isGrouping,
})
}
ms := &mergedStreams{
streams: streams,
}
if isGrouping {
return bi.handleGroupRead(f, ms)
}
return bi.handleRead(f, ms)
}
func (bi *bockIterator) handleRead(f func(query.Block) error, ms *mergedStreams) error {
for ms.more() {
if p := ms.peek(); readFrameType(p) != seriesType {
//This means the consumer didn't read all the data off the block
@ -141,8 +154,38 @@ func (bi *bockIterator) Do(f func(query.Block) error) error {
s := frame.GetSeries()
typ := convertDataType(s.DataType)
key := partitionKeyForSeries(s, &bi.readSpec, bi.bounds)
cols := bi.determineBlockCols(s, typ)
block := newBlock(bi.bounds, key, cols, ms, &bi.readSpec, s.Tags)
cols, defs := determineBlockColsForSeries(s, typ)
block := newBlock(bi.bounds, key, cols, ms, &bi.readSpec, s.Tags, defs)
if err := f(block); err != nil {
// TODO(nathanielc): Close streams since we have abandoned the request
return err
}
// Wait until the block has been read.
block.wait()
}
return nil
}
func (bi *bockIterator) handleGroupRead(f func(query.Block) error, ms *mergedStreams) error {
for ms.more() {
if p := ms.peek(); readFrameType(p) != groupType {
//This means the consumer didn't read all the data off the block
return errors.New("internal error: short read")
}
frame := ms.next()
s := frame.GetGroup()
key := partitionKeyForGroup(s, &bi.readSpec, bi.bounds)
// try to infer type
// TODO(sgc): this is a hack
typ := query.TString
if p := ms.peek(); readFrameType(p) == seriesType {
typ = convertDataType(p.GetSeries().DataType)
}
cols, defs := determineBlockColsForGroup(s, typ)
block := newBlock(bi.bounds, key, cols, ms, &bi.readSpec, nil, defs)
if err := f(block); err != nil {
// TODO(nathanielc): Close streams since we have abandoned the request
@ -165,6 +208,22 @@ func determineAggregateMethod(agg string) (Aggregate_AggregateType, error) {
return 0, fmt.Errorf("unknown aggregate type %q", agg)
}
func convertGroupMode(m storage.GroupMode) ReadRequest_Group {
switch m {
case storage.GroupModeNone:
return GroupNone
case storage.GroupModeBy:
return GroupBy
case storage.GroupModeExcept:
return GroupExcept
case storage.GroupModeDefault, storage.GroupModeAll:
fallthrough
default:
return GroupAll
}
}
func convertDataType(t ReadResponse_DataType) query.DataType {
switch t {
case DataTypeFloat:
@ -189,8 +248,9 @@ const (
valueColIdx = 3
)
func (bi *bockIterator) determineBlockCols(s *ReadResponse_SeriesFrame, typ query.DataType) []query.ColMeta {
func determineBlockColsForSeries(s *ReadResponse_SeriesFrame, typ query.DataType) ([]query.ColMeta, [][]byte) {
cols := make([]query.ColMeta, 4+len(s.Tags))
defs := make([][]byte, 4+len(s.Tags))
cols[startColIdx] = query.ColMeta{
Label: execute.DefaultStartColLabel,
Type: query.TTime,
@ -212,8 +272,9 @@ func (bi *bockIterator) determineBlockCols(s *ReadResponse_SeriesFrame, typ quer
Label: string(tag.Key),
Type: query.TString,
}
defs[4+j] = []byte("")
}
return cols
return cols, defs
}
func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey {
@ -229,37 +290,90 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp
Type: query.TTime,
}
values[1] = bnds.Stop
if len(readSpec.GroupKeys) > 0 {
for _, tag := range s.Tags {
if !execute.ContainsStr(readSpec.GroupKeys, string(tag.Key)) {
continue
}
switch readSpec.GroupMode {
case storage.GroupModeBy:
// partition key in GroupKeys order, including tags in the GroupKeys slice
for _, k := range readSpec.GroupKeys {
if i := indexOfTag(s.Tags, k); i < len(s.Tags) {
cols = append(cols, query.ColMeta{
Label: string(tag.Key),
Label: string(s.Tags[i].Key),
Type: query.TString,
})
values = append(values, string(tag.Value))
values = append(values, string(s.Tags[i].Value))
}
} else if len(readSpec.GroupExcept) > 0 {
for _, tag := range s.Tags {
if !execute.ContainsStr(readSpec.GroupExcept, string(tag.Key)) {
continue
}
case storage.GroupModeExcept:
// partition key in GroupKeys order, skipping tags in the GroupKeys slice
for _, k := range readSpec.GroupKeys {
if i := indexOfTag(s.Tags, k); i == len(s.Tags) {
cols = append(cols, query.ColMeta{
Label: string(tag.Key),
Label: string(s.Tags[i].Key),
Type: query.TString,
})
values = append(values, string(tag.Value))
values = append(values, string(s.Tags[i].Value))
}
} else if !readSpec.MergeAll {
for _, tag := range s.Tags {
}
case storage.GroupModeAll:
for i := range s.Tags {
cols = append(cols, query.ColMeta{
Label: string(tag.Key),
Label: string(s.Tags[i].Key),
Type: query.TString,
})
values = append(values, string(tag.Value))
values = append(values, string(s.Tags[i].Value))
}
}
return execute.NewPartitionKey(cols, values)
}
func determineBlockColsForGroup(f *ReadResponse_GroupFrame, typ query.DataType) ([]query.ColMeta, [][]byte) {
cols := make([]query.ColMeta, 4+len(f.TagKeys))
defs := make([][]byte, 4+len(f.TagKeys))
cols[startColIdx] = query.ColMeta{
Label: execute.DefaultStartColLabel,
Type: query.TTime,
}
cols[stopColIdx] = query.ColMeta{
Label: execute.DefaultStopColLabel,
Type: query.TTime,
}
cols[timeColIdx] = query.ColMeta{
Label: execute.DefaultTimeColLabel,
Type: query.TTime,
}
cols[valueColIdx] = query.ColMeta{
Label: execute.DefaultValueColLabel,
Type: typ,
}
for j, tag := range f.TagKeys {
cols[4+j] = query.ColMeta{
Label: string(tag),
Type: query.TString,
}
defs[4+j] = []byte("")
}
return cols, defs
}
func partitionKeyForGroup(g *ReadResponse_GroupFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey {
cols := make([]query.ColMeta, 2, len(readSpec.GroupKeys)+2)
values := make([]interface{}, 2, len(readSpec.GroupKeys)+2)
cols[0] = query.ColMeta{
Label: execute.DefaultStartColLabel,
Type: query.TTime,
}
values[0] = bnds.Start
cols[1] = query.ColMeta{
Label: execute.DefaultStopColLabel,
Type: query.TTime,
}
values[1] = bnds.Stop
for i := range readSpec.GroupKeys {
cols = append(cols, query.ColMeta{
Label: readSpec.GroupKeys[i],
Type: query.TString,
})
values = append(values, string(g.PartitionKeyVals[i]))
}
return execute.NewPartitionKey(cols, values)
}
@ -274,6 +388,7 @@ type block struct {
// cache of the tags on the current series.
// len(tags) == len(colMeta)
tags [][]byte
defs [][]byte
readSpec *storage.ReadSpec
@ -306,11 +421,13 @@ func newBlock(
ms *mergedStreams,
readSpec *storage.ReadSpec,
tags []Tag,
defs [][]byte,
) *block {
b := &block{
bounds: bounds,
key: key,
tags: make([][]byte, len(cols)),
defs: defs,
colBufs: make([]interface{}, len(cols)),
cols: cols,
readSpec: readSpec,
@ -383,8 +500,13 @@ func (b *block) Times(j int) []execute.Time {
// readTags populates b.tags with the provided tags
func (b *block) readTags(tags []Tag) {
for j := range b.tags {
b.tags[j] = nil
b.tags[j] = b.defs[j]
}
if len(tags) == 0 {
return
}
for _, t := range tags {
k := string(t.Key)
j := execute.ColIdx(k, b.cols)
@ -403,6 +525,8 @@ func (b *block) advance() bool {
b.floatBuf = b.floatBuf[0:0]
switch p := b.ms.peek(); readFrameType(p) {
case groupType:
return false
case seriesType:
if !b.ms.key().Equal(b.key) {
// We have reached the end of data for this block
@ -636,6 +760,7 @@ type streamState struct {
currentKey query.PartitionKey
readSpec *storage.ReadSpec
finished bool
group bool
}
func (s *streamState) peek() ReadResponse_Frame {
@ -671,11 +796,21 @@ func (s *streamState) key() query.PartitionKey {
func (s *streamState) computeKey() {
// Determine new currentKey
if p := s.peek(); readFrameType(p) == seriesType {
p := s.peek()
ft := readFrameType(p)
if s.group {
if ft == groupType {
group := p.GetGroup()
s.currentKey = partitionKeyForGroup(group, s.readSpec, s.bounds)
}
} else {
if ft == seriesType {
series := p.GetSeries()
s.currentKey = partitionKeyForSeries(series, s.readSpec, s.bounds)
}
}
}
func (s *streamState) next() ReadResponse_Frame {
frame := s.rep.Frames[0]
s.rep.Frames = s.rep.Frames[1:]
@ -758,6 +893,7 @@ type frameType int
const (
seriesType frameType = iota
groupType
boolPointsType
intPointsType
uintPointsType
@ -769,6 +905,8 @@ func readFrameType(frame ReadResponse_Frame) frameType {
switch frame.Data.(type) {
case *ReadResponse_Frame_Series:
return seriesType
case *ReadResponse_Frame_Group:
return groupType
case *ReadResponse_Frame_BooleanPoints:
return boolPointsType
case *ReadResponse_Frame_IntegerPoints:

View File

@ -0,0 +1,52 @@
package pb
import (
"strings"
"github.com/gogo/protobuf/proto"
"sort"
)
type HintFlags uint32
func (h HintFlags) NoPoints() bool {
return uint32(h)&uint32(HintNoPoints) != 0
}
func (h *HintFlags) SetNoPoints() {
*h |= HintFlags(HintNoPoints)
}
func (h HintFlags) NoSeries() bool {
return uint32(h)&uint32(HintNoSeries) != 0
}
func (h *HintFlags) SetNoSeries() {
*h |= HintFlags(HintNoSeries)
}
func (h HintFlags) String() string {
f := uint32(h)
var s []string
enums := proto.EnumValueMap("com.github.influxdata.influxdb.services.storage.ReadRequest_HintFlags")
if h == 0 {
return "HINT_NONE"
}
for k, v := range enums {
if v == 0 {
continue
}
v := uint32(v)
if f&v == v {
s = append(s, k)
}
}
return strings.Join(s, ",")
}
func indexOfTag(t []Tag, k string) int {
return sort.Search(len(t), func(i int) bool { return string(t[i].Key) >= k })
}

View File

@ -41,6 +41,65 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type ReadRequest_Group int32
const (
// GroupNone returns all series as a single group.
// The single GroupFrame.TagKeys will be the union of all tag keys.
GroupNone ReadRequest_Group = 0
// GroupAll returns a unique group for each series.
// As an optimization, no GroupFrames will be generated.
GroupAll ReadRequest_Group = 1
// GroupBy returns a group for each unique value of the specified GroupKeys.
GroupBy ReadRequest_Group = 2
// GroupExcept in not implemented.
GroupExcept ReadRequest_Group = 3
)
var ReadRequest_Group_name = map[int32]string{
0: "GROUP_NONE",
1: "GROUP_ALL",
2: "GROUP_BY",
3: "GROUP_EXCEPT",
}
var ReadRequest_Group_value = map[string]int32{
"GROUP_NONE": 0,
"GROUP_ALL": 1,
"GROUP_BY": 2,
"GROUP_EXCEPT": 3,
}
func (x ReadRequest_Group) String() string {
return proto.EnumName(ReadRequest_Group_name, int32(x))
}
func (ReadRequest_Group) EnumDescriptor() ([]byte, []int) { return fileDescriptorStorage, []int{0, 0} }
type ReadRequest_HintFlags int32
const (
HintNone ReadRequest_HintFlags = 0
HintNoPoints ReadRequest_HintFlags = 1
HintNoSeries ReadRequest_HintFlags = 2
)
var ReadRequest_HintFlags_name = map[int32]string{
0: "HINT_NONE",
1: "HINT_NO_POINTS",
2: "HINT_NO_SERIES",
}
var ReadRequest_HintFlags_value = map[string]int32{
"HINT_NONE": 0,
"HINT_NO_POINTS": 1,
"HINT_NO_SERIES": 2,
}
func (x ReadRequest_HintFlags) String() string {
return proto.EnumName(ReadRequest_HintFlags_name, int32(x))
}
func (ReadRequest_HintFlags) EnumDescriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{0, 1}
}
type Aggregate_AggregateType int32
const (
@ -129,8 +188,11 @@ type ReadRequest struct {
TimestampRange TimestampRange `protobuf:"bytes,2,opt,name=timestamp_range,json=timestampRange" json:"timestamp_range"`
// Descending indicates whether points should be returned in descending order.
Descending bool `protobuf:"varint,3,opt,name=descending,proto3" json:"descending,omitempty"`
// Grouping specifies a list of tags used to order the data
Grouping []string `protobuf:"bytes,4,rep,name=grouping" json:"grouping,omitempty"`
// GroupKeys specifies a list of tag keys used to order the data. It is dependent on the Group property to determine
// its behavior.
GroupKeys []string `protobuf:"bytes,4,rep,name=group_keys,json=groupKeys" json:"group_keys,omitempty"`
//
Group ReadRequest_Group `protobuf:"varint,11,opt,name=group,proto3,enum=com.github.influxdata.influxdb.services.storage.ReadRequest_Group" json:"group,omitempty"`
// Aggregate specifies an optional aggregate to apply to the data.
// TODO(sgc): switch to slice for multiple aggregates in a single request
Aggregate *Aggregate `protobuf:"bytes,9,opt,name=aggregate" json:"aggregate,omitempty"`
@ -144,6 +206,9 @@ type ReadRequest struct {
PointsLimit int64 `protobuf:"varint,8,opt,name=points_limit,json=pointsLimit,proto3" json:"points_limit,omitempty"`
// Trace contains opaque data if a trace is active.
Trace map[string]string `protobuf:"bytes,10,rep,name=trace" json:"trace,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// Hints is a bitwise OR of HintFlags to control the behavior
// of the read request.
Hints HintFlags `protobuf:"fixed32,12,opt,name=hints,proto3,casttype=HintFlags" json:"hints,omitempty"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
@ -152,7 +217,7 @@ func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptorStorage, []int{0} }
type Aggregate struct {
Type Aggregate_AggregateType `protobuf:"varint,1,opt,name=type,proto3,enum=storage.Aggregate_AggregateType" json:"type,omitempty"`
Type Aggregate_AggregateType `protobuf:"varint,1,opt,name=type,proto3,enum=com.github.influxdata.influxdb.services.storage.Aggregate_AggregateType" json:"type,omitempty"`
}
func (m *Aggregate) Reset() { *m = Aggregate{} }
@ -182,6 +247,7 @@ func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptorStorage
type ReadResponse_Frame struct {
// Types that are valid to be assigned to Data:
// *ReadResponse_Frame_Group
// *ReadResponse_Frame_Series
// *ReadResponse_Frame_FloatPoints
// *ReadResponse_Frame_IntegerPoints
@ -202,6 +268,9 @@ type isReadResponse_Frame_Data interface {
Size() int
}
type ReadResponse_Frame_Group struct {
Group *ReadResponse_GroupFrame `protobuf:"bytes,7,opt,name=group,oneof"`
}
type ReadResponse_Frame_Series struct {
Series *ReadResponse_SeriesFrame `protobuf:"bytes,1,opt,name=series,oneof"`
}
@ -221,6 +290,7 @@ type ReadResponse_Frame_StringPoints struct {
StringPoints *ReadResponse_StringPointsFrame `protobuf:"bytes,6,opt,name=string_points,json=stringPoints,oneof"`
}
func (*ReadResponse_Frame_Group) isReadResponse_Frame_Data() {}
func (*ReadResponse_Frame_Series) isReadResponse_Frame_Data() {}
func (*ReadResponse_Frame_FloatPoints) isReadResponse_Frame_Data() {}
func (*ReadResponse_Frame_IntegerPoints) isReadResponse_Frame_Data() {}
@ -235,6 +305,13 @@ func (m *ReadResponse_Frame) GetData() isReadResponse_Frame_Data {
return nil
}
func (m *ReadResponse_Frame) GetGroup() *ReadResponse_GroupFrame {
if x, ok := m.GetData().(*ReadResponse_Frame_Group); ok {
return x.Group
}
return nil
}
func (m *ReadResponse_Frame) GetSeries() *ReadResponse_SeriesFrame {
if x, ok := m.GetData().(*ReadResponse_Frame_Series); ok {
return x.Series
@ -280,6 +357,7 @@ func (m *ReadResponse_Frame) GetStringPoints() *ReadResponse_StringPointsFrame {
// XXX_OneofFuncs is for the internal use of the proto package.
func (*ReadResponse_Frame) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _ReadResponse_Frame_OneofMarshaler, _ReadResponse_Frame_OneofUnmarshaler, _ReadResponse_Frame_OneofSizer, []interface{}{
(*ReadResponse_Frame_Group)(nil),
(*ReadResponse_Frame_Series)(nil),
(*ReadResponse_Frame_FloatPoints)(nil),
(*ReadResponse_Frame_IntegerPoints)(nil),
@ -293,6 +371,11 @@ func _ReadResponse_Frame_OneofMarshaler(msg proto.Message, b *proto.Buffer) erro
m := msg.(*ReadResponse_Frame)
// data
switch x := m.Data.(type) {
case *ReadResponse_Frame_Group:
_ = b.EncodeVarint(7<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.Group); err != nil {
return err
}
case *ReadResponse_Frame_Series:
_ = b.EncodeVarint(1<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.Series); err != nil {
@ -333,6 +416,14 @@ func _ReadResponse_Frame_OneofMarshaler(msg proto.Message, b *proto.Buffer) erro
func _ReadResponse_Frame_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*ReadResponse_Frame)
switch tag {
case 7: // data.group
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(ReadResponse_GroupFrame)
err := b.DecodeMessage(msg)
m.Data = &ReadResponse_Frame_Group{msg}
return true, err
case 1: // data.series
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
@ -390,6 +481,11 @@ func _ReadResponse_Frame_OneofSizer(msg proto.Message) (n int) {
m := msg.(*ReadResponse_Frame)
// data
switch x := m.Data.(type) {
case *ReadResponse_Frame_Group:
s := proto.Size(x.Group)
n += proto.SizeVarint(7<<3 | proto.WireBytes)
n += proto.SizeVarint(uint64(s))
n += s
case *ReadResponse_Frame_Series:
s := proto.Size(x.Series)
n += proto.SizeVarint(1<<3 | proto.WireBytes)
@ -427,16 +523,30 @@ func _ReadResponse_Frame_OneofSizer(msg proto.Message) (n int) {
return n
}
type ReadResponse_GroupFrame struct {
// TagKeys
TagKeys [][]byte `protobuf:"bytes,1,rep,name=tag_keys,json=tagKeys" json:"tag_keys,omitempty"`
// PartitionKeyVals is the values of the partition key for this group, order matching ReadRequest.GroupKeys
PartitionKeyVals [][]byte `protobuf:"bytes,2,rep,name=partition_key_vals,json=partitionKeyVals" json:"partition_key_vals,omitempty"`
}
func (m *ReadResponse_GroupFrame) Reset() { *m = ReadResponse_GroupFrame{} }
func (m *ReadResponse_GroupFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_GroupFrame) ProtoMessage() {}
func (*ReadResponse_GroupFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 1}
}
type ReadResponse_SeriesFrame struct {
Tags []Tag `protobuf:"bytes,1,rep,name=tags" json:"tags"`
DataType ReadResponse_DataType `protobuf:"varint,2,opt,name=data_type,json=dataType,proto3,enum=storage.ReadResponse_DataType" json:"data_type,omitempty"`
DataType ReadResponse_DataType `protobuf:"varint,2,opt,name=data_type,json=dataType,proto3,enum=com.github.influxdata.influxdb.services.storage.ReadResponse_DataType" json:"data_type,omitempty"`
}
func (m *ReadResponse_SeriesFrame) Reset() { *m = ReadResponse_SeriesFrame{} }
func (m *ReadResponse_SeriesFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_SeriesFrame) ProtoMessage() {}
func (*ReadResponse_SeriesFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 1}
return fileDescriptorStorage, []int{3, 2}
}
type ReadResponse_FloatPointsFrame struct {
@ -448,7 +558,7 @@ func (m *ReadResponse_FloatPointsFrame) Reset() { *m = ReadResponse_Floa
func (m *ReadResponse_FloatPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_FloatPointsFrame) ProtoMessage() {}
func (*ReadResponse_FloatPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 2}
return fileDescriptorStorage, []int{3, 3}
}
type ReadResponse_IntegerPointsFrame struct {
@ -460,7 +570,7 @@ func (m *ReadResponse_IntegerPointsFrame) Reset() { *m = ReadResponse_In
func (m *ReadResponse_IntegerPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_IntegerPointsFrame) ProtoMessage() {}
func (*ReadResponse_IntegerPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 3}
return fileDescriptorStorage, []int{3, 4}
}
type ReadResponse_UnsignedPointsFrame struct {
@ -472,7 +582,7 @@ func (m *ReadResponse_UnsignedPointsFrame) Reset() { *m = ReadResponse_U
func (m *ReadResponse_UnsignedPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_UnsignedPointsFrame) ProtoMessage() {}
func (*ReadResponse_UnsignedPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 4}
return fileDescriptorStorage, []int{3, 5}
}
type ReadResponse_BooleanPointsFrame struct {
@ -484,7 +594,7 @@ func (m *ReadResponse_BooleanPointsFrame) Reset() { *m = ReadResponse_Bo
func (m *ReadResponse_BooleanPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_BooleanPointsFrame) ProtoMessage() {}
func (*ReadResponse_BooleanPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 5}
return fileDescriptorStorage, []int{3, 6}
}
type ReadResponse_StringPointsFrame struct {
@ -496,7 +606,7 @@ func (m *ReadResponse_StringPointsFrame) Reset() { *m = ReadResponse_Str
func (m *ReadResponse_StringPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_StringPointsFrame) ProtoMessage() {}
func (*ReadResponse_StringPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptorStorage, []int{3, 6}
return fileDescriptorStorage, []int{3, 7}
}
type CapabilitiesResponse struct {
@ -530,23 +640,26 @@ func (*TimestampRange) ProtoMessage() {}
func (*TimestampRange) Descriptor() ([]byte, []int) { return fileDescriptorStorage, []int{6} }
func init() {
proto.RegisterType((*ReadRequest)(nil), "storage.ReadRequest")
proto.RegisterType((*Aggregate)(nil), "storage.Aggregate")
proto.RegisterType((*Tag)(nil), "storage.Tag")
proto.RegisterType((*ReadResponse)(nil), "storage.ReadResponse")
proto.RegisterType((*ReadResponse_Frame)(nil), "storage.ReadResponse.Frame")
proto.RegisterType((*ReadResponse_SeriesFrame)(nil), "storage.ReadResponse.SeriesFrame")
proto.RegisterType((*ReadResponse_FloatPointsFrame)(nil), "storage.ReadResponse.FloatPointsFrame")
proto.RegisterType((*ReadResponse_IntegerPointsFrame)(nil), "storage.ReadResponse.IntegerPointsFrame")
proto.RegisterType((*ReadResponse_UnsignedPointsFrame)(nil), "storage.ReadResponse.UnsignedPointsFrame")
proto.RegisterType((*ReadResponse_BooleanPointsFrame)(nil), "storage.ReadResponse.BooleanPointsFrame")
proto.RegisterType((*ReadResponse_StringPointsFrame)(nil), "storage.ReadResponse.StringPointsFrame")
proto.RegisterType((*CapabilitiesResponse)(nil), "storage.CapabilitiesResponse")
proto.RegisterType((*HintsResponse)(nil), "storage.HintsResponse")
proto.RegisterType((*TimestampRange)(nil), "storage.TimestampRange")
proto.RegisterEnum("storage.Aggregate_AggregateType", Aggregate_AggregateType_name, Aggregate_AggregateType_value)
proto.RegisterEnum("storage.ReadResponse_FrameType", ReadResponse_FrameType_name, ReadResponse_FrameType_value)
proto.RegisterEnum("storage.ReadResponse_DataType", ReadResponse_DataType_name, ReadResponse_DataType_value)
proto.RegisterType((*ReadRequest)(nil), "com.github.influxdata.influxdb.services.storage.ReadRequest")
proto.RegisterType((*Aggregate)(nil), "com.github.influxdata.influxdb.services.storage.Aggregate")
proto.RegisterType((*Tag)(nil), "com.github.influxdata.influxdb.services.storage.Tag")
proto.RegisterType((*ReadResponse)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse")
proto.RegisterType((*ReadResponse_Frame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.Frame")
proto.RegisterType((*ReadResponse_GroupFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.GroupFrame")
proto.RegisterType((*ReadResponse_SeriesFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.SeriesFrame")
proto.RegisterType((*ReadResponse_FloatPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.FloatPointsFrame")
proto.RegisterType((*ReadResponse_IntegerPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.IntegerPointsFrame")
proto.RegisterType((*ReadResponse_UnsignedPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.UnsignedPointsFrame")
proto.RegisterType((*ReadResponse_BooleanPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.BooleanPointsFrame")
proto.RegisterType((*ReadResponse_StringPointsFrame)(nil), "com.github.influxdata.influxdb.services.storage.ReadResponse.StringPointsFrame")
proto.RegisterType((*CapabilitiesResponse)(nil), "com.github.influxdata.influxdb.services.storage.CapabilitiesResponse")
proto.RegisterType((*HintsResponse)(nil), "com.github.influxdata.influxdb.services.storage.HintsResponse")
proto.RegisterType((*TimestampRange)(nil), "com.github.influxdata.influxdb.services.storage.TimestampRange")
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.ReadRequest_Group", ReadRequest_Group_name, ReadRequest_Group_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.ReadRequest_HintFlags", ReadRequest_HintFlags_name, ReadRequest_HintFlags_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.Aggregate_AggregateType", Aggregate_AggregateType_name, Aggregate_AggregateType_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.ReadResponse_FrameType", ReadResponse_FrameType_name, ReadResponse_FrameType_value)
proto.RegisterEnum("com.github.influxdata.influxdb.services.storage.ReadResponse_DataType", ReadResponse_DataType_name, ReadResponse_DataType_value)
}
func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
@ -587,8 +700,8 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
}
i++
}
if len(m.Grouping) > 0 {
for _, s := range m.Grouping {
if len(m.GroupKeys) > 0 {
for _, s := range m.GroupKeys {
dAtA[i] = 0x22
i++
l = len(s)
@ -654,6 +767,16 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
i += copy(dAtA[i:], v)
}
}
if m.Group != 0 {
dAtA[i] = 0x58
i++
i = encodeVarintStorage(dAtA, i, uint64(m.Group))
}
if m.Hints != 0 {
dAtA[i] = 0x65
i++
i = encodeFixed32Storage(dAtA, i, uint32(m.Hints))
}
return i, nil
}
@ -849,6 +972,54 @@ func (m *ReadResponse_Frame_StringPoints) MarshalTo(dAtA []byte) (int, error) {
}
return i, nil
}
func (m *ReadResponse_Frame_Group) MarshalTo(dAtA []byte) (int, error) {
i := 0
if m.Group != nil {
dAtA[i] = 0x3a
i++
i = encodeVarintStorage(dAtA, i, uint64(m.Group.Size()))
n11, err := m.Group.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n11
}
return i, nil
}
func (m *ReadResponse_GroupFrame) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReadResponse_GroupFrame) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.TagKeys) > 0 {
for _, b := range m.TagKeys {
dAtA[i] = 0xa
i++
i = encodeVarintStorage(dAtA, i, uint64(len(b)))
i += copy(dAtA[i:], b)
}
}
if len(m.PartitionKeyVals) > 0 {
for _, b := range m.PartitionKeyVals {
dAtA[i] = 0x12
i++
i = encodeVarintStorage(dAtA, i, uint64(len(b)))
i += copy(dAtA[i:], b)
}
}
return i, nil
}
func (m *ReadResponse_SeriesFrame) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -927,22 +1098,22 @@ func (m *ReadResponse_FloatPointsFrame) MarshalTo(dAtA []byte) (int, error) {
i++
i = encodeVarintStorage(dAtA, i, uint64(len(m.Values)*8))
for _, num := range m.Values {
f11 := math.Float64bits(float64(num))
dAtA[i] = uint8(f11)
f12 := math.Float64bits(float64(num))
dAtA[i] = uint8(f12)
i++
dAtA[i] = uint8(f11 >> 8)
dAtA[i] = uint8(f12 >> 8)
i++
dAtA[i] = uint8(f11 >> 16)
dAtA[i] = uint8(f12 >> 16)
i++
dAtA[i] = uint8(f11 >> 24)
dAtA[i] = uint8(f12 >> 24)
i++
dAtA[i] = uint8(f11 >> 32)
dAtA[i] = uint8(f12 >> 32)
i++
dAtA[i] = uint8(f11 >> 40)
dAtA[i] = uint8(f12 >> 40)
i++
dAtA[i] = uint8(f11 >> 48)
dAtA[i] = uint8(f12 >> 48)
i++
dAtA[i] = uint8(f11 >> 56)
dAtA[i] = uint8(f12 >> 56)
i++
}
}
@ -988,22 +1159,22 @@ func (m *ReadResponse_IntegerPointsFrame) MarshalTo(dAtA []byte) (int, error) {
}
}
if len(m.Values) > 0 {
dAtA13 := make([]byte, len(m.Values)*10)
var j12 int
dAtA14 := make([]byte, len(m.Values)*10)
var j13 int
for _, num1 := range m.Values {
num := uint64(num1)
for num >= 1<<7 {
dAtA13[j12] = uint8(uint64(num)&0x7f | 0x80)
dAtA14[j13] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j12++
j13++
}
dAtA13[j12] = uint8(num)
j12++
dAtA14[j13] = uint8(num)
j13++
}
dAtA[i] = 0x12
i++
i = encodeVarintStorage(dAtA, i, uint64(j12))
i += copy(dAtA[i:], dAtA13[:j12])
i = encodeVarintStorage(dAtA, i, uint64(j13))
i += copy(dAtA[i:], dAtA14[:j13])
}
return i, nil
}
@ -1047,21 +1218,21 @@ func (m *ReadResponse_UnsignedPointsFrame) MarshalTo(dAtA []byte) (int, error) {
}
}
if len(m.Values) > 0 {
dAtA15 := make([]byte, len(m.Values)*10)
var j14 int
dAtA16 := make([]byte, len(m.Values)*10)
var j15 int
for _, num := range m.Values {
for num >= 1<<7 {
dAtA15[j14] = uint8(uint64(num)&0x7f | 0x80)
dAtA16[j15] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j14++
j15++
}
dAtA15[j14] = uint8(num)
j14++
dAtA16[j15] = uint8(num)
j15++
}
dAtA[i] = 0x12
i++
i = encodeVarintStorage(dAtA, i, uint64(j14))
i += copy(dAtA[i:], dAtA15[:j14])
i = encodeVarintStorage(dAtA, i, uint64(j15))
i += copy(dAtA[i:], dAtA16[:j15])
}
return i, nil
}
@ -1296,8 +1467,8 @@ func (m *ReadRequest) Size() (n int) {
if m.Descending {
n += 2
}
if len(m.Grouping) > 0 {
for _, s := range m.Grouping {
if len(m.GroupKeys) > 0 {
for _, s := range m.GroupKeys {
l = len(s)
n += 1 + l + sovStorage(uint64(l))
}
@ -1327,6 +1498,12 @@ func (m *ReadRequest) Size() (n int) {
n += mapEntrySize + 1 + sovStorage(uint64(mapEntrySize))
}
}
if m.Group != 0 {
n += 1 + sovStorage(uint64(m.Group))
}
if m.Hints != 0 {
n += 5
}
return n
}
@ -1428,6 +1605,33 @@ func (m *ReadResponse_Frame_StringPoints) Size() (n int) {
}
return n
}
func (m *ReadResponse_Frame_Group) Size() (n int) {
var l int
_ = l
if m.Group != nil {
l = m.Group.Size()
n += 1 + l + sovStorage(uint64(l))
}
return n
}
func (m *ReadResponse_GroupFrame) Size() (n int) {
var l int
_ = l
if len(m.TagKeys) > 0 {
for _, b := range m.TagKeys {
l = len(b)
n += 1 + l + sovStorage(uint64(l))
}
}
if len(m.PartitionKeyVals) > 0 {
for _, b := range m.PartitionKeyVals {
l = len(b)
n += 1 + l + sovStorage(uint64(l))
}
}
return n
}
func (m *ReadResponse_SeriesFrame) Size() (n int) {
var l int
_ = l
@ -1669,7 +1873,7 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error {
m.Descending = bool(v != 0)
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Grouping", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field GroupKeys", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
@ -1694,7 +1898,7 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Grouping = append(m.Grouping, string(dAtA[iNdEx:postIndex]))
m.GroupKeys = append(m.GroupKeys, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
case 5:
if wireType != 2 {
@ -1937,6 +2141,38 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error {
}
m.Trace[mapkey] = mapvalue
iNdEx = postIndex
case 11:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Group", wireType)
}
m.Group = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Group |= (ReadRequest_Group(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 12:
if wireType != 5 {
return fmt.Errorf("proto: wrong wireType = %d for field Hints", wireType)
}
m.Hints = 0
if (iNdEx + 4) > l {
return io.ErrUnexpectedEOF
}
iNdEx += 4
m.Hints = HintFlags(dAtA[iNdEx-4])
m.Hints |= HintFlags(dAtA[iNdEx-3]) << 8
m.Hints |= HintFlags(dAtA[iNdEx-2]) << 16
m.Hints |= HintFlags(dAtA[iNdEx-1]) << 24
default:
iNdEx = preIndex
skippy, err := skipStorage(dAtA[iNdEx:])
@ -2441,6 +2677,146 @@ func (m *ReadResponse_Frame) Unmarshal(dAtA []byte) error {
}
m.Data = &ReadResponse_Frame_StringPoints{v}
iNdEx = postIndex
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Group", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthStorage
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &ReadResponse_GroupFrame{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Data = &ReadResponse_Frame_Group{v}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipStorage(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthStorage
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ReadResponse_GroupFrame) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GroupFrame: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GroupFrame: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TagKeys", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthStorage
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.TagKeys = append(m.TagKeys, make([]byte, postIndex-iNdEx))
copy(m.TagKeys[len(m.TagKeys)-1], dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PartitionKeyVals", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthStorage
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.PartitionKeyVals = append(m.PartitionKeyVals, make([]byte, postIndex-iNdEx))
copy(m.PartitionKeyVals[len(m.PartitionKeyVals)-1], dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipStorage(dAtA[iNdEx:])
@ -3791,81 +4167,101 @@ var (
func init() { proto.RegisterFile("storage.proto", fileDescriptorStorage) }
var fileDescriptorStorage = []byte{
// 1206 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x41, 0x8f, 0xdb, 0x44,
0x14, 0xb6, 0xd7, 0x4e, 0x76, 0xf3, 0x92, 0xec, 0x7a, 0xa7, 0xdb, 0x25, 0x72, 0x69, 0xe2, 0xe6,
0x50, 0xc2, 0xa1, 0x69, 0x15, 0x40, 0x14, 0x2a, 0x24, 0x9a, 0x36, 0xed, 0x2e, 0xdd, 0x26, 0xd5,
0x24, 0x2b, 0x71, 0x40, 0x5a, 0x26, 0x9b, 0x89, 0x6b, 0x91, 0xd8, 0xc6, 0x9e, 0xa0, 0xee, 0x8d,
0x23, 0x5a, 0x71, 0xe0, 0xc0, 0x35, 0x27, 0x7e, 0x03, 0x5c, 0x90, 0x38, 0x70, 0xea, 0x91, 0x23,
0xa7, 0x08, 0xc2, 0x1f, 0x41, 0x33, 0x63, 0x3b, 0xf6, 0x6e, 0x5a, 0x69, 0x2f, 0xd1, 0xbc, 0xf7,
0xbe, 0xf7, 0xbd, 0xf7, 0x66, 0xde, 0x7b, 0x31, 0x94, 0x43, 0xe6, 0x05, 0xc4, 0xa6, 0x4d, 0x3f,
0xf0, 0x98, 0x87, 0x36, 0x23, 0xd1, 0xbc, 0x63, 0x3b, 0xec, 0xe5, 0x6c, 0xd8, 0x3c, 0xf5, 0xa6,
0x77, 0x6d, 0xcf, 0xf6, 0xee, 0x0a, 0xfb, 0x70, 0x36, 0x16, 0x92, 0x10, 0xc4, 0x49, 0xfa, 0x99,
0x37, 0x6c, 0xcf, 0xb3, 0x27, 0x74, 0x85, 0xa2, 0x53, 0x9f, 0x9d, 0x45, 0xc6, 0x56, 0x8a, 0xcb,
0x71, 0xc7, 0x93, 0xd9, 0xab, 0x11, 0x61, 0xe4, 0xee, 0x19, 0x09, 0xfc, 0x53, 0xf9, 0x2b, 0xf9,
0xc4, 0x31, 0xf2, 0xd9, 0xf1, 0x03, 0x3a, 0x72, 0x4e, 0x09, 0x8b, 0x32, 0xab, 0xff, 0xa1, 0x43,
0x11, 0x53, 0x32, 0xc2, 0xf4, 0xdb, 0x19, 0x0d, 0x19, 0x32, 0x61, 0x8b, 0xb3, 0x0c, 0x49, 0x48,
0x2b, 0xaa, 0xa5, 0x36, 0x0a, 0x38, 0x91, 0xd1, 0x97, 0xb0, 0xc3, 0x9c, 0x29, 0x0d, 0x19, 0x99,
0xfa, 0x27, 0x01, 0x71, 0x6d, 0x5a, 0xd9, 0xb0, 0xd4, 0x46, 0xb1, 0xf5, 0x4e, 0x33, 0x2e, 0x77,
0x10, 0xdb, 0x31, 0x37, 0xb7, 0xf7, 0x5f, 0x2f, 0x6a, 0xca, 0x72, 0x51, 0xdb, 0xce, 0xea, 0xf1,
0x36, 0xcb, 0xc8, 0xa8, 0x0a, 0x30, 0xa2, 0xe1, 0x29, 0x75, 0x47, 0x8e, 0x6b, 0x57, 0x34, 0x4b,
0x6d, 0x6c, 0xe1, 0x94, 0x86, 0x67, 0x65, 0x07, 0xde, 0xcc, 0xe7, 0x56, 0xdd, 0xd2, 0x78, 0x56,
0xb1, 0x8c, 0xee, 0x41, 0x21, 0x29, 0xaa, 0x92, 0x13, 0xf9, 0xa0, 0x24, 0x9f, 0x17, 0xb1, 0x05,
0xaf, 0x40, 0xa8, 0x05, 0xa5, 0x90, 0x06, 0x0e, 0x0d, 0x4f, 0x26, 0xce, 0xd4, 0x61, 0x95, 0xbc,
0xa5, 0x36, 0xb4, 0xf6, 0xce, 0x72, 0x51, 0x2b, 0xf6, 0x85, 0xfe, 0x88, 0xab, 0x71, 0x31, 0x5c,
0x09, 0xe8, 0x23, 0x28, 0x47, 0x3e, 0xde, 0x78, 0x1c, 0x52, 0x56, 0xd9, 0x14, 0x4e, 0xc6, 0x72,
0x51, 0x2b, 0x49, 0xa7, 0x9e, 0xd0, 0xe3, 0x88, 0x5a, 0x4a, 0x3c, 0x94, 0xef, 0x39, 0x2e, 0x8b,
0x43, 0x6d, 0xad, 0x42, 0xbd, 0x10, 0xfa, 0x28, 0x94, 0xbf, 0x12, 0x78, 0x41, 0xc4, 0xb6, 0x03,
0x6a, 0xf3, 0x82, 0x0a, 0x17, 0x0a, 0x7a, 0x18, 0x5b, 0xf0, 0x0a, 0x84, 0x3e, 0x87, 0x1c, 0x0b,
0xc8, 0x29, 0xad, 0x80, 0xa5, 0x35, 0x8a, 0xad, 0x5a, 0x82, 0x4e, 0xbd, 0x6c, 0x73, 0xc0, 0x11,
0x1d, 0x97, 0x05, 0x67, 0xed, 0xc2, 0x72, 0x51, 0xcb, 0x09, 0x19, 0x4b, 0x47, 0xf3, 0x3e, 0xc0,
0xca, 0x8e, 0x0c, 0xd0, 0xbe, 0xa1, 0x67, 0xd1, 0xfb, 0xf3, 0x23, 0xda, 0x83, 0xdc, 0x77, 0x64,
0x32, 0x93, 0x0f, 0x5e, 0xc0, 0x52, 0xf8, 0x74, 0xe3, 0xbe, 0x5a, 0xff, 0x5d, 0x85, 0x42, 0x92,
0x14, 0xfa, 0x10, 0x74, 0x76, 0xe6, 0xcb, 0xd6, 0xd9, 0x6e, 0x59, 0x97, 0xd3, 0x5e, 0x9d, 0x06,
0x67, 0x3e, 0xc5, 0x02, 0x5d, 0x7f, 0x05, 0xe5, 0x8c, 0x1a, 0xd5, 0x40, 0xef, 0xf6, 0xba, 0x1d,
0x43, 0x31, 0xaf, 0x9f, 0xcf, 0xad, 0xdd, 0x8c, 0xb1, 0xeb, 0xb9, 0x14, 0xdd, 0x04, 0xad, 0x7f,
0xfc, 0xdc, 0x50, 0xcd, 0xbd, 0xf3, 0xb9, 0x65, 0x64, 0xec, 0xfd, 0xd9, 0x14, 0xdd, 0x82, 0xdc,
0xa3, 0xde, 0x71, 0x77, 0x60, 0x6c, 0x98, 0xfb, 0xe7, 0x73, 0x0b, 0x65, 0x00, 0x8f, 0xbc, 0x99,
0xcb, 0x4c, 0xfd, 0x87, 0x5f, 0xaa, 0x4a, 0xfd, 0x0e, 0x68, 0x03, 0x62, 0xa7, 0x0b, 0x2e, 0xad,
0x29, 0xb8, 0x14, 0x15, 0x5c, 0xff, 0xb9, 0x08, 0x25, 0x79, 0xa7, 0xa1, 0xef, 0xb9, 0x21, 0x45,
0x9f, 0x40, 0x7e, 0x1c, 0x90, 0x29, 0x0d, 0x2b, 0xaa, 0xb8, 0xfa, 0x1b, 0x17, 0xae, 0x5e, 0xc2,
0x9a, 0x4f, 0x38, 0xa6, 0xad, 0xf3, 0x69, 0xc0, 0x91, 0x83, 0xf9, 0xa7, 0x0e, 0x39, 0xa1, 0x47,
0x0f, 0x20, 0x2f, 0x9b, 0x46, 0x24, 0x50, 0x6c, 0xdd, 0x5a, 0x4f, 0x22, 0xdb, 0x4c, 0xb8, 0x1c,
0x28, 0x38, 0x72, 0x41, 0x5f, 0x41, 0x69, 0x3c, 0xf1, 0x08, 0x3b, 0x91, 0x2d, 0x14, 0x4d, 0xe4,
0xed, 0x37, 0xe4, 0xc1, 0x91, 0xb2, 0xf1, 0x64, 0x4a, 0xa2, 0x13, 0x53, 0xda, 0x03, 0x05, 0x17,
0xc7, 0x2b, 0x11, 0x8d, 0x60, 0xdb, 0x71, 0x19, 0xb5, 0x69, 0x10, 0xf3, 0x6b, 0x82, 0xbf, 0xb1,
0x9e, 0xff, 0x50, 0x62, 0xd3, 0x11, 0x76, 0x97, 0x8b, 0x5a, 0x39, 0xa3, 0x3f, 0x50, 0x70, 0xd9,
0x49, 0x2b, 0xd0, 0x4b, 0xd8, 0x99, 0xb9, 0xa1, 0x63, 0xbb, 0x74, 0x14, 0x87, 0xd1, 0x45, 0x98,
0xf7, 0xd7, 0x87, 0x39, 0x8e, 0xc0, 0xe9, 0x38, 0x88, 0xaf, 0x99, 0xac, 0xe1, 0x40, 0xc1, 0xdb,
0xb3, 0x8c, 0x86, 0xd7, 0x33, 0xf4, 0xbc, 0x09, 0x25, 0x6e, 0x1c, 0x28, 0xf7, 0xb6, 0x7a, 0xda,
0x12, 0x7b, 0xa9, 0x9e, 0x8c, 0x9e, 0xd7, 0x33, 0x4c, 0x2b, 0xd0, 0xd7, 0x7c, 0xff, 0x07, 0x8e,
0x6b, 0xc7, 0x41, 0xf2, 0x22, 0xc8, 0x7b, 0x6f, 0x78, 0x57, 0x01, 0x4d, 0xc7, 0x90, 0x5b, 0x25,
0xa5, 0x3e, 0x50, 0x70, 0x29, 0x4c, 0xc9, 0xed, 0x3c, 0xe8, 0x7c, 0x2d, 0x9b, 0x01, 0x14, 0x53,
0x6d, 0x81, 0x6e, 0x83, 0xce, 0x88, 0x1d, 0x37, 0x63, 0x69, 0xb5, 0x96, 0x89, 0x1d, 0x75, 0x9f,
0xb0, 0xa3, 0x07, 0x50, 0xe0, 0xee, 0x27, 0x62, 0x56, 0x37, 0xc4, 0xac, 0x56, 0xd7, 0x27, 0xf7,
0x98, 0x30, 0x22, 0x26, 0x55, 0xfc, 0x0d, 0xf0, 0x93, 0xf9, 0x05, 0x18, 0x17, 0xfb, 0x88, 0x2f,
0xf0, 0x64, 0xa5, 0xcb, 0xf0, 0x06, 0x4e, 0x69, 0xd0, 0x3e, 0xe4, 0xc5, 0x04, 0xf1, 0xfe, 0xd4,
0x1a, 0x2a, 0x8e, 0x24, 0xf3, 0x08, 0xd0, 0xe5, 0x9e, 0xb9, 0x22, 0x9b, 0x96, 0xb0, 0x3d, 0x87,
0x6b, 0x6b, 0x5a, 0xe3, 0x8a, 0x74, 0x7a, 0x3a, 0xb9, 0xcb, 0x0d, 0x70, 0x45, 0xb6, 0xad, 0x84,
0xed, 0x19, 0xec, 0x5e, 0x7a, 0xe9, 0x2b, 0x92, 0x15, 0x62, 0xb2, 0x7a, 0x1f, 0x0a, 0x82, 0x20,
0xda, 0x96, 0xf9, 0x7e, 0x07, 0x1f, 0x76, 0xfa, 0x86, 0x62, 0x5e, 0x3b, 0x9f, 0x5b, 0x3b, 0x89,
0x49, 0xf6, 0x06, 0x07, 0xbc, 0xe8, 0x1d, 0x76, 0x07, 0x7d, 0x43, 0xbd, 0x00, 0x90, 0xb9, 0x44,
0xcb, 0xf0, 0x37, 0x15, 0xb6, 0xe2, 0xf7, 0x46, 0xef, 0x42, 0xee, 0xc9, 0x51, 0xef, 0xe1, 0xc0,
0x50, 0xcc, 0xdd, 0xf3, 0xb9, 0x55, 0x8e, 0x0d, 0xe2, 0xe9, 0x91, 0x05, 0x9b, 0x87, 0xdd, 0x41,
0xe7, 0x69, 0x07, 0xc7, 0x94, 0xb1, 0x3d, 0x7a, 0x4e, 0x54, 0x87, 0xad, 0xe3, 0x6e, 0xff, 0xf0,
0x69, 0xb7, 0xf3, 0xd8, 0xd8, 0x90, 0x6b, 0x3a, 0x86, 0xc4, 0x6f, 0xc4, 0x59, 0xda, 0xbd, 0xde,
0x51, 0xe7, 0x61, 0xd7, 0xd0, 0xb2, 0x2c, 0xd1, 0xbd, 0xa3, 0x2a, 0xe4, 0xfb, 0x03, 0x7c, 0xd8,
0x7d, 0x6a, 0xe8, 0x26, 0x3a, 0x9f, 0x5b, 0xdb, 0x31, 0x40, 0x5e, 0x65, 0x94, 0xf8, 0x8f, 0x2a,
0xec, 0x3d, 0x22, 0x3e, 0x19, 0x3a, 0x13, 0x87, 0x39, 0x34, 0x4c, 0xd6, 0xf3, 0x03, 0xd0, 0x4f,
0x89, 0x1f, 0xcf, 0xc3, 0x6a, 0xfe, 0xd6, 0x81, 0xb9, 0x32, 0x14, 0xff, 0x7f, 0x58, 0x38, 0x99,
0x1f, 0x43, 0x21, 0x51, 0x5d, 0xe9, 0x2f, 0x71, 0x07, 0xca, 0x07, 0xfc, 0x5a, 0x63, 0xe6, 0xfa,
0x7d, 0xb8, 0xf0, 0x01, 0xc4, 0x9d, 0x43, 0x46, 0x02, 0x26, 0x08, 0x35, 0x2c, 0x05, 0x1e, 0x84,
0xba, 0x23, 0x41, 0xa8, 0x61, 0x7e, 0x6c, 0xfd, 0xad, 0xc2, 0x66, 0x5f, 0x26, 0xcd, 0x8b, 0xe1,
0xa3, 0x89, 0xf6, 0xd6, 0xfd, 0xbd, 0x9b, 0xd7, 0xd7, 0xce, 0x6f, 0x5d, 0xff, 0xfe, 0xd7, 0x8a,
0x72, 0x4f, 0x45, 0xcf, 0xa0, 0x94, 0x2e, 0x1a, 0xed, 0x37, 0xe5, 0xa7, 0x65, 0x33, 0xfe, 0xb4,
0x6c, 0x76, 0xf8, 0xa7, 0xa5, 0x79, 0xf3, 0xad, 0x77, 0x24, 0xe8, 0x54, 0xf4, 0x19, 0xe4, 0x44,
0x81, 0x6f, 0x64, 0xd9, 0x4f, 0x58, 0xb2, 0x17, 0xc1, 0xdd, 0x37, 0x4c, 0x91, 0x53, 0x7b, 0xef,
0xf5, 0xbf, 0x55, 0xe5, 0xf5, 0xb2, 0xaa, 0xfe, 0xb5, 0xac, 0xaa, 0xff, 0x2c, 0xab, 0xea, 0x4f,
0xff, 0x55, 0x95, 0x61, 0x5e, 0x30, 0x7d, 0xf0, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8b, 0x99,
0xc4, 0xf8, 0x41, 0x0b, 0x00, 0x00,
// 1530 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x57, 0x4f, 0x8f, 0x1a, 0xc9,
0x15, 0xa7, 0xf9, 0xcf, 0xe3, 0x5f, 0x4f, 0x79, 0x32, 0x22, 0xed, 0x18, 0xda, 0x28, 0x8a, 0x38,
0xd8, 0x4c, 0x44, 0x12, 0xc5, 0xb2, 0xf2, 0x47, 0xc3, 0x98, 0x19, 0x88, 0xc7, 0x30, 0x29, 0x98,
0xc8, 0x89, 0x22, 0x91, 0x82, 0xa9, 0x69, 0xb7, 0x0c, 0xdd, 0x9d, 0xee, 0xc6, 0x1a, 0x72, 0xf2,
0x21, 0x52, 0x2c, 0xb2, 0x07, 0x5f, 0xf7, 0xc0, 0x69, 0xbf, 0xc2, 0xee, 0x7e, 0x80, 0x3d, 0xf9,
0xb8, 0x9f, 0x00, 0xed, 0xe2, 0x4f, 0xb1, 0x7b, 0xd9, 0x55, 0x55, 0x75, 0x43, 0xe3, 0xf1, 0x1e,
0x30, 0xbe, 0xb4, 0xea, 0xbd, 0x57, 0xf5, 0x7b, 0x7f, 0xea, 0xbd, 0x57, 0xaf, 0x21, 0xeb, 0xb8,
0xa6, 0x4d, 0x34, 0x5a, 0xb5, 0x6c, 0xd3, 0x35, 0xd1, 0xe1, 0xd0, 0x1c, 0x57, 0x35, 0xdd, 0x7d,
0x36, 0x19, 0x54, 0x75, 0xe3, 0x6a, 0x34, 0xb9, 0xbe, 0x24, 0x2e, 0xf1, 0x97, 0x83, 0xaa, 0x43,
0xed, 0x17, 0xfa, 0x90, 0x3a, 0x55, 0xef, 0x98, 0x72, 0xdf, 0xdb, 0x3c, 0x34, 0xc7, 0x87, 0x9a,
0xa9, 0x99, 0x87, 0x1c, 0x67, 0x30, 0xb9, 0xe2, 0x14, 0x27, 0xf8, 0x4a, 0xe0, 0x2b, 0xb7, 0x35,
0xd3, 0xd4, 0x46, 0x74, 0xbd, 0x8b, 0x8e, 0x2d, 0x77, 0xea, 0x09, 0x6b, 0x01, 0xac, 0xb5, 0xf2,
0xc3, 0x29, 0xb1, 0xad, 0xa1, 0xf8, 0x0a, 0x3c, 0xbe, 0xf4, 0xce, 0xe4, 0x2d, 0x9b, 0x5e, 0xea,
0x43, 0xe2, 0x7a, 0x1e, 0x94, 0x7f, 0x48, 0x42, 0x1a, 0x53, 0x72, 0x89, 0xe9, 0xbf, 0x27, 0xd4,
0x71, 0x91, 0x02, 0x49, 0x86, 0x32, 0x20, 0x0e, 0x2d, 0x48, 0xaa, 0x54, 0x49, 0xe1, 0x15, 0x8d,
0x5e, 0x4a, 0x90, 0x77, 0xf5, 0x31, 0x75, 0x5c, 0x32, 0xb6, 0xfa, 0x36, 0x31, 0x34, 0x5a, 0x08,
0xab, 0x52, 0x25, 0x5d, 0xfb, 0x73, 0x75, 0xcb, 0x40, 0x54, 0x7b, 0x3e, 0x0e, 0x66, 0x30, 0xf5,
0x83, 0x37, 0x8b, 0x52, 0x68, 0xb9, 0x28, 0xe5, 0x36, 0xf9, 0x38, 0xe7, 0x6e, 0xd0, 0xa8, 0x08,
0x70, 0x49, 0x9d, 0x21, 0x35, 0x2e, 0x75, 0x43, 0x2b, 0x44, 0x54, 0xa9, 0x92, 0xc4, 0x01, 0x0e,
0xba, 0x07, 0xa0, 0xd9, 0xe6, 0xc4, 0xea, 0x3f, 0xa7, 0x53, 0xa7, 0x10, 0x55, 0x23, 0x95, 0x54,
0x3d, 0xbb, 0x5c, 0x94, 0x52, 0xa7, 0x8c, 0xfb, 0x98, 0x4e, 0x1d, 0x9c, 0xd2, 0xfc, 0x25, 0x7a,
0x0a, 0xa9, 0x55, 0x3c, 0x0a, 0x31, 0xee, 0xc9, 0xc3, 0xad, 0x3d, 0x39, 0xf7, 0x11, 0xf0, 0x1a,
0x0c, 0xd5, 0x20, 0xe3, 0x50, 0x5b, 0xa7, 0x4e, 0x7f, 0xa4, 0x8f, 0x75, 0xb7, 0x10, 0x57, 0xa5,
0x4a, 0xa4, 0x9e, 0x5f, 0x2e, 0x4a, 0xe9, 0x2e, 0xe7, 0x9f, 0x31, 0x36, 0x4e, 0x3b, 0x6b, 0x02,
0xfd, 0x0e, 0xb2, 0xde, 0x19, 0xf3, 0xea, 0xca, 0xa1, 0x6e, 0x21, 0xc1, 0x0f, 0xc9, 0xcb, 0x45,
0x29, 0x23, 0x0e, 0x75, 0x38, 0x1f, 0x7b, 0xd0, 0x82, 0x62, 0xaa, 0x2c, 0x53, 0x37, 0x5c, 0x5f,
0x55, 0x72, 0xad, 0xea, 0x9c, 0xf3, 0x3d, 0x55, 0xd6, 0x9a, 0x60, 0x8e, 0x13, 0x4d, 0xb3, 0xa9,
0xc6, 0x1c, 0x4f, 0x7d, 0xa0, 0xe3, 0x47, 0x3e, 0x02, 0x5e, 0x83, 0xa1, 0x67, 0x10, 0x73, 0x6d,
0x32, 0xa4, 0x05, 0x50, 0x23, 0x95, 0x74, 0xed, 0x74, 0x6b, 0xd4, 0x40, 0x32, 0x56, 0x7b, 0x0c,
0xa9, 0x61, 0xb8, 0xf6, 0xb4, 0x9e, 0x5a, 0x2e, 0x4a, 0x31, 0x4e, 0x63, 0xa1, 0x00, 0x3d, 0x85,
0x18, 0xbf, 0xc9, 0x42, 0x5a, 0x95, 0x2a, 0xb9, 0x5a, 0x7d, 0x27, 0x4d, 0x3c, 0x3d, 0xb0, 0x00,
0x44, 0xf7, 0x20, 0xf6, 0x8c, 0xc5, 0xaa, 0x90, 0x51, 0xa5, 0x4a, 0xa2, 0x7e, 0xc0, 0x54, 0x37,
0x19, 0xe3, 0xfb, 0x45, 0x29, 0xc5, 0x16, 0x27, 0x23, 0xa2, 0x39, 0x58, 0x6c, 0x52, 0x1e, 0x00,
0xac, 0xed, 0x44, 0x32, 0x44, 0x9e, 0xd3, 0xa9, 0x57, 0x3a, 0x6c, 0x89, 0xf6, 0x21, 0xf6, 0x82,
0x8c, 0x26, 0xa2, 0x54, 0x52, 0x58, 0x10, 0x0f, 0xc3, 0x0f, 0xa4, 0xf2, 0xff, 0x24, 0x88, 0x71,
0xc5, 0xe8, 0x0e, 0xc0, 0x29, 0xee, 0x5c, 0x9c, 0xf7, 0xdb, 0x9d, 0x76, 0x43, 0x0e, 0x29, 0xd9,
0xd9, 0x5c, 0x15, 0x29, 0xdb, 0x36, 0x0d, 0x8a, 0x6e, 0x43, 0x4a, 0x88, 0x8f, 0xce, 0xce, 0x64,
0x49, 0xc9, 0xcc, 0xe6, 0x6a, 0x92, 0x4b, 0x8f, 0x46, 0x23, 0xf4, 0x73, 0x48, 0x0a, 0x61, 0xfd,
0xef, 0x72, 0x58, 0x49, 0xcf, 0xe6, 0x6a, 0x82, 0xcb, 0xea, 0x53, 0x74, 0x17, 0x32, 0x42, 0xd4,
0x78, 0x7a, 0xdc, 0x38, 0xef, 0xc9, 0x11, 0x25, 0x3f, 0x9b, 0xab, 0x69, 0x2e, 0x6e, 0x5c, 0x0f,
0xa9, 0xe5, 0x2a, 0xd1, 0x57, 0x9f, 0x15, 0x43, 0xe5, 0xff, 0xc0, 0xda, 0x2f, 0xa6, 0xad, 0xd9,
0x6a, 0xf7, 0x7c, 0x5b, 0xb8, 0x36, 0x26, 0xe5, 0xa6, 0xfc, 0x12, 0x72, 0x9e, 0xb0, 0x7f, 0xde,
0x69, 0xb5, 0x7b, 0x5d, 0x59, 0x52, 0xe4, 0xd9, 0x5c, 0xcd, 0x88, 0x1d, 0x22, 0xe3, 0x82, 0xbb,
0xba, 0x0d, 0xdc, 0x6a, 0x74, 0xe5, 0x70, 0x70, 0x97, 0xc8, 0x66, 0x4f, 0xf7, 0x5b, 0x09, 0x52,
0xab, 0x54, 0x42, 0xff, 0x84, 0xa8, 0x3b, 0xb5, 0x44, 0xef, 0xc9, 0xd5, 0x9a, 0x1f, 0x9e, 0x94,
0xeb, 0x55, 0x6f, 0x6a, 0x51, 0xcc, 0x51, 0xcb, 0xd7, 0x90, 0xdd, 0x60, 0xa3, 0x12, 0x44, 0x3d,
0x37, 0x7f, 0x36, 0x9b, 0xab, 0x7b, 0x1b, 0x42, 0xee, 0xef, 0x1d, 0x88, 0x74, 0x2f, 0x9e, 0xc8,
0x92, 0xb2, 0x3f, 0x9b, 0xab, 0xf2, 0x86, 0xbc, 0x3b, 0x19, 0xa3, 0xbb, 0x10, 0x3b, 0xee, 0x5c,
0xb4, 0x7b, 0x72, 0x58, 0x39, 0x98, 0xcd, 0x55, 0xb4, 0xb1, 0xe1, 0xd8, 0x9c, 0x18, 0x7e, 0x84,
0xef, 0x43, 0xa4, 0x47, 0xb4, 0x60, 0x7a, 0x64, 0xde, 0x93, 0x1e, 0x19, 0x2f, 0x3d, 0xca, 0x8b,
0x3c, 0x64, 0x44, 0x7e, 0x3a, 0x96, 0x69, 0x38, 0x14, 0x11, 0x88, 0x5f, 0xd9, 0x64, 0x4c, 0x9d,
0x82, 0xc4, 0x0b, 0xeb, 0xf8, 0x03, 0xd3, 0x5d, 0xc0, 0x55, 0x4f, 0x18, 0x56, 0x3d, 0xca, 0xba,
0x2e, 0xf6, 0x80, 0x95, 0x4f, 0x13, 0x10, 0xe3, 0x7c, 0x34, 0x84, 0xb8, 0x68, 0x31, 0xdc, 0xd0,
0x74, 0xad, 0xb5, 0x9b, 0x32, 0x71, 0xdd, 0x1c, 0xba, 0x19, 0xc2, 0x1e, 0x34, 0xfa, 0xaf, 0x04,
0x99, 0xab, 0x91, 0x49, 0xdc, 0xbe, 0xe8, 0x4c, 0xde, 0x53, 0xd2, 0xde, 0xd1, 0x31, 0x86, 0x28,
0xb2, 0x50, 0xf8, 0xc8, 0x1b, 0x61, 0x80, 0xdb, 0x0c, 0xe1, 0xf4, 0xd5, 0x9a, 0x44, 0x9f, 0x48,
0x90, 0xd3, 0x0d, 0x97, 0x6a, 0xd4, 0xf6, 0x0d, 0x89, 0x70, 0x43, 0xce, 0x77, 0x33, 0xa4, 0x25,
0x30, 0x83, 0xa6, 0xec, 0x2d, 0x17, 0xa5, 0xec, 0x06, 0xbf, 0x19, 0xc2, 0x59, 0x3d, 0xc8, 0x40,
0xaf, 0x25, 0xc8, 0x4f, 0x0c, 0x47, 0xd7, 0x0c, 0x7a, 0xe9, 0xdb, 0x13, 0xe5, 0xf6, 0xfc, 0x75,
0x37, 0x7b, 0x2e, 0x3c, 0xd0, 0xa0, 0x41, 0x88, 0xbd, 0xb8, 0x9b, 0x82, 0x66, 0x08, 0xe7, 0x26,
0x1b, 0x1c, 0x1e, 0xa1, 0x81, 0x69, 0x8e, 0x28, 0x31, 0x7c, 0x8b, 0x62, 0x1f, 0x23, 0x42, 0x75,
0x81, 0x79, 0x23, 0x42, 0x1b, 0x7c, 0x16, 0xa1, 0x41, 0x90, 0x81, 0x5e, 0x49, 0x6c, 0x0a, 0xb3,
0x75, 0x43, 0xf3, 0xad, 0x89, 0x73, 0x6b, 0x3a, 0x3b, 0x26, 0x29, 0x87, 0x0c, 0x1a, 0x23, 0x1e,
0xde, 0x00, 0xbb, 0x19, 0xc2, 0x19, 0x27, 0x40, 0xa3, 0x7f, 0xf9, 0x4f, 0x50, 0x82, 0x5b, 0xd0,
0xdc, 0xcd, 0x02, 0xde, 0x96, 0xfd, 0x2a, 0x11, 0xc0, 0xf5, 0x38, 0x44, 0x19, 0x84, 0x72, 0x0d,
0xb0, 0x16, 0xa3, 0x5f, 0x41, 0xd2, 0x25, 0x9a, 0x98, 0x71, 0x58, 0x3b, 0xc8, 0xd4, 0xd3, 0xcb,
0x45, 0x29, 0xd1, 0x23, 0x1a, 0x9f, 0x70, 0x12, 0xae, 0x58, 0xa0, 0x3a, 0x20, 0x8b, 0xd8, 0xae,
0xee, 0xea, 0xa6, 0xc1, 0x76, 0xf7, 0x5f, 0x90, 0x11, 0xab, 0x33, 0x76, 0x62, 0x7f, 0xb9, 0x28,
0xc9, 0xe7, 0xbe, 0xf4, 0x31, 0x9d, 0xfe, 0x8d, 0x8c, 0x1c, 0x2c, 0x5b, 0xef, 0x70, 0x94, 0xaf,
0x24, 0x48, 0x07, 0x0a, 0x18, 0xb5, 0x21, 0xea, 0x12, 0xcd, 0x6f, 0x43, 0xbf, 0xdd, 0x7e, 0xf0,
0x23, 0x9a, 0xd7, 0x77, 0x38, 0x0e, 0x1a, 0x42, 0x8a, 0x9d, 0xe8, 0xf3, 0xae, 0x1f, 0xe6, 0x5d,
0xff, 0x64, 0xb7, 0x38, 0x3e, 0x22, 0x2e, 0xe1, 0x3d, 0x9f, 0x4f, 0xae, 0x6c, 0xa5, 0xfc, 0x05,
0xe4, 0x77, 0x1b, 0x03, 0x1b, 0x25, 0x57, 0xc3, 0xa5, 0x70, 0x47, 0xc6, 0x01, 0x0e, 0x3a, 0x80,
0x38, 0xef, 0xc5, 0x22, 0x60, 0x12, 0xf6, 0x28, 0xe5, 0x0c, 0xd0, 0xcd, 0xda, 0xde, 0x12, 0x2d,
0xb2, 0x42, 0x7b, 0x02, 0xb7, 0xde, 0x53, 0x99, 0x5b, 0xc2, 0x45, 0x83, 0xc6, 0xdd, 0x2c, 0xab,
0x2d, 0xd1, 0x92, 0x2b, 0xb4, 0xc7, 0xb0, 0x77, 0xa3, 0x2c, 0xb6, 0x04, 0x4b, 0xf9, 0x60, 0xe5,
0x2e, 0xa4, 0x38, 0x80, 0xf7, 0xee, 0xc6, 0xbd, 0xc1, 0x20, 0xa4, 0xdc, 0x9a, 0xcd, 0xd5, 0xfc,
0x4a, 0x24, 0x72, 0x8d, 0x6d, 0x58, 0xcd, 0x17, 0x9b, 0x1b, 0x84, 0x2d, 0xde, 0xb3, 0xfa, 0xa5,
0x04, 0x49, 0xff, 0xbe, 0xd1, 0x2f, 0x20, 0x76, 0x72, 0xd6, 0x39, 0xea, 0xc9, 0x21, 0x65, 0x6f,
0x36, 0x57, 0xb3, 0xbe, 0x80, 0x5f, 0x3d, 0x52, 0x21, 0xd1, 0x6a, 0xf7, 0x1a, 0xa7, 0x0d, 0xec,
0x43, 0xfa, 0x72, 0xef, 0x3a, 0x51, 0x19, 0x92, 0x17, 0xed, 0x6e, 0xeb, 0xb4, 0xdd, 0x78, 0x24,
0x87, 0xc5, 0x83, 0xef, 0x6f, 0xf1, 0xef, 0x88, 0xa1, 0xd4, 0x3b, 0x9d, 0xb3, 0xc6, 0x51, 0x5b,
0x8e, 0x6c, 0xa2, 0x78, 0x71, 0x47, 0x45, 0x88, 0x77, 0x7b, 0xb8, 0xd5, 0x3e, 0x95, 0xa3, 0x0a,
0x9a, 0xcd, 0xd5, 0x9c, 0xbf, 0x41, 0x84, 0xd2, 0x33, 0xfc, 0x73, 0x09, 0xf6, 0x8f, 0x89, 0x45,
0x06, 0xfa, 0x48, 0x77, 0x75, 0xea, 0xac, 0x1e, 0xfa, 0x21, 0x44, 0x87, 0xc4, 0xf2, 0xeb, 0x6b,
0xfb, 0xa6, 0xf6, 0x3e, 0x50, 0xc6, 0x74, 0xf8, 0x7c, 0x8a, 0x39, 0xb8, 0xf2, 0x7b, 0x48, 0xad,
0x58, 0x5b, 0x8d, 0xac, 0x79, 0xc8, 0xf2, 0x49, 0xd8, 0x47, 0x2e, 0x3f, 0x80, 0x77, 0x7e, 0xd9,
0xd8, 0x61, 0xc7, 0x25, 0xb6, 0xcb, 0x01, 0x23, 0x58, 0x10, 0x4c, 0x09, 0x35, 0x2e, 0x39, 0x60,
0x04, 0xb3, 0x65, 0xed, 0xbb, 0x30, 0x24, 0xba, 0xc2, 0x68, 0xf4, 0x7f, 0x09, 0xa2, 0xac, 0x86,
0xd1, 0x1f, 0x76, 0x99, 0xe2, 0x95, 0x3f, 0xee, 0xd4, 0x38, 0xca, 0xd1, 0x97, 0x5f, 0x14, 0x42,
0xbf, 0x96, 0x90, 0x03, 0x99, 0x60, 0x14, 0xd1, 0x41, 0x55, 0xfc, 0x86, 0x57, 0xfd, 0xdf, 0xf0,
0x6a, 0x83, 0xfd, 0x86, 0x2b, 0x8d, 0x8f, 0x72, 0x39, 0x5c, 0xad, 0x84, 0x28, 0x88, 0x7f, 0x8c,
0x9f, 0xd4, 0xf6, 0xa7, 0xad, 0xb5, 0x6d, 0xde, 0x14, 0x53, 0x13, 0x56, 0xb8, 0x8f, 0xf5, 0x3b,
0x6f, 0xbe, 0x2d, 0x86, 0xde, 0x2c, 0x8b, 0xd2, 0xd7, 0xcb, 0xa2, 0xf4, 0xcd, 0xb2, 0x28, 0xbd,
0x7e, 0x5b, 0x0c, 0xfd, 0x23, 0xe1, 0x1d, 0x1c, 0xc4, 0xb9, 0xea, 0xdf, 0xfc, 0x18, 0x00, 0x00,
0xff, 0xff, 0xa9, 0x9b, 0xef, 0x45, 0xee, 0x10, 0x00, 0x00,
}

View File

@ -23,7 +23,6 @@ package pb
import (
context "context"
yarpc "github.com/influxdata/yarpc"
)
@ -164,7 +163,7 @@ func _Storage_Hints_Handler(srv interface{}, ctx context.Context, dec func(inter
}
var _Storage_serviceDesc = yarpc.ServiceDesc{
ServiceName: "storage.Storage",
ServiceName: "com.github.influxdata.influxdb.services.storage.Storage",
Index: 0,
HandlerType: (*StorageServer)(nil),
Methods: []yarpc.MethodDesc{

View File

@ -143,6 +143,21 @@ func (s *source) next(ctx context.Context, trace map[string]string) (query.Block
return bi, stop, true
}
type GroupMode int
const (
// GroupModeDefault specifies the default grouping mode, which is GroupModeAll.
GroupModeDefault GroupMode = 0
// GroupModeNone merges all series into a single group.
GroupModeNone GroupMode = 1 << iota
// GroupModeAll produces a separate block for each series.
GroupModeAll
// GroupModeBy produces a block for each unique value of the specified GroupKeys.
GroupModeBy
// GroupModeExcept produces a block for the unique values of all keys, except those specified by GroupKeys.
GroupModeExcept
)
type ReadSpec struct {
OrganizationID []byte
BucketID []byte
@ -162,12 +177,13 @@ type ReadSpec struct {
// By default this is false meaning all values of time are produced for a given series,
// before any values are produced from the next series.
OrderByTime bool
// MergeAll indicates that all series should be merged into a single group
MergeAll bool
// GroupKeys is the list of dimensions along which to group
// GroupMode instructs
GroupMode GroupMode
// GroupKeys is the list of dimensions along which to group.
//
// When GroupMode is GroupModeBy, the results will be grouped by the specified keys.
// When GroupMode is GroupModeExcept, the results will be grouped by all keys, except those specified.
GroupKeys []string
// GroupExcept is the list of dimensions along which to not group
GroupExcept []string
}
type Reader interface {

View File

@ -6,8 +6,8 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
)
@ -422,7 +422,8 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("group"): {
ID: plan.ProcedureIDFromOperationID("group"),
Spec: &functions.GroupProcedureSpec{
By: []string{"host", "region"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host", "region"},
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("sum")},
@ -465,6 +466,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
},
},
GroupingSet: true,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host", "region"},
AggregateSet: true,
AggregateMethod: "sum",
@ -523,7 +525,8 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("group"): {
ID: plan.ProcedureIDFromOperationID("group"),
Spec: &functions.GroupProcedureSpec{
By: []string{"host"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")},
@ -568,6 +571,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
},
},
GroupingSet: true,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
LimitSet: true,
PointsLimit: -1,
@ -626,7 +630,8 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("group"): {
ID: plan.ProcedureIDFromOperationID("group"),
Spec: &functions.GroupProcedureSpec{
By: []string{"host"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")},
@ -671,6 +676,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
},
},
GroupingSet: true,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
},
Parents: nil,
@ -727,7 +733,8 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
plan.ProcedureIDFromOperationID("group"): {
ID: plan.ProcedureIDFromOperationID("group"),
Spec: &functions.GroupProcedureSpec{
By: []string{"host"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
},
Parents: []plan.ProcedureID{plan.ProcedureIDFromOperationID("range")},
Children: []plan.ProcedureID{plan.ProcedureIDFromOperationID("distinct")},
@ -772,6 +779,7 @@ func TestPhysicalPlanner_Plan(t *testing.T) {
},
},
GroupingSet: true,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"host"},
},
Parents: nil,

View File

@ -225,8 +225,8 @@ func (r *REPL) doQuery(spec *query.Spec) error {
blocks := r.Blocks()
fmt.Println("Result:", name)
err := blocks.Do(func(b query.Block) error {
execute.NewFormatter(b, nil).WriteTo(os.Stdout)
return nil
_, err := execute.NewFormatter(b, nil).WriteTo(os.Stdout)
return err
})
if err != nil {
return err