Ensure we don't have duplicate values. Fix panic in compaction.
parent
0770ccc87d
commit
7baba84a21
|
@ -1,6 +1,7 @@
|
||||||
package pd1
|
package pd1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dgryski/go-tsz"
|
"github.com/dgryski/go-tsz"
|
||||||
|
@ -75,6 +76,24 @@ func (v Values) DecodeSameTypeBlock(block []byte) Values {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deduplicate returns a new Values slice with any values
|
||||||
|
// that have the same timestamp removed. The Value that appears
|
||||||
|
// last in the slice is the one that is kept. The returned slice is in ascending order
|
||||||
|
func (v Values) Deduplicate() Values {
|
||||||
|
m := make(map[int64]Value)
|
||||||
|
for _, val := range v {
|
||||||
|
m[val.UnixNano()] = val
|
||||||
|
}
|
||||||
|
|
||||||
|
a := make([]Value, 0, len(m))
|
||||||
|
for _, val := range m {
|
||||||
|
a = append(a, val)
|
||||||
|
}
|
||||||
|
sort.Sort(Values(a))
|
||||||
|
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
// Sort methods
|
// Sort methods
|
||||||
func (a Values) Len() int { return len(a) }
|
func (a Values) Len() int { return len(a) }
|
||||||
func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
|
|
@ -389,6 +389,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
|
||||||
if len(valuesByID) == 0 {
|
if len(valuesByID) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// we need the values in sorted order so that we can merge them into the
|
// we need the values in sorted order so that we can merge them into the
|
||||||
// new file as we read the old file
|
// new file as we read the old file
|
||||||
ids := make([]uint64, 0, len(valuesByID))
|
ids := make([]uint64, 0, len(valuesByID))
|
||||||
|
@ -506,7 +507,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
|
||||||
// determine if there's a block after this with the same id and get its time
|
// determine if there's a block after this with the same id and get its time
|
||||||
hasFutureBlock := false
|
hasFutureBlock := false
|
||||||
nextTime := int64(0)
|
nextTime := int64(0)
|
||||||
if fpos < oldDF.size {
|
if fpos < oldDF.indexPosition() {
|
||||||
nextID := btou64(oldDF.mmap[fpos : fpos+8])
|
nextID := btou64(oldDF.mmap[fpos : fpos+8])
|
||||||
if nextID == id {
|
if nextID == id {
|
||||||
hasFutureBlock = true
|
hasFutureBlock = true
|
||||||
|
@ -530,7 +531,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
|
||||||
|
|
||||||
currentPosition += uint32(12 + len(newBlock))
|
currentPosition += uint32(12 + len(newBlock))
|
||||||
|
|
||||||
if fpos >= oldDF.size {
|
if fpos >= oldDF.indexPosition() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -877,17 +878,15 @@ func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime
|
||||||
})
|
})
|
||||||
values = append(values, newValues[:pos]...)
|
values = append(values, newValues[:pos]...)
|
||||||
remainingValues = newValues[pos:]
|
remainingValues = newValues[pos:]
|
||||||
sort.Sort(values)
|
values = values.Deduplicate()
|
||||||
} else {
|
} else {
|
||||||
requireSort := values.MaxTime() > newValues.MinTime()
|
requireSort := values.MaxTime() >= newValues.MinTime()
|
||||||
values = append(values, newValues...)
|
values = append(values, newValues...)
|
||||||
if requireSort {
|
if requireSort {
|
||||||
sort.Sort(values)
|
values = values.Deduplicate()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: deduplicate values
|
|
||||||
|
|
||||||
if len(values) > DefaultMaxPointsPerBlock {
|
if len(values) > DefaultMaxPointsPerBlock {
|
||||||
remainingValues = values[DefaultMaxPointsPerBlock:]
|
remainingValues = values[DefaultMaxPointsPerBlock:]
|
||||||
values = values[:DefaultMaxPointsPerBlock]
|
values = values[:DefaultMaxPointsPerBlock]
|
||||||
|
@ -986,6 +985,10 @@ func (d *dataFile) IDToPosition() map[uint64]uint32 {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *dataFile) indexPosition() uint32 {
|
||||||
|
return d.size - uint32(d.SeriesCount()*12+20)
|
||||||
|
}
|
||||||
|
|
||||||
// StartingPositionForID returns the position in the file of the
|
// StartingPositionForID returns the position in the file of the
|
||||||
// first block for the given ID. If zero is returned the ID doesn't
|
// first block for the given ID. If zero is returned the ID doesn't
|
||||||
// have any data in this file.
|
// have any data in this file.
|
||||||
|
@ -1123,7 +1126,7 @@ func (c *cursor) Next() (int64, interface{}) {
|
||||||
// if we have a file set, see if the next block is for this ID
|
// if we have a file set, see if the next block is for this ID
|
||||||
if c.f != nil && c.pos < c.f.size {
|
if c.f != nil && c.pos < c.f.size {
|
||||||
nextBlockID := btou64(c.f.mmap[c.pos : c.pos+8])
|
nextBlockID := btou64(c.f.mmap[c.pos : c.pos+8])
|
||||||
if nextBlockID == c.id {
|
if nextBlockID == c.id && c.pos != c.f.indexPosition() {
|
||||||
return c.decodeBlockAndGetValues(c.pos)
|
return c.decodeBlockAndGetValues(c.pos)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,6 +184,53 @@ func TestEngine_WriteIndexQueryAcrossDataFiles(t *testing.T) {
|
||||||
verify("cpu,host=B", []models.Point{p2, p8, p4, p6}, 0)
|
verify("cpu,host=B", []models.Point{p2, p8, p4, p6}, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEngine_WriteOverwritePreviousPoint(t *testing.T) {
|
||||||
|
e := OpenDefaultEngine()
|
||||||
|
defer e.Cleanup()
|
||||||
|
|
||||||
|
e.Shard = newFieldCodecMock(map[string]influxql.DataType{"value": influxql.Float})
|
||||||
|
fields := []string{"value"}
|
||||||
|
var codec *tsdb.FieldCodec
|
||||||
|
|
||||||
|
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
|
||||||
|
p2 := parsePoint("cpu,host=A value=1.2 1000000000")
|
||||||
|
p3 := parsePoint("cpu,host=A value=1.3 1000000000")
|
||||||
|
|
||||||
|
if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil {
|
||||||
|
t.Fatalf("failed to write points: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
c := e.Cursor("cpu,host=A", fields, codec, true)
|
||||||
|
k, v := c.Next()
|
||||||
|
if k != p2.UnixNano() {
|
||||||
|
t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p2.UnixNano(), k)
|
||||||
|
}
|
||||||
|
if 1.2 != v {
|
||||||
|
t.Fatalf("data wrong:\n\texp:%f\n\tgot:%f", 1.2, v.(float64))
|
||||||
|
}
|
||||||
|
k, v = c.Next()
|
||||||
|
if k != tsdb.EOF {
|
||||||
|
t.Fatal("expected EOF")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil {
|
||||||
|
t.Fatalf("failed to write points: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
c = e.Cursor("cpu,host=A", fields, codec, true)
|
||||||
|
k, v = c.Next()
|
||||||
|
if k != p3.UnixNano() {
|
||||||
|
t.Fatalf("time wrong:\n\texp:%d\n\tgot:%d\n", p3.UnixNano(), k)
|
||||||
|
}
|
||||||
|
if 1.3 != v {
|
||||||
|
t.Fatalf("data wrong:\n\texp:%f\n\tgot:%f", 1.3, v.(float64))
|
||||||
|
}
|
||||||
|
k, v = c.Next()
|
||||||
|
if k != tsdb.EOF {
|
||||||
|
t.Fatal("expected EOF")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEngine_WriteIndexBenchmarkNames(t *testing.T) {
|
func TestEngine_WriteIndexBenchmarkNames(t *testing.T) {
|
||||||
t.Skip("whatevs")
|
t.Skip("whatevs")
|
||||||
|
|
||||||
|
|
|
@ -282,7 +282,7 @@ func (l *Log) addToCache(points []models.Point, fields map[string]*tsdb.Measurem
|
||||||
|
|
||||||
// only mark it as dirty if it isn't already
|
// only mark it as dirty if it isn't already
|
||||||
if _, ok := l.cacheDirtySort[k]; !ok && len(cacheValues) > 0 {
|
if _, ok := l.cacheDirtySort[k]; !ok && len(cacheValues) > 0 {
|
||||||
dirty := cacheValues[len(cacheValues)-1].Time().UnixNano() > v.Time().UnixNano()
|
dirty := cacheValues[len(cacheValues)-1].Time().UnixNano() >= v.Time().UnixNano()
|
||||||
if dirty {
|
if dirty {
|
||||||
l.cacheDirtySort[k] = true
|
l.cacheDirtySort[k] = true
|
||||||
}
|
}
|
||||||
|
@ -522,7 +522,7 @@ func (l *Log) flush(flush flushType) error {
|
||||||
l.flushCache = l.cache
|
l.flushCache = l.cache
|
||||||
l.cache = make(map[string]Values)
|
l.cache = make(map[string]Values)
|
||||||
for k, _ := range l.cacheDirtySort {
|
for k, _ := range l.cacheDirtySort {
|
||||||
sort.Sort(l.flushCache[k])
|
l.flushCache[k] = l.flushCache[k].Deduplicate()
|
||||||
}
|
}
|
||||||
l.cacheDirtySort = make(map[string]bool)
|
l.cacheDirtySort = make(map[string]bool)
|
||||||
valuesByKey := make(map[string]Values)
|
valuesByKey := make(map[string]Values)
|
||||||
|
|
Loading…
Reference in New Issue