Merge pull request #7107 from benbjohnson/concurrent-shard-limiter

Limit shard concurrency
pull/7118/head
Ben Johnson 2016-08-05 12:49:20 -06:00 committed by GitHub
commit 22e1a55f97
9 changed files with 679 additions and 33 deletions

View File

@ -114,6 +114,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#7084](https://github.com/influxdata/influxdb/pull/7084): Tombstone memory improvements
- [#6543](https://github.com/influxdata/influxdb/issues/6543): Fix parseFill to check for fill ident before attempting to parse an expression.
- [#7032](https://github.com/influxdata/influxdb/pull/7032): Copy tags in influx_stress to avoid a concurrent write panic on a map.
- [#7107](https://github.com/influxdata/influxdb/pull/7107): Limit shard concurrency
## v0.13.0 [2016-05-12]

View File

@ -603,6 +603,12 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt
if err != nil {
return nil, err
}
// Reverse shards if in descending order.
if !stmt.TimeAscending() {
shards = meta.ShardInfos(shards).Reverse()
}
return e.TSDBStore.IteratorCreator(shards, opt)
}

View File

@ -29,6 +29,12 @@ var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Point struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"`
@ -44,9 +50,10 @@ type Point struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *Point) Reset() { *m = Point{} }
func (m *Point) String() string { return proto.CompactTextString(m) }
func (*Point) ProtoMessage() {}
func (m *Point) Reset() { *m = Point{} }
func (m *Point) String() string { return proto.CompactTextString(m) }
func (*Point) ProtoMessage() {}
func (*Point) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{0} }
func (m *Point) GetName() string {
if m != nil && m.Name != nil {
@ -134,9 +141,10 @@ type Aux struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *Aux) Reset() { *m = Aux{} }
func (m *Aux) String() string { return proto.CompactTextString(m) }
func (*Aux) ProtoMessage() {}
func (m *Aux) Reset() { *m = Aux{} }
func (m *Aux) String() string { return proto.CompactTextString(m) }
func (*Aux) ProtoMessage() {}
func (*Aux) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{1} }
func (m *Aux) GetDataType() int32 {
if m != nil && m.DataType != nil {
@ -194,9 +202,10 @@ type IteratorOptions struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *IteratorOptions) Reset() { *m = IteratorOptions{} }
func (m *IteratorOptions) String() string { return proto.CompactTextString(m) }
func (*IteratorOptions) ProtoMessage() {}
func (m *IteratorOptions) Reset() { *m = IteratorOptions{} }
func (m *IteratorOptions) String() string { return proto.CompactTextString(m) }
func (*IteratorOptions) ProtoMessage() {}
func (*IteratorOptions) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{2} }
func (m *IteratorOptions) GetExpr() string {
if m != nil && m.Expr != nil {
@ -322,9 +331,10 @@ type Measurements struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *Measurements) Reset() { *m = Measurements{} }
func (m *Measurements) String() string { return proto.CompactTextString(m) }
func (*Measurements) ProtoMessage() {}
func (m *Measurements) Reset() { *m = Measurements{} }
func (m *Measurements) String() string { return proto.CompactTextString(m) }
func (*Measurements) ProtoMessage() {}
func (*Measurements) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{3} }
func (m *Measurements) GetItems() []*Measurement {
if m != nil {
@ -342,9 +352,10 @@ type Measurement struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *Measurement) Reset() { *m = Measurement{} }
func (m *Measurement) String() string { return proto.CompactTextString(m) }
func (*Measurement) ProtoMessage() {}
func (m *Measurement) Reset() { *m = Measurement{} }
func (m *Measurement) String() string { return proto.CompactTextString(m) }
func (*Measurement) ProtoMessage() {}
func (*Measurement) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{4} }
func (m *Measurement) GetDatabase() string {
if m != nil && m.Database != nil {
@ -387,9 +398,10 @@ type Interval struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *Interval) Reset() { *m = Interval{} }
func (m *Interval) String() string { return proto.CompactTextString(m) }
func (*Interval) ProtoMessage() {}
func (m *Interval) Reset() { *m = Interval{} }
func (m *Interval) String() string { return proto.CompactTextString(m) }
func (*Interval) ProtoMessage() {}
func (*Interval) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{5} }
func (m *Interval) GetDuration() int64 {
if m != nil && m.Duration != nil {
@ -411,9 +423,10 @@ type IteratorStats struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *IteratorStats) Reset() { *m = IteratorStats{} }
func (m *IteratorStats) String() string { return proto.CompactTextString(m) }
func (*IteratorStats) ProtoMessage() {}
func (m *IteratorStats) Reset() { *m = IteratorStats{} }
func (m *IteratorStats) String() string { return proto.CompactTextString(m) }
func (*IteratorStats) ProtoMessage() {}
func (*IteratorStats) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{6} }
func (m *IteratorStats) GetSeriesN() int64 {
if m != nil && m.SeriesN != nil {
@ -435,9 +448,10 @@ type VarRef struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *VarRef) Reset() { *m = VarRef{} }
func (m *VarRef) String() string { return proto.CompactTextString(m) }
func (*VarRef) ProtoMessage() {}
func (m *VarRef) Reset() { *m = VarRef{} }
func (m *VarRef) String() string { return proto.CompactTextString(m) }
func (*VarRef) ProtoMessage() {}
func (*VarRef) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{7} }
func (m *VarRef) GetVal() string {
if m != nil && m.Val != nil {
@ -463,3 +477,45 @@ func init() {
proto.RegisterType((*IteratorStats)(nil), "influxql.IteratorStats")
proto.RegisterType((*VarRef)(nil), "influxql.VarRef")
}
func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorInternal) }
var fileDescriptorInternal = []byte{
// 575 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x53, 0xdf, 0x6a, 0xdb, 0x3c,
0x14, 0x47, 0x56, 0x9d, 0xda, 0xc7, 0x49, 0x93, 0xe8, 0xfb, 0x46, 0xc5, 0xae, 0x44, 0x56, 0x8a,
0x2f, 0x46, 0x37, 0xca, 0x5e, 0x20, 0x5b, 0x5b, 0x08, 0x6c, 0x69, 0x69, 0x4a, 0xef, 0xb5, 0xe6,
0xc4, 0x08, 0x14, 0x39, 0x93, 0xe4, 0x91, 0x3e, 0x73, 0x5f, 0x62, 0x48, 0x71, 0x9a, 0xac, 0x84,
0xdd, 0xf9, 0x1c, 0x1d, 0xc9, 0xbf, 0x7f, 0x07, 0x4e, 0x95, 0xf1, 0x68, 0x8d, 0xd4, 0x9f, 0xb6,
0x1f, 0x17, 0x2b, 0x5b, 0xfb, 0x9a, 0x65, 0xca, 0x2c, 0x74, 0xb3, 0xfe, 0xa5, 0x47, 0x2f, 0x04,
0xd2, 0xbb, 0x5a, 0x19, 0xcf, 0xba, 0x70, 0x34, 0x95, 0x4b, 0xe4, 0x44, 0x24, 0x65, 0x1e, 0xaa,
0x07, 0x59, 0x39, 0x9e, 0xbc, 0x56, 0x6a, 0x89, 0x9c, 0x8a, 0xa4, 0xa4, 0xac, 0x00, 0x3a, 0x55,
0x9a, 0x1f, 0x89, 0xa4, 0xcc, 0xd8, 0x7b, 0xa0, 0xe3, 0x66, 0xcd, 0x53, 0x41, 0xcb, 0xe2, 0xb2,
0x77, 0xb1, 0x7d, 0xf8, 0x62, 0xdc, 0xac, 0x19, 0x03, 0x18, 0x57, 0x95, 0xc5, 0x4a, 0x7a, 0x9c,
0xf3, 0x8e, 0x20, 0x65, 0x2f, 0xf4, 0x6e, 0x74, 0x2d, 0xfd, 0xa3, 0xd4, 0x0d, 0xf2, 0x63, 0x41,
0x4a, 0xc2, 0xfe, 0x87, 0xee, 0xc4, 0x78, 0xac, 0xd0, 0x6e, 0xba, 0x99, 0x20, 0x25, 0x65, 0xff,
0x41, 0x31, 0xf3, 0x56, 0x99, 0x6a, 0xd3, 0xcc, 0x05, 0x29, 0xf3, 0x30, 0xfa, 0xb5, 0xae, 0x35,
0x4a, 0xb3, 0xe9, 0x82, 0x20, 0x65, 0xc6, 0xce, 0x21, 0x9d, 0x79, 0xe9, 0x1d, 0x2f, 0x04, 0x29,
0x8b, 0xcb, 0xd3, 0x1d, 0x8c, 0x89, 0x47, 0x2b, 0x7d, 0x6d, 0xe3, 0xf1, 0x48, 0x47, 0xb0, 0x6c,
0x00, 0xd9, 0x95, 0xf4, 0xf2, 0xe1, 0x79, 0xb5, 0xa1, 0x9b, 0xbe, 0x41, 0x95, 0x1c, 0x44, 0x45,
0x0f, 0xa1, 0x3a, 0x3a, 0x88, 0x2a, 0x0d, 0xa8, 0x46, 0x2f, 0x09, 0xf4, 0xb7, 0xff, 0xbf, 0x5d,
0x79, 0x55, 0x1b, 0x17, 0x94, 0xbc, 0x5e, 0xaf, 0x2c, 0x27, 0xf1, 0x5e, 0xb1, 0x11, 0x2f, 0x11,
0xb4, 0xcc, 0x99, 0x80, 0xce, 0x8d, 0x42, 0x3d, 0x77, 0x7c, 0x18, 0xc5, 0x1c, 0xec, 0x58, 0x3c,
0x4a, 0x7b, 0x8f, 0x0b, 0x76, 0x0e, 0xc7, 0xb3, 0xba, 0xb1, 0x4f, 0xe8, 0x38, 0x8d, 0x23, 0xef,
0x76, 0x23, 0x3f, 0x50, 0xba, 0xc6, 0xe2, 0x12, 0x8d, 0x67, 0x67, 0x90, 0x05, 0xe4, 0xf6, 0xb7,
0xd4, 0x11, 0x60, 0x71, 0xc9, 0xf6, 0x14, 0x69, 0x4f, 0x02, 0xe7, 0x2b, 0xb5, 0x44, 0xe3, 0x02,
0xb0, 0x68, 0x60, 0x34, 0xfa, 0x46, 0x69, 0x1d, 0xbd, 0x4a, 0xd9, 0x10, 0xf2, 0x50, 0xed, 0x5b,
0x35, 0x84, 0xfc, 0x5b, 0x6d, 0xe6, 0x2a, 0xb0, 0x89, 0x3e, 0xe5, 0xa1, 0x35, 0xf3, 0xd2, 0xfa,
0x98, 0x90, 0x3c, 0x8a, 0xd4, 0x87, 0xe3, 0x6b, 0x33, 0x8f, 0x0d, 0x88, 0x8d, 0x21, 0xe4, 0x63,
0xf7, 0x84, 0x66, 0xae, 0x4c, 0x15, 0x4d, 0xca, 0x58, 0x0f, 0xd2, 0xef, 0x6a, 0xa9, 0x3c, 0xef,
0xc6, 0x89, 0x13, 0xe8, 0xdc, 0x2e, 0x16, 0x0e, 0x3d, 0xef, 0x6d, 0xeb, 0xd9, 0xe6, 0xfc, 0x64,
0xfb, 0xe4, 0xac, 0x1d, 0xe8, 0x6f, 0x07, 0xae, 0x70, 0xde, 0xac, 0x90, 0x0f, 0xa2, 0xda, 0x5f,
0xa0, 0xbb, 0xa7, 0x81, 0x63, 0x67, 0x90, 0x4e, 0x3c, 0x2e, 0x1d, 0x27, 0xff, 0x90, 0x6a, 0x54,
0x41, 0xb1, 0xaf, 0x5c, 0x9b, 0x8c, 0x9f, 0xd2, 0x61, 0x6b, 0xd1, 0x29, 0xf4, 0xef, 0xd1, 0xa3,
0x09, 0x84, 0xef, 0x6a, 0xad, 0x9e, 0x9e, 0x63, 0x3c, 0xf2, 0xd7, 0x7d, 0xa1, 0xb1, 0xea, 0x41,
0x7a, 0x8f, 0x15, 0xae, 0xdb, 0x40, 0x0c, 0x20, 0x9b, 0xb8, 0x07, 0x69, 0x2b, 0xf4, 0x6d, 0x18,
0x3e, 0xee, 0x3c, 0x89, 0x7f, 0x69, 0xac, 0x8c, 0x1a, 0x92, 0x37, 0xec, 0xc3, 0xe3, 0x74, 0xf4,
0x19, 0x7a, 0x7f, 0x25, 0x37, 0xd2, 0x47, 0xab, 0xd0, 0x4d, 0x77, 0x37, 0xe2, 0xde, 0x4e, 0xdb,
0x1b, 0x1f, 0xa0, 0xd3, 0xa6, 0xa4, 0x00, 0xfa, 0x28, 0xf5, 0xde, 0x1e, 0x87, 0x98, 0x87, 0xa1,
0xf4, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x1a, 0x06, 0x51, 0x0d, 0x11, 0x04, 0x00, 0x00,
}

View File

@ -494,6 +494,103 @@ func (itr *floatParallelIterator) monitor() {
}
}
type lazyFloatIterator struct {
itr FloatIterator
fn func() (Iterator, error)
}
// init instantiates the underlying iterator.
func (itr *lazyFloatIterator) init() error {
if itr.itr != nil {
return nil
}
i, err := itr.fn()
if err != nil {
return err
} else if i == nil {
return nil
}
if it, ok := i.(FloatIterator); ok {
itr.itr = it
} else {
return fmt.Errorf("invalid lazy float iterator type: %T", i)
}
return nil
}
func (itr *lazyFloatIterator) Close() error {
if itr.itr == nil {
return nil
}
return itr.itr.Close()
}
func (itr *lazyFloatIterator) Next() (*FloatPoint, error) {
if err := itr.init(); err != nil {
return nil, err
} else if itr.itr == nil {
return nil, nil
}
return itr.itr.Next()
}
func (itr *lazyFloatIterator) Stats() IteratorStats {
if itr.itr == nil {
return IteratorStats{}
}
return itr.itr.Stats()
}
// MultiFloatIterator represents an iterator that concatenates a list of iterators.
type MultiFloatIterator []FloatIterator
// NewMultiFloatIterator returns a pointer to a MultiFloatIterator.
func NewMultiFloatIterator(a []FloatIterator) *MultiFloatIterator {
itr := MultiFloatIterator(a)
return &itr
}
// Close closes all iterators.
func (a *MultiFloatIterator) Close() error {
for _, itr := range *a {
itr.Close()
}
return nil
}
// Next iterates over all points in all iterators sequentially.
func (a *MultiFloatIterator) Next() (*FloatPoint, error) {
for {
// Return no point if
if len(*a) == 0 {
return nil, nil
}
// Read next point off the first iterator until EOF.
p, err := (*a)[0].Next()
if p == nil && err == nil {
if err := (*a)[0].Close(); err != nil {
return nil, err
}
*a = (*a)[1:]
continue
}
return p, err
}
}
// Stats returns the aggregation of all iterator stats.
func (a *MultiFloatIterator) Stats() IteratorStats {
var stats IteratorStats
for _, itr := range *a {
stats.Add(itr.Stats())
}
return stats
}
type floatPointError struct {
point *FloatPoint
err error
@ -2558,6 +2655,103 @@ func (itr *integerParallelIterator) monitor() {
}
}
type lazyIntegerIterator struct {
itr IntegerIterator
fn func() (Iterator, error)
}
// init instantiates the underlying iterator.
func (itr *lazyIntegerIterator) init() error {
if itr.itr != nil {
return nil
}
i, err := itr.fn()
if err != nil {
return err
} else if i == nil {
return nil
}
if it, ok := i.(IntegerIterator); ok {
itr.itr = it
} else {
return fmt.Errorf("invalid lazy integer iterator type: %T", i)
}
return nil
}
func (itr *lazyIntegerIterator) Close() error {
if itr.itr == nil {
return nil
}
return itr.itr.Close()
}
func (itr *lazyIntegerIterator) Next() (*IntegerPoint, error) {
if err := itr.init(); err != nil {
return nil, err
} else if itr.itr == nil {
return nil, nil
}
return itr.itr.Next()
}
func (itr *lazyIntegerIterator) Stats() IteratorStats {
if itr.itr == nil {
return IteratorStats{}
}
return itr.itr.Stats()
}
// MultiIntegerIterator represents an iterator that concatenates a list of iterators.
type MultiIntegerIterator []IntegerIterator
// NewMultiIntegerIterator returns a pointer to a MultiIntegerIterator.
func NewMultiIntegerIterator(a []IntegerIterator) *MultiIntegerIterator {
itr := MultiIntegerIterator(a)
return &itr
}
// Close closes all iterators.
func (a *MultiIntegerIterator) Close() error {
for _, itr := range *a {
itr.Close()
}
return nil
}
// Next iterates over all points in all iterators sequentially.
func (a *MultiIntegerIterator) Next() (*IntegerPoint, error) {
for {
// Return no point if
if len(*a) == 0 {
return nil, nil
}
// Read next point off the first iterator until EOF.
p, err := (*a)[0].Next()
if p == nil && err == nil {
if err := (*a)[0].Close(); err != nil {
return nil, err
}
*a = (*a)[1:]
continue
}
return p, err
}
}
// Stats returns the aggregation of all iterator stats.
func (a *MultiIntegerIterator) Stats() IteratorStats {
var stats IteratorStats
for _, itr := range *a {
stats.Add(itr.Stats())
}
return stats
}
type integerPointError struct {
point *IntegerPoint
err error
@ -4619,6 +4813,103 @@ func (itr *stringParallelIterator) monitor() {
}
}
type lazyStringIterator struct {
itr StringIterator
fn func() (Iterator, error)
}
// init instantiates the underlying iterator.
func (itr *lazyStringIterator) init() error {
if itr.itr != nil {
return nil
}
i, err := itr.fn()
if err != nil {
return err
} else if i == nil {
return nil
}
if it, ok := i.(StringIterator); ok {
itr.itr = it
} else {
return fmt.Errorf("invalid lazy string iterator type: %T", i)
}
return nil
}
func (itr *lazyStringIterator) Close() error {
if itr.itr == nil {
return nil
}
return itr.itr.Close()
}
func (itr *lazyStringIterator) Next() (*StringPoint, error) {
if err := itr.init(); err != nil {
return nil, err
} else if itr.itr == nil {
return nil, nil
}
return itr.itr.Next()
}
func (itr *lazyStringIterator) Stats() IteratorStats {
if itr.itr == nil {
return IteratorStats{}
}
return itr.itr.Stats()
}
// MultiStringIterator represents an iterator that concatenates a list of iterators.
type MultiStringIterator []StringIterator
// NewMultiStringIterator returns a pointer to a MultiStringIterator.
func NewMultiStringIterator(a []StringIterator) *MultiStringIterator {
itr := MultiStringIterator(a)
return &itr
}
// Close closes all iterators.
func (a *MultiStringIterator) Close() error {
for _, itr := range *a {
itr.Close()
}
return nil
}
// Next iterates over all points in all iterators sequentially.
func (a *MultiStringIterator) Next() (*StringPoint, error) {
for {
// Return no point if
if len(*a) == 0 {
return nil, nil
}
// Read next point off the first iterator until EOF.
p, err := (*a)[0].Next()
if p == nil && err == nil {
if err := (*a)[0].Close(); err != nil {
return nil, err
}
*a = (*a)[1:]
continue
}
return p, err
}
}
// Stats returns the aggregation of all iterator stats.
func (a *MultiStringIterator) Stats() IteratorStats {
var stats IteratorStats
for _, itr := range *a {
stats.Add(itr.Stats())
}
return stats
}
type stringPointError struct {
point *StringPoint
err error
@ -6680,6 +6971,103 @@ func (itr *booleanParallelIterator) monitor() {
}
}
type lazyBooleanIterator struct {
itr BooleanIterator
fn func() (Iterator, error)
}
// init instantiates the underlying iterator.
func (itr *lazyBooleanIterator) init() error {
if itr.itr != nil {
return nil
}
i, err := itr.fn()
if err != nil {
return err
} else if i == nil {
return nil
}
if it, ok := i.(BooleanIterator); ok {
itr.itr = it
} else {
return fmt.Errorf("invalid lazy boolean iterator type: %T", i)
}
return nil
}
func (itr *lazyBooleanIterator) Close() error {
if itr.itr == nil {
return nil
}
return itr.itr.Close()
}
func (itr *lazyBooleanIterator) Next() (*BooleanPoint, error) {
if err := itr.init(); err != nil {
return nil, err
} else if itr.itr == nil {
return nil, nil
}
return itr.itr.Next()
}
func (itr *lazyBooleanIterator) Stats() IteratorStats {
if itr.itr == nil {
return IteratorStats{}
}
return itr.itr.Stats()
}
// MultiBooleanIterator represents an iterator that concatenates a list of iterators.
type MultiBooleanIterator []BooleanIterator
// NewMultiBooleanIterator returns a pointer to a MultiBooleanIterator.
func NewMultiBooleanIterator(a []BooleanIterator) *MultiBooleanIterator {
itr := MultiBooleanIterator(a)
return &itr
}
// Close closes all iterators.
func (a *MultiBooleanIterator) Close() error {
for _, itr := range *a {
itr.Close()
}
return nil
}
// Next iterates over all points in all iterators sequentially.
func (a *MultiBooleanIterator) Next() (*BooleanPoint, error) {
for {
// Return no point if
if len(*a) == 0 {
return nil, nil
}
// Read next point off the first iterator until EOF.
p, err := (*a)[0].Next()
if p == nil && err == nil {
if err := (*a)[0].Close(); err != nil {
return nil, err
}
*a = (*a)[1:]
continue
}
return p, err
}
}
// Stats returns the aggregation of all iterator stats.
func (a *MultiBooleanIterator) Stats() IteratorStats {
var stats IteratorStats
for _, itr := range *a {
stats.Add(itr.Stats())
}
return stats
}
type booleanPointError struct {
point *BooleanPoint
err error

View File

@ -492,6 +492,104 @@ func (itr *{{$k.name}}ParallelIterator) monitor() {
}
}
type lazy{{$k.Name}}Iterator struct {
itr {{$k.Name}}Iterator
fn func() (Iterator, error)
}
// init instantiates the underlying iterator.
func (itr *lazy{{$k.Name}}Iterator) init() error {
if itr.itr != nil {
return nil
}
i, err := itr.fn()
if err != nil {
return err
} else if i == nil {
return nil
}
if it, ok := i.({{$k.Name}}Iterator); ok {
itr.itr = it
} else {
return fmt.Errorf("invalid lazy {{$k.name}} iterator type: %T", i)
}
return nil
}
func (itr *lazy{{$k.Name}}Iterator) Close() error {
if itr.itr == nil {
return nil
}
return itr.itr.Close()
}
func (itr *lazy{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) {
if err := itr.init(); err != nil {
return nil, err
} else if itr.itr == nil {
return nil, nil
}
return itr.itr.Next()
}
func (itr *lazy{{$k.Name}}Iterator) Stats() IteratorStats {
if itr.itr == nil {
return IteratorStats{}
}
return itr.itr.Stats()
}
// Multi{{$k.Name}}Iterator represents an iterator that concatenates a list of iterators.
type Multi{{$k.Name}}Iterator []{{$k.Name}}Iterator
// NewMulti{{$k.Name}}Iterator returns a pointer to a Multi{{$k.Name}}Iterator.
func NewMulti{{$k.Name}}Iterator(a []{{$k.Name}}Iterator) *Multi{{$k.Name}}Iterator {
itr := Multi{{$k.Name}}Iterator(a)
return &itr
}
// Close closes all iterators.
func (a *Multi{{$k.Name}}Iterator) Close() error {
for _, itr := range *a {
itr.Close()
}
return nil
}
// Next iterates over all points in all iterators sequentially.
func (a *Multi{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) {
for {
// Return no point if
if len(*a) == 0 {
return nil, nil
}
// Read next point off the first iterator until EOF.
p, err := (*a)[0].Next()
if p == nil && err == nil {
if err := (*a)[0].Close(); err != nil {
return nil, err
}
*a = (*a)[1:]
continue
}
return p, err
}
}
// Stats returns the aggregation of all iterator stats.
func (a *Multi{{$k.Name}}Iterator) Stats() IteratorStats {
var stats IteratorStats
for _, itr := range *a {
stats.Add(itr.Stats())
}
return stats
}
type {{$k.name}}PointError struct {
point *{{$k.Name}}Point
err error

View File

@ -696,6 +696,91 @@ func (a IteratorCreators) ExpandSources(sources Sources) (Sources, error) {
return sorted, nil
}
// lazyIteratorCreators represents a list of iterator creators that are lazily created.
type lazyIteratorCreators IteratorCreators
// NewLazyIteratorCreator returns an iterator creator for that creates iterators lazily.
func NewLazyIteratorCreator(a []IteratorCreator) IteratorCreator {
return lazyIteratorCreators(a)
}
func (a lazyIteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error) {
for {
if len(a) == 0 {
return nil, nil
}
// Create first iterator to determine data type.
itr, err := a[0].CreateIterator(opt)
if err != nil {
return nil, err
} else if itr == nil {
a = a[1:]
continue
} else if len(a) == 1 {
return Iterators{itr}.Merge(opt)
}
// All additional iterators need to be the same type.
switch itr := itr.(type) {
case FloatIterator:
itrs := make([]FloatIterator, len(a))
itrs[0] = itr
for i := 1; i < len(itrs); i++ {
ic := a[i]
itrs[i] = &lazyFloatIterator{
fn: func() (Iterator, error) { return ic.CreateIterator(opt) },
}
}
return Iterators{NewMultiFloatIterator(itrs)}.Merge(opt)
case IntegerIterator:
itrs := make([]IntegerIterator, len(a))
itrs[0] = itr
for i := 1; i < len(itrs); i++ {
ic := a[i]
itrs[i] = &lazyIntegerIterator{
fn: func() (Iterator, error) { return ic.CreateIterator(opt) },
}
}
return Iterators{NewMultiIntegerIterator(itrs)}.Merge(opt)
case StringIterator:
itrs := make([]StringIterator, len(a))
itrs[0] = itr
for i := 1; i < len(itrs); i++ {
ic := a[i]
itrs[i] = &lazyStringIterator{
fn: func() (Iterator, error) { return ic.CreateIterator(opt) },
}
}
return Iterators{NewMultiStringIterator(itrs)}.Merge(opt)
case BooleanIterator:
itrs := make([]BooleanIterator, len(a))
itrs[0] = itr
for i := 1; i < len(itrs); i++ {
ic := a[i]
itrs[i] = &lazyBooleanIterator{
fn: func() (Iterator, error) { return ic.CreateIterator(opt) },
}
}
return Iterators{NewMultiBooleanIterator(itrs)}.Merge(opt)
default:
panic(fmt.Sprintf("unsupported iterator type for lazy iteration: %T", itr))
}
}
}
func (a lazyIteratorCreators) FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error) {
return IteratorCreators(a).FieldDimensions(sources)
}
func (a lazyIteratorCreators) ExpandSources(sources Sources) (Sources, error) {
return IteratorCreators(a).ExpandSources(sources)
}
// IteratorOptions is an object passed to CreateIterator to specify creation options.
type IteratorOptions struct {
// Expression to iterate for.

View File

@ -647,8 +647,9 @@ func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.T
}
// ShardsByTimeRange returns a slice of shards that may contain data in the time range.
// Shards are returned in ascending time order.
func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []ShardInfo, err error) {
m := make(map[*ShardInfo]struct{})
m := make(map[uint64]struct{})
for _, src := range sources {
mm, ok := src.(*influxql.Measurement)
if !ok {
@ -660,17 +661,17 @@ func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Tim
return nil, err
}
for _, g := range groups {
for i := range g.Shards {
m[&g.Shards[i]] = struct{}{}
for _, sh := range g.Shards {
if _, ok := m[sh.ID]; ok {
continue
}
a = append(a, sh)
m[sh.ID] = struct{}{}
}
}
}
a = make([]ShardInfo, 0, len(m))
for sh := range m {
a = append(a, *sh)
}
return a, nil
}

View File

@ -1164,6 +1164,17 @@ func (si *ShardInfo) unmarshal(pb *internal.ShardInfo) {
}
}
type ShardInfos []ShardInfo
// Reverse returns a reversed list of shard infos.
func (a ShardInfos) Reverse() []ShardInfo {
other := make([]ShardInfo, len(a))
for i := range a {
other[len(other)-i-1] = a[i]
}
return other
}
// SubscriptionInfo hold the subscription information
type SubscriptionInfo struct {
Name string

View File

@ -796,7 +796,7 @@ func (s *Store) IteratorCreator(shards []uint64, opt *influxql.SelectOptions) (i
return nil, err
}
return influxql.IteratorCreators(ics), nil
return influxql.NewLazyIteratorCreator(ics), nil
}
// WriteToShard writes a list of points to a shard identified by its ID.