Merge pull request #8896 from influxdata/lh-batcher-leak

Fix a minor memory leak when batching points for some services.
pull/8901/head
Lyon Hill 2017-10-02 12:26:58 -06:00 committed by GitHub
commit 9cf517bcdf
2 changed files with 24 additions and 15 deletions

View File

@ -68,6 +68,7 @@
- [#8678](https://github.com/influxdata/influxdb/issues/8678): Ensure time and tag-based condition can be used with tsi1 index when deleting. - [#8678](https://github.com/influxdata/influxdb/issues/8678): Ensure time and tag-based condition can be used with tsi1 index when deleting.
- [#8848](https://github.com/influxdata/influxdb/issues/8848): Prevent deadlock when doing math on the result of a subquery. - [#8848](https://github.com/influxdata/influxdb/issues/8848): Prevent deadlock when doing math on the result of a subquery.
- [#8813](https://github.com/influxdata/influxdb/issues/8813): Fix retention policy duration format - [#8813](https://github.com/influxdata/influxdb/issues/8813): Fix retention policy duration format
- [#8895](https://github.com/influxdata/influxdb/issues/8895): Fix a minor memory leak in batching points in tsdb.
## v1.3.4 [unreleased] ## v1.3.4 [unreleased]

View File

@ -55,14 +55,27 @@ func (b *PointBatcher) Start() {
return return
} }
var timer *time.Timer // initialize the timer variable
timer := time.NewTimer(time.Hour)
timer.Stop()
var batch []models.Point var batch []models.Point
var timerCh <-chan time.Time
emit := func() { emit := func() {
timer.Stop()
select {
case <-timer.C:
default:
}
// Nothing batched?
if len(batch) == 0 {
return
}
b.out <- batch b.out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1) atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil batch = nil
} }
b.wg = &sync.WaitGroup{} b.wg = &sync.WaitGroup{}
@ -73,18 +86,17 @@ func (b *PointBatcher) Start() {
for { for {
select { select {
case <-b.stop: case <-b.stop:
if len(batch) > 0 { emit()
emit()
timerCh = nil
}
return return
case p := <-b.in: case p := <-b.in:
atomic.AddUint64(&b.stats.PointTotal, 1) atomic.AddUint64(&b.stats.PointTotal, 1)
if batch == nil { if batch == nil {
batch = make([]models.Point, 0, b.size) if b.size > 0 {
batch = make([]models.Point, 0, b.size)
}
if b.duration > 0 { if b.duration > 0 {
timer = time.NewTimer(b.duration) timer.Reset(b.duration)
timerCh = timer.C
} }
} }
@ -92,16 +104,12 @@ func (b *PointBatcher) Start() {
if len(batch) >= b.size { // 0 means send immediately. if len(batch) >= b.size { // 0 means send immediately.
atomic.AddUint64(&b.stats.SizeTotal, 1) atomic.AddUint64(&b.stats.SizeTotal, 1)
emit() emit()
timerCh = nil
} }
case <-b.flush: case <-b.flush:
if len(batch) > 0 { emit()
emit()
timerCh = nil
}
case <-timerCh: case <-timer.C:
atomic.AddUint64(&b.stats.TimeoutTotal, 1) atomic.AddUint64(&b.stats.TimeoutTotal, 1)
emit() emit()
} }