refactor: remove dead iterator code (#23887)

* fix: codegen without needing goimports

* refactor: remove dead code
pull/23913/head
Sam Arnold 2022-11-09 20:26:12 -04:00 committed by GitHub
parent 46464f409c
commit 4de89afd37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 22 additions and 1529 deletions

File diff suppressed because it is too large Load Diff

View File

@ -6,214 +6,6 @@ import (
"github.com/influxdata/influxdb/v2/tsdb"
)
{{range .}}
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) merge{{.Name}}() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.merged{{.Name}}Values) == 0 {
return
}
sort.Stable(k.blocks)
dedup := len(k.merged{{.Name}}Values) != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
dedup = k.blocks[i].partiallyRead() ||
k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) ||
len(k.blocks[i].tombstones) > 0
}
}
k.merged = k.combine{{.Name}}(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks {
if dedup {
for len(k.merged{{.Name}}Values) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{})
if err != nil {
k.AppendError(err)
return nil
}
// Remove values we already read
v = {{.Name}}Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = {{.Name}}Values(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max)
}
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunk{{.Name}}(nil)
} else {
var i int
for ; i < len(k.blocks); i++ {
// skip this block if it's values were already read
if k.blocks[i].read() {
continue
}
// If this block is already full, just add it as is
count, err := BlockCount(k.blocks[i].b)
if err != nil {
// accumulate all errors to tsmKeyIterator.err
k.AppendError(err)
continue
}
if count < k.size {
break
}
k.merged = append(k.merged, k.blocks[i])
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
k.merged = append(k.merged, k.blocks[i])
i++
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
k.merged = append(k.merged, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.merged{{.Name}}Values) < k.size {
if k.blocks[i].read() {
i++
continue
}
v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{})
if err != nil {
k.AppendError(err)
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
i++
}
k.blocks = k.blocks[i:]
return k.chunk{{.Name}}(k.merged)
}
}
func (k *tsmKeyIterator) chunk{{.Name}}(dst blocks) blocks {
if len(k.merged{{.Name}}Values) > k.size {
values := k.merged{{.Name}}Values[:k.size]
cb, err := {{.Name}}Values(values).Encode(nil)
if err != nil {
k.AppendError(err)
return nil
}
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.merged{{.Name}}Values = k.merged{{.Name}}Values[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.merged{{.Name}}Values) > 0 {
cb, err := {{.Name}}Values(k.merged{{.Name}}Values).Encode(nil)
if err != nil {
k.AppendError(err)
return nil
}
dst = append(dst, &block{
minTime: k.merged{{.Name}}Values[0].UnixNano(),
maxTime: k.merged{{.Name}}Values[len(k.merged{{.Name}}Values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.merged{{.Name}}Values = k.merged{{.Name}}Values[:0]
}
return dst
}
{{ end }}
{{range .}}
// merge combines the next set of blocks into merged blocks.
func (k *tsmBatchKeyIterator) merge{{.Name}}() {

View File

@ -1266,61 +1266,6 @@ func (t TSMErrors) Error() string {
return strings.Join(e, ", ")
}
// tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces
// keys in sorted order and the values between the keys sorted and deduped. If any of
// the readers have associated tombstone entries, they are returned as part of iteration.
type tsmKeyIterator struct {
// readers is the set of readers it produce a sorted key run with
readers []*TSMReader
// values is the temporary buffers for each key that is returned by a reader
values map[string][]Value
// pos is the current key position within the corresponding readers slice. A value of
// pos[0] = 1, means the reader[0] is currently at key 1 in its ordered index.
pos []int
// TSMError wraps any error we received while iterating values.
errs TSMErrors
// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
// fast is true to prevent overlapping blocks of time for the same key.
// If false, the blocks will be decoded and duplicated (if needed) and
// then chunked into the maximally sized blocks.
fast bool
// size is the maximum number of values to encode in a single block
size int
// key is the current key lowest key across all readers that has not be fully exhausted
// of values.
key []byte
typ byte
iterators []*BlockIterator
blocks blocks
buf []blocks
// mergeValues are decoded blocks that have been combined
mergedFloatValues FloatValues
mergedIntegerValues IntegerValues
mergedUnsignedValues UnsignedValues
mergedBooleanValues BooleanValues
mergedStringValues StringValues
// merged are encoded blocks that have been combined or used as is
// without decode
merged blocks
interrupt chan struct{}
}
func (t *tsmKeyIterator) AppendError(err error) {
t.errs = append(t.errs, err)
}
type block struct {
key []byte
minTime, maxTime int64
@ -1373,242 +1318,6 @@ func (a blocks) Less(i, j int) bool {
func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// NewTSMKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
}
return &tsmKeyIterator{
readers: readers,
values: map[string][]Value{},
pos: make([]int, len(readers)),
size: size,
iterators: iter,
fast: fast,
buf: make([]blocks, len(iter)),
interrupt: interrupt,
}, nil
}
func (k *tsmKeyIterator) hasMergedValues() bool {
return len(k.mergedFloatValues) > 0 ||
len(k.mergedIntegerValues) > 0 ||
len(k.mergedUnsignedValues) > 0 ||
len(k.mergedStringValues) > 0 ||
len(k.mergedBooleanValues) > 0
}
func (k *tsmKeyIterator) EstimatedIndexSize() int {
var size uint32
for _, r := range k.readers {
size += r.IndexSize()
}
return int(size) / len(k.readers)
}
// Next returns true if there are any values remaining in the iterator.
func (k *tsmKeyIterator) Next() bool {
RETRY:
// Any merged blocks pending?
if len(k.merged) > 0 {
k.merged = k.merged[1:]
if len(k.merged) > 0 {
return true
}
}
// Any merged values pending?
if k.hasMergedValues() {
k.merge()
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
// If we still have blocks from the last read, merge them
if len(k.blocks) > 0 {
k.merge()
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
// Read the next block from each TSM iterator
for i, v := range k.buf {
if len(v) == 0 {
iter := k.iterators[i]
if iter.Next() {
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.AppendError(err)
}
// This block may have ranges of time removed from it that would
// reduce the block min and max time.
tombstones := iter.r.TombstoneRange(key)
var blk *block
if cap(k.buf[i]) > len(k.buf[i]) {
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
blk = k.buf[i][len(k.buf[i])-1]
if blk == nil {
blk = &block{}
k.buf[i][len(k.buf[i])-1] = blk
}
} else {
blk = &block{}
k.buf[i] = append(k.buf[i], blk)
}
blk.minTime = minTime
blk.maxTime = maxTime
blk.key = key
blk.typ = typ
blk.b = b
blk.tombstones = tombstones
blk.readMin = math.MaxInt64
blk.readMax = math.MinInt64
blockKey := key
for bytes.Equal(iter.PeekNext(), blockKey) {
iter.Next()
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.AppendError(err)
}
tombstones := iter.r.TombstoneRange(key)
var blk *block
if cap(k.buf[i]) > len(k.buf[i]) {
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
blk = k.buf[i][len(k.buf[i])-1]
if blk == nil {
blk = &block{}
k.buf[i][len(k.buf[i])-1] = blk
}
} else {
blk = &block{}
k.buf[i] = append(k.buf[i], blk)
}
blk.minTime = minTime
blk.maxTime = maxTime
blk.key = key
blk.typ = typ
blk.b = b
blk.tombstones = tombstones
blk.readMin = math.MaxInt64
blk.readMax = math.MinInt64
}
}
if iter.Err() != nil {
k.AppendError(iter.Err())
}
}
}
// Each reader could have a different key that it's currently at, need to find
// the next smallest one to keep the sort ordering.
var minKey []byte
var minType byte
for _, b := range k.buf {
// block could be nil if the iterator has been exhausted for that file
if len(b) == 0 {
continue
}
if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {
minKey = b[0].key
minType = b[0].typ
}
}
k.key = minKey
k.typ = minType
// Now we need to find all blocks that match the min key so we can combine and dedupe
// the blocks if necessary
for i, b := range k.buf {
if len(b) == 0 {
continue
}
if bytes.Equal(b[0].key, k.key) {
k.blocks = append(k.blocks, b...)
k.buf[i] = k.buf[i][:0]
}
}
if len(k.blocks) == 0 {
return false
}
k.merge()
// After merging all the values for this key, we might not have any. (e.g. they were all deleted
// through many tombstones). In this case, move on to the next key instead of ending iteration.
if len(k.merged) == 0 {
goto RETRY
}
return len(k.merged) > 0
}
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) merge() {
switch k.typ {
case BlockFloat64:
k.mergeFloat()
case BlockInteger:
k.mergeInteger()
case BlockUnsigned:
k.mergeUnsigned()
case BlockBoolean:
k.mergeBoolean()
case BlockString:
k.mergeString()
default:
k.AppendError(fmt.Errorf("unknown block type: %v", k.typ))
}
}
func (k *tsmKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
// See if compactions were disabled while we were running.
select {
case <-k.interrupt:
return nil, 0, 0, nil, errCompactionAborted{}
default:
}
if len(k.merged) == 0 {
return nil, 0, 0, nil, k.Err()
}
block := k.merged[0]
return block.key, block.minTime, block.maxTime, block.b, k.Err()
}
func (k *tsmKeyIterator) Close() error {
k.values = nil
k.pos = nil
k.iterators = nil
for _, r := range k.readers {
if err := r.Close(); err != nil {
return err
}
}
return nil
}
// Error returns any errors encountered during iteration.
func (k *tsmKeyIterator) Err() error {
if len(k.errs) == 0 {
return nil
}
return k.errs
}
// tsmBatchKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces
// keys in sorted order and the values between the keys sorted and deduped. If any of
// the readers have associated tombstone entries, they are returned as part of iteration.

View File

@ -15,7 +15,7 @@ import (
"go.uber.org/zap"
)
// Tests compacting a Cache snapshot into a single TSM file
// Tests compacting a Cache snapshot into a single TSM file
func TestCompactor_Snapshot(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
@ -1098,7 +1098,7 @@ func TestTSMKeyIterator_Single(t *testing.T) {
r := MustTSMReader(dir, 1, writes)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r)
iter, err := newTSMKeyIterator(1, false, nil, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -1134,6 +1134,14 @@ func TestTSMKeyIterator_Single(t *testing.T) {
}
}
func newTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*tsm1.TSMReader) (tsm1.KeyIterator, error) {
files := []string{}
for _, r := range readers {
files = append(files, r.Path())
}
return tsm1.NewTSMBatchKeyIterator(size, fast, 0, interrupt, files, readers...)
}
// Tests that duplicate point values are merged. There is only one case
// where this could happen and that is when a compaction completed and we replace
// the old TSM file with a new one and we crash just before deleting the old file.
@ -1158,7 +1166,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) {
r2 := MustTSMReader(dir, 2, writes2)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2)
iter, err := newTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -1219,7 +1227,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
r2 := MustTSMReader(dir, 2, points2)
r2.Delete([][]byte{[]byte("cpu,host=A#!~#count")})
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2)
iter, err := newTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -1300,7 +1308,7 @@ func TestTSMKeyIterator_SingleDeletes(t *testing.T) {
t.Fatal(e)
}
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1)
iter, err := newTSMKeyIterator(1, false, nil, r1)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -1357,7 +1365,7 @@ func TestTSMKeyIterator_Abort(t *testing.T) {
r := MustTSMReader(dir, 1, writes)
intC := make(chan struct{})
iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r)
iter, err := newTSMKeyIterator(1, false, intC, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}

View File

@ -44,8 +44,8 @@ import (
// to support adding templated data from the command line.
// This can probably be worked into the upstream tmpl
// but isn't at the moment.
//go:generate go run ../../../tools/tmpl -i -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store.gen.go
//go:generate go run ../../../tools/tmpl -i -d isArray=y -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store_array.gen.go
//go:generate go run ../../../tools/tmpl -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store.gen.go
//go:generate go run ../../../tools/tmpl -d isArray=y -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store_array.gen.go
//go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl
//go:generate tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl
//go:generate tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl

View File

@ -1,10 +1,14 @@
package tsm1
{{$isArray := .D.isArray}}
{{$isNotArray := not $isArray}}
{{if $isArray -}}
import (
"github.com/influxdata/influxdb/v2/tsdb"
)
{{$isArray := .D.isArray}}
{{$isNotArray := not $isArray}}
{{end}}
{{range .In}}
{{if $isArray -}}
// Read{{.Name}}ArrayBlock reads the next block as a set of {{.name}} values.