Merge pull request #1773 from influxdb/race-condition-when-merging-multiple-series

Fix problem with merging series that have unequal number of points in gr...
pull/1771/head
Paul Dix 2015-02-27 01:07:16 -05:00
commit 48294ceb96
4 changed files with 43 additions and 4 deletions

View File

@ -5,6 +5,7 @@
- [#1752](https://github.com/influxdb/influxdb/pull/1752): remove debug log output from collectd.
- [#1720](https://github.com/influxdb/influxdb/pull/1720): Parse Series IDs as unsigned 32-bits.
- [#1767](https://github.com/influxdb/influxdb/pull/1767): Drop Series was failing across shards. Issue #1761.
- [#1773](https://github.com/influxdb/influxdb/pull/1773): Fix bug when merging series together that have unequal number of points in a group by interval
### Features

View File

@ -69,7 +69,7 @@ func NewPlanner(db DB) *Planner {
// Plan creates an execution plan for the given SelectStatement and returns an Executor.
func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
now := p.Now()
now := p.Now().UTC()
// Clone the statement to be planned.
// Replace instances of "now()" with the current time.
@ -698,7 +698,9 @@ func MapMean(itr Iterator, e *Emitter, tmin int64) {
out.Count++
out.Sum += v.(float64)
}
e.Emit(Key{tmin, itr.Tags()}, out)
if out.Count > 0 {
e.Emit(Key{tmin, itr.Tags()}, out)
}
}
type meanMapOutput struct {
@ -714,7 +716,9 @@ func ReduceMean(key Key, values []interface{}, e *Emitter) {
out.Count += val.Count
out.Sum += val.Sum
}
e.Emit(key, out.Sum/float64(out.Count))
if out.Count > 0 {
e.Emit(key, out.Sum/float64(out.Count))
}
}
// MapMin collects the values to pass to the reducer

View File

@ -1138,6 +1138,37 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) {
}
}
// Ensure that when merging many series together and some of them have a different number of points than others
// in a group by interval the results are correct
func TestServer_MergeManySeries(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")
for i := 1; i < 11; i++ {
for j := 1; j < 5+i%3; j++ {
tags := map[string]string{"host": fmt.Sprintf("server_%d", i)}
if index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: time.Unix(int64(j), int64(0)), Fields: map[string]interface{}{"value": float64(22)}}}); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
}
}
results := s.ExecuteQuery(MustParseQuery(`select count(value) from cpu group by time(1s)`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:01Z",10],["1970-01-01T00:00:02Z",10],["1970-01-01T00:00:03Z",10],["1970-01-01T00:00:04Z",10],["1970-01-01T00:00:05Z",7],["1970-01-01T00:00:06Z",3]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}
// Ensure Drop Series can:
// write to measurement cpu with tags region=uswest host=serverA
// write to measurement cpu with tags region=uswest host=serverB

5
tx.go
View File

@ -2,6 +2,7 @@ package influxdb
import (
"fmt"
"math"
"sort"
"sync"
"time"
@ -309,10 +310,12 @@ func (i *shardIterator) Tags() string { return i.tags }
func (i *shardIterator) Next() (key int64, data []byte, value interface{}) {
min := -1
minKey := int64(math.MaxInt64)
for ind, kv := range i.keyValues {
if kv.key != 0 && kv.key < i.tmax {
if kv.key != 0 && kv.key < i.tmax && kv.key < minKey {
min = ind
minKey = kv.key
}
}