diff --git a/tsdb/engine/pd1/encoding.go b/tsdb/engine/pd1/encoding.go index 5d1bee9a8f..bf06fb9880 100644 --- a/tsdb/engine/pd1/encoding.go +++ b/tsdb/engine/pd1/encoding.go @@ -1,6 +1,7 @@ package pd1 import ( + "sort" "time" "github.com/dgryski/go-tsz" @@ -75,6 +76,24 @@ func (v Values) DecodeSameTypeBlock(block []byte) Values { return nil } +// Deduplicate returns a new Values slice with any values +// that have the same timestamp removed. The Value that appears +// last in the slice is the one that is kept. The returned slice is in ascending order +func (v Values) Deduplicate() Values { + m := make(map[int64]Value) + for _, val := range v { + m[val.UnixNano()] = val + } + + a := make([]Value, 0, len(m)) + for _, val := range m { + a = append(a, val) + } + sort.Sort(Values(a)) + + return a +} + // Sort methods func (a Values) Len() int { return len(a) } func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/tsdb/engine/pd1/pd1.go b/tsdb/engine/pd1/pd1.go index 40aa7f8050..5ae35e4798 100644 --- a/tsdb/engine/pd1/pd1.go +++ b/tsdb/engine/pd1/pd1.go @@ -389,6 +389,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro if len(valuesByID) == 0 { return nil } + // we need the values in sorted order so that we can merge them into the // new file as we read the old file ids := make([]uint64, 0, len(valuesByID)) @@ -506,7 +507,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro // determine if there's a block after this with the same id and get its time hasFutureBlock := false nextTime := int64(0) - if fpos < oldDF.size { + if fpos < oldDF.indexPosition() { nextID := btou64(oldDF.mmap[fpos : fpos+8]) if nextID == id { hasFutureBlock = true @@ -530,7 +531,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro currentPosition += uint32(12 + len(newBlock)) - if fpos >= oldDF.size { + if fpos >= oldDF.indexPosition() { break } } @@ -877,17 +878,15 @@ func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime }) values = append(values, newValues[:pos]...) remainingValues = newValues[pos:] - sort.Sort(values) + values = values.Deduplicate() } else { - requireSort := values.MaxTime() > newValues.MinTime() + requireSort := values.MaxTime() >= newValues.MinTime() values = append(values, newValues...) if requireSort { - sort.Sort(values) + values = values.Deduplicate() } } - // TODO: deduplicate values - if len(values) > DefaultMaxPointsPerBlock { remainingValues = values[DefaultMaxPointsPerBlock:] values = values[:DefaultMaxPointsPerBlock] @@ -986,6 +985,10 @@ func (d *dataFile) IDToPosition() map[uint64]uint32 { return m } +func (d *dataFile) indexPosition() uint32 { + return d.size - uint32(d.SeriesCount()*12+20) +} + // StartingPositionForID returns the position in the file of the // first block for the given ID. If zero is returned the ID doesn't // have any data in this file. @@ -1123,7 +1126,7 @@ func (c *cursor) Next() (int64, interface{}) { // if we have a file set, see if the next block is for this ID if c.f != nil && c.pos < c.f.size { nextBlockID := btou64(c.f.mmap[c.pos : c.pos+8]) - if nextBlockID == c.id { + if nextBlockID == c.id && c.pos != c.f.indexPosition() { return c.decodeBlockAndGetValues(c.pos) } } diff --git a/tsdb/engine/pd1/pd1_test.go b/tsdb/engine/pd1/pd1_test.go index a20cc665ce..3be9bc63bb 100644 --- a/tsdb/engine/pd1/pd1_test.go +++ b/tsdb/engine/pd1/pd1_test.go @@ -184,6 +184,53 @@ func TestEngine_WriteIndexQueryAcrossDataFiles(t *testing.T) { verify("cpu,host=B", []models.Point{p2, p8, p4, p6}, 0) } +func TestEngine_WriteOverwritePreviousPoint(t *testing.T) { + e := OpenDefaultEngine() + defer e.Cleanup() + + e.Shard = newFieldCodecMock(map[string]influxql.DataType{"value": influxql.Float}) + fields := []string{"value"} + var codec *tsdb.FieldCodec + + p1 := parsePoint("cpu,host=A value=1.1 1000000000") + p2 := parsePoint("cpu,host=A value=1.2 1000000000") + p3 := parsePoint("cpu,host=A value=1.3 1000000000") + + if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + c := e.Cursor("cpu,host=A", fields, codec, true) + k, v := c.Next() + if k != p2.UnixNano() { + t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k) + } + if 1.2 != v { + t.Fatalf("data wrong:\n\texp:%f\n\tgot:%f", 1.2, v.(float64)) + } + k, v = c.Next() + if k != tsdb.EOF { + t.Fatal("expected EOF") + } + + if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + c = e.Cursor("cpu,host=A", fields, codec, true) + k, v = c.Next() + if k != p3.UnixNano() { + t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k) + } + if 1.3 != v { + t.Fatalf("data wrong:\n\texp:%f\n\tgot:%f", 1.3, v.(float64)) + } + k, v = c.Next() + if k != tsdb.EOF { + t.Fatal("expected EOF") + } +} + func TestEngine_WriteIndexBenchmarkNames(t *testing.T) { t.Skip("whatevs") diff --git a/tsdb/engine/pd1/wal.go b/tsdb/engine/pd1/wal.go index e7fb7fb39a..063e0d884b 100644 --- a/tsdb/engine/pd1/wal.go +++ b/tsdb/engine/pd1/wal.go @@ -282,7 +282,7 @@ func (l *Log) addToCache(points []models.Point, fields map[string]*tsdb.Measurem // only mark it as dirty if it isn't already if _, ok := l.cacheDirtySort[k]; !ok && len(cacheValues) > 0 { - dirty := cacheValues[len(cacheValues)-1].Time().UnixNano() > v.Time().UnixNano() + dirty := cacheValues[len(cacheValues)-1].Time().UnixNano() >= v.Time().UnixNano() if dirty { l.cacheDirtySort[k] = true } @@ -522,7 +522,7 @@ func (l *Log) flush(flush flushType) error { l.flushCache = l.cache l.cache = make(map[string]Values) for k, _ := range l.cacheDirtySort { - sort.Sort(l.flushCache[k]) + l.flushCache[k] = l.flushCache[k].Deduplicate() } l.cacheDirtySort = make(map[string]bool) valuesByKey := make(map[string]Values)