Fix blocks not sorted correctly when chunking
parent
bb2562b2ab
commit
48d4156eac
|
@ -460,7 +460,7 @@ type tsmKeyIterator struct {
|
||||||
iterators []*BlockIterator
|
iterators []*BlockIterator
|
||||||
blocks blocks
|
blocks blocks
|
||||||
|
|
||||||
buf blocks
|
buf []blocks
|
||||||
}
|
}
|
||||||
|
|
||||||
type block struct {
|
type block struct {
|
||||||
|
@ -495,7 +495,7 @@ func NewTSMKeyIterator(size int, readers ...*TSMReader) (KeyIterator, error) {
|
||||||
keys: make([]string, len(readers)),
|
keys: make([]string, len(readers)),
|
||||||
size: size,
|
size: size,
|
||||||
iterators: iter,
|
iterators: iter,
|
||||||
buf: make([]*block, len(iter)),
|
buf: make([]blocks, len(iter)),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -519,11 +519,27 @@ func (k *tsmKeyIterator) Next() bool {
|
||||||
k.err = err
|
k.err = err
|
||||||
}
|
}
|
||||||
|
|
||||||
k.buf[i] = &block{
|
k.buf[i] = append(k.buf[i], &block{
|
||||||
minTime: minTime,
|
minTime: minTime,
|
||||||
maxTime: maxTime,
|
maxTime: maxTime,
|
||||||
key: key,
|
key: key,
|
||||||
b: b,
|
b: b,
|
||||||
|
})
|
||||||
|
|
||||||
|
blockKey := key
|
||||||
|
for iter.PeekNext() == blockKey {
|
||||||
|
iter.Next()
|
||||||
|
key, minTime, maxTime, b, err := iter.Read()
|
||||||
|
if err != nil {
|
||||||
|
k.err = err
|
||||||
|
}
|
||||||
|
|
||||||
|
k.buf[i] = append(k.buf[i], &block{
|
||||||
|
minTime: minTime,
|
||||||
|
maxTime: maxTime,
|
||||||
|
key: key,
|
||||||
|
b: b,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -534,22 +550,22 @@ func (k *tsmKeyIterator) Next() bool {
|
||||||
var minKey string
|
var minKey string
|
||||||
for _, b := range k.buf {
|
for _, b := range k.buf {
|
||||||
// block could be nil if the iterator has been exhausted for that file
|
// block could be nil if the iterator has been exhausted for that file
|
||||||
if b == nil {
|
if len(b) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if minKey == "" || b.key < minKey {
|
if minKey == "" || b[0].key < minKey {
|
||||||
minKey = b.key
|
minKey = b[0].key
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now we need to find all blocks that match the min key so we can combine and dedupe
|
// Now we need to find all blocks that match the min key so we can combine and dedupe
|
||||||
// the blocks if necessary
|
// the blocks if necessary
|
||||||
for i, b := range k.buf {
|
for i, b := range k.buf {
|
||||||
if b == nil {
|
if len(b) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if b.key == minKey {
|
if b[0].key == minKey {
|
||||||
k.blocks = append(k.blocks, b)
|
k.blocks = append(k.blocks, b...)
|
||||||
k.buf[i] = nil
|
k.buf[i] = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -572,7 +588,6 @@ func (k *tsmKeyIterator) Next() bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
k.blocks = k.combine(dedup)
|
k.blocks = k.combine(dedup)
|
||||||
|
|
||||||
return len(k.blocks) > 0
|
return len(k.blocks) > 0
|
||||||
|
@ -618,18 +633,26 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
// The remaining blocks can be combined and we know that they do not overlap and are already
|
var needSort bool
|
||||||
// sorted so we can just append each and re-encode.
|
// The remaining blocks can be combined and we know that they do not overlap and
|
||||||
|
// so we can just append each, sort and re-encode.
|
||||||
for i < len(k.blocks) {
|
for i < len(k.blocks) {
|
||||||
v, err := DecodeBlock(k.blocks[i].b, nil)
|
v, err := DecodeBlock(k.blocks[i].b, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
k.err = err
|
k.err = err
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if len(decoded) > 0 && decoded[len(decoded)-1].Time().After(v[0].Time()) {
|
||||||
|
needSort = true
|
||||||
|
}
|
||||||
|
|
||||||
decoded = append(decoded, v...)
|
decoded = append(decoded, v...)
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if needSort {
|
||||||
|
sort.Sort(Values(decoded))
|
||||||
|
}
|
||||||
return k.chunk(chunked, decoded)
|
return k.chunk(chunked, decoded)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -644,7 +667,7 @@ func (k *tsmKeyIterator) chunk(dst blocks, values []Value) blocks {
|
||||||
|
|
||||||
dst = append(dst, &block{
|
dst = append(dst, &block{
|
||||||
minTime: values[0].Time(),
|
minTime: values[0].Time(),
|
||||||
maxTime: values[k.size].Time(),
|
maxTime: values[k.size-1].Time(),
|
||||||
key: k.blocks[0].key,
|
key: k.blocks[0].key,
|
||||||
b: cb,
|
b: cb,
|
||||||
})
|
})
|
||||||
|
@ -724,7 +747,7 @@ func (c *cacheKeyIterator) Next() bool {
|
||||||
c.k = c.order[0]
|
c.k = c.order[0]
|
||||||
c.order = c.order[1:]
|
c.order = c.order[1:]
|
||||||
c.values = c.cache.values(c.k)
|
c.values = c.cache.values(c.k)
|
||||||
return true
|
return len(c.values) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheKeyIterator) Read() (string, time.Time, time.Time, []byte, error) {
|
func (c *cacheKeyIterator) Read() (string, time.Time, time.Time, []byte, error) {
|
||||||
|
@ -732,7 +755,7 @@ func (c *cacheKeyIterator) Read() (string, time.Time, time.Time, []byte, error)
|
||||||
var b []byte
|
var b []byte
|
||||||
var err error
|
var err error
|
||||||
if len(c.values) > c.size {
|
if len(c.values) > c.size {
|
||||||
maxTime = c.values[c.size].Time()
|
maxTime = c.values[c.size-1].Time()
|
||||||
b, err = Values(c.values[:c.size]).Encode(nil)
|
b, err = Values(c.values[:c.size]).Encode(nil)
|
||||||
} else {
|
} else {
|
||||||
b, err = Values(c.values).Encode(nil)
|
b, err = Values(c.values).Encode(nil)
|
||||||
|
|
|
@ -40,6 +40,15 @@ type BlockIterator struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *BlockIterator) PeekNext() string {
|
||||||
|
if len(b.entries) > 1 {
|
||||||
|
return b.key
|
||||||
|
} else if len(b.keys) > 1 {
|
||||||
|
return b.keys[1]
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
func (b *BlockIterator) Next() bool {
|
func (b *BlockIterator) Next() bool {
|
||||||
if len(b.keys) == 0 && len(b.entries) == 0 {
|
if len(b.keys) == 0 && len(b.entries) == 0 {
|
||||||
return false
|
return false
|
||||||
|
|
Loading…
Reference in New Issue