From bad0f657de2215947a3dbe4da203969f6c9a97aa Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 30 Nov 2015 16:21:44 -0800 Subject: [PATCH] Deduplicate supports requesting sort order --- tsdb/engine/tsm1/cache.go | 4 ++-- tsdb/engine/tsm1/compact.go | 2 +- tsdb/engine/tsm1/encoding.go | 14 +++++++++----- tsdb/engine/tsm1/log.go | 6 +++--- tsdb/engine/tsm1/tsm1.go | 4 ++-- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index f7b5ac04db..bb8e3da2a0 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -86,7 +86,7 @@ func (a *entries) clone() Values { // dedupe returns a copy of all underlying Values. Values are deduped and sorted. func (a *entries) dedupe() Values { - return a.clone().Deduplicate() + return a.clone().Deduplicate(true) } // Cache maintains an in-memory store of Values for a set of keys. As data is added to the cache @@ -204,7 +204,7 @@ func (c *Cache) Values(key string) Values { if values == nil { return nil } - return values.Deduplicate() + return values.Deduplicate(true) } // evict instructs the cache to evict data up to and including the current checkpoint. diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index cebe493cd0..447608887c 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -287,7 +287,7 @@ func NewWALKeyIterator(readers ...*WALSegmentReader) (KeyIterator, error) { // sort and dedup all the points for each key. for k, v := range series { order = append(order, k) - series[k] = v.Deduplicate() + series[k] = v.Deduplicate(true) } sort.Strings(order) diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index a819c2ebd9..8cb0a0d838 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -168,10 +168,10 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { } } -// Deduplicate returns a new Values slice with any values -// that have the same timestamp removed. The Value that appears -// last in the slice is the one that is kept. The returned slice is in ascending order -func (a Values) Deduplicate() Values { +// Deduplicate returns a new Values slice with any values that have the same timestamp removed. +// The Value that appears last in the slice is the one that is kept. The returned slice is then +// sorted in the requested order. +func (a Values) Deduplicate(ascending bool) Values { m := make(map[int64]Value) for _, val := range a { m[val.UnixNano()] = val @@ -181,8 +181,12 @@ func (a Values) Deduplicate() Values { for _, val := range m { other = append(other, val) } - sort.Sort(Values(other)) + if ascending { + sort.Sort(Values(other)) + } else { + sort.Sort(sort.Reverse(Values(other))) + } return other } diff --git a/tsdb/engine/tsm1/log.go b/tsdb/engine/tsm1/log.go index 46b059a50f..d19f3695c0 100644 --- a/tsdb/engine/tsm1/log.go +++ b/tsdb/engine/tsm1/log.go @@ -166,12 +166,12 @@ func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascen copy(c, fc) c = append(c, values...) - return newWALCursor(Values(c).Deduplicate(), ascending) + return newWALCursor(Values(c).Deduplicate(true), ascending) } } if l.cacheDirtySort[ck] { - values = Values(values).Deduplicate() + values = Values(values).Deduplicate(true) } // build a copy so writes afterwards don't change the result set @@ -586,7 +586,7 @@ func (l *Log) flush(flush flushType) error { } l.cache = make(map[string]Values) for k := range l.cacheDirtySort { - l.flushCache[k] = l.flushCache[k].Deduplicate() + l.flushCache[k] = l.flushCache[k].Deduplicate(true) } l.cacheDirtySort = make(map[string]bool) diff --git a/tsdb/engine/tsm1/tsm1.go b/tsdb/engine/tsm1/tsm1.go index 12fbd81e6d..d16b656326 100644 --- a/tsdb/engine/tsm1/tsm1.go +++ b/tsdb/engine/tsm1/tsm1.go @@ -1656,12 +1656,12 @@ func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime }) values = append(values, newValues[:pos]...) remainingValues = newValues[pos:] - values = Values(values).Deduplicate() + values = Values(values).Deduplicate(true) } else { requireSort := Values(values).MaxTime() >= newValues.MinTime() values = append(values, newValues...) if requireSort { - values = Values(values).Deduplicate() + values = Values(values).Deduplicate(true) } }