From 6c7d56d4bc634170d2abd11104423bda1b554e0e Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 4 Aug 2016 11:04:33 -0600 Subject: [PATCH] limit shard concurrency This commit limits queries to only process one shard at a time. However, within a shard, multiple series can still be processed in parallel. Shard iterators are lazily instantiated during query execution to limit the amount of memory a given query uses. --- CHANGELOG.md | 1 + coordinator/statement_executor.go | 6 + influxql/internal/internal.pb.go | 104 ++++++-- influxql/iterator.gen.go | 388 ++++++++++++++++++++++++++++++ influxql/iterator.gen.go.tmpl | 98 ++++++++ influxql/iterator.go | 85 +++++++ services/meta/client.go | 17 +- services/meta/data.go | 11 + tsdb/store.go | 2 +- 9 files changed, 679 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 759b6e4b6d..e999b4bb2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,6 +114,7 @@ With this release the systemd configuration files for InfluxDB will use the syst - [#7084](https://github.com/influxdata/influxdb/pull/7084): Tombstone memory improvements - [#6543](https://github.com/influxdata/influxdb/issues/6543): Fix parseFill to check for fill ident before attempting to parse an expression. - [#7032](https://github.com/influxdata/influxdb/pull/7032): Copy tags in influx_stress to avoid a concurrent write panic on a map. +- [#7107](https://github.com/influxdata/influxdb/pull/7107): Limit shard concurrency ## v0.13.0 [2016-05-12] diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 74bdbd9e33..554d4a03aa 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -603,6 +603,12 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt if err != nil { return nil, err } + + // Reverse shards if in descending order. + if !stmt.TimeAscending() { + shards = meta.ShardInfos(shards).Reverse() + } + return e.TSDBStore.IteratorCreator(shards, opt) } diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index a6f2c4b305..51e0a7f3e7 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -29,6 +29,12 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + type Point struct { Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"` @@ -44,9 +50,10 @@ type Point struct { XXX_unrecognized []byte `json:"-"` } -func (m *Point) Reset() { *m = Point{} } -func (m *Point) String() string { return proto.CompactTextString(m) } -func (*Point) ProtoMessage() {} +func (m *Point) Reset() { *m = Point{} } +func (m *Point) String() string { return proto.CompactTextString(m) } +func (*Point) ProtoMessage() {} +func (*Point) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{0} } func (m *Point) GetName() string { if m != nil && m.Name != nil { @@ -134,9 +141,10 @@ type Aux struct { XXX_unrecognized []byte `json:"-"` } -func (m *Aux) Reset() { *m = Aux{} } -func (m *Aux) String() string { return proto.CompactTextString(m) } -func (*Aux) ProtoMessage() {} +func (m *Aux) Reset() { *m = Aux{} } +func (m *Aux) String() string { return proto.CompactTextString(m) } +func (*Aux) ProtoMessage() {} +func (*Aux) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{1} } func (m *Aux) GetDataType() int32 { if m != nil && m.DataType != nil { @@ -194,9 +202,10 @@ type IteratorOptions struct { XXX_unrecognized []byte `json:"-"` } -func (m *IteratorOptions) Reset() { *m = IteratorOptions{} } -func (m *IteratorOptions) String() string { return proto.CompactTextString(m) } -func (*IteratorOptions) ProtoMessage() {} +func (m *IteratorOptions) Reset() { *m = IteratorOptions{} } +func (m *IteratorOptions) String() string { return proto.CompactTextString(m) } +func (*IteratorOptions) ProtoMessage() {} +func (*IteratorOptions) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{2} } func (m *IteratorOptions) GetExpr() string { if m != nil && m.Expr != nil { @@ -322,9 +331,10 @@ type Measurements struct { XXX_unrecognized []byte `json:"-"` } -func (m *Measurements) Reset() { *m = Measurements{} } -func (m *Measurements) String() string { return proto.CompactTextString(m) } -func (*Measurements) ProtoMessage() {} +func (m *Measurements) Reset() { *m = Measurements{} } +func (m *Measurements) String() string { return proto.CompactTextString(m) } +func (*Measurements) ProtoMessage() {} +func (*Measurements) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{3} } func (m *Measurements) GetItems() []*Measurement { if m != nil { @@ -342,9 +352,10 @@ type Measurement struct { XXX_unrecognized []byte `json:"-"` } -func (m *Measurement) Reset() { *m = Measurement{} } -func (m *Measurement) String() string { return proto.CompactTextString(m) } -func (*Measurement) ProtoMessage() {} +func (m *Measurement) Reset() { *m = Measurement{} } +func (m *Measurement) String() string { return proto.CompactTextString(m) } +func (*Measurement) ProtoMessage() {} +func (*Measurement) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{4} } func (m *Measurement) GetDatabase() string { if m != nil && m.Database != nil { @@ -387,9 +398,10 @@ type Interval struct { XXX_unrecognized []byte `json:"-"` } -func (m *Interval) Reset() { *m = Interval{} } -func (m *Interval) String() string { return proto.CompactTextString(m) } -func (*Interval) ProtoMessage() {} +func (m *Interval) Reset() { *m = Interval{} } +func (m *Interval) String() string { return proto.CompactTextString(m) } +func (*Interval) ProtoMessage() {} +func (*Interval) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{5} } func (m *Interval) GetDuration() int64 { if m != nil && m.Duration != nil { @@ -411,9 +423,10 @@ type IteratorStats struct { XXX_unrecognized []byte `json:"-"` } -func (m *IteratorStats) Reset() { *m = IteratorStats{} } -func (m *IteratorStats) String() string { return proto.CompactTextString(m) } -func (*IteratorStats) ProtoMessage() {} +func (m *IteratorStats) Reset() { *m = IteratorStats{} } +func (m *IteratorStats) String() string { return proto.CompactTextString(m) } +func (*IteratorStats) ProtoMessage() {} +func (*IteratorStats) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{6} } func (m *IteratorStats) GetSeriesN() int64 { if m != nil && m.SeriesN != nil { @@ -435,9 +448,10 @@ type VarRef struct { XXX_unrecognized []byte `json:"-"` } -func (m *VarRef) Reset() { *m = VarRef{} } -func (m *VarRef) String() string { return proto.CompactTextString(m) } -func (*VarRef) ProtoMessage() {} +func (m *VarRef) Reset() { *m = VarRef{} } +func (m *VarRef) String() string { return proto.CompactTextString(m) } +func (*VarRef) ProtoMessage() {} +func (*VarRef) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{7} } func (m *VarRef) GetVal() string { if m != nil && m.Val != nil { @@ -463,3 +477,45 @@ func init() { proto.RegisterType((*IteratorStats)(nil), "influxql.IteratorStats") proto.RegisterType((*VarRef)(nil), "influxql.VarRef") } + +func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorInternal) } + +var fileDescriptorInternal = []byte{ + // 575 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x53, 0xdf, 0x6a, 0xdb, 0x3c, + 0x14, 0x47, 0x56, 0x9d, 0xda, 0xc7, 0x49, 0x93, 0xe8, 0xfb, 0x46, 0xc5, 0xae, 0x44, 0x56, 0x8a, + 0x2f, 0x46, 0x37, 0xca, 0x5e, 0x20, 0x5b, 0x5b, 0x08, 0x6c, 0x69, 0x69, 0x4a, 0xef, 0xb5, 0xe6, + 0xc4, 0x08, 0x14, 0x39, 0x93, 0xe4, 0x91, 0x3e, 0x73, 0x5f, 0x62, 0x48, 0x71, 0x9a, 0xac, 0x84, + 0xdd, 0xf9, 0x1c, 0x1d, 0xc9, 0xbf, 0x7f, 0x07, 0x4e, 0x95, 0xf1, 0x68, 0x8d, 0xd4, 0x9f, 0xb6, + 0x1f, 0x17, 0x2b, 0x5b, 0xfb, 0x9a, 0x65, 0xca, 0x2c, 0x74, 0xb3, 0xfe, 0xa5, 0x47, 0x2f, 0x04, + 0xd2, 0xbb, 0x5a, 0x19, 0xcf, 0xba, 0x70, 0x34, 0x95, 0x4b, 0xe4, 0x44, 0x24, 0x65, 0x1e, 0xaa, + 0x07, 0x59, 0x39, 0x9e, 0xbc, 0x56, 0x6a, 0x89, 0x9c, 0x8a, 0xa4, 0xa4, 0xac, 0x00, 0x3a, 0x55, + 0x9a, 0x1f, 0x89, 0xa4, 0xcc, 0xd8, 0x7b, 0xa0, 0xe3, 0x66, 0xcd, 0x53, 0x41, 0xcb, 0xe2, 0xb2, + 0x77, 0xb1, 0x7d, 0xf8, 0x62, 0xdc, 0xac, 0x19, 0x03, 0x18, 0x57, 0x95, 0xc5, 0x4a, 0x7a, 0x9c, + 0xf3, 0x8e, 0x20, 0x65, 0x2f, 0xf4, 0x6e, 0x74, 0x2d, 0xfd, 0xa3, 0xd4, 0x0d, 0xf2, 0x63, 0x41, + 0x4a, 0xc2, 0xfe, 0x87, 0xee, 0xc4, 0x78, 0xac, 0xd0, 0x6e, 0xba, 0x99, 0x20, 0x25, 0x65, 0xff, + 0x41, 0x31, 0xf3, 0x56, 0x99, 0x6a, 0xd3, 0xcc, 0x05, 0x29, 0xf3, 0x30, 0xfa, 0xb5, 0xae, 0x35, + 0x4a, 0xb3, 0xe9, 0x82, 0x20, 0x65, 0xc6, 0xce, 0x21, 0x9d, 0x79, 0xe9, 0x1d, 0x2f, 0x04, 0x29, + 0x8b, 0xcb, 0xd3, 0x1d, 0x8c, 0x89, 0x47, 0x2b, 0x7d, 0x6d, 0xe3, 0xf1, 0x48, 0x47, 0xb0, 0x6c, + 0x00, 0xd9, 0x95, 0xf4, 0xf2, 0xe1, 0x79, 0xb5, 0xa1, 0x9b, 0xbe, 0x41, 0x95, 0x1c, 0x44, 0x45, + 0x0f, 0xa1, 0x3a, 0x3a, 0x88, 0x2a, 0x0d, 0xa8, 0x46, 0x2f, 0x09, 0xf4, 0xb7, 0xff, 0xbf, 0x5d, + 0x79, 0x55, 0x1b, 0x17, 0x94, 0xbc, 0x5e, 0xaf, 0x2c, 0x27, 0xf1, 0x5e, 0xb1, 0x11, 0x2f, 0x11, + 0xb4, 0xcc, 0x99, 0x80, 0xce, 0x8d, 0x42, 0x3d, 0x77, 0x7c, 0x18, 0xc5, 0x1c, 0xec, 0x58, 0x3c, + 0x4a, 0x7b, 0x8f, 0x0b, 0x76, 0x0e, 0xc7, 0xb3, 0xba, 0xb1, 0x4f, 0xe8, 0x38, 0x8d, 0x23, 0xef, + 0x76, 0x23, 0x3f, 0x50, 0xba, 0xc6, 0xe2, 0x12, 0x8d, 0x67, 0x67, 0x90, 0x05, 0xe4, 0xf6, 0xb7, + 0xd4, 0x11, 0x60, 0x71, 0xc9, 0xf6, 0x14, 0x69, 0x4f, 0x02, 0xe7, 0x2b, 0xb5, 0x44, 0xe3, 0x02, + 0xb0, 0x68, 0x60, 0x34, 0xfa, 0x46, 0x69, 0x1d, 0xbd, 0x4a, 0xd9, 0x10, 0xf2, 0x50, 0xed, 0x5b, + 0x35, 0x84, 0xfc, 0x5b, 0x6d, 0xe6, 0x2a, 0xb0, 0x89, 0x3e, 0xe5, 0xa1, 0x35, 0xf3, 0xd2, 0xfa, + 0x98, 0x90, 0x3c, 0x8a, 0xd4, 0x87, 0xe3, 0x6b, 0x33, 0x8f, 0x0d, 0x88, 0x8d, 0x21, 0xe4, 0x63, + 0xf7, 0x84, 0x66, 0xae, 0x4c, 0x15, 0x4d, 0xca, 0x58, 0x0f, 0xd2, 0xef, 0x6a, 0xa9, 0x3c, 0xef, + 0xc6, 0x89, 0x13, 0xe8, 0xdc, 0x2e, 0x16, 0x0e, 0x3d, 0xef, 0x6d, 0xeb, 0xd9, 0xe6, 0xfc, 0x64, + 0xfb, 0xe4, 0xac, 0x1d, 0xe8, 0x6f, 0x07, 0xae, 0x70, 0xde, 0xac, 0x90, 0x0f, 0xa2, 0xda, 0x5f, + 0xa0, 0xbb, 0xa7, 0x81, 0x63, 0x67, 0x90, 0x4e, 0x3c, 0x2e, 0x1d, 0x27, 0xff, 0x90, 0x6a, 0x54, + 0x41, 0xb1, 0xaf, 0x5c, 0x9b, 0x8c, 0x9f, 0xd2, 0x61, 0x6b, 0xd1, 0x29, 0xf4, 0xef, 0xd1, 0xa3, + 0x09, 0x84, 0xef, 0x6a, 0xad, 0x9e, 0x9e, 0x63, 0x3c, 0xf2, 0xd7, 0x7d, 0xa1, 0xb1, 0xea, 0x41, + 0x7a, 0x8f, 0x15, 0xae, 0xdb, 0x40, 0x0c, 0x20, 0x9b, 0xb8, 0x07, 0x69, 0x2b, 0xf4, 0x6d, 0x18, + 0x3e, 0xee, 0x3c, 0x89, 0x7f, 0x69, 0xac, 0x8c, 0x1a, 0x92, 0x37, 0xec, 0xc3, 0xe3, 0x74, 0xf4, + 0x19, 0x7a, 0x7f, 0x25, 0x37, 0xd2, 0x47, 0xab, 0xd0, 0x4d, 0x77, 0x37, 0xe2, 0xde, 0x4e, 0xdb, + 0x1b, 0x1f, 0xa0, 0xd3, 0xa6, 0xa4, 0x00, 0xfa, 0x28, 0xf5, 0xde, 0x1e, 0x87, 0x98, 0x87, 0xa1, + 0xf4, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x1a, 0x06, 0x51, 0x0d, 0x11, 0x04, 0x00, 0x00, +} diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index b222251224..cad54c1d26 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -494,6 +494,103 @@ func (itr *floatParallelIterator) monitor() { } } +type lazyFloatIterator struct { + itr FloatIterator + fn func() (Iterator, error) +} + +// init instantiates the underlying iterator. +func (itr *lazyFloatIterator) init() error { + if itr.itr != nil { + return nil + } + + i, err := itr.fn() + if err != nil { + return err + } else if i == nil { + return nil + } + + if it, ok := i.(FloatIterator); ok { + itr.itr = it + } else { + return fmt.Errorf("invalid lazy float iterator type: %T", i) + } + + return nil +} + +func (itr *lazyFloatIterator) Close() error { + if itr.itr == nil { + return nil + } + return itr.itr.Close() +} + +func (itr *lazyFloatIterator) Next() (*FloatPoint, error) { + if err := itr.init(); err != nil { + return nil, err + } else if itr.itr == nil { + return nil, nil + } + return itr.itr.Next() +} + +func (itr *lazyFloatIterator) Stats() IteratorStats { + if itr.itr == nil { + return IteratorStats{} + } + return itr.itr.Stats() +} + +// MultiFloatIterator represents an iterator that concatenates a list of iterators. +type MultiFloatIterator []FloatIterator + +// NewMultiFloatIterator returns a pointer to a MultiFloatIterator. +func NewMultiFloatIterator(a []FloatIterator) *MultiFloatIterator { + itr := MultiFloatIterator(a) + return &itr +} + +// Close closes all iterators. +func (a *MultiFloatIterator) Close() error { + for _, itr := range *a { + itr.Close() + } + return nil +} + +// Next iterates over all points in all iterators sequentially. +func (a *MultiFloatIterator) Next() (*FloatPoint, error) { + for { + // Return no point if + if len(*a) == 0 { + return nil, nil + } + + // Read next point off the first iterator until EOF. + p, err := (*a)[0].Next() + if p == nil && err == nil { + if err := (*a)[0].Close(); err != nil { + return nil, err + } + *a = (*a)[1:] + continue + } + return p, err + } +} + +// Stats returns the aggregation of all iterator stats. +func (a *MultiFloatIterator) Stats() IteratorStats { + var stats IteratorStats + for _, itr := range *a { + stats.Add(itr.Stats()) + } + return stats +} + type floatPointError struct { point *FloatPoint err error @@ -2558,6 +2655,103 @@ func (itr *integerParallelIterator) monitor() { } } +type lazyIntegerIterator struct { + itr IntegerIterator + fn func() (Iterator, error) +} + +// init instantiates the underlying iterator. +func (itr *lazyIntegerIterator) init() error { + if itr.itr != nil { + return nil + } + + i, err := itr.fn() + if err != nil { + return err + } else if i == nil { + return nil + } + + if it, ok := i.(IntegerIterator); ok { + itr.itr = it + } else { + return fmt.Errorf("invalid lazy integer iterator type: %T", i) + } + + return nil +} + +func (itr *lazyIntegerIterator) Close() error { + if itr.itr == nil { + return nil + } + return itr.itr.Close() +} + +func (itr *lazyIntegerIterator) Next() (*IntegerPoint, error) { + if err := itr.init(); err != nil { + return nil, err + } else if itr.itr == nil { + return nil, nil + } + return itr.itr.Next() +} + +func (itr *lazyIntegerIterator) Stats() IteratorStats { + if itr.itr == nil { + return IteratorStats{} + } + return itr.itr.Stats() +} + +// MultiIntegerIterator represents an iterator that concatenates a list of iterators. +type MultiIntegerIterator []IntegerIterator + +// NewMultiIntegerIterator returns a pointer to a MultiIntegerIterator. +func NewMultiIntegerIterator(a []IntegerIterator) *MultiIntegerIterator { + itr := MultiIntegerIterator(a) + return &itr +} + +// Close closes all iterators. +func (a *MultiIntegerIterator) Close() error { + for _, itr := range *a { + itr.Close() + } + return nil +} + +// Next iterates over all points in all iterators sequentially. +func (a *MultiIntegerIterator) Next() (*IntegerPoint, error) { + for { + // Return no point if + if len(*a) == 0 { + return nil, nil + } + + // Read next point off the first iterator until EOF. + p, err := (*a)[0].Next() + if p == nil && err == nil { + if err := (*a)[0].Close(); err != nil { + return nil, err + } + *a = (*a)[1:] + continue + } + return p, err + } +} + +// Stats returns the aggregation of all iterator stats. +func (a *MultiIntegerIterator) Stats() IteratorStats { + var stats IteratorStats + for _, itr := range *a { + stats.Add(itr.Stats()) + } + return stats +} + type integerPointError struct { point *IntegerPoint err error @@ -4619,6 +4813,103 @@ func (itr *stringParallelIterator) monitor() { } } +type lazyStringIterator struct { + itr StringIterator + fn func() (Iterator, error) +} + +// init instantiates the underlying iterator. +func (itr *lazyStringIterator) init() error { + if itr.itr != nil { + return nil + } + + i, err := itr.fn() + if err != nil { + return err + } else if i == nil { + return nil + } + + if it, ok := i.(StringIterator); ok { + itr.itr = it + } else { + return fmt.Errorf("invalid lazy string iterator type: %T", i) + } + + return nil +} + +func (itr *lazyStringIterator) Close() error { + if itr.itr == nil { + return nil + } + return itr.itr.Close() +} + +func (itr *lazyStringIterator) Next() (*StringPoint, error) { + if err := itr.init(); err != nil { + return nil, err + } else if itr.itr == nil { + return nil, nil + } + return itr.itr.Next() +} + +func (itr *lazyStringIterator) Stats() IteratorStats { + if itr.itr == nil { + return IteratorStats{} + } + return itr.itr.Stats() +} + +// MultiStringIterator represents an iterator that concatenates a list of iterators. +type MultiStringIterator []StringIterator + +// NewMultiStringIterator returns a pointer to a MultiStringIterator. +func NewMultiStringIterator(a []StringIterator) *MultiStringIterator { + itr := MultiStringIterator(a) + return &itr +} + +// Close closes all iterators. +func (a *MultiStringIterator) Close() error { + for _, itr := range *a { + itr.Close() + } + return nil +} + +// Next iterates over all points in all iterators sequentially. +func (a *MultiStringIterator) Next() (*StringPoint, error) { + for { + // Return no point if + if len(*a) == 0 { + return nil, nil + } + + // Read next point off the first iterator until EOF. + p, err := (*a)[0].Next() + if p == nil && err == nil { + if err := (*a)[0].Close(); err != nil { + return nil, err + } + *a = (*a)[1:] + continue + } + return p, err + } +} + +// Stats returns the aggregation of all iterator stats. +func (a *MultiStringIterator) Stats() IteratorStats { + var stats IteratorStats + for _, itr := range *a { + stats.Add(itr.Stats()) + } + return stats +} + type stringPointError struct { point *StringPoint err error @@ -6680,6 +6971,103 @@ func (itr *booleanParallelIterator) monitor() { } } +type lazyBooleanIterator struct { + itr BooleanIterator + fn func() (Iterator, error) +} + +// init instantiates the underlying iterator. +func (itr *lazyBooleanIterator) init() error { + if itr.itr != nil { + return nil + } + + i, err := itr.fn() + if err != nil { + return err + } else if i == nil { + return nil + } + + if it, ok := i.(BooleanIterator); ok { + itr.itr = it + } else { + return fmt.Errorf("invalid lazy boolean iterator type: %T", i) + } + + return nil +} + +func (itr *lazyBooleanIterator) Close() error { + if itr.itr == nil { + return nil + } + return itr.itr.Close() +} + +func (itr *lazyBooleanIterator) Next() (*BooleanPoint, error) { + if err := itr.init(); err != nil { + return nil, err + } else if itr.itr == nil { + return nil, nil + } + return itr.itr.Next() +} + +func (itr *lazyBooleanIterator) Stats() IteratorStats { + if itr.itr == nil { + return IteratorStats{} + } + return itr.itr.Stats() +} + +// MultiBooleanIterator represents an iterator that concatenates a list of iterators. +type MultiBooleanIterator []BooleanIterator + +// NewMultiBooleanIterator returns a pointer to a MultiBooleanIterator. +func NewMultiBooleanIterator(a []BooleanIterator) *MultiBooleanIterator { + itr := MultiBooleanIterator(a) + return &itr +} + +// Close closes all iterators. +func (a *MultiBooleanIterator) Close() error { + for _, itr := range *a { + itr.Close() + } + return nil +} + +// Next iterates over all points in all iterators sequentially. +func (a *MultiBooleanIterator) Next() (*BooleanPoint, error) { + for { + // Return no point if + if len(*a) == 0 { + return nil, nil + } + + // Read next point off the first iterator until EOF. + p, err := (*a)[0].Next() + if p == nil && err == nil { + if err := (*a)[0].Close(); err != nil { + return nil, err + } + *a = (*a)[1:] + continue + } + return p, err + } +} + +// Stats returns the aggregation of all iterator stats. +func (a *MultiBooleanIterator) Stats() IteratorStats { + var stats IteratorStats + for _, itr := range *a { + stats.Add(itr.Stats()) + } + return stats +} + type booleanPointError struct { point *BooleanPoint err error diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index c7d33625f9..2d10c82367 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -492,6 +492,104 @@ func (itr *{{$k.name}}ParallelIterator) monitor() { } } +type lazy{{$k.Name}}Iterator struct { + itr {{$k.Name}}Iterator + fn func() (Iterator, error) +} + +// init instantiates the underlying iterator. +func (itr *lazy{{$k.Name}}Iterator) init() error { + if itr.itr != nil { + return nil + } + + i, err := itr.fn() + if err != nil { + return err + } else if i == nil { + return nil + } + + if it, ok := i.({{$k.Name}}Iterator); ok { + itr.itr = it + } else { + return fmt.Errorf("invalid lazy {{$k.name}} iterator type: %T", i) + } + + return nil +} + +func (itr *lazy{{$k.Name}}Iterator) Close() error { + if itr.itr == nil { + return nil + } + return itr.itr.Close() +} + +func (itr *lazy{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) { + if err := itr.init(); err != nil { + return nil, err + } else if itr.itr == nil { + return nil, nil + } + return itr.itr.Next() +} + +func (itr *lazy{{$k.Name}}Iterator) Stats() IteratorStats { + if itr.itr == nil { + return IteratorStats{} + } + return itr.itr.Stats() +} + +// Multi{{$k.Name}}Iterator represents an iterator that concatenates a list of iterators. +type Multi{{$k.Name}}Iterator []{{$k.Name}}Iterator + +// NewMulti{{$k.Name}}Iterator returns a pointer to a Multi{{$k.Name}}Iterator. +func NewMulti{{$k.Name}}Iterator(a []{{$k.Name}}Iterator) *Multi{{$k.Name}}Iterator { + itr := Multi{{$k.Name}}Iterator(a) + return &itr +} + +// Close closes all iterators. +func (a *Multi{{$k.Name}}Iterator) Close() error { + for _, itr := range *a { + itr.Close() + } + return nil +} + +// Next iterates over all points in all iterators sequentially. +func (a *Multi{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) { + for { + // Return no point if + if len(*a) == 0 { + return nil, nil + } + + // Read next point off the first iterator until EOF. + p, err := (*a)[0].Next() + if p == nil && err == nil { + if err := (*a)[0].Close(); err != nil { + return nil, err + } + *a = (*a)[1:] + continue + } + return p, err + } +} + +// Stats returns the aggregation of all iterator stats. +func (a *Multi{{$k.Name}}Iterator) Stats() IteratorStats { + var stats IteratorStats + for _, itr := range *a { + stats.Add(itr.Stats()) + } + return stats +} + + type {{$k.name}}PointError struct { point *{{$k.Name}}Point err error diff --git a/influxql/iterator.go b/influxql/iterator.go index 827529ff17..c71528ffd3 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -696,6 +696,91 @@ func (a IteratorCreators) ExpandSources(sources Sources) (Sources, error) { return sorted, nil } +// lazyIteratorCreators represents a list of iterator creators that are lazily created. +type lazyIteratorCreators IteratorCreators + +// NewLazyIteratorCreator returns an iterator creator for that creates iterators lazily. +func NewLazyIteratorCreator(a []IteratorCreator) IteratorCreator { + return lazyIteratorCreators(a) +} + +func (a lazyIteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error) { + for { + if len(a) == 0 { + return nil, nil + } + + // Create first iterator to determine data type. + itr, err := a[0].CreateIterator(opt) + if err != nil { + return nil, err + } else if itr == nil { + a = a[1:] + continue + } else if len(a) == 1 { + return Iterators{itr}.Merge(opt) + } + + // All additional iterators need to be the same type. + switch itr := itr.(type) { + case FloatIterator: + itrs := make([]FloatIterator, len(a)) + itrs[0] = itr + for i := 1; i < len(itrs); i++ { + ic := a[i] + itrs[i] = &lazyFloatIterator{ + fn: func() (Iterator, error) { return ic.CreateIterator(opt) }, + } + } + return Iterators{NewMultiFloatIterator(itrs)}.Merge(opt) + + case IntegerIterator: + itrs := make([]IntegerIterator, len(a)) + itrs[0] = itr + for i := 1; i < len(itrs); i++ { + ic := a[i] + itrs[i] = &lazyIntegerIterator{ + fn: func() (Iterator, error) { return ic.CreateIterator(opt) }, + } + } + return Iterators{NewMultiIntegerIterator(itrs)}.Merge(opt) + + case StringIterator: + itrs := make([]StringIterator, len(a)) + itrs[0] = itr + for i := 1; i < len(itrs); i++ { + ic := a[i] + itrs[i] = &lazyStringIterator{ + fn: func() (Iterator, error) { return ic.CreateIterator(opt) }, + } + } + return Iterators{NewMultiStringIterator(itrs)}.Merge(opt) + + case BooleanIterator: + itrs := make([]BooleanIterator, len(a)) + itrs[0] = itr + for i := 1; i < len(itrs); i++ { + ic := a[i] + itrs[i] = &lazyBooleanIterator{ + fn: func() (Iterator, error) { return ic.CreateIterator(opt) }, + } + } + return Iterators{NewMultiBooleanIterator(itrs)}.Merge(opt) + + default: + panic(fmt.Sprintf("unsupported iterator type for lazy iteration: %T", itr)) + } + } +} + +func (a lazyIteratorCreators) FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error) { + return IteratorCreators(a).FieldDimensions(sources) +} + +func (a lazyIteratorCreators) ExpandSources(sources Sources) (Sources, error) { + return IteratorCreators(a).ExpandSources(sources) +} + // IteratorOptions is an object passed to CreateIterator to specify creation options. type IteratorOptions struct { // Expression to iterate for. diff --git a/services/meta/client.go b/services/meta/client.go index e5976a86df..4da4186f46 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -647,8 +647,9 @@ func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.T } // ShardsByTimeRange returns a slice of shards that may contain data in the time range. +// Shards are returned in ascending time order. func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []ShardInfo, err error) { - m := make(map[*ShardInfo]struct{}) + m := make(map[uint64]struct{}) for _, src := range sources { mm, ok := src.(*influxql.Measurement) if !ok { @@ -660,17 +661,17 @@ func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Tim return nil, err } for _, g := range groups { - for i := range g.Shards { - m[&g.Shards[i]] = struct{}{} + for _, sh := range g.Shards { + if _, ok := m[sh.ID]; ok { + continue + } + + a = append(a, sh) + m[sh.ID] = struct{}{} } } } - a = make([]ShardInfo, 0, len(m)) - for sh := range m { - a = append(a, *sh) - } - return a, nil } diff --git a/services/meta/data.go b/services/meta/data.go index 6871d08df6..3ced75b3d8 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -1164,6 +1164,17 @@ func (si *ShardInfo) unmarshal(pb *internal.ShardInfo) { } } +type ShardInfos []ShardInfo + +// Reverse returns a reversed list of shard infos. +func (a ShardInfos) Reverse() []ShardInfo { + other := make([]ShardInfo, len(a)) + for i := range a { + other[len(other)-i-1] = a[i] + } + return other +} + // SubscriptionInfo hold the subscription information type SubscriptionInfo struct { Name string diff --git a/tsdb/store.go b/tsdb/store.go index 97211aa076..cc721afb39 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -796,7 +796,7 @@ func (s *Store) IteratorCreator(shards []uint64, opt *influxql.SelectOptions) (i return nil, err } - return influxql.IteratorCreators(ics), nil + return influxql.NewLazyIteratorCreator(ics), nil } // WriteToShard writes a list of points to a shard identified by its ID.