diff --git a/tsdb/engine/tsm1/compact.gen.go b/tsdb/engine/tsm1/compact.gen.go new file mode 100644 index 0000000000..9eaec346c6 --- /dev/null +++ b/tsdb/engine/tsm1/compact.gen.go @@ -0,0 +1,867 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: compact.gen.go.tmpl + +package tsm1 + +import ( + "runtime" +) + +// merge combines the next set of blocks into merged blocks. +func (k *tsmKeyIterator) mergeFloat() { + // No blocks left, or pending merged values, we're done + if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedFloatValues) == 0 { + return + } + + dedup := len(k.mergedFloatValues) != 0 + if len(k.blocks) > 0 && !dedup { + // If we have more than one block or any partially tombstoned blocks, we many need to dedup + dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() + + // Quickly scan each block to see if any overlap with the prior block, if they overlap then + // we need to dedup as there may be duplicate points now + for i := 1; !dedup && i < len(k.blocks); i++ { + if k.blocks[i].partiallyRead() { + dedup = true + break + } + + if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 { + dedup = true + break + } + } + + } + + k.merged = k.combineFloat(dedup) +} + +// combine returns a new set of blocks using the current blocks in the buffers. If dedup +// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, +// only blocks that are smaller than the chunk size will be decoded and combined. +func (k *tsmKeyIterator) combineFloat(dedup bool) blocks { + if dedup { + for len(k.mergedFloatValues) < k.size && len(k.blocks) > 0 { + for len(k.blocks) > 0 && k.blocks[0].read() { + k.blocks = k.blocks[1:] + } + + if len(k.blocks) == 0 { + break + } + first := k.blocks[0] + minTime := first.minTime + maxTime := first.maxTime + + // Adjust the min time to the start of any overlapping blocks. + for i := 0; i < len(k.blocks); i++ { + if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { + if k.blocks[i].minTime < minTime { + minTime = k.blocks[i].minTime + } + if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { + maxTime = k.blocks[i].maxTime + } + } + } + + // We have some overlapping blocks so decode all, append in order and then dedup + for i := 0; i < len(k.blocks); i++ { + if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { + continue + } + + v, err := DecodeFloatBlock(k.blocks[i].b, &[]FloatValue{}) + if err != nil { + k.err = err + return nil + } + + // Remove values we already read + v = FloatValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) + + // Filter out only the values for overlapping block + v = FloatValues(v).Include(minTime, maxTime) + if len(v) > 0 { + // Record that we read a subset of the block + k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = FloatValues(v).Exclude(ts.Min, ts.Max) + } + + k.mergedFloatValues = k.mergedFloatValues.Merge(v) + + // Allow other goroutines to run + runtime.Gosched() + + } + } + + // Since we combined multiple blocks, we could have more values than we should put into + // a single block. We need to chunk them up into groups and re-encode them. + return k.chunkFloat(nil) + } else { + var chunked blocks + var i int + + for i < len(k.blocks) { + + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + // If we this block is already full, just add it as is + if BlockCount(k.blocks[i].b) >= k.size { + chunked = append(chunked, k.blocks[i]) + } else { + break + } + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + if k.fast { + for i < len(k.blocks) { + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + + chunked = append(chunked, k.blocks[i]) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + } + + // If we only have 1 blocks left, just append it as is and avoid decoding/recoding + if i == len(k.blocks)-1 { + if !k.blocks[i].read() { + chunked = append(chunked, k.blocks[i]) + } + i++ + } + + // The remaining blocks can be combined and we know that they do not overlap and + // so we can just append each, sort and re-encode. + for i < len(k.blocks) && len(k.mergedFloatValues) < k.size { + if k.blocks[i].read() { + i++ + continue + } + + v, err := DecodeFloatBlock(k.blocks[i].b, &[]FloatValue{}) + if err != nil { + k.err = err + return nil + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = FloatValues(v).Exclude(ts.Min, ts.Max) + } + + k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) + + k.mergedFloatValues = k.mergedFloatValues.Merge(v) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + k.blocks = k.blocks[i:] + + return k.chunkFloat(chunked) + } +} + +func (k *tsmKeyIterator) chunkFloat(dst blocks) blocks { + if len(k.mergedFloatValues) > k.size { + values := k.mergedFloatValues[:k.size] + cb, err := FloatValues(values).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: values[0].UnixNano(), + maxTime: values[len(values)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.mergedFloatValues = k.mergedFloatValues[k.size:] + return dst + } + + // Re-encode the remaining values into the last block + if len(k.mergedFloatValues) > 0 { + cb, err := FloatValues(k.mergedFloatValues).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: k.mergedFloatValues[0].UnixNano(), + maxTime: k.mergedFloatValues[len(k.mergedFloatValues)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.mergedFloatValues = k.mergedFloatValues[:0] + } + return dst +} + +// merge combines the next set of blocks into merged blocks. +func (k *tsmKeyIterator) mergeInteger() { + // No blocks left, or pending merged values, we're done + if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedIntegerValues) == 0 { + return + } + + dedup := len(k.mergedIntegerValues) != 0 + if len(k.blocks) > 0 && !dedup { + // If we have more than one block or any partially tombstoned blocks, we many need to dedup + dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() + + // Quickly scan each block to see if any overlap with the prior block, if they overlap then + // we need to dedup as there may be duplicate points now + for i := 1; !dedup && i < len(k.blocks); i++ { + if k.blocks[i].partiallyRead() { + dedup = true + break + } + + if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 { + dedup = true + break + } + } + + } + + k.merged = k.combineInteger(dedup) +} + +// combine returns a new set of blocks using the current blocks in the buffers. If dedup +// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, +// only blocks that are smaller than the chunk size will be decoded and combined. +func (k *tsmKeyIterator) combineInteger(dedup bool) blocks { + if dedup { + for len(k.mergedIntegerValues) < k.size && len(k.blocks) > 0 { + for len(k.blocks) > 0 && k.blocks[0].read() { + k.blocks = k.blocks[1:] + } + + if len(k.blocks) == 0 { + break + } + first := k.blocks[0] + minTime := first.minTime + maxTime := first.maxTime + + // Adjust the min time to the start of any overlapping blocks. + for i := 0; i < len(k.blocks); i++ { + if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { + if k.blocks[i].minTime < minTime { + minTime = k.blocks[i].minTime + } + if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { + maxTime = k.blocks[i].maxTime + } + } + } + + // We have some overlapping blocks so decode all, append in order and then dedup + for i := 0; i < len(k.blocks); i++ { + if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { + continue + } + + v, err := DecodeIntegerBlock(k.blocks[i].b, &[]IntegerValue{}) + if err != nil { + k.err = err + return nil + } + + // Remove values we already read + v = IntegerValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) + + // Filter out only the values for overlapping block + v = IntegerValues(v).Include(minTime, maxTime) + if len(v) > 0 { + // Record that we read a subset of the block + k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = IntegerValues(v).Exclude(ts.Min, ts.Max) + } + + k.mergedIntegerValues = k.mergedIntegerValues.Merge(v) + + // Allow other goroutines to run + runtime.Gosched() + + } + } + + // Since we combined multiple blocks, we could have more values than we should put into + // a single block. We need to chunk them up into groups and re-encode them. + return k.chunkInteger(nil) + } else { + var chunked blocks + var i int + + for i < len(k.blocks) { + + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + // If we this block is already full, just add it as is + if BlockCount(k.blocks[i].b) >= k.size { + chunked = append(chunked, k.blocks[i]) + } else { + break + } + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + if k.fast { + for i < len(k.blocks) { + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + + chunked = append(chunked, k.blocks[i]) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + } + + // If we only have 1 blocks left, just append it as is and avoid decoding/recoding + if i == len(k.blocks)-1 { + if !k.blocks[i].read() { + chunked = append(chunked, k.blocks[i]) + } + i++ + } + + // The remaining blocks can be combined and we know that they do not overlap and + // so we can just append each, sort and re-encode. + for i < len(k.blocks) && len(k.mergedIntegerValues) < k.size { + if k.blocks[i].read() { + i++ + continue + } + + v, err := DecodeIntegerBlock(k.blocks[i].b, &[]IntegerValue{}) + if err != nil { + k.err = err + return nil + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = IntegerValues(v).Exclude(ts.Min, ts.Max) + } + + k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) + + k.mergedIntegerValues = k.mergedIntegerValues.Merge(v) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + k.blocks = k.blocks[i:] + + return k.chunkInteger(chunked) + } +} + +func (k *tsmKeyIterator) chunkInteger(dst blocks) blocks { + if len(k.mergedIntegerValues) > k.size { + values := k.mergedIntegerValues[:k.size] + cb, err := IntegerValues(values).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: values[0].UnixNano(), + maxTime: values[len(values)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.mergedIntegerValues = k.mergedIntegerValues[k.size:] + return dst + } + + // Re-encode the remaining values into the last block + if len(k.mergedIntegerValues) > 0 { + cb, err := IntegerValues(k.mergedIntegerValues).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: k.mergedIntegerValues[0].UnixNano(), + maxTime: k.mergedIntegerValues[len(k.mergedIntegerValues)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.mergedIntegerValues = k.mergedIntegerValues[:0] + } + return dst +} + +// merge combines the next set of blocks into merged blocks. +func (k *tsmKeyIterator) mergeString() { + // No blocks left, or pending merged values, we're done + if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedStringValues) == 0 { + return + } + + dedup := len(k.mergedStringValues) != 0 + if len(k.blocks) > 0 && !dedup { + // If we have more than one block or any partially tombstoned blocks, we many need to dedup + dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() + + // Quickly scan each block to see if any overlap with the prior block, if they overlap then + // we need to dedup as there may be duplicate points now + for i := 1; !dedup && i < len(k.blocks); i++ { + if k.blocks[i].partiallyRead() { + dedup = true + break + } + + if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 { + dedup = true + break + } + } + + } + + k.merged = k.combineString(dedup) +} + +// combine returns a new set of blocks using the current blocks in the buffers. If dedup +// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, +// only blocks that are smaller than the chunk size will be decoded and combined. +func (k *tsmKeyIterator) combineString(dedup bool) blocks { + if dedup { + for len(k.mergedStringValues) < k.size && len(k.blocks) > 0 { + for len(k.blocks) > 0 && k.blocks[0].read() { + k.blocks = k.blocks[1:] + } + + if len(k.blocks) == 0 { + break + } + first := k.blocks[0] + minTime := first.minTime + maxTime := first.maxTime + + // Adjust the min time to the start of any overlapping blocks. + for i := 0; i < len(k.blocks); i++ { + if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { + if k.blocks[i].minTime < minTime { + minTime = k.blocks[i].minTime + } + if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { + maxTime = k.blocks[i].maxTime + } + } + } + + // We have some overlapping blocks so decode all, append in order and then dedup + for i := 0; i < len(k.blocks); i++ { + if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { + continue + } + + v, err := DecodeStringBlock(k.blocks[i].b, &[]StringValue{}) + if err != nil { + k.err = err + return nil + } + + // Remove values we already read + v = StringValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) + + // Filter out only the values for overlapping block + v = StringValues(v).Include(minTime, maxTime) + if len(v) > 0 { + // Record that we read a subset of the block + k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = StringValues(v).Exclude(ts.Min, ts.Max) + } + + k.mergedStringValues = k.mergedStringValues.Merge(v) + + // Allow other goroutines to run + runtime.Gosched() + + } + } + + // Since we combined multiple blocks, we could have more values than we should put into + // a single block. We need to chunk them up into groups and re-encode them. + return k.chunkString(nil) + } else { + var chunked blocks + var i int + + for i < len(k.blocks) { + + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + // If we this block is already full, just add it as is + if BlockCount(k.blocks[i].b) >= k.size { + chunked = append(chunked, k.blocks[i]) + } else { + break + } + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + if k.fast { + for i < len(k.blocks) { + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + + chunked = append(chunked, k.blocks[i]) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + } + + // If we only have 1 blocks left, just append it as is and avoid decoding/recoding + if i == len(k.blocks)-1 { + if !k.blocks[i].read() { + chunked = append(chunked, k.blocks[i]) + } + i++ + } + + // The remaining blocks can be combined and we know that they do not overlap and + // so we can just append each, sort and re-encode. + for i < len(k.blocks) && len(k.mergedStringValues) < k.size { + if k.blocks[i].read() { + i++ + continue + } + + v, err := DecodeStringBlock(k.blocks[i].b, &[]StringValue{}) + if err != nil { + k.err = err + return nil + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = StringValues(v).Exclude(ts.Min, ts.Max) + } + + k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) + + k.mergedStringValues = k.mergedStringValues.Merge(v) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + k.blocks = k.blocks[i:] + + return k.chunkString(chunked) + } +} + +func (k *tsmKeyIterator) chunkString(dst blocks) blocks { + if len(k.mergedStringValues) > k.size { + values := k.mergedStringValues[:k.size] + cb, err := StringValues(values).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: values[0].UnixNano(), + maxTime: values[len(values)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.mergedStringValues = k.mergedStringValues[k.size:] + return dst + } + + // Re-encode the remaining values into the last block + if len(k.mergedStringValues) > 0 { + cb, err := StringValues(k.mergedStringValues).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: k.mergedStringValues[0].UnixNano(), + maxTime: k.mergedStringValues[len(k.mergedStringValues)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.mergedStringValues = k.mergedStringValues[:0] + } + return dst +} + +// merge combines the next set of blocks into merged blocks. +func (k *tsmKeyIterator) mergeBoolean() { + // No blocks left, or pending merged values, we're done + if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedBooleanValues) == 0 { + return + } + + dedup := len(k.mergedBooleanValues) != 0 + if len(k.blocks) > 0 && !dedup { + // If we have more than one block or any partially tombstoned blocks, we many need to dedup + dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() + + // Quickly scan each block to see if any overlap with the prior block, if they overlap then + // we need to dedup as there may be duplicate points now + for i := 1; !dedup && i < len(k.blocks); i++ { + if k.blocks[i].partiallyRead() { + dedup = true + break + } + + if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 { + dedup = true + break + } + } + + } + + k.merged = k.combineBoolean(dedup) +} + +// combine returns a new set of blocks using the current blocks in the buffers. If dedup +// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, +// only blocks that are smaller than the chunk size will be decoded and combined. +func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks { + if dedup { + for len(k.mergedBooleanValues) < k.size && len(k.blocks) > 0 { + for len(k.blocks) > 0 && k.blocks[0].read() { + k.blocks = k.blocks[1:] + } + + if len(k.blocks) == 0 { + break + } + first := k.blocks[0] + minTime := first.minTime + maxTime := first.maxTime + + // Adjust the min time to the start of any overlapping blocks. + for i := 0; i < len(k.blocks); i++ { + if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { + if k.blocks[i].minTime < minTime { + minTime = k.blocks[i].minTime + } + if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { + maxTime = k.blocks[i].maxTime + } + } + } + + // We have some overlapping blocks so decode all, append in order and then dedup + for i := 0; i < len(k.blocks); i++ { + if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { + continue + } + + v, err := DecodeBooleanBlock(k.blocks[i].b, &[]BooleanValue{}) + if err != nil { + k.err = err + return nil + } + + // Remove values we already read + v = BooleanValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) + + // Filter out only the values for overlapping block + v = BooleanValues(v).Include(minTime, maxTime) + if len(v) > 0 { + // Record that we read a subset of the block + k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = BooleanValues(v).Exclude(ts.Min, ts.Max) + } + + k.mergedBooleanValues = k.mergedBooleanValues.Merge(v) + + // Allow other goroutines to run + runtime.Gosched() + + } + } + + // Since we combined multiple blocks, we could have more values than we should put into + // a single block. We need to chunk them up into groups and re-encode them. + return k.chunkBoolean(nil) + } else { + var chunked blocks + var i int + + for i < len(k.blocks) { + + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + // If we this block is already full, just add it as is + if BlockCount(k.blocks[i].b) >= k.size { + chunked = append(chunked, k.blocks[i]) + } else { + break + } + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + if k.fast { + for i < len(k.blocks) { + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + + chunked = append(chunked, k.blocks[i]) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + } + + // If we only have 1 blocks left, just append it as is and avoid decoding/recoding + if i == len(k.blocks)-1 { + if !k.blocks[i].read() { + chunked = append(chunked, k.blocks[i]) + } + i++ + } + + // The remaining blocks can be combined and we know that they do not overlap and + // so we can just append each, sort and re-encode. + for i < len(k.blocks) && len(k.mergedBooleanValues) < k.size { + if k.blocks[i].read() { + i++ + continue + } + + v, err := DecodeBooleanBlock(k.blocks[i].b, &[]BooleanValue{}) + if err != nil { + k.err = err + return nil + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = BooleanValues(v).Exclude(ts.Min, ts.Max) + } + + k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) + + k.mergedBooleanValues = k.mergedBooleanValues.Merge(v) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + k.blocks = k.blocks[i:] + + return k.chunkBoolean(chunked) + } +} + +func (k *tsmKeyIterator) chunkBoolean(dst blocks) blocks { + if len(k.mergedBooleanValues) > k.size { + values := k.mergedBooleanValues[:k.size] + cb, err := BooleanValues(values).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: values[0].UnixNano(), + maxTime: values[len(values)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.mergedBooleanValues = k.mergedBooleanValues[k.size:] + return dst + } + + // Re-encode the remaining values into the last block + if len(k.mergedBooleanValues) > 0 { + cb, err := BooleanValues(k.mergedBooleanValues).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: k.mergedBooleanValues[0].UnixNano(), + maxTime: k.mergedBooleanValues[len(k.mergedBooleanValues)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.mergedBooleanValues = k.mergedBooleanValues[:0] + } + return dst +} diff --git a/tsdb/engine/tsm1/compact.gen.go.tmpl b/tsdb/engine/tsm1/compact.gen.go.tmpl new file mode 100644 index 0000000000..1beab547f9 --- /dev/null +++ b/tsdb/engine/tsm1/compact.gen.go.tmpl @@ -0,0 +1,223 @@ +package tsm1 + +import ( + "runtime" +) + +{{range .}} + +// merge combines the next set of blocks into merged blocks. +func (k *tsmKeyIterator) merge{{.Name}}() { + // No blocks left, or pending merged values, we're done + if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.merged{{.Name}}Values) == 0 { + return + } + + dedup := len(k.merged{{.Name}}Values) != 0 + if len(k.blocks) > 0 && !dedup { + // If we have more than one block or any partially tombstoned blocks, we many need to dedup + dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() + + // Quickly scan each block to see if any overlap with the prior block, if they overlap then + // we need to dedup as there may be duplicate points now + for i := 1; !dedup && i < len(k.blocks); i++ { + if k.blocks[i].partiallyRead() { + dedup = true + break + } + + if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 { + dedup = true + break + } + } + + } + + k.merged = k.combine{{.Name}}(dedup) +} + +// combine returns a new set of blocks using the current blocks in the buffers. If dedup +// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, +// only blocks that are smaller than the chunk size will be decoded and combined. +func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks { + if dedup { + for len(k.merged{{.Name}}Values) < k.size && len(k.blocks) > 0 { + for len(k.blocks) > 0 && k.blocks[0].read() { + k.blocks = k.blocks[1:] + } + + if len(k.blocks) == 0 { + break + } + first := k.blocks[0] + minTime := first.minTime + maxTime := first.maxTime + + // Adjust the min time to the start of any overlapping blocks. + for i := 0; i < len(k.blocks); i++ { + if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { + if k.blocks[i].minTime < minTime { + minTime = k.blocks[i].minTime + } + if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { + maxTime = k.blocks[i].maxTime + } + } + } + + // We have some overlapping blocks so decode all, append in order and then dedup + for i := 0; i < len(k.blocks); i++ { + if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { + continue + } + + v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{}) + if err != nil { + k.err = err + return nil + } + + // Remove values we already read + v = {{.Name}}Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) + + // Filter out only the values for overlapping block + v = {{.Name}}Values(v).Include(minTime, maxTime) + if len(v) > 0 { + // Record that we read a subset of the block + k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max) + } + + k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v) + + // Allow other goroutines to run + runtime.Gosched() + + } + } + + // Since we combined multiple blocks, we could have more values than we should put into + // a single block. We need to chunk them up into groups and re-encode them. + return k.chunk{{.Name}}(nil) + } else { + var chunked blocks + var i int + + for i < len(k.blocks) { + + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + // If we this block is already full, just add it as is + if BlockCount(k.blocks[i].b) >= k.size { + chunked = append(chunked, k.blocks[i]) + } else { + break + } + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + if k.fast { + for i < len(k.blocks) { + // skip this block if it's values were already read + if k.blocks[i].read() { + i++ + continue + } + + chunked = append(chunked, k.blocks[i]) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + } + + // If we only have 1 blocks left, just append it as is and avoid decoding/recoding + if i == len(k.blocks)-1 { + if !k.blocks[i].read() { + chunked = append(chunked, k.blocks[i]) + } + i++ + } + + // The remaining blocks can be combined and we know that they do not overlap and + // so we can just append each, sort and re-encode. + for i < len(k.blocks) && len(k.merged{{.Name}}Values) < k.size { + if k.blocks[i].read() { + i++ + continue + } + + v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{}) + if err != nil { + k.err = err + return nil + } + + // Apply each tombstone to the block + for _, ts := range k.blocks[i].tombstones { + v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max) + } + + k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) + + k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v) + i++ + // Allow other goroutines to run + runtime.Gosched() + } + + k.blocks = k.blocks[i:] + + return k.chunk{{.Name}}(chunked) + } +} + +func (k *tsmKeyIterator) chunk{{.Name}}(dst blocks) blocks { + if len(k.merged{{.Name}}Values) > k.size { + values := k.merged{{.Name}}Values[:k.size] + cb, err := {{.Name}}Values(values).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: values[0].UnixNano(), + maxTime: values[len(values)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.merged{{.Name}}Values = k.merged{{.Name}}Values[k.size:] + return dst + } + + // Re-encode the remaining values into the last block + if len(k.merged{{.Name}}Values) > 0 { + cb, err := {{.Name}}Values(k.merged{{.Name}}Values).Encode(nil) + if err != nil { + k.err = err + return nil + } + + dst = append(dst, &block{ + minTime: k.merged{{.Name}}Values[0].UnixNano(), + maxTime: k.merged{{.Name}}Values[len(k.merged{{.Name}}Values)-1].UnixNano(), + key: k.key, + b: cb, + }) + k.merged{{.Name}}Values = k.merged{{.Name}}Values[:0] + } + return dst +} + +{{ end }} diff --git a/tsdb/engine/tsm1/compact.gen.go.tmpldata b/tsdb/engine/tsm1/compact.gen.go.tmpldata new file mode 100644 index 0000000000..3f3fb8f18e --- /dev/null +++ b/tsdb/engine/tsm1/compact.gen.go.tmpldata @@ -0,0 +1,18 @@ +[ + { + "Name":"Float", + "name":"float" + }, + { + "Name":"Integer", + "name":"integer" + }, + { + "Name":"String", + "name":"string" + }, + { + "Name":"Boolean", + "name":"boolean" + } +] diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index bb3e63cde0..e78b34c08a 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -887,6 +887,7 @@ type tsmKeyIterator struct { // key is the current key lowest key across all readers that has not be fully exhausted // of values. key string + typ byte iterators []*BlockIterator blocks blocks @@ -894,7 +895,10 @@ type tsmKeyIterator struct { buf []blocks // mergeValues are decoded blocks that have been combined - mergedValues Values + mergedFloatValues FloatValues + mergedIntegerValues IntegerValues + mergedBooleanValues BooleanValues + mergedStringValues StringValues // merged are encoded blocks that have been combined or used as is // without decode @@ -904,6 +908,7 @@ type tsmKeyIterator struct { type block struct { key string minTime, maxTime int64 + typ byte b []byte tombstones []TimeRange @@ -966,6 +971,13 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, }, nil } +func (k *tsmKeyIterator) hasMergedValues() bool { + return len(k.mergedFloatValues) > 0 || + len(k.mergedIntegerValues) > 0 || + len(k.mergedStringValues) > 0 || + len(k.mergedBooleanValues) > 0 +} + // Next returns true if there are any values remaining in the iterator. func (k *tsmKeyIterator) Next() bool { // Any merged blocks pending? @@ -977,9 +989,9 @@ func (k *tsmKeyIterator) Next() bool { } // Any merged values pending? - if len(k.mergedValues) > 0 { + if k.hasMergedValues() { k.merge() - if len(k.merged) > 0 || len(k.mergedValues) > 0 { + if len(k.merged) > 0 || k.hasMergedValues() { return true } } @@ -987,7 +999,7 @@ func (k *tsmKeyIterator) Next() bool { // If we still have blocks from the last read, merge them if len(k.blocks) > 0 { k.merge() - if len(k.merged) > 0 || len(k.mergedValues) > 0 { + if len(k.merged) > 0 || k.hasMergedValues() { return true } } @@ -997,7 +1009,7 @@ func (k *tsmKeyIterator) Next() bool { if v == nil { iter := k.iterators[i] if iter.Next() { - key, minTime, maxTime, _, _, b, err := iter.Read() + key, minTime, maxTime, typ, _, b, err := iter.Read() if err != nil { k.err = err } @@ -1009,6 +1021,7 @@ func (k *tsmKeyIterator) Next() bool { minTime: minTime, maxTime: maxTime, key: key, + typ: typ, b: b, tombstones: tombstones, readMin: math.MaxInt64, @@ -1018,7 +1031,7 @@ func (k *tsmKeyIterator) Next() bool { blockKey := key for iter.PeekNext() == blockKey { iter.Next() - key, minTime, maxTime, _, _, b, err := iter.Read() + key, minTime, maxTime, typ, _, b, err := iter.Read() if err != nil { k.err = err } @@ -1029,6 +1042,7 @@ func (k *tsmKeyIterator) Next() bool { minTime: minTime, maxTime: maxTime, key: key, + typ: typ, b: b, tombstones: tombstones, readMin: math.MaxInt64, @@ -1042,6 +1056,7 @@ func (k *tsmKeyIterator) Next() bool { // Each reader could have a different key that it's currently at, need to find // the next smallest one to keep the sort ordering. var minKey string + var minType byte for _, b := range k.buf { // block could be nil if the iterator has been exhausted for that file if len(b) == 0 { @@ -1049,9 +1064,11 @@ func (k *tsmKeyIterator) Next() bool { } if minKey == "" || b[0].key < minKey { minKey = b[0].key + minType = b[0].typ } } k.key = minKey + k.typ = minType // Now we need to find all blocks that match the min key so we can combine and dedupe // the blocks if necessary @@ -1076,203 +1093,18 @@ func (k *tsmKeyIterator) Next() bool { // merge combines the next set of blocks into merged blocks. func (k *tsmKeyIterator) merge() { - // No blocks left, or pending merged values, we're done - if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedValues) == 0 { - return + switch k.typ { + case BlockFloat64: + k.mergeFloat() + case BlockInteger: + k.mergeInteger() + case BlockBoolean: + k.mergeBoolean() + case BlockString: + k.mergeString() + default: + k.err = fmt.Errorf("unknown block type: %v", k.typ) } - - dedup := len(k.mergedValues) > 0 - if len(k.blocks) > 0 && !dedup { - // If we have more than one block or any partially tombstoned blocks, we many need to dedup - dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() - - // Quickly scan each block to see if any overlap with the prior block, if they overlap then - // we need to dedup as there may be duplicate points now - for i := 1; !dedup && i < len(k.blocks); i++ { - if k.blocks[i].partiallyRead() { - dedup = true - break - } - - if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 { - dedup = true - break - } - } - - } - - k.merged = k.combine(dedup) -} - -// combine returns a new set of blocks using the current blocks in the buffers. If dedup -// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, -// only blocks that are smaller than the chunk size will be decoded and combined. -func (k *tsmKeyIterator) combine(dedup bool) blocks { - if dedup { - for len(k.mergedValues) < k.size && len(k.blocks) > 0 { - for len(k.blocks) > 0 && k.blocks[0].read() { - k.blocks = k.blocks[1:] - } - - if len(k.blocks) == 0 { - break - } - first := k.blocks[0] - minTime := first.minTime - maxTime := first.maxTime - - // Adjust the min time to the start of any overlapping blocks. - for i := 0; i < len(k.blocks); i++ { - if k.blocks[i].overlapsTimeRange(minTime, maxTime) { - if k.blocks[i].minTime < minTime { - minTime = k.blocks[i].minTime - } - } - } - - // We have some overlapping blocks so decode all, append in order and then dedup - for i := 0; i < len(k.blocks); i++ { - if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { - continue - } - - v, err := DecodeBlock(k.blocks[i].b, nil) - if err != nil { - k.err = err - return nil - } - - // Remove values we already read - v = Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) - - // Filter out only the values for overlapping block - v = Values(v).Include(minTime, maxTime) - if len(v) > 0 { - // Record that we read a subset of the block - k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = Values(v).Exclude(ts.Min, ts.Max) - } - - k.mergedValues = k.mergedValues.Merge(v) - } - k.blocks = k.blocks[1:] - } - - // Since we combined multiple blocks, we could have more values than we should put into - // a single block. We need to chunk them up into groups and re-encode them. - return k.chunk(nil) - } else { - var chunked blocks - var i int - - for i < len(k.blocks) { - // skip this block if it's values were already read - if k.blocks[i].read() { - i++ - continue - } - // If we this block is already full, just add it as is - if BlockCount(k.blocks[i].b) >= k.size { - chunked = append(chunked, k.blocks[i]) - } else { - break - } - i++ - } - - if k.fast { - for i < len(k.blocks) { - // skip this block if it's values were already read - if k.blocks[i].read() { - i++ - continue - } - - chunked = append(chunked, k.blocks[i]) - i++ - } - } - - // If we only have 1 blocks left, just append it as is and avoid decoding/recoding - if i == len(k.blocks)-1 { - if !k.blocks[i].read() { - chunked = append(chunked, k.blocks[i]) - } - i++ - } - - // The remaining blocks can be combined and we know that they do not overlap and - // so we can just append each, sort and re-encode. - for i < len(k.blocks) && len(k.mergedValues) < k.size { - if k.blocks[i].read() { - i++ - continue - } - - v, err := DecodeBlock(k.blocks[i].b, nil) - if err != nil { - k.err = err - return nil - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = Values(v).Exclude(ts.Min, ts.Max) - } - - k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) - - k.mergedValues = k.mergedValues.Merge(v) - i++ - } - - k.blocks = k.blocks[i:] - - return k.chunk(chunked) - } -} - -func (k *tsmKeyIterator) chunk(dst blocks) blocks { - if len(k.mergedValues) > k.size { - values := k.mergedValues[:k.size] - cb, err := Values(values).Encode(nil) - if err != nil { - k.err = err - return nil - } - - dst = append(dst, &block{ - minTime: values[0].UnixNano(), - maxTime: values[len(values)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedValues = k.mergedValues[k.size:] - return dst - } - - // Re-encode the remaining values into the last block - if len(k.mergedValues) > 0 { - cb, err := Values(k.mergedValues).Encode(nil) - if err != nil { - k.err = err - return nil - } - - dst = append(dst, &block{ - minTime: k.mergedValues[0].UnixNano(), - maxTime: k.mergedValues[len(k.mergedValues)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedValues = k.mergedValues[:0] - } - return dst } func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) { diff --git a/tsdb/engine/tsm1/encoding.gen.go b/tsdb/engine/tsm1/encoding.gen.go index dbb42c1bd1..63261c6392 100644 --- a/tsdb/engine/tsm1/encoding.gen.go +++ b/tsdb/engine/tsm1/encoding.gen.go @@ -370,6 +370,10 @@ func (a FloatValues) Merge(b FloatValues) FloatValues { return a } +func (a FloatValues) Encode(buf []byte) ([]byte, error) { + return encodeFloatValuesBlock(buf, a) +} + // Sort methods func (a FloatValues) Len() int { return len(a) } func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -552,6 +556,10 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues { return a } +func (a IntegerValues) Encode(buf []byte) ([]byte, error) { + return encodeIntegerValuesBlock(buf, a) +} + // Sort methods func (a IntegerValues) Len() int { return len(a) } func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -734,6 +742,10 @@ func (a StringValues) Merge(b StringValues) StringValues { return a } +func (a StringValues) Encode(buf []byte) ([]byte, error) { + return encodeStringValuesBlock(buf, a) +} + // Sort methods func (a StringValues) Len() int { return len(a) } func (a StringValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -916,6 +928,10 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues { return a } +func (a BooleanValues) Encode(buf []byte) ([]byte, error) { + return encodeBooleanValuesBlock(buf, a) +} + // Sort methods func (a BooleanValues) Len() int { return len(a) } func (a BooleanValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/tsdb/engine/tsm1/encoding.gen.go.tmpl b/tsdb/engine/tsm1/encoding.gen.go.tmpl index 4d3fb50142..6258f5d506 100644 --- a/tsdb/engine/tsm1/encoding.gen.go.tmpl +++ b/tsdb/engine/tsm1/encoding.gen.go.tmpl @@ -185,6 +185,12 @@ func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values { return a } +{{ if ne .Name "" }} +func (a {{.Name}}Values) Encode(buf []byte) ([]byte, error) { + return encode{{.Name}}ValuesBlock(buf, a) +} +{{ end }} + // Sort methods func (a {{.Name}}Values) Len() int { return len(a) } func (a {{.Name}}Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 56266432ef..e2dbcfa58b 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -366,6 +366,53 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { return b, err } +func encodeFloatValuesBlock(buf []byte, values []FloatValue) ([]byte, error) { + if len(values) == 0 { + return nil, nil + } + + // A float block is encoded using different compression strategies + // for timestamps and values. + + // Encode values using Gorilla float compression + venc := getFloatEncoder() + + // Encode timestamps using an adaptive encoder that uses delta-encoding, + // frame-or-reference and run length encoding. + tsenc := getTimeEncoder(len(values)) + + var b []byte + err := func() error { + for _, v := range values { + tsenc.Write(v.unixnano) + venc.Push(v.value) + } + venc.Finish() + + // Encoded timestamp values + tb, err := tsenc.Bytes() + if err != nil { + return err + } + // Encoded float values + vb, err := venc.Bytes() + if err != nil { + return err + } + + // Prepend the first timestamp of the block in the first 8 bytes and the block + // in the next byte, followed by the block + b = packBlock(buf, BlockFloat64, tb, vb) + + return nil + }() + + putTimeEncoder(tsenc) + putFloatEncoder(venc) + + return b, err +} + // DecodeFloatBlock decodes the float block from the byte slice // and appends the float values to a. func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) { @@ -496,6 +543,48 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { return b, err } +func encodeBooleanValuesBlock(buf []byte, values []BooleanValue) ([]byte, error) { + if len(values) == 0 { + return nil, nil + } + + // A boolean block is encoded using different compression strategies + // for timestamps and values. + venc := getBooleanEncoder(len(values)) + + // Encode timestamps using an adaptive encoder + tsenc := getTimeEncoder(len(values)) + + var b []byte + err := func() error { + for _, v := range values { + tsenc.Write(v.unixnano) + venc.Write(v.value) + } + + // Encoded timestamp values + tb, err := tsenc.Bytes() + if err != nil { + return err + } + // Encoded float values + vb, err := venc.Bytes() + if err != nil { + return err + } + + // Prepend the first timestamp of the block in the first 8 bytes and the block + // in the next byte, followed by the block + b = packBlock(buf, BlockBoolean, tb, vb) + return nil + }() + + putTimeEncoder(tsenc) + putBooleanEncoder(venc) + + return b, err +} + // DecodeBooleanBlock decodes the boolean block from the byte slice // and appends the boolean values to a. func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) { @@ -613,6 +702,39 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) { return b, err } +func encodeIntegerValuesBlock(buf []byte, values []IntegerValue) ([]byte, error) { + tsEnc := getTimeEncoder(len(values)) + vEnc := getIntegerEncoder(len(values)) + + var b []byte + err := func() error { + for _, v := range values { + tsEnc.Write(v.unixnano) + vEnc.Write(v.value) + } + + // Encoded timestamp values + tb, err := tsEnc.Bytes() + if err != nil { + return err + } + // Encoded int64 values + vb, err := vEnc.Bytes() + if err != nil { + return err + } + + // Prepend the first timestamp of the block in the first 8 bytes + b = packBlock(buf, BlockInteger, tb, vb) + return nil + }() + + putTimeEncoder(tsEnc) + putIntegerEncoder(vEnc) + + return b, err +} + // DecodeIntegerBlock decodes the integer block from the byte slice // and appends the integer values to a. func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) { @@ -732,6 +854,40 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { return b, err } +func encodeStringValuesBlock(buf []byte, values []StringValue) ([]byte, error) { + tsEnc := getTimeEncoder(len(values)) + vEnc := getStringEncoder(len(values) * len(values[0].value)) + + var b []byte + err := func() error { + for _, v := range values { + tsEnc.Write(v.unixnano) + vEnc.Write(v.value) + } + + // Encoded timestamp values + tb, err := tsEnc.Bytes() + if err != nil { + return err + } + // Encoded string values + vb, err := vEnc.Bytes() + if err != nil { + return err + } + + // Prepend the first timestamp of the block in the first 8 bytes + b = packBlock(buf, BlockString, tb, vb) + + return nil + }() + + putTimeEncoder(tsEnc) + putStringEncoder(vEnc) + + return b, err +} + // DecodeStringBlock decodes the string block from the byte slice // and appends the string values to a. func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index d58df48686..6b2d69dd88 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -26,6 +26,7 @@ import ( //go:generate tmpl -data=@iterator.gen.go.tmpldata iterator.gen.go.tmpl //go:generate tmpl -data=@file_store.gen.go.tmpldata file_store.gen.go.tmpl //go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl +//go:generate tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl func init() { tsdb.RegisterEngine("tsm1", NewEngine)