From 1b6524a7bf65742241332cfe775a712858ad1db1 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 25 Apr 2016 09:12:55 -0600 Subject: [PATCH] reduce interrupt iterator checks The interrupt iterator currently introduces a non-trivial amount of overhead to queries by checking for interrupts every 256 points. This commit adjusts that check to every 5000 points. There are also several places where nested field access has been adjusted to minimize field lookups. --- influxql/internal/internal.pb.go | 107 +++++++++++++++++++++++-------- influxql/iterator.gen.go | 80 +++++++++++------------ influxql/iterator.gen.go.tmpl | 20 +++--- 3 files changed, 130 insertions(+), 77 deletions(-) diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index cee0815b75..f3dd295077 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -30,6 +30,10 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +const _ = proto.GoGoProtoPackageIsVersion1 + type Point struct { Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"` @@ -45,9 +49,10 @@ type Point struct { XXX_unrecognized []byte `json:"-"` } -func (m *Point) Reset() { *m = Point{} } -func (m *Point) String() string { return proto.CompactTextString(m) } -func (*Point) ProtoMessage() {} +func (m *Point) Reset() { *m = Point{} } +func (m *Point) String() string { return proto.CompactTextString(m) } +func (*Point) ProtoMessage() {} +func (*Point) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{0} } func (m *Point) GetName() string { if m != nil && m.Name != nil { @@ -135,9 +140,10 @@ type Aux struct { XXX_unrecognized []byte `json:"-"` } -func (m *Aux) Reset() { *m = Aux{} } -func (m *Aux) String() string { return proto.CompactTextString(m) } -func (*Aux) ProtoMessage() {} +func (m *Aux) Reset() { *m = Aux{} } +func (m *Aux) String() string { return proto.CompactTextString(m) } +func (*Aux) ProtoMessage() {} +func (*Aux) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{1} } func (m *Aux) GetDataType() int32 { if m != nil && m.DataType != nil { @@ -194,9 +200,10 @@ type IteratorOptions struct { XXX_unrecognized []byte `json:"-"` } -func (m *IteratorOptions) Reset() { *m = IteratorOptions{} } -func (m *IteratorOptions) String() string { return proto.CompactTextString(m) } -func (*IteratorOptions) ProtoMessage() {} +func (m *IteratorOptions) Reset() { *m = IteratorOptions{} } +func (m *IteratorOptions) String() string { return proto.CompactTextString(m) } +func (*IteratorOptions) ProtoMessage() {} +func (*IteratorOptions) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{2} } func (m *IteratorOptions) GetExpr() string { if m != nil && m.Expr != nil { @@ -315,9 +322,10 @@ type Measurements struct { XXX_unrecognized []byte `json:"-"` } -func (m *Measurements) Reset() { *m = Measurements{} } -func (m *Measurements) String() string { return proto.CompactTextString(m) } -func (*Measurements) ProtoMessage() {} +func (m *Measurements) Reset() { *m = Measurements{} } +func (m *Measurements) String() string { return proto.CompactTextString(m) } +func (*Measurements) ProtoMessage() {} +func (*Measurements) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{3} } func (m *Measurements) GetItems() []*Measurement { if m != nil { @@ -335,9 +343,10 @@ type Measurement struct { XXX_unrecognized []byte `json:"-"` } -func (m *Measurement) Reset() { *m = Measurement{} } -func (m *Measurement) String() string { return proto.CompactTextString(m) } -func (*Measurement) ProtoMessage() {} +func (m *Measurement) Reset() { *m = Measurement{} } +func (m *Measurement) String() string { return proto.CompactTextString(m) } +func (*Measurement) ProtoMessage() {} +func (*Measurement) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{4} } func (m *Measurement) GetDatabase() string { if m != nil && m.Database != nil { @@ -380,9 +389,10 @@ type Interval struct { XXX_unrecognized []byte `json:"-"` } -func (m *Interval) Reset() { *m = Interval{} } -func (m *Interval) String() string { return proto.CompactTextString(m) } -func (*Interval) ProtoMessage() {} +func (m *Interval) Reset() { *m = Interval{} } +func (m *Interval) String() string { return proto.CompactTextString(m) } +func (*Interval) ProtoMessage() {} +func (*Interval) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{5} } func (m *Interval) GetDuration() int64 { if m != nil && m.Duration != nil { @@ -404,9 +414,10 @@ type IteratorStats struct { XXX_unrecognized []byte `json:"-"` } -func (m *IteratorStats) Reset() { *m = IteratorStats{} } -func (m *IteratorStats) String() string { return proto.CompactTextString(m) } -func (*IteratorStats) ProtoMessage() {} +func (m *IteratorStats) Reset() { *m = IteratorStats{} } +func (m *IteratorStats) String() string { return proto.CompactTextString(m) } +func (*IteratorStats) ProtoMessage() {} +func (*IteratorStats) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{6} } func (m *IteratorStats) GetSeriesN() int64 { if m != nil && m.SeriesN != nil { @@ -429,9 +440,10 @@ type Series struct { XXX_unrecognized []byte `json:"-"` } -func (m *Series) Reset() { *m = Series{} } -func (m *Series) String() string { return proto.CompactTextString(m) } -func (*Series) ProtoMessage() {} +func (m *Series) Reset() { *m = Series{} } +func (m *Series) String() string { return proto.CompactTextString(m) } +func (*Series) ProtoMessage() {} +func (*Series) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{7} } func (m *Series) GetName() string { if m != nil && m.Name != nil { @@ -459,9 +471,10 @@ type SeriesList struct { XXX_unrecognized []byte `json:"-"` } -func (m *SeriesList) Reset() { *m = SeriesList{} } -func (m *SeriesList) String() string { return proto.CompactTextString(m) } -func (*SeriesList) ProtoMessage() {} +func (m *SeriesList) Reset() { *m = SeriesList{} } +func (m *SeriesList) String() string { return proto.CompactTextString(m) } +func (*SeriesList) ProtoMessage() {} +func (*SeriesList) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{8} } func (m *SeriesList) GetItems() []*Series { if m != nil { @@ -481,3 +494,43 @@ func init() { proto.RegisterType((*Series)(nil), "influxql.Series") proto.RegisterType((*SeriesList)(nil), "influxql.SeriesList") } + +var fileDescriptorInternal = []byte{ + // 569 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x53, 0xdf, 0x6e, 0xda, 0x3c, + 0x14, 0x57, 0x48, 0x53, 0x92, 0x13, 0x52, 0xf8, 0xfc, 0x6d, 0x22, 0xda, 0xcd, 0x50, 0x34, 0x4d, + 0x5c, 0x6c, 0x6c, 0x43, 0x7b, 0x01, 0x36, 0x5a, 0x09, 0xa9, 0xa3, 0x55, 0x41, 0xbb, 0xf7, 0xc0, + 0x44, 0x96, 0x4c, 0xcc, 0x6c, 0x67, 0xa2, 0x8f, 0xb8, 0x67, 0xd9, 0x4b, 0xec, 0xd8, 0x49, 0x0a, + 0x54, 0xec, 0x2e, 0xe7, 0x77, 0x8e, 0x73, 0x7e, 0x7f, 0x6c, 0xe8, 0xf3, 0xc2, 0x30, 0x55, 0x50, + 0xf1, 0xa1, 0xf9, 0x18, 0xed, 0x94, 0x34, 0x92, 0x84, 0xbc, 0xd8, 0x88, 0x72, 0xff, 0x53, 0x64, + 0x7f, 0x3c, 0x08, 0xee, 0x25, 0xb6, 0x49, 0x07, 0x2e, 0xe6, 0x74, 0xcb, 0x52, 0x6f, 0xd0, 0x1a, + 0x46, 0xb6, 0x5a, 0xd2, 0x5c, 0xa7, 0xad, 0xa7, 0x8a, 0x63, 0xcf, 0xc7, 0xca, 0x27, 0x31, 0xf8, + 0x73, 0x2e, 0xd2, 0x0b, 0x2c, 0x42, 0xf2, 0x0a, 0xfc, 0x49, 0xb9, 0x4f, 0x83, 0x81, 0x3f, 0x8c, + 0xc7, 0xc9, 0xa8, 0xf9, 0xf1, 0x08, 0x41, 0x42, 0x00, 0x26, 0x79, 0xae, 0x58, 0x4e, 0x0d, 0x5b, + 0xa7, 0x97, 0x03, 0x6f, 0x98, 0x58, 0xec, 0x46, 0x48, 0x6a, 0xbe, 0x53, 0x51, 0xb2, 0xb4, 0x8d, + 0x98, 0x47, 0x5e, 0x40, 0x67, 0x86, 0x04, 0x73, 0xa6, 0x2a, 0x34, 0x44, 0xd4, 0x27, 0xff, 0x43, + 0xbc, 0x30, 0x8a, 0x17, 0x79, 0x05, 0x46, 0x08, 0x46, 0x76, 0xf4, 0x8b, 0x94, 0x82, 0xd1, 0xa2, + 0x42, 0x01, 0xd1, 0x90, 0xbc, 0x85, 0x60, 0x61, 0xa8, 0xd1, 0x69, 0x8c, 0x65, 0x3c, 0xee, 0x1f, + 0x68, 0xcc, 0x50, 0x37, 0x35, 0x52, 0xb9, 0x76, 0x26, 0x1c, 0x59, 0xd2, 0x83, 0x70, 0x4a, 0x0d, + 0x5d, 0x3e, 0xee, 0x2a, 0xb9, 0xc1, 0x33, 0x56, 0xad, 0xb3, 0xac, 0xfc, 0x73, 0xac, 0x2e, 0xce, + 0xb2, 0x0a, 0x2c, 0xab, 0xec, 0x77, 0x0b, 0xba, 0xcd, 0xfe, 0xbb, 0x9d, 0xe1, 0xb2, 0xd0, 0xd6, + 0xc9, 0xeb, 0xfd, 0x4e, 0xe1, 0x5a, 0x7b, 0x2e, 0xae, 0xcc, 0x6b, 0xa1, 0x79, 0x11, 0x8a, 0x68, + 0x2f, 0x64, 0xa9, 0x56, 0x4c, 0xe3, 0x2a, 0xeb, 0xe6, 0xcb, 0x83, 0x8c, 0x6f, 0x8c, 0xea, 0x52, + 0xb1, 0x2d, 0xc3, 0xa0, 0xde, 0x40, 0x68, 0x79, 0xa9, 0x5f, 0x54, 0xb8, 0xf5, 0xf1, 0x98, 0x1c, + 0xe9, 0xad, 0x3b, 0x56, 0xd1, 0x14, 0x23, 0x2b, 0xb4, 0x5d, 0xeb, 0xe2, 0x71, 0x31, 0xde, 0x70, + 0x21, 0x5c, 0x12, 0x01, 0xf9, 0x0f, 0x22, 0x5b, 0x1d, 0x07, 0x81, 0xd0, 0x57, 0x59, 0xac, 0xb9, + 0xe5, 0xea, 0x52, 0x88, 0x2c, 0x84, 0xde, 0x29, 0xe3, 0xf2, 0x8f, 0x9c, 0x05, 0x5d, 0x68, 0x5f, + 0x17, 0x6b, 0x07, 0x80, 0x03, 0x70, 0x66, 0xa2, 0x57, 0x0c, 0x0f, 0x16, 0xb9, 0x8b, 0x20, 0x24, + 0x09, 0x04, 0xb7, 0x7c, 0xcb, 0x4d, 0xda, 0x71, 0x13, 0x57, 0x70, 0x79, 0xb7, 0xd9, 0x68, 0x66, + 0xd2, 0xa4, 0xa9, 0x17, 0x55, 0xff, 0xaa, 0xf9, 0xe5, 0xa2, 0x1e, 0xe8, 0x36, 0x03, 0x53, 0xb6, + 0x2e, 0x31, 0xa0, 0x9e, 0xf3, 0xf2, 0x33, 0x74, 0x8e, 0x3c, 0xd0, 0x68, 0x42, 0x80, 0xd6, 0x6e, + 0x35, 0x1a, 0xf9, 0x6f, 0xab, 0xb2, 0x1c, 0xe2, 0x63, 0xe7, 0xea, 0xdc, 0x7f, 0x50, 0xcd, 0xea, + 0x00, 0xfa, 0xd0, 0x7d, 0x60, 0x06, 0x7b, 0x28, 0xf8, 0x5e, 0x0a, 0xbe, 0x7a, 0x74, 0xe1, 0x47, + 0x4f, 0xaf, 0xc1, 0x77, 0x15, 0xaa, 0x79, 0xc0, 0x8b, 0xb0, 0xaf, 0xe3, 0xc6, 0xff, 0xcc, 0xf4, + 0x92, 0xaa, 0x1c, 0xe9, 0x56, 0x51, 0xbf, 0x3b, 0x64, 0xe2, 0xb6, 0x94, 0x18, 0xba, 0xf5, 0xd0, + 0x7b, 0xa6, 0xde, 0xfe, 0xdc, 0xcf, 0x3e, 0x42, 0x72, 0x72, 0x2f, 0x9d, 0x7c, 0xa6, 0x38, 0xd3, + 0xf3, 0xc3, 0x09, 0xf7, 0x2a, 0xe7, 0xf5, 0x89, 0x4f, 0xe8, 0x97, 0x1b, 0x38, 0x7a, 0xa6, 0xde, + 0xc9, 0x33, 0xf5, 0x86, 0x9d, 0xe6, 0x3a, 0xd9, 0xdb, 0x93, 0x64, 0xef, 0x01, 0xaa, 0x23, 0xb7, + 0x5c, 0x1b, 0xf2, 0xfa, 0xd4, 0xaf, 0xde, 0xc1, 0xaf, 0x6a, 0xe8, 0x6f, 0x00, 0x00, 0x00, 0xff, + 0xff, 0xe3, 0x58, 0x08, 0xa6, 0x2c, 0x04, 0x00, 0x00, +} diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index f9e6895645..95db26b8de 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -88,8 +88,8 @@ func (itr *bufFloatIterator) peekTime() (int64, error) { // Next returns the current buffer, if exists, or calls the underlying iterator. func (itr *bufFloatIterator) Next() (*FloatPoint, error) { - if itr.buf != nil { - buf := itr.buf + buf := itr.buf + if buf != nil { itr.buf = nil return buf, nil } @@ -102,7 +102,7 @@ func (itr *bufFloatIterator) NextInWindow(startTime, endTime int64) (*FloatPoint v, err := itr.Next() if v == nil || err != nil { return nil, err - } else if v.Time < startTime || v.Time >= endTime { + } else if t := v.Time; t >= endTime || t < startTime { itr.unread(v) return nil, nil } @@ -221,13 +221,13 @@ func (itr *floatMergeIterator) Next() (*FloatPoint, error) { // Check if the point is inside of our current window. inWindow := true - if itr.window.name != p.Name { + if window := itr.window; window.name != p.Name { inWindow = false - } else if itr.window.tags != p.Tags.ID() { + } else if window.tags != p.Tags.ID() { inWindow = false - } else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime { + } else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime { inWindow = false - } else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime { + } else if !opt.Ascending && p.Time < window.startTime { inWindow = false } @@ -669,11 +669,11 @@ func (itr *floatInterruptIterator) Stats() IteratorStats { return itr.input.Stat func (itr *floatInterruptIterator) Close() error { return itr.input.Close() } func (itr *floatInterruptIterator) Next() (*FloatPoint, error) { - // Only check if the channel is closed every 256 points. This - // intentionally checks on both 0 and 256 so that if the iterator + // Only check if the channel is closed every N points. This + // intentionally checks on both 0 and N so that if the iterator // has been interrupted before the first point is emitted it will // not emit any points. - if itr.count&0x100 == 0 { + if itr.count&0xFF == 0xFF { select { case <-itr.closing: return nil, nil @@ -2034,8 +2034,8 @@ func (itr *bufIntegerIterator) peekTime() (int64, error) { // Next returns the current buffer, if exists, or calls the underlying iterator. func (itr *bufIntegerIterator) Next() (*IntegerPoint, error) { - if itr.buf != nil { - buf := itr.buf + buf := itr.buf + if buf != nil { itr.buf = nil return buf, nil } @@ -2048,7 +2048,7 @@ func (itr *bufIntegerIterator) NextInWindow(startTime, endTime int64) (*IntegerP v, err := itr.Next() if v == nil || err != nil { return nil, err - } else if v.Time < startTime || v.Time >= endTime { + } else if t := v.Time; t >= endTime || t < startTime { itr.unread(v) return nil, nil } @@ -2167,13 +2167,13 @@ func (itr *integerMergeIterator) Next() (*IntegerPoint, error) { // Check if the point is inside of our current window. inWindow := true - if itr.window.name != p.Name { + if window := itr.window; window.name != p.Name { inWindow = false - } else if itr.window.tags != p.Tags.ID() { + } else if window.tags != p.Tags.ID() { inWindow = false - } else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime { + } else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime { inWindow = false - } else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime { + } else if !opt.Ascending && p.Time < window.startTime { inWindow = false } @@ -2615,11 +2615,11 @@ func (itr *integerInterruptIterator) Stats() IteratorStats { return itr.input.St func (itr *integerInterruptIterator) Close() error { return itr.input.Close() } func (itr *integerInterruptIterator) Next() (*IntegerPoint, error) { - // Only check if the channel is closed every 256 points. This - // intentionally checks on both 0 and 256 so that if the iterator + // Only check if the channel is closed every N points. This + // intentionally checks on both 0 and N so that if the iterator // has been interrupted before the first point is emitted it will // not emit any points. - if itr.count&0x100 == 0 { + if itr.count&0xFF == 0xFF { select { case <-itr.closing: return nil, nil @@ -3977,8 +3977,8 @@ func (itr *bufStringIterator) peekTime() (int64, error) { // Next returns the current buffer, if exists, or calls the underlying iterator. func (itr *bufStringIterator) Next() (*StringPoint, error) { - if itr.buf != nil { - buf := itr.buf + buf := itr.buf + if buf != nil { itr.buf = nil return buf, nil } @@ -3991,7 +3991,7 @@ func (itr *bufStringIterator) NextInWindow(startTime, endTime int64) (*StringPoi v, err := itr.Next() if v == nil || err != nil { return nil, err - } else if v.Time < startTime || v.Time >= endTime { + } else if t := v.Time; t >= endTime || t < startTime { itr.unread(v) return nil, nil } @@ -4110,13 +4110,13 @@ func (itr *stringMergeIterator) Next() (*StringPoint, error) { // Check if the point is inside of our current window. inWindow := true - if itr.window.name != p.Name { + if window := itr.window; window.name != p.Name { inWindow = false - } else if itr.window.tags != p.Tags.ID() { + } else if window.tags != p.Tags.ID() { inWindow = false - } else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime { + } else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime { inWindow = false - } else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime { + } else if !opt.Ascending && p.Time < window.startTime { inWindow = false } @@ -4558,11 +4558,11 @@ func (itr *stringInterruptIterator) Stats() IteratorStats { return itr.input.Sta func (itr *stringInterruptIterator) Close() error { return itr.input.Close() } func (itr *stringInterruptIterator) Next() (*StringPoint, error) { - // Only check if the channel is closed every 256 points. This - // intentionally checks on both 0 and 256 so that if the iterator + // Only check if the channel is closed every N points. This + // intentionally checks on both 0 and N so that if the iterator // has been interrupted before the first point is emitted it will // not emit any points. - if itr.count&0x100 == 0 { + if itr.count&0xFF == 0xFF { select { case <-itr.closing: return nil, nil @@ -5920,8 +5920,8 @@ func (itr *bufBooleanIterator) peekTime() (int64, error) { // Next returns the current buffer, if exists, or calls the underlying iterator. func (itr *bufBooleanIterator) Next() (*BooleanPoint, error) { - if itr.buf != nil { - buf := itr.buf + buf := itr.buf + if buf != nil { itr.buf = nil return buf, nil } @@ -5934,7 +5934,7 @@ func (itr *bufBooleanIterator) NextInWindow(startTime, endTime int64) (*BooleanP v, err := itr.Next() if v == nil || err != nil { return nil, err - } else if v.Time < startTime || v.Time >= endTime { + } else if t := v.Time; t >= endTime || t < startTime { itr.unread(v) return nil, nil } @@ -6053,13 +6053,13 @@ func (itr *booleanMergeIterator) Next() (*BooleanPoint, error) { // Check if the point is inside of our current window. inWindow := true - if itr.window.name != p.Name { + if window := itr.window; window.name != p.Name { inWindow = false - } else if itr.window.tags != p.Tags.ID() { + } else if window.tags != p.Tags.ID() { inWindow = false - } else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime { + } else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime { inWindow = false - } else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime { + } else if !opt.Ascending && p.Time < window.startTime { inWindow = false } @@ -6501,11 +6501,11 @@ func (itr *booleanInterruptIterator) Stats() IteratorStats { return itr.input.St func (itr *booleanInterruptIterator) Close() error { return itr.input.Close() } func (itr *booleanInterruptIterator) Next() (*BooleanPoint, error) { - // Only check if the channel is closed every 256 points. This - // intentionally checks on both 0 and 256 so that if the iterator + // Only check if the channel is closed every N points. This + // intentionally checks on both 0 and N so that if the iterator // has been interrupted before the first point is emitted it will // not emit any points. - if itr.count&0x100 == 0 { + if itr.count&0xFF == 0xFF { select { case <-itr.closing: return nil, nil diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index eb9001db7c..453c18c09b 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -85,8 +85,8 @@ func (itr *buf{{$k.Name}}Iterator) peekTime() (int64, error) { // Next returns the current buffer, if exists, or calls the underlying iterator. func (itr *buf{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) { - if itr.buf != nil { - buf := itr.buf + buf := itr.buf + if buf != nil { itr.buf = nil return buf, nil } @@ -99,7 +99,7 @@ func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) (*{{$k v, err := itr.Next() if v == nil || err != nil { return nil, err - } else if v.Time < startTime || v.Time >= endTime { + } else if t := v.Time; t >= endTime || t < startTime { itr.unread(v) return nil, nil } @@ -218,13 +218,13 @@ func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) { // Check if the point is inside of our current window. inWindow := true - if itr.window.name != p.Name { + if window := itr.window; window.name != p.Name { inWindow = false - } else if itr.window.tags != p.Tags.ID() { + } else if window.tags != p.Tags.ID() { inWindow = false - } else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime { + } else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime { inWindow = false - } else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime { + } else if !opt.Ascending && p.Time < window.startTime { inWindow = false } @@ -668,11 +668,11 @@ func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.inpu func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) { - // Only check if the channel is closed every 256 points. This - // intentionally checks on both 0 and 256 so that if the iterator + // Only check if the channel is closed every N points. This + // intentionally checks on both 0 and N so that if the iterator // has been interrupted before the first point is emitted it will // not emit any points. - if itr.count&0x100 == 0 { + if itr.count & 0xFF == 0xFF { select { case <-itr.closing: return nil, nil