Add TSM batch key iterator

The batch focussed TSM key iterator iterates TSM blocks, decoding and
merging blocks where appropriate using the the batch focussed
approaches.
pull/10300/head
Edd Robinson 2018-09-21 10:23:01 -07:00
parent 51233b71a5
commit 09da18c08e
5 changed files with 2772 additions and 3 deletions

1027
tsdb/arrayvalues.gen.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,211 @@
package tsdb
{{range .}}
{{ $typename := print .Name "Array" }}
type {{ $typename }} struct {
Timestamps []int64
Values []{{.Type}}
}
func New{{$typename}}Len(sz int) *{{$typename}} {
return &{{$typename}}{
Timestamps: make([]int64, sz),
Values: make([]{{.Type}}, sz),
}
}
func (a *{{ $typename }}) MinTime() int64 {
return a.Timestamps[0]
}
func (a *{{ $typename }}) MaxTime() int64 {
return a.Timestamps[len(a.Timestamps)-1]
}
func (a *{{ $typename }}) Size() int {
panic("not implemented")
}
func (a *{{ $typename}}) Len() int {
if a == nil {
return 0
}
return len(a.Timestamps)
}
// Exclude removes the subset of values in [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *{{ $typename }}) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
if rmin == -1 && rmax == -1 {
return
}
// a.Timestamps[rmin] ≥ min
// a.Timestamps[rmax] ≥ max
if rmax < a.Len() {
if a.Timestamps[rmax] == max {
rmax++
}
rest := a.Len()-rmax
if rest > 0 {
ts := a.Timestamps[:rmin+rest]
copy(ts[rmin:], a.Timestamps[rmax:])
a.Timestamps = ts
vs := a.Values[:rmin+rest]
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
return
}
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
// Include returns the subset values between min and max inclusive. The values must
// be deduplicated and sorted before calling Include or the results are undefined.
func (a *{{ $typename }}) Include(min, max int64) {
rmin, rmax := a.FindRange(min, max)
if rmin == -1 && rmax == -1 {
a.Timestamps = a.Timestamps[:0]
a.Values = a.Values[:0]
return
}
// a.Timestamps[rmin] ≥ min
// a.Timestamps[rmax] ≥ max
if rmax < a.Len() && a.Timestamps[rmax] == max {
rmax++
}
if rmin > -1 {
ts := a.Timestamps[:rmax-rmin]
copy(ts, a.Timestamps[rmin:rmax])
a.Timestamps = ts
vs := a.Values[:rmax-rmin]
copy(vs, a.Values[rmin:rmax])
a.Values = vs
} else {
a.Timestamps = a.Timestamps[:rmax]
a.Values = a.Values[:rmax]
}
}
// search performs a binary search for UnixNano() v in a
// and returns the position, i, where v would be inserted.
// An additional check of a.Timestamps[i] == v is necessary
// to determine if the value v exists.
func (a *{{ $typename }}) search(v int64) int {
// Define: f(x) → a.Timestamps[x] < v
// Define: f(-1) == true, f(n) == false
// Invariant: f(lo-1) == true, f(hi) == false
lo := 0
hi := a.Len()
for lo < hi {
mid := int(uint(lo+hi) >> 1)
if a.Timestamps[mid] < v {
lo = mid + 1 // preserves f(lo-1) == true
} else {
hi = mid // preserves f(hi) == false
}
}
// lo == hi
return lo
}
// FindRange returns the positions where min and max would be
// inserted into the array. If a[0].UnixNano() > max or
// a[len-1].UnixNano() < min then FindRange returns (-1, -1)
// indicating the array is outside the [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results
// are undefined.
func (a *{{ $typename }}) FindRange(min, max int64) (int, int) {
if a.Len() == 0 || min > max {
return -1, -1
}
minVal := a.MinTime()
maxVal := a.MaxTime()
if maxVal < min || minVal > max {
return -1, -1
}
return a.search(min), a.search(max)
}
// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a *{{ $typename }}) Merge(b *{{ $typename }}) {
if a.Len() == 0 {
*a = *b
return
}
if b.Len() == 0 {
return
}
// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
// a = a.Deduplicate()
// b = b.Deduplicate()
if a.MaxTime() < b.MinTime() {
a.Timestamps = append(a.Timestamps, b.Timestamps...)
a.Values = append(a.Values, b.Values...)
return
}
if b.MaxTime() < a.MinTime() {
var tmp {{$typename}}
tmp.Timestamps = append(b.Timestamps, a.Timestamps...)
tmp.Values = append(b.Values, a.Values...)
*a = tmp
return
}
out := New{{$typename}}Len(a.Len()+b.Len())
i, j, k := 0, 0, 0
for i < len(a.Timestamps) && j < len(b.Timestamps) {
if a.Timestamps[i] < b.Timestamps[j] {
out.Timestamps[k] = a.Timestamps[i]
out.Values[k] = a.Values[i]
i++
} else if a.Timestamps[i] == b.Timestamps[j] {
out.Timestamps[k] = b.Timestamps[j]
out.Values[k] = b.Values[j]
i++
j++
} else {
out.Timestamps[k] = b.Timestamps[j]
out.Values[k] = b.Values[j]
j++
}
k++
}
if i < len(a.Timestamps) {
n := copy(out.Timestamps[k:], a.Timestamps[i:])
copy(out.Values[k:], a.Values[i:])
k += n
} else if j < len(b.Timestamps) {
n := copy(out.Timestamps[k:], b.Timestamps[j:])
copy(out.Values[k:], b.Values[j:])
k += n
}
a.Timestamps = out.Timestamps[:k]
a.Values = out.Values[:k]
}
{{ end }}

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,10 @@
package tsm1
import "sort"
import (
"sort"
"github.com/influxdata/influxdb/tsdb"
)
{{range .}}
@ -203,4 +207,213 @@ func (k *tsmKeyIterator) chunk{{.Name}}(dst blocks) blocks {
return dst
}
{{ end }}
{{range .}}
// merge combines the next set of blocks into merged blocks.
func (k *tsmBatchKeyIterator) merge{{.Name}}() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && k.merged{{.Name}}Values.Len() == 0 {
return
}
sort.Stable(k.blocks)
dedup := k.merged{{.Name}}Values.Len() != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
dedup = k.blocks[i].partiallyRead() ||
k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) ||
len(k.blocks[i].tombstones) > 0
}
}
k.merged = k.combine{{.Name}}(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks {
if dedup {
for k.merged{{.Name}}Values.Len() < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
var v tsdb.{{.Name}}Array
var err error
if err = Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil {
k.err = err
return nil
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v.Include(minTime, maxTime)
if v.Len() > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v.MinTime(), v.MaxTime())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
}
k.merged{{.Name}}Values.Merge(&v)
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunk{{.Name}}(nil)
} else {
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
k.merged = append(k.merged, k.blocks[i])
} else {
break
}
i++
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
k.merged = append(k.merged, k.blocks[i])
i++
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
k.merged = append(k.merged, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && k.merged{{.Name}}Values.Len() < k.size {
if k.blocks[i].read() {
i++
continue
}
var v tsdb.{{.Name}}Array
if err := Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.merged{{.Name}}Values.Merge(&v)
i++
}
k.blocks = k.blocks[i:]
return k.chunk{{.Name}}(k.merged)
}
}
func (k *tsmBatchKeyIterator) chunk{{.Name}}(dst blocks) blocks {
if k.merged{{.Name}}Values.Len() > k.size {
var values tsdb.{{.Name}}Array
values.Timestamps = k.merged{{.Name}}Values.Timestamps[:k.size]
values.Values = k.merged{{.Name}}Values.Values[:k.size]
cb, err := Encode{{.Name}}ArrayBlock(&values, nil) // TODO(edd): pool this buffer
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values.MinTime(),
maxTime: values.MaxTime(),
key: k.key,
b: cb,
})
k.merged{{.Name}}Values.Timestamps = k.merged{{.Name}}Values.Timestamps[k.size:]
k.merged{{.Name}}Values.Values = k.merged{{.Name}}Values.Values[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if k.merged{{.Name}}Values.Len() > 0 {
cb, err := Encode{{.Name}}ArrayBlock(k.merged{{.Name}}Values, nil) // TODO(edd): pool this buffer
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.merged{{.Name}}Values.MinTime(),
maxTime: k.merged{{.Name}}Values.MaxTime(),
key: k.key,
b: cb,
})
k.merged{{.Name}}Values.Timestamps = k.merged{{.Name}}Values.Timestamps[:0]
k.merged{{.Name}}Values.Values = k.merged{{.Name}}Values.Values[:0]
}
return dst
}
{{ end }}

View File

@ -932,7 +932,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, nil
}
tsm, err := NewTSMKeyIterator(size, fast, intC, trs...)
tsm, err := NewTSMBatchKeyIterator(size, fast, intC, trs...)
if err != nil {
return nil, err
}
@ -1556,6 +1556,295 @@ func (k *tsmKeyIterator) Err() error {
return k.err
}
// tsmBatchKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces
// keys in sorted order and the values between the keys sorted and deduped. If any of
// the readers have associated tombstone entries, they are returned as part of iteration.
type tsmBatchKeyIterator struct {
// readers is the set of readers it produce a sorted key run with
readers []*TSMReader
// values is the temporary buffers for each key that is returned by a reader
values map[string][]Value
// pos is the current key postion within the corresponding readers slice. A value of
// pos[0] = 1, means the reader[0] is currently at key 1 in its ordered index.
pos []int
// err is any error we received while iterating values.
err error
// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
// fast is true to prevent overlapping blocks of time for the same key.
// If false, the blocks will be decoded and duplicated (if needed) and
// then chunked into the maximally sized blocks.
fast bool
// size is the maximum number of values to encode in a single block
size int
// key is the current key lowest key across all readers that has not be fully exhausted
// of values.
key []byte
typ byte
iterators []*BlockIterator
blocks blocks
buf []blocks
// mergeValues are decoded blocks that have been combined
mergedFloatValues *tsdb.FloatArray
mergedIntegerValues *tsdb.IntegerArray
mergedUnsignedValues *tsdb.UnsignedArray
mergedBooleanValues *tsdb.BooleanArray
mergedStringValues *tsdb.StringArray
// merged are encoded blocks that have been combined or used as is
// without decode
merged blocks
interrupt chan struct{}
}
// NewTSMBatchKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
}
return &tsmBatchKeyIterator{
readers: readers,
values: map[string][]Value{},
pos: make([]int, len(readers)),
size: size,
iterators: iter,
fast: fast,
buf: make([]blocks, len(iter)),
mergedFloatValues: &tsdb.FloatArray{},
mergedIntegerValues: &tsdb.IntegerArray{},
mergedUnsignedValues: &tsdb.UnsignedArray{},
mergedBooleanValues: &tsdb.BooleanArray{},
mergedStringValues: &tsdb.StringArray{},
interrupt: interrupt,
}, nil
}
func (k *tsmBatchKeyIterator) hasMergedValues() bool {
return k.mergedFloatValues.Len() > 0 ||
k.mergedIntegerValues.Len() > 0 ||
k.mergedUnsignedValues.Len() > 0 ||
k.mergedStringValues.Len() > 0 ||
k.mergedBooleanValues.Len() > 0
}
func (k *tsmBatchKeyIterator) EstimatedIndexSize() int {
var size uint32
for _, r := range k.readers {
size += r.IndexSize()
}
return int(size) / len(k.readers)
}
// Next returns true if there are any values remaining in the iterator.
func (k *tsmBatchKeyIterator) Next() bool {
RETRY:
// Any merged blocks pending?
if len(k.merged) > 0 {
k.merged = k.merged[1:]
if len(k.merged) > 0 {
return true
}
}
// Any merged values pending?
if k.hasMergedValues() {
k.merge()
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
// If we still have blocks from the last read, merge them
if len(k.blocks) > 0 {
k.merge()
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
// Read the next block from each TSM iterator
for i, v := range k.buf {
if len(v) == 0 {
iter := k.iterators[i]
if iter.Next() {
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.err = err
}
// This block may have ranges of time removed from it that would
// reduce the block min and max time.
tombstones := iter.r.TombstoneRange(key)
var blk *block
if cap(k.buf[i]) > len(k.buf[i]) {
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
blk = k.buf[i][len(k.buf[i])-1]
if blk == nil {
blk = &block{}
k.buf[i][len(k.buf[i])-1] = blk
}
} else {
blk = &block{}
k.buf[i] = append(k.buf[i], blk)
}
blk.minTime = minTime
blk.maxTime = maxTime
blk.key = key
blk.typ = typ
blk.b = b
blk.tombstones = tombstones
blk.readMin = math.MaxInt64
blk.readMax = math.MinInt64
blockKey := key
for bytes.Equal(iter.PeekNext(), blockKey) {
iter.Next()
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.err = err
}
tombstones := iter.r.TombstoneRange(key)
var blk *block
if cap(k.buf[i]) > len(k.buf[i]) {
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
blk = k.buf[i][len(k.buf[i])-1]
if blk == nil {
blk = &block{}
k.buf[i][len(k.buf[i])-1] = blk
}
} else {
blk = &block{}
k.buf[i] = append(k.buf[i], blk)
}
blk.minTime = minTime
blk.maxTime = maxTime
blk.key = key
blk.typ = typ
blk.b = b
blk.tombstones = tombstones
blk.readMin = math.MaxInt64
blk.readMax = math.MinInt64
}
}
if iter.Err() != nil {
k.err = iter.Err()
}
}
}
// Each reader could have a different key that it's currently at, need to find
// the next smallest one to keep the sort ordering.
var minKey []byte
var minType byte
for _, b := range k.buf {
// block could be nil if the iterator has been exhausted for that file
if len(b) == 0 {
continue
}
if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {
minKey = b[0].key
minType = b[0].typ
}
}
k.key = minKey
k.typ = minType
// Now we need to find all blocks that match the min key so we can combine and dedupe
// the blocks if necessary
for i, b := range k.buf {
if len(b) == 0 {
continue
}
if bytes.Equal(b[0].key, k.key) {
k.blocks = append(k.blocks, b...)
k.buf[i] = k.buf[i][:0]
}
}
if len(k.blocks) == 0 {
return false
}
k.merge()
// After merging all the values for this key, we might not have any. (e.g. they were all deleted
// through many tombstones). In this case, move on to the next key instead of ending iteration.
if len(k.merged) == 0 {
goto RETRY
}
return len(k.merged) > 0
}
// merge combines the next set of blocks into merged blocks.
func (k *tsmBatchKeyIterator) merge() {
switch k.typ {
case BlockFloat64:
k.mergeFloat()
case BlockInteger:
k.mergeInteger()
case BlockUnsigned:
k.mergeUnsigned()
case BlockBoolean:
k.mergeBoolean()
case BlockString:
k.mergeString()
default:
k.err = fmt.Errorf("unknown block type: %v", k.typ)
}
}
func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
// See if compactions were disabled while we were running.
select {
case <-k.interrupt:
return nil, 0, 0, nil, errCompactionAborted{}
default:
}
if len(k.merged) == 0 {
return nil, 0, 0, nil, k.err
}
block := k.merged[0]
return block.key, block.minTime, block.maxTime, block.b, k.err
}
func (k *tsmBatchKeyIterator) Close() error {
k.values = nil
k.pos = nil
k.iterators = nil
for _, r := range k.readers {
if err := r.Close(); err != nil {
return err
}
}
return nil
}
// Error returns any errors encountered during iteration.
func (k *tsmBatchKeyIterator) Err() error {
return k.err
}
type cacheKeyIterator struct {
cache *Cache
size int