diff --git a/CHANGELOG.md b/CHANGELOG.md index 19a66dc3cb..1ee7612bc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [#1752](https://github.com/influxdb/influxdb/pull/1752): remove debug log output from collectd. - [#1720](https://github.com/influxdb/influxdb/pull/1720): Parse Series IDs as unsigned 32-bits. - [#1767](https://github.com/influxdb/influxdb/pull/1767): Drop Series was failing across shards. Issue #1761. +- [#1773](https://github.com/influxdb/influxdb/pull/1773): Fix bug when merging series together that have unequal number of points in a group by interval ### Features diff --git a/influxql/engine.go b/influxql/engine.go index 5092fec78a..216ffe9b69 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -69,7 +69,7 @@ func NewPlanner(db DB) *Planner { // Plan creates an execution plan for the given SelectStatement and returns an Executor. func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) { - now := p.Now() + now := p.Now().UTC() // Clone the statement to be planned. // Replace instances of "now()" with the current time. @@ -698,7 +698,9 @@ func MapMean(itr Iterator, e *Emitter, tmin int64) { out.Count++ out.Sum += v.(float64) } - e.Emit(Key{tmin, itr.Tags()}, out) + if out.Count > 0 { + e.Emit(Key{tmin, itr.Tags()}, out) + } } type meanMapOutput struct { @@ -714,7 +716,9 @@ func ReduceMean(key Key, values []interface{}, e *Emitter) { out.Count += val.Count out.Sum += val.Sum } - e.Emit(key, out.Sum/float64(out.Count)) + if out.Count > 0 { + e.Emit(key, out.Sum/float64(out.Count)) + } } // MapMin collects the values to pass to the reducer diff --git a/server_test.go b/server_test.go index d4ab76dbf2..421619a16a 100644 --- a/server_test.go +++ b/server_test.go @@ -1138,6 +1138,37 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) { } } +// Ensure that when merging many series together and some of them have a different number of points than others +// in a group by interval the results are correct +func TestServer_MergeManySeries(t *testing.T) { + c := NewMessagingClient() + s := OpenServer(c) + defer s.Close() + s.CreateDatabase("foo") + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour}) + s.SetDefaultRetentionPolicy("foo", "raw") + + for i := 1; i < 11; i++ { + for j := 1; j < 5+i%3; j++ { + tags := map[string]string{"host": fmt.Sprintf("server_%d", i)} + if index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: time.Unix(int64(j), int64(0)), Fields: map[string]interface{}{"value": float64(22)}}}); err != nil { + t.Fatalf("unexpected error: %s", err.Error()) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) + } + } + } + + results := s.ExecuteQuery(MustParseQuery(`select count(value) from cpu group by time(1s)`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Series) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Series)) + } else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:01Z",10],["1970-01-01T00:00:02Z",10],["1970-01-01T00:00:03Z",10],["1970-01-01T00:00:04Z",10],["1970-01-01T00:00:05Z",7],["1970-01-01T00:00:06Z",3]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } +} + // Ensure Drop Series can: // write to measurement cpu with tags region=uswest host=serverA // write to measurement cpu with tags region=uswest host=serverB diff --git a/tx.go b/tx.go index 6e168b2159..d2ec729e65 100644 --- a/tx.go +++ b/tx.go @@ -2,6 +2,7 @@ package influxdb import ( "fmt" + "math" "sort" "sync" "time" @@ -309,10 +310,12 @@ func (i *shardIterator) Tags() string { return i.tags } func (i *shardIterator) Next() (key int64, data []byte, value interface{}) { min := -1 + minKey := int64(math.MaxInt64) for ind, kv := range i.keyValues { - if kv.key != 0 && kv.key < i.tmax { + if kv.key != 0 && kv.key < i.tmax && kv.key < minKey { min = ind + minKey = kv.key } }