From 48d4156eac0a86fd6c2129f042c3d9199983c154 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Sat, 19 Dec 2015 14:19:08 -0700 Subject: [PATCH] Fix blocks not sorted correctly when chunking --- tsdb/engine/tsm1/compact.go | 53 ++++++++++++++++++++++++++----------- tsdb/engine/tsm1/reader.go | 9 +++++++ 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 13884fdb55..edc4a9af15 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -460,7 +460,7 @@ type tsmKeyIterator struct { iterators []*BlockIterator blocks blocks - buf blocks + buf []blocks } type block struct { @@ -495,7 +495,7 @@ func NewTSMKeyIterator(size int, readers ...*TSMReader) (KeyIterator, error) { keys: make([]string, len(readers)), size: size, iterators: iter, - buf: make([]*block, len(iter)), + buf: make([]blocks, len(iter)), }, nil } @@ -519,11 +519,27 @@ func (k *tsmKeyIterator) Next() bool { k.err = err } - k.buf[i] = &block{ + k.buf[i] = append(k.buf[i], &block{ minTime: minTime, maxTime: maxTime, key: key, b: b, + }) + + blockKey := key + for iter.PeekNext() == blockKey { + iter.Next() + key, minTime, maxTime, b, err := iter.Read() + if err != nil { + k.err = err + } + + k.buf[i] = append(k.buf[i], &block{ + minTime: minTime, + maxTime: maxTime, + key: key, + b: b, + }) } } } @@ -534,22 +550,22 @@ func (k *tsmKeyIterator) Next() bool { var minKey string for _, b := range k.buf { // block could be nil if the iterator has been exhausted for that file - if b == nil { + if len(b) == 0 { continue } - if minKey == "" || b.key < minKey { - minKey = b.key + if minKey == "" || b[0].key < minKey { + minKey = b[0].key } } // Now we need to find all blocks that match the min key so we can combine and dedupe // the blocks if necessary for i, b := range k.buf { - if b == nil { + if len(b) == 0 { continue } - if b.key == minKey { - k.blocks = append(k.blocks, b) + if b[0].key == minKey { + k.blocks = append(k.blocks, b...) k.buf[i] = nil } } @@ -572,7 +588,6 @@ func (k *tsmKeyIterator) Next() bool { } } } - k.blocks = k.combine(dedup) return len(k.blocks) > 0 @@ -618,18 +633,26 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks { i++ } - // The remaining blocks can be combined and we know that they do not overlap and are already - // sorted so we can just append each and re-encode. + var needSort bool + // The remaining blocks can be combined and we know that they do not overlap and + // so we can just append each, sort and re-encode. for i < len(k.blocks) { v, err := DecodeBlock(k.blocks[i].b, nil) if err != nil { k.err = err return nil } + if len(decoded) > 0 && decoded[len(decoded)-1].Time().After(v[0].Time()) { + needSort = true + } + decoded = append(decoded, v...) i++ } + if needSort { + sort.Sort(Values(decoded)) + } return k.chunk(chunked, decoded) } } @@ -644,7 +667,7 @@ func (k *tsmKeyIterator) chunk(dst blocks, values []Value) blocks { dst = append(dst, &block{ minTime: values[0].Time(), - maxTime: values[k.size].Time(), + maxTime: values[k.size-1].Time(), key: k.blocks[0].key, b: cb, }) @@ -724,7 +747,7 @@ func (c *cacheKeyIterator) Next() bool { c.k = c.order[0] c.order = c.order[1:] c.values = c.cache.values(c.k) - return true + return len(c.values) > 0 } func (c *cacheKeyIterator) Read() (string, time.Time, time.Time, []byte, error) { @@ -732,7 +755,7 @@ func (c *cacheKeyIterator) Read() (string, time.Time, time.Time, []byte, error) var b []byte var err error if len(c.values) > c.size { - maxTime = c.values[c.size].Time() + maxTime = c.values[c.size-1].Time() b, err = Values(c.values[:c.size]).Encode(nil) } else { b, err = Values(c.values).Encode(nil) diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 57d06f40ad..5fdae3b5e2 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -40,6 +40,15 @@ type BlockIterator struct { err error } +func (b *BlockIterator) PeekNext() string { + if len(b.entries) > 1 { + return b.key + } else if len(b.keys) > 1 { + return b.keys[1] + } + return "" +} + func (b *BlockIterator) Next() bool { if len(b.keys) == 0 && len(b.entries) == 0 { return false