Deduplicate supports requesting sort order
parent
f8040269d9
commit
bad0f657de
|
@ -86,7 +86,7 @@ func (a *entries) clone() Values {
|
||||||
|
|
||||||
// dedupe returns a copy of all underlying Values. Values are deduped and sorted.
|
// dedupe returns a copy of all underlying Values. Values are deduped and sorted.
|
||||||
func (a *entries) dedupe() Values {
|
func (a *entries) dedupe() Values {
|
||||||
return a.clone().Deduplicate()
|
return a.clone().Deduplicate(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache maintains an in-memory store of Values for a set of keys. As data is added to the cache
|
// Cache maintains an in-memory store of Values for a set of keys. As data is added to the cache
|
||||||
|
@ -204,7 +204,7 @@ func (c *Cache) Values(key string) Values {
|
||||||
if values == nil {
|
if values == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return values.Deduplicate()
|
return values.Deduplicate(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// evict instructs the cache to evict data up to and including the current checkpoint.
|
// evict instructs the cache to evict data up to and including the current checkpoint.
|
||||||
|
|
|
@ -287,7 +287,7 @@ func NewWALKeyIterator(readers ...*WALSegmentReader) (KeyIterator, error) {
|
||||||
// sort and dedup all the points for each key.
|
// sort and dedup all the points for each key.
|
||||||
for k, v := range series {
|
for k, v := range series {
|
||||||
order = append(order, k)
|
order = append(order, k)
|
||||||
series[k] = v.Deduplicate()
|
series[k] = v.Deduplicate(true)
|
||||||
}
|
}
|
||||||
sort.Strings(order)
|
sort.Strings(order)
|
||||||
|
|
||||||
|
|
|
@ -168,10 +168,10 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deduplicate returns a new Values slice with any values
|
// Deduplicate returns a new Values slice with any values that have the same timestamp removed.
|
||||||
// that have the same timestamp removed. The Value that appears
|
// The Value that appears last in the slice is the one that is kept. The returned slice is then
|
||||||
// last in the slice is the one that is kept. The returned slice is in ascending order
|
// sorted in the requested order.
|
||||||
func (a Values) Deduplicate() Values {
|
func (a Values) Deduplicate(ascending bool) Values {
|
||||||
m := make(map[int64]Value)
|
m := make(map[int64]Value)
|
||||||
for _, val := range a {
|
for _, val := range a {
|
||||||
m[val.UnixNano()] = val
|
m[val.UnixNano()] = val
|
||||||
|
@ -181,8 +181,12 @@ func (a Values) Deduplicate() Values {
|
||||||
for _, val := range m {
|
for _, val := range m {
|
||||||
other = append(other, val)
|
other = append(other, val)
|
||||||
}
|
}
|
||||||
sort.Sort(Values(other))
|
|
||||||
|
|
||||||
|
if ascending {
|
||||||
|
sort.Sort(Values(other))
|
||||||
|
} else {
|
||||||
|
sort.Sort(sort.Reverse(Values(other)))
|
||||||
|
}
|
||||||
return other
|
return other
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -166,12 +166,12 @@ func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascen
|
||||||
copy(c, fc)
|
copy(c, fc)
|
||||||
c = append(c, values...)
|
c = append(c, values...)
|
||||||
|
|
||||||
return newWALCursor(Values(c).Deduplicate(), ascending)
|
return newWALCursor(Values(c).Deduplicate(true), ascending)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if l.cacheDirtySort[ck] {
|
if l.cacheDirtySort[ck] {
|
||||||
values = Values(values).Deduplicate()
|
values = Values(values).Deduplicate(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// build a copy so writes afterwards don't change the result set
|
// build a copy so writes afterwards don't change the result set
|
||||||
|
@ -586,7 +586,7 @@ func (l *Log) flush(flush flushType) error {
|
||||||
}
|
}
|
||||||
l.cache = make(map[string]Values)
|
l.cache = make(map[string]Values)
|
||||||
for k := range l.cacheDirtySort {
|
for k := range l.cacheDirtySort {
|
||||||
l.flushCache[k] = l.flushCache[k].Deduplicate()
|
l.flushCache[k] = l.flushCache[k].Deduplicate(true)
|
||||||
}
|
}
|
||||||
l.cacheDirtySort = make(map[string]bool)
|
l.cacheDirtySort = make(map[string]bool)
|
||||||
|
|
||||||
|
|
|
@ -1656,12 +1656,12 @@ func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime
|
||||||
})
|
})
|
||||||
values = append(values, newValues[:pos]...)
|
values = append(values, newValues[:pos]...)
|
||||||
remainingValues = newValues[pos:]
|
remainingValues = newValues[pos:]
|
||||||
values = Values(values).Deduplicate()
|
values = Values(values).Deduplicate(true)
|
||||||
} else {
|
} else {
|
||||||
requireSort := Values(values).MaxTime() >= newValues.MinTime()
|
requireSort := Values(values).MaxTime() >= newValues.MinTime()
|
||||||
values = append(values, newValues...)
|
values = append(values, newValues...)
|
||||||
if requireSort {
|
if requireSort {
|
||||||
values = Values(values).Deduplicate()
|
values = Values(values).Deduplicate(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue