From 5911b38e94fa2911c97e99dba1621cd9c5cde54d Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 11 Feb 2015 15:00:39 -0700 Subject: [PATCH 1/3] emit in batches of 1000 --- influxql/engine.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/influxql/engine.go b/influxql/engine.go index b56cb33fdc..8fd300479d 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -752,6 +752,11 @@ func MapStddev(itr Iterator, e *Emitter, tmax int64) { for k, v := itr.Next(); k != 0; k, v = itr.Next() { values = append(values, v.(float64)) + // Emit in batches of 1000 + if len(values) == 1000 { + e.Emit(Key{tmax, itr.Tags()}, values) + values = []float64{} + } } if len(values) > 0 { e.Emit(Key{tmax, itr.Tags()}, values) From 4b18ddedec2c9ae97ff9c403f62133e100100c7d Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 11 Feb 2015 15:11:45 -0700 Subject: [PATCH 2/3] magic numbers are evil --- influxql/engine.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/influxql/engine.go b/influxql/engine.go index 8fd300479d..61b715527c 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -10,6 +10,9 @@ import ( "time" ) +// how many values we will map before emitting +const emitBatchSize = 1000 + // DB represents an interface for creating transactions. type DB interface { Begin() (Tx, error) @@ -752,8 +755,8 @@ func MapStddev(itr Iterator, e *Emitter, tmax int64) { for k, v := itr.Next(); k != 0; k, v = itr.Next() { values = append(values, v.(float64)) - // Emit in batches of 1000 - if len(values) == 1000 { + // Emit in batches + if len(values) == emitBatchSize { e.Emit(Key{tmax, itr.Tags()}, values) values = []float64{} } From 79fe16fb6c1a1c1ac99add6b7c115269f9cc1be7 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 11 Feb 2015 16:03:01 -0700 Subject: [PATCH 3/3] add comment about why we batch --- influxql/engine.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/influxql/engine.go b/influxql/engine.go index 61b715527c..149f1c47ed 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -755,7 +755,9 @@ func MapStddev(itr Iterator, e *Emitter, tmax int64) { for k, v := itr.Next(); k != 0; k, v = itr.Next() { values = append(values, v.(float64)) - // Emit in batches + // Emit in batches. + // unbounded emission of data can lead to excessive memory use + // or other potential performance problems. if len(values) == emitBatchSize { e.Emit(Key{tmax, itr.Tags()}, values) values = []float64{}