Merge pull request #6463 from benbjohnson/optimize

Reduce interrupt iterator checks & field access
pull/5720/head
Ben Johnson 2016-04-26 13:16:28 -06:00
commit fd840f242c
3 changed files with 130 additions and 77 deletions

View File

@ -30,6 +30,10 @@ var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
const _ = proto.GoGoProtoPackageIsVersion1
type Point struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"`
@ -48,6 +52,7 @@ type Point struct {
func (m *Point) Reset() { *m = Point{} }
func (m *Point) String() string { return proto.CompactTextString(m) }
func (*Point) ProtoMessage() {}
func (*Point) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{0} }
func (m *Point) GetName() string {
if m != nil && m.Name != nil {
@ -138,6 +143,7 @@ type Aux struct {
func (m *Aux) Reset() { *m = Aux{} }
func (m *Aux) String() string { return proto.CompactTextString(m) }
func (*Aux) ProtoMessage() {}
func (*Aux) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{1} }
func (m *Aux) GetDataType() int32 {
if m != nil && m.DataType != nil {
@ -197,6 +203,7 @@ type IteratorOptions struct {
func (m *IteratorOptions) Reset() { *m = IteratorOptions{} }
func (m *IteratorOptions) String() string { return proto.CompactTextString(m) }
func (*IteratorOptions) ProtoMessage() {}
func (*IteratorOptions) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{2} }
func (m *IteratorOptions) GetExpr() string {
if m != nil && m.Expr != nil {
@ -318,6 +325,7 @@ type Measurements struct {
func (m *Measurements) Reset() { *m = Measurements{} }
func (m *Measurements) String() string { return proto.CompactTextString(m) }
func (*Measurements) ProtoMessage() {}
func (*Measurements) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{3} }
func (m *Measurements) GetItems() []*Measurement {
if m != nil {
@ -338,6 +346,7 @@ type Measurement struct {
func (m *Measurement) Reset() { *m = Measurement{} }
func (m *Measurement) String() string { return proto.CompactTextString(m) }
func (*Measurement) ProtoMessage() {}
func (*Measurement) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{4} }
func (m *Measurement) GetDatabase() string {
if m != nil && m.Database != nil {
@ -383,6 +392,7 @@ type Interval struct {
func (m *Interval) Reset() { *m = Interval{} }
func (m *Interval) String() string { return proto.CompactTextString(m) }
func (*Interval) ProtoMessage() {}
func (*Interval) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{5} }
func (m *Interval) GetDuration() int64 {
if m != nil && m.Duration != nil {
@ -407,6 +417,7 @@ type IteratorStats struct {
func (m *IteratorStats) Reset() { *m = IteratorStats{} }
func (m *IteratorStats) String() string { return proto.CompactTextString(m) }
func (*IteratorStats) ProtoMessage() {}
func (*IteratorStats) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{6} }
func (m *IteratorStats) GetSeriesN() int64 {
if m != nil && m.SeriesN != nil {
@ -432,6 +443,7 @@ type Series struct {
func (m *Series) Reset() { *m = Series{} }
func (m *Series) String() string { return proto.CompactTextString(m) }
func (*Series) ProtoMessage() {}
func (*Series) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{7} }
func (m *Series) GetName() string {
if m != nil && m.Name != nil {
@ -462,6 +474,7 @@ type SeriesList struct {
func (m *SeriesList) Reset() { *m = SeriesList{} }
func (m *SeriesList) String() string { return proto.CompactTextString(m) }
func (*SeriesList) ProtoMessage() {}
func (*SeriesList) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{8} }
func (m *SeriesList) GetItems() []*Series {
if m != nil {
@ -481,3 +494,43 @@ func init() {
proto.RegisterType((*Series)(nil), "influxql.Series")
proto.RegisterType((*SeriesList)(nil), "influxql.SeriesList")
}
var fileDescriptorInternal = []byte{
// 569 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x53, 0xdf, 0x6e, 0xda, 0x3c,
0x14, 0x57, 0x48, 0x53, 0x92, 0x13, 0x52, 0xf8, 0xfc, 0x6d, 0x22, 0xda, 0xcd, 0x50, 0x34, 0x4d,
0x5c, 0x6c, 0x6c, 0x43, 0x7b, 0x01, 0x36, 0x5a, 0x09, 0xa9, 0xa3, 0x55, 0x41, 0xbb, 0xf7, 0xc0,
0x44, 0x96, 0x4c, 0xcc, 0x6c, 0x67, 0xa2, 0x8f, 0xb8, 0x67, 0xd9, 0x4b, 0xec, 0xd8, 0x49, 0x0a,
0x54, 0xec, 0x2e, 0xe7, 0x77, 0x8e, 0x73, 0x7e, 0x7f, 0x6c, 0xe8, 0xf3, 0xc2, 0x30, 0x55, 0x50,
0xf1, 0xa1, 0xf9, 0x18, 0xed, 0x94, 0x34, 0x92, 0x84, 0xbc, 0xd8, 0x88, 0x72, 0xff, 0x53, 0x64,
0x7f, 0x3c, 0x08, 0xee, 0x25, 0xb6, 0x49, 0x07, 0x2e, 0xe6, 0x74, 0xcb, 0x52, 0x6f, 0xd0, 0x1a,
0x46, 0xb6, 0x5a, 0xd2, 0x5c, 0xa7, 0xad, 0xa7, 0x8a, 0x63, 0xcf, 0xc7, 0xca, 0x27, 0x31, 0xf8,
0x73, 0x2e, 0xd2, 0x0b, 0x2c, 0x42, 0xf2, 0x0a, 0xfc, 0x49, 0xb9, 0x4f, 0x83, 0x81, 0x3f, 0x8c,
0xc7, 0xc9, 0xa8, 0xf9, 0xf1, 0x08, 0x41, 0x42, 0x00, 0x26, 0x79, 0xae, 0x58, 0x4e, 0x0d, 0x5b,
0xa7, 0x97, 0x03, 0x6f, 0x98, 0x58, 0xec, 0x46, 0x48, 0x6a, 0xbe, 0x53, 0x51, 0xb2, 0xb4, 0x8d,
0x98, 0x47, 0x5e, 0x40, 0x67, 0x86, 0x04, 0x73, 0xa6, 0x2a, 0x34, 0x44, 0xd4, 0x27, 0xff, 0x43,
0xbc, 0x30, 0x8a, 0x17, 0x79, 0x05, 0x46, 0x08, 0x46, 0x76, 0xf4, 0x8b, 0x94, 0x82, 0xd1, 0xa2,
0x42, 0x01, 0xd1, 0x90, 0xbc, 0x85, 0x60, 0x61, 0xa8, 0xd1, 0x69, 0x8c, 0x65, 0x3c, 0xee, 0x1f,
0x68, 0xcc, 0x50, 0x37, 0x35, 0x52, 0xb9, 0x76, 0x26, 0x1c, 0x59, 0xd2, 0x83, 0x70, 0x4a, 0x0d,
0x5d, 0x3e, 0xee, 0x2a, 0xb9, 0xc1, 0x33, 0x56, 0xad, 0xb3, 0xac, 0xfc, 0x73, 0xac, 0x2e, 0xce,
0xb2, 0x0a, 0x2c, 0xab, 0xec, 0x77, 0x0b, 0xba, 0xcd, 0xfe, 0xbb, 0x9d, 0xe1, 0xb2, 0xd0, 0xd6,
0xc9, 0xeb, 0xfd, 0x4e, 0xe1, 0x5a, 0x7b, 0x2e, 0xae, 0xcc, 0x6b, 0xa1, 0x79, 0x11, 0x8a, 0x68,
0x2f, 0x64, 0xa9, 0x56, 0x4c, 0xe3, 0x2a, 0xeb, 0xe6, 0xcb, 0x83, 0x8c, 0x6f, 0x8c, 0xea, 0x52,
0xb1, 0x2d, 0xc3, 0xa0, 0xde, 0x40, 0x68, 0x79, 0xa9, 0x5f, 0x54, 0xb8, 0xf5, 0xf1, 0x98, 0x1c,
0xe9, 0xad, 0x3b, 0x56, 0xd1, 0x14, 0x23, 0x2b, 0xb4, 0x5d, 0xeb, 0xe2, 0x71, 0x31, 0xde, 0x70,
0x21, 0x5c, 0x12, 0x01, 0xf9, 0x0f, 0x22, 0x5b, 0x1d, 0x07, 0x81, 0xd0, 0x57, 0x59, 0xac, 0xb9,
0xe5, 0xea, 0x52, 0x88, 0x2c, 0x84, 0xde, 0x29, 0xe3, 0xf2, 0x8f, 0x9c, 0x05, 0x5d, 0x68, 0x5f,
0x17, 0x6b, 0x07, 0x80, 0x03, 0x70, 0x66, 0xa2, 0x57, 0x0c, 0x0f, 0x16, 0xb9, 0x8b, 0x20, 0x24,
0x09, 0x04, 0xb7, 0x7c, 0xcb, 0x4d, 0xda, 0x71, 0x13, 0x57, 0x70, 0x79, 0xb7, 0xd9, 0x68, 0x66,
0xd2, 0xa4, 0xa9, 0x17, 0x55, 0xff, 0xaa, 0xf9, 0xe5, 0xa2, 0x1e, 0xe8, 0x36, 0x03, 0x53, 0xb6,
0x2e, 0x31, 0xa0, 0x9e, 0xf3, 0xf2, 0x33, 0x74, 0x8e, 0x3c, 0xd0, 0x68, 0x42, 0x80, 0xd6, 0x6e,
0x35, 0x1a, 0xf9, 0x6f, 0xab, 0xb2, 0x1c, 0xe2, 0x63, 0xe7, 0xea, 0xdc, 0x7f, 0x50, 0xcd, 0xea,
0x00, 0xfa, 0xd0, 0x7d, 0x60, 0x06, 0x7b, 0x28, 0xf8, 0x5e, 0x0a, 0xbe, 0x7a, 0x74, 0xe1, 0x47,
0x4f, 0xaf, 0xc1, 0x77, 0x15, 0xaa, 0x79, 0xc0, 0x8b, 0xb0, 0xaf, 0xe3, 0xc6, 0xff, 0xcc, 0xf4,
0x92, 0xaa, 0x1c, 0xe9, 0x56, 0x51, 0xbf, 0x3b, 0x64, 0xe2, 0xb6, 0x94, 0x18, 0xba, 0xf5, 0xd0,
0x7b, 0xa6, 0xde, 0xfe, 0xdc, 0xcf, 0x3e, 0x42, 0x72, 0x72, 0x2f, 0x9d, 0x7c, 0xa6, 0x38, 0xd3,
0xf3, 0xc3, 0x09, 0xf7, 0x2a, 0xe7, 0xf5, 0x89, 0x4f, 0xe8, 0x97, 0x1b, 0x38, 0x7a, 0xa6, 0xde,
0xc9, 0x33, 0xf5, 0x86, 0x9d, 0xe6, 0x3a, 0xd9, 0xdb, 0x93, 0x64, 0xef, 0x01, 0xaa, 0x23, 0xb7,
0x5c, 0x1b, 0xf2, 0xfa, 0xd4, 0xaf, 0xde, 0xc1, 0xaf, 0x6a, 0xe8, 0x6f, 0x00, 0x00, 0x00, 0xff,
0xff, 0xe3, 0x58, 0x08, 0xa6, 0x2c, 0x04, 0x00, 0x00,
}

View File

@ -88,8 +88,8 @@ func (itr *bufFloatIterator) peekTime() (int64, error) {
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufFloatIterator) Next() (*FloatPoint, error) {
if itr.buf != nil {
buf := itr.buf
if buf != nil {
itr.buf = nil
return buf, nil
}
@ -102,7 +102,7 @@ func (itr *bufFloatIterator) NextInWindow(startTime, endTime int64) (*FloatPoint
v, err := itr.Next()
if v == nil || err != nil {
return nil, err
} else if v.Time < startTime || v.Time >= endTime {
} else if t := v.Time; t >= endTime || t < startTime {
itr.unread(v)
return nil, nil
}
@ -221,13 +221,13 @@ func (itr *floatMergeIterator) Next() (*FloatPoint, error) {
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
if window := itr.window; window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
} else if window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
} else if !opt.Ascending && p.Time < window.startTime {
inWindow = false
}
@ -669,11 +669,11 @@ func (itr *floatInterruptIterator) Stats() IteratorStats { return itr.input.Stat
func (itr *floatInterruptIterator) Close() error { return itr.input.Close() }
func (itr *floatInterruptIterator) Next() (*FloatPoint, error) {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// Only check if the channel is closed every N points. This
// intentionally checks on both 0 and N so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
if itr.count&0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil
@ -2034,8 +2034,8 @@ func (itr *bufIntegerIterator) peekTime() (int64, error) {
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufIntegerIterator) Next() (*IntegerPoint, error) {
if itr.buf != nil {
buf := itr.buf
if buf != nil {
itr.buf = nil
return buf, nil
}
@ -2048,7 +2048,7 @@ func (itr *bufIntegerIterator) NextInWindow(startTime, endTime int64) (*IntegerP
v, err := itr.Next()
if v == nil || err != nil {
return nil, err
} else if v.Time < startTime || v.Time >= endTime {
} else if t := v.Time; t >= endTime || t < startTime {
itr.unread(v)
return nil, nil
}
@ -2167,13 +2167,13 @@ func (itr *integerMergeIterator) Next() (*IntegerPoint, error) {
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
if window := itr.window; window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
} else if window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
} else if !opt.Ascending && p.Time < window.startTime {
inWindow = false
}
@ -2615,11 +2615,11 @@ func (itr *integerInterruptIterator) Stats() IteratorStats { return itr.input.St
func (itr *integerInterruptIterator) Close() error { return itr.input.Close() }
func (itr *integerInterruptIterator) Next() (*IntegerPoint, error) {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// Only check if the channel is closed every N points. This
// intentionally checks on both 0 and N so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
if itr.count&0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil
@ -3977,8 +3977,8 @@ func (itr *bufStringIterator) peekTime() (int64, error) {
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufStringIterator) Next() (*StringPoint, error) {
if itr.buf != nil {
buf := itr.buf
if buf != nil {
itr.buf = nil
return buf, nil
}
@ -3991,7 +3991,7 @@ func (itr *bufStringIterator) NextInWindow(startTime, endTime int64) (*StringPoi
v, err := itr.Next()
if v == nil || err != nil {
return nil, err
} else if v.Time < startTime || v.Time >= endTime {
} else if t := v.Time; t >= endTime || t < startTime {
itr.unread(v)
return nil, nil
}
@ -4110,13 +4110,13 @@ func (itr *stringMergeIterator) Next() (*StringPoint, error) {
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
if window := itr.window; window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
} else if window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
} else if !opt.Ascending && p.Time < window.startTime {
inWindow = false
}
@ -4558,11 +4558,11 @@ func (itr *stringInterruptIterator) Stats() IteratorStats { return itr.input.Sta
func (itr *stringInterruptIterator) Close() error { return itr.input.Close() }
func (itr *stringInterruptIterator) Next() (*StringPoint, error) {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// Only check if the channel is closed every N points. This
// intentionally checks on both 0 and N so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
if itr.count&0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil
@ -5920,8 +5920,8 @@ func (itr *bufBooleanIterator) peekTime() (int64, error) {
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufBooleanIterator) Next() (*BooleanPoint, error) {
if itr.buf != nil {
buf := itr.buf
if buf != nil {
itr.buf = nil
return buf, nil
}
@ -5934,7 +5934,7 @@ func (itr *bufBooleanIterator) NextInWindow(startTime, endTime int64) (*BooleanP
v, err := itr.Next()
if v == nil || err != nil {
return nil, err
} else if v.Time < startTime || v.Time >= endTime {
} else if t := v.Time; t >= endTime || t < startTime {
itr.unread(v)
return nil, nil
}
@ -6053,13 +6053,13 @@ func (itr *booleanMergeIterator) Next() (*BooleanPoint, error) {
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
if window := itr.window; window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
} else if window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
} else if !opt.Ascending && p.Time < window.startTime {
inWindow = false
}
@ -6501,11 +6501,11 @@ func (itr *booleanInterruptIterator) Stats() IteratorStats { return itr.input.St
func (itr *booleanInterruptIterator) Close() error { return itr.input.Close() }
func (itr *booleanInterruptIterator) Next() (*BooleanPoint, error) {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// Only check if the channel is closed every N points. This
// intentionally checks on both 0 and N so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
if itr.count&0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil

View File

@ -85,8 +85,8 @@ func (itr *buf{{$k.Name}}Iterator) peekTime() (int64, error) {
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *buf{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) {
if itr.buf != nil {
buf := itr.buf
if buf != nil {
itr.buf = nil
return buf, nil
}
@ -99,7 +99,7 @@ func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) (*{{$k
v, err := itr.Next()
if v == nil || err != nil {
return nil, err
} else if v.Time < startTime || v.Time >= endTime {
} else if t := v.Time; t >= endTime || t < startTime {
itr.unread(v)
return nil, nil
}
@ -218,13 +218,13 @@ func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) {
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
if window := itr.window; window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
} else if window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
} else if !opt.Ascending && p.Time < window.startTime {
inWindow = false
}
@ -668,11 +668,11 @@ func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.inpu
func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// Only check if the channel is closed every N points. This
// intentionally checks on both 0 and N so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
if itr.count & 0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil