From 38dc837910b08f5863c1c467e031834c627dee12 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Thu, 28 Sep 2017 14:21:58 -0600 Subject: [PATCH 1/3] Fix a minor memory leak when batching points for some services. fixes #8895 --- CHANGELOG.md | 1 + tsdb/batcher.go | 22 ++++++++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b73b52edb..ca6dd1a416 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ - [#8678](https://github.com/influxdata/influxdb/issues/8678): Ensure time and tag-based condition can be used with tsi1 index when deleting. - [#8848](https://github.com/influxdata/influxdb/issues/8848): Prevent deadlock when doing math on the result of a subquery. - [#8813](https://github.com/influxdata/influxdb/issues/8813): Fix retention policy duration format +- [#8895](https://github.com/influxdata/influxdb/issues/8895): Fix a minor memory leak in batching points in tsdb. ## v1.3.4 [unreleased] diff --git a/tsdb/batcher.go b/tsdb/batcher.go index 05f7e270b2..c45e45d000 100644 --- a/tsdb/batcher.go +++ b/tsdb/batcher.go @@ -55,14 +55,23 @@ func (b *PointBatcher) Start() { return } - var timer *time.Timer + // initialize the timer variable + timer := &time.NewTimer(b.duration) + timer.Stop() var batch []models.Point - var timerCh <-chan time.Time emit := func() { + + timer.Stop() + + // Nothing batched? + if len(batch) == 0 { + return + } b.out <- batch atomic.AddUint64(&b.stats.BatchTotal, 1) batch = nil + } b.wg = &sync.WaitGroup{} @@ -75,16 +84,15 @@ func (b *PointBatcher) Start() { case <-b.stop: if len(batch) > 0 { emit() - timerCh = nil } return case p := <-b.in: atomic.AddUint64(&b.stats.PointTotal, 1) if batch == nil { batch = make([]models.Point, 0, b.size) + if b.duration > 0 { - timer = time.NewTimer(b.duration) - timerCh = timer.C + timer.Reset(b.duration) } } @@ -92,16 +100,14 @@ func (b *PointBatcher) Start() { if len(batch) >= b.size { // 0 means send immediately. atomic.AddUint64(&b.stats.SizeTotal, 1) emit() - timerCh = nil } case <-b.flush: if len(batch) > 0 { emit() - timerCh = nil } - case <-timerCh: + case <-timer.C: atomic.AddUint64(&b.stats.TimeoutTotal, 1) emit() } From a6cbce0d3ee6a90d42817c42ec3ce90be43f2d43 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Mon, 2 Oct 2017 11:41:03 -0600 Subject: [PATCH 2/3] fix issues brought up by joe --- tsdb/batcher.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tsdb/batcher.go b/tsdb/batcher.go index c45e45d000..45e6fd03ea 100644 --- a/tsdb/batcher.go +++ b/tsdb/batcher.go @@ -56,13 +56,17 @@ func (b *PointBatcher) Start() { } // initialize the timer variable - timer := &time.NewTimer(b.duration) + timer := time.NewTimer(time.Hour) timer.Stop() var batch []models.Point emit := func() { timer.Stop() + select { + case <-timer.C: + default: + } // Nothing batched? if len(batch) == 0 { From 7e5fd14e8ac537953f67810c167271245c99f08e Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Mon, 2 Oct 2017 12:02:38 -0600 Subject: [PATCH 3/3] add in some optimization --- tsdb/batcher.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tsdb/batcher.go b/tsdb/batcher.go index 45e6fd03ea..0ae3b407df 100644 --- a/tsdb/batcher.go +++ b/tsdb/batcher.go @@ -86,14 +86,14 @@ func (b *PointBatcher) Start() { for { select { case <-b.stop: - if len(batch) > 0 { - emit() - } + emit() return case p := <-b.in: atomic.AddUint64(&b.stats.PointTotal, 1) if batch == nil { - batch = make([]models.Point, 0, b.size) + if b.size > 0 { + batch = make([]models.Point, 0, b.size) + } if b.duration > 0 { timer.Reset(b.duration) @@ -107,9 +107,7 @@ func (b *PointBatcher) Start() { } case <-b.flush: - if len(batch) > 0 { - emit() - } + emit() case <-timer.C: atomic.AddUint64(&b.stats.TimeoutTotal, 1)