Merge pull request #17589 from influxdata/sgc/issue/17560
feat(tsm1): Implementation of MeasurementFields schema APIpull/17695/head
commit
3627954139
|
@ -1,5 +1,7 @@
|
|||
package cursors
|
||||
|
||||
import "sort"
|
||||
|
||||
func (a *FloatArray) Size() int {
|
||||
// size of timestamps + values
|
||||
return len(a.Timestamps)*8 + len(a.Values)*8
|
||||
|
@ -27,3 +29,13 @@ func (a *BooleanArray) Size() int {
|
|||
// size of timestamps + values
|
||||
return len(a.Timestamps)*8 + len(a.Values)
|
||||
}
|
||||
|
||||
var _ sort.Interface = (*TimestampArray)(nil)
|
||||
|
||||
func (a *TimestampArray) Less(i, j int) bool {
|
||||
return a.Timestamps[i] < a.Timestamps[j]
|
||||
}
|
||||
|
||||
func (a *TimestampArray) Swap(i, j int) {
|
||||
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
|
||||
}
|
||||
|
|
|
@ -4,11 +4,12 @@ package cursors
|
|||
type FieldType int
|
||||
|
||||
const (
|
||||
Float FieldType = iota // means the data type is a float
|
||||
Integer // means the data type is an integer
|
||||
Unsigned // means the data type is an unsigned integer
|
||||
Boolean // means the data type is a boolean
|
||||
String // means the data type is a string of text
|
||||
Float FieldType = iota // means the data type is a float
|
||||
Integer // means the data type is an integer
|
||||
Unsigned // means the data type is an unsigned integer
|
||||
Boolean // means the data type is a boolean
|
||||
String // means the data type is a string of text
|
||||
Undefined // means the data type in unknown or undefined
|
||||
)
|
||||
|
||||
type MeasurementField struct {
|
||||
|
@ -20,8 +21,8 @@ type MeasurementFields struct {
|
|||
Fields []MeasurementField
|
||||
}
|
||||
|
||||
type MeasurementFieldsCursor interface {
|
||||
// Next advances the MeasurementFieldsCursor to the next value. It returns false
|
||||
type MeasurementFieldsIterator interface {
|
||||
// Next advances the iterator to the next value. It returns false
|
||||
// when there are no more values.
|
||||
Next() bool
|
||||
|
||||
|
@ -30,3 +31,75 @@ type MeasurementFieldsCursor interface {
|
|||
|
||||
Stats() CursorStats
|
||||
}
|
||||
|
||||
// EmptyMeasurementFieldsIterator is an implementation of MeasurementFieldsIterator that returns
|
||||
// no values.
|
||||
var EmptyMeasurementFieldsIterator = &measurementFieldsIterator{}
|
||||
|
||||
type measurementFieldsIterator struct{}
|
||||
|
||||
func (m *measurementFieldsIterator) Next() bool { return false }
|
||||
func (m *measurementFieldsIterator) Value() MeasurementFields { return MeasurementFields{} }
|
||||
func (m *measurementFieldsIterator) Stats() CursorStats { return CursorStats{} }
|
||||
|
||||
type MeasurementFieldsSliceIterator struct {
|
||||
f []MeasurementFields
|
||||
v MeasurementFields
|
||||
i int
|
||||
stats CursorStats
|
||||
}
|
||||
|
||||
func NewMeasurementFieldsSliceIteratorWithStats(f []MeasurementFields, stats CursorStats) *MeasurementFieldsSliceIterator {
|
||||
return &MeasurementFieldsSliceIterator{f: f, stats: stats}
|
||||
}
|
||||
|
||||
func (s *MeasurementFieldsSliceIterator) Next() bool {
|
||||
if s.i < len(s.f) {
|
||||
s.v = s.f[s.i]
|
||||
s.i++
|
||||
return true
|
||||
}
|
||||
s.v = MeasurementFields{}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *MeasurementFieldsSliceIterator) Value() MeasurementFields {
|
||||
return s.v
|
||||
}
|
||||
|
||||
func (s *MeasurementFieldsSliceIterator) Stats() CursorStats {
|
||||
return s.stats
|
||||
}
|
||||
|
||||
func (s *MeasurementFieldsSliceIterator) toSlice() []MeasurementFields {
|
||||
if s.i < len(s.f) {
|
||||
return s.f[s.i:]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MeasurementFieldsIteratorFlatMap reads the remainder of i, flattening the results
|
||||
// to a single slice.
|
||||
func MeasurementFieldsIteratorFlatMap(i MeasurementFieldsIterator) []MeasurementField {
|
||||
if i == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var res []MeasurementField
|
||||
if si, ok := i.(*MeasurementFieldsSliceIterator); ok {
|
||||
s := si.toSlice()
|
||||
sz := 0
|
||||
for i := range s {
|
||||
sz += len(s[i].Fields)
|
||||
}
|
||||
res = make([]MeasurementField, 0, sz)
|
||||
for i := range s {
|
||||
res = append(res, s[i].Fields...)
|
||||
}
|
||||
} else {
|
||||
for i.Next() {
|
||||
res = append(res, i.Value().Fields...)
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
|
@ -328,6 +328,47 @@ func (c *Cache) Type(key []byte) (models.FieldType, error) {
|
|||
return models.Empty, errUnknownFieldType
|
||||
}
|
||||
|
||||
// BlockType returns the TSM block type for the specified
|
||||
// key or BlockUndefined if the type cannot be determined
|
||||
// either because the key does not exist or there are no
|
||||
// values for the key.
|
||||
func (c *Cache) BlockType(key []byte) byte {
|
||||
c.mu.RLock()
|
||||
e := c.store.entry(key)
|
||||
if e == nil && c.snapshot != nil {
|
||||
e = c.snapshot.store.entry(key)
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
if e != nil {
|
||||
return e.BlockType()
|
||||
}
|
||||
|
||||
return BlockUndefined
|
||||
}
|
||||
|
||||
// AppendTimestamps appends ts with the timestamps for the specified key.
|
||||
// It is the responsibility of the caller to sort and or deduplicate the slice.
|
||||
func (c *Cache) AppendTimestamps(key []byte, ts []int64) []int64 {
|
||||
var snapshotEntries *entry
|
||||
|
||||
c.mu.RLock()
|
||||
e := c.store.entry(key)
|
||||
if c.snapshot != nil {
|
||||
snapshotEntries = c.snapshot.store.entry(key)
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
if e != nil {
|
||||
ts = e.AppendTimestamps(ts)
|
||||
}
|
||||
if snapshotEntries != nil {
|
||||
ts = snapshotEntries.AppendTimestamps(ts)
|
||||
}
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
// Values returns a copy of all values, deduped and sorted, for the given key.
|
||||
func (c *Cache) Values(key []byte) Values {
|
||||
var snapshotEntries *entry
|
||||
|
@ -729,14 +770,14 @@ func valueType(v Value) byte {
|
|||
|
||||
var (
|
||||
valueTypeBlockType = [8]byte{
|
||||
valueTypeUndefined: blockUndefined,
|
||||
valueTypeUndefined: BlockUndefined,
|
||||
valueTypeFloat64: BlockFloat64,
|
||||
valueTypeInteger: BlockInteger,
|
||||
valueTypeString: BlockString,
|
||||
valueTypeBoolean: BlockBoolean,
|
||||
valueTypeUnsigned: BlockUnsigned,
|
||||
6: blockUndefined,
|
||||
7: blockUndefined,
|
||||
6: BlockUndefined,
|
||||
7: BlockUndefined,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -117,6 +117,19 @@ func (e *entry) size() int {
|
|||
return sz
|
||||
}
|
||||
|
||||
// AppendTimestamps appends ts with the timestamps from the entry.
|
||||
func (e *entry) AppendTimestamps(ts []int64) []int64 {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
n := e.values.Len()
|
||||
if n > 0 {
|
||||
for i := range e.values {
|
||||
ts = append(ts, e.values[i].UnixNano())
|
||||
}
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
// InfluxQLType returns for the entry the data type of its values.
|
||||
func (e *entry) InfluxQLType() (influxql.DataType, error) {
|
||||
e.mu.RLock()
|
||||
|
|
|
@ -25,8 +25,8 @@ const (
|
|||
// BlockUnsigned designates a block encodes uint64 values.
|
||||
BlockUnsigned = byte(4)
|
||||
|
||||
// blockUndefined represents an undefined block type value.
|
||||
blockUndefined = BlockUnsigned + 1
|
||||
// BlockUndefined represents an undefined block type value.
|
||||
BlockUndefined = BlockUnsigned + 1
|
||||
|
||||
// encodedBlockHeaderSize is the size of the header for an encoded block. There is one
|
||||
// byte encoding the type of the block.
|
||||
|
@ -162,6 +162,28 @@ func (a Values) InfluxQLType() (influxql.DataType, error) {
|
|||
return influxql.Unknown, fmt.Errorf("unsupported value type %T", a[0])
|
||||
}
|
||||
|
||||
// BlockType returns the TSM block type the values map to.
|
||||
func (a Values) BlockType() byte {
|
||||
if len(a) == 0 {
|
||||
return BlockUndefined
|
||||
}
|
||||
|
||||
switch a[0].(type) {
|
||||
case FloatValue:
|
||||
return BlockFloat64
|
||||
case IntegerValue:
|
||||
return BlockInteger
|
||||
case UnsignedValue:
|
||||
return BlockUnsigned
|
||||
case BooleanValue:
|
||||
return BlockBoolean
|
||||
case StringValue:
|
||||
return BlockString
|
||||
}
|
||||
|
||||
return BlockUndefined
|
||||
}
|
||||
|
||||
// BlockType returns the type of value encoded in a block or an error
|
||||
// if the block type is unknown.
|
||||
func BlockType(block []byte) (byte, error) {
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/pkg/metrics"
|
||||
"github.com/influxdata/influxdb/v2/query"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/seriesfile"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/tsi1"
|
||||
"github.com/influxdata/influxql"
|
||||
|
@ -1464,7 +1465,7 @@ var (
|
|||
BlockBoolean: influxql.Boolean,
|
||||
BlockString: influxql.String,
|
||||
BlockUnsigned: influxql.Unsigned,
|
||||
blockUndefined: influxql.Unknown,
|
||||
BlockUndefined: influxql.Unknown,
|
||||
6: influxql.Unknown,
|
||||
7: influxql.Unknown,
|
||||
}
|
||||
|
@ -1472,6 +1473,21 @@ var (
|
|||
|
||||
func BlockTypeToInfluxQLDataType(typ byte) influxql.DataType { return blockToFieldType[typ&7] }
|
||||
|
||||
var (
|
||||
blockTypeFieldType = [8]cursors.FieldType{
|
||||
BlockFloat64: cursors.Float,
|
||||
BlockInteger: cursors.Integer,
|
||||
BlockBoolean: cursors.Boolean,
|
||||
BlockString: cursors.String,
|
||||
BlockUnsigned: cursors.Unsigned,
|
||||
BlockUndefined: cursors.Undefined,
|
||||
6: cursors.Undefined,
|
||||
7: cursors.Undefined,
|
||||
}
|
||||
)
|
||||
|
||||
func BlockTypeToFieldType(typ byte) cursors.FieldType { return blockTypeFieldType[typ&7] }
|
||||
|
||||
// SeriesAndFieldFromCompositeKey returns the series key and the field key extracted from the composite key.
|
||||
func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, []byte) {
|
||||
sep := bytes.Index(key, KeyFieldSeparatorBytes)
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/seriesfile"
|
||||
"github.com/influxdata/influxql"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -74,6 +75,8 @@ func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb.
|
|||
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
var ts cursors.TimestampArray
|
||||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
prefixStr := string(prefix)
|
||||
|
@ -94,10 +97,17 @@ func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb.
|
|||
return nil
|
||||
}
|
||||
|
||||
stats.ScannedValues += entry.values.Len()
|
||||
stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0])
|
||||
if ts.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if entry.values.Contains(start, end) {
|
||||
sort.Sort(&ts)
|
||||
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
tsmValues[string(name)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
|
@ -149,6 +159,248 @@ func (e *Engine) MeasurementTagKeys(ctx context.Context, orgID, bucketID influxd
|
|||
return e.tagKeysPredicate(ctx, orgID, bucketID, []byte(measurement), start, end, predicate)
|
||||
}
|
||||
|
||||
func (e *Engine) MeasurementFields(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, start, end int64, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) {
|
||||
if predicate == nil {
|
||||
return e.fieldsNoPredicate(ctx, orgID, bucketID, []byte(measurement), start, end)
|
||||
}
|
||||
|
||||
predicate = AddMeasurementToExpr(measurement, predicate)
|
||||
|
||||
return e.fieldsPredicate(ctx, orgID, bucketID, []byte(measurement), start, end, predicate)
|
||||
}
|
||||
|
||||
type fieldTypeTime struct {
|
||||
typ cursors.FieldType
|
||||
max int64
|
||||
}
|
||||
|
||||
func (e *Engine) fieldsPredicate(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, measurement []byte, start int64, end int64, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) {
|
||||
if err := ValidateTagPredicate(predicate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
orgBucket := tsdb.EncodeName(orgID, bucketID)
|
||||
|
||||
keys, err := e.findCandidateKeys(ctx, orgBucket[:], predicate)
|
||||
if err != nil {
|
||||
return cursors.EmptyMeasurementFieldsIterator, err
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
return cursors.EmptyMeasurementFieldsIterator, nil
|
||||
}
|
||||
|
||||
var files []TSMFile
|
||||
defer func() {
|
||||
for _, f := range files {
|
||||
f.Unref()
|
||||
}
|
||||
}()
|
||||
var iters []*TimeRangeMaxTimeIterator
|
||||
|
||||
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
||||
// don't have to remember to get it right everywhere we need to touch TSM data.
|
||||
orgBucketEsc := models.EscapeMeasurement(orgBucket[:])
|
||||
|
||||
mt := models.Tags{models.NewTag(models.MeasurementTagKeyBytes, measurement)}
|
||||
tsmKeyPrefix := mt.AppendHashKey(orgBucketEsc)
|
||||
|
||||
var canceled bool
|
||||
|
||||
e.FileStore.ForEachFile(func(f TSMFile) bool {
|
||||
// Check the context before accessing each tsm file
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
canceled = true
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(tsmKeyPrefix, tsmKeyPrefix) {
|
||||
f.Ref()
|
||||
files = append(files, f)
|
||||
iters = append(iters, f.TimeRangeMaxTimeIterator(tsmKeyPrefix, start, end))
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
var stats cursors.CursorStats
|
||||
|
||||
if canceled {
|
||||
stats = statsFromTimeRangeMaxTimeIters(stats, iters)
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
tsmValues := make(map[string]fieldTypeTime)
|
||||
|
||||
// reusable buffers
|
||||
var (
|
||||
tags models.Tags
|
||||
keybuf []byte
|
||||
sfkey []byte
|
||||
ts cursors.TimestampArray
|
||||
)
|
||||
|
||||
for i := range keys {
|
||||
// to keep cache scans fast, check context every 'cancelCheckInterval' iteratons
|
||||
if i%cancelCheckInterval == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
stats = statsFromTimeRangeMaxTimeIters(stats, iters)
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
_, tags = seriesfile.ParseSeriesKeyInto(keys[i], tags[:0])
|
||||
fieldKey := tags.Get(models.FieldKeyTagKeyBytes)
|
||||
keybuf = models.AppendMakeKey(keybuf[:0], orgBucketEsc, tags)
|
||||
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, fieldKey)
|
||||
|
||||
cur := fieldTypeTime{max: InvalidMinNanoTime}
|
||||
|
||||
ts.Timestamps = e.Cache.AppendTimestamps(sfkey, ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
max := ts.MaxTime()
|
||||
if max > cur.max {
|
||||
cur.max = max
|
||||
cur.typ = BlockTypeToFieldType(e.Cache.BlockType(sfkey))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, iter := range iters {
|
||||
if exact, _ := iter.Seek(sfkey); !exact {
|
||||
continue
|
||||
}
|
||||
|
||||
max := iter.MaxTime()
|
||||
if max > cur.max {
|
||||
cur.max = max
|
||||
cur.typ = BlockTypeToFieldType(iter.Type())
|
||||
}
|
||||
}
|
||||
|
||||
if cur.max != InvalidMinNanoTime {
|
||||
tsmValues[string(fieldKey)] = cur
|
||||
}
|
||||
}
|
||||
|
||||
vals := make([]cursors.MeasurementField, 0, len(tsmValues))
|
||||
for key, val := range tsmValues {
|
||||
vals = append(vals, cursors.MeasurementField{Key: key, Type: val.typ})
|
||||
}
|
||||
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats([]cursors.MeasurementFields{{Fields: vals}}, stats), nil
|
||||
}
|
||||
|
||||
func (e *Engine) fieldsNoPredicate(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, measurement []byte, start int64, end int64) (cursors.MeasurementFieldsIterator, error) {
|
||||
tsmValues := make(map[string]fieldTypeTime)
|
||||
orgBucket := tsdb.EncodeName(orgID, bucketID)
|
||||
|
||||
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
||||
// don't have to remember to get it right everywhere we need to touch TSM data.
|
||||
orgBucketEsc := models.EscapeMeasurement(orgBucket[:])
|
||||
|
||||
mt := models.Tags{models.NewTag(models.MeasurementTagKeyBytes, measurement)}
|
||||
tsmKeyPrefix := mt.AppendHashKey(orgBucketEsc)
|
||||
|
||||
var stats cursors.CursorStats
|
||||
var canceled bool
|
||||
|
||||
e.FileStore.ForEachFile(func(f TSMFile) bool {
|
||||
// Check the context before touching each tsm file
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
canceled = true
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(tsmKeyPrefix, tsmKeyPrefix) {
|
||||
// TODO(sgc): create f.TimeRangeIterator(minKey, maxKey, start, end)
|
||||
iter := f.TimeRangeMaxTimeIterator(tsmKeyPrefix, start, end)
|
||||
for i := 0; iter.Next(); i++ {
|
||||
sfkey := iter.Key()
|
||||
if !bytes.HasPrefix(sfkey, tsmKeyPrefix) {
|
||||
// end of prefix
|
||||
break
|
||||
}
|
||||
|
||||
max := iter.MaxTime()
|
||||
if max == InvalidMinNanoTime {
|
||||
continue
|
||||
}
|
||||
|
||||
_, fieldKey := SeriesAndFieldFromCompositeKey(sfkey)
|
||||
v, ok := tsmValues[string(fieldKey)]
|
||||
if !ok || v.max < max {
|
||||
tsmValues[string(fieldKey)] = fieldTypeTime{
|
||||
typ: BlockTypeToFieldType(iter.Type()),
|
||||
max: max,
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.Add(iter.Stats())
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if canceled {
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
var ts cursors.TimestampArray
|
||||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
tsmKeyPrefixStr := string(tsmKeyPrefix)
|
||||
_ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error {
|
||||
if !strings.HasPrefix(sfkey, tsmKeyPrefixStr) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0])
|
||||
if ts.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Sort(&ts)
|
||||
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if !ts.Contains(start, end) {
|
||||
return nil
|
||||
}
|
||||
|
||||
max := ts.MaxTime()
|
||||
|
||||
// TODO(edd): consider the []byte() conversion here.
|
||||
_, fieldKey := SeriesAndFieldFromCompositeKey([]byte(sfkey))
|
||||
v, ok := tsmValues[string(fieldKey)]
|
||||
if !ok || v.max < max {
|
||||
tsmValues[string(fieldKey)] = fieldTypeTime{
|
||||
typ: BlockTypeToFieldType(entry.BlockType()),
|
||||
max: max,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
vals := make([]cursors.MeasurementField, 0, len(tsmValues))
|
||||
for key, val := range tsmValues {
|
||||
vals = append(vals, cursors.MeasurementField{Key: key, Type: val.typ})
|
||||
}
|
||||
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats([]cursors.MeasurementFields{{Fields: vals}}, stats), nil
|
||||
}
|
||||
|
||||
func AddMeasurementToExpr(measurement string, base influxql.Expr) influxql.Expr {
|
||||
// \x00 = '<measurement>'
|
||||
expr := &influxql.BinaryExpr{
|
||||
|
@ -175,3 +427,10 @@ func AddMeasurementToExpr(measurement string, base influxql.Expr) influxql.Expr
|
|||
|
||||
return expr
|
||||
}
|
||||
|
||||
func statsFromTimeRangeMaxTimeIters(stats cursors.CursorStats, iters []*TimeRangeMaxTimeIterator) cursors.CursorStats {
|
||||
for _, iter := range iters {
|
||||
stats.Add(iter.Stats())
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
|
|
@ -789,6 +789,277 @@ mem,mem1=v,mem2=v f=1 201`)
|
|||
}
|
||||
}
|
||||
|
||||
func TestEngine_MeasurementFields(t *testing.T) {
|
||||
e, err := NewEngine(tsm1.NewConfig(), t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := e.Open(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
orgs := []struct {
|
||||
org, bucket influxdb.ID
|
||||
}{
|
||||
{
|
||||
org: 0x5020,
|
||||
bucket: 0x5100,
|
||||
},
|
||||
{
|
||||
org: 0x6000,
|
||||
bucket: 0x6100,
|
||||
},
|
||||
}
|
||||
|
||||
// this org will require escaping the 0x20 byte
|
||||
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
|
||||
m00,tag00=v00,tag10=v10 i=1i 101
|
||||
m00,tag00=v00,tag10=v10 i=1i 102
|
||||
m00,tag00=v00,tag10=v10 f=1 101
|
||||
m00,tag00=v00,tag10=v10 i=1i 108
|
||||
m00,tag00=v00,tag10=v10 f=1 109
|
||||
m00,tag00=v00,tag10=v10 i=1i 109
|
||||
m01,tag00=v00,tag10=v10 b=true 101
|
||||
`)
|
||||
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
|
||||
m10,foo=v barF=50 101
|
||||
`)
|
||||
|
||||
// send some points to TSM data
|
||||
e.MustWriteSnapshot()
|
||||
|
||||
// delete some data from the first bucket
|
||||
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 105)
|
||||
|
||||
// leave some points in the cache
|
||||
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
|
||||
m00,tag00=v00,tag10=v10 i=2i 201
|
||||
m00,tag00=v00,tag10=v10 i=2i 202
|
||||
m00,tag00=v00,tag10=v10 f=2 201
|
||||
m00,tag00=v00,tag10=v11 i="s" 202
|
||||
m00,tag00=v00,tag10=v11 i="s" 208
|
||||
m00,tag00=v00,tag10=v11 i="s" 209
|
||||
m01,tag00=v00,tag10=v10 b=true 201
|
||||
`)
|
||||
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
|
||||
m10,foo=v barS="60" 501
|
||||
`)
|
||||
|
||||
type args struct {
|
||||
org int
|
||||
m string
|
||||
min, max int64
|
||||
expr string
|
||||
}
|
||||
|
||||
makeStats := func(v int) cursors.CursorStats {
|
||||
return cursors.CursorStats{
|
||||
ScannedValues: v,
|
||||
ScannedBytes: v * 8,
|
||||
}
|
||||
}
|
||||
|
||||
var tests = []struct {
|
||||
name string
|
||||
args args
|
||||
exp []cursors.MeasurementField
|
||||
expStats cursors.CursorStats
|
||||
}{
|
||||
// ***********************
|
||||
// * queries for the first org, which has some deleted data
|
||||
// ***********************
|
||||
{
|
||||
name: "TSM and cache",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 0,
|
||||
max: 300,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(12),
|
||||
},
|
||||
{
|
||||
name: "m00 only TSM",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 0,
|
||||
max: 199,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(12),
|
||||
},
|
||||
{
|
||||
name: "m01 all time",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m01",
|
||||
min: 0,
|
||||
max: 1000,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "b", Type: cursors.Boolean}},
|
||||
expStats: makeStats(1),
|
||||
},
|
||||
{
|
||||
name: "m10 only TSM",
|
||||
args: args{
|
||||
org: 1,
|
||||
m: "m10",
|
||||
min: 0,
|
||||
max: 199,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "barF", Type: cursors.Float}},
|
||||
expStats: makeStats(1),
|
||||
},
|
||||
{
|
||||
name: "only cache",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 200,
|
||||
max: 299,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
{
|
||||
name: "one timestamp TSM/data",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 109,
|
||||
max: 109,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
{
|
||||
name: "one timestamp cache/data",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 201,
|
||||
max: 201,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
{
|
||||
name: "one timestamp change type cache/data",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 202,
|
||||
max: 202,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}},
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
{
|
||||
name: "one timestamp TSM/nodata",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 103,
|
||||
max: 103,
|
||||
},
|
||||
exp: nil,
|
||||
expStats: makeStats(12),
|
||||
},
|
||||
{
|
||||
name: "one timestamp cache/nodata",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 203,
|
||||
max: 203,
|
||||
},
|
||||
exp: nil,
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
|
||||
// queries with predicates
|
||||
{
|
||||
name: "predicate/v10",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 0,
|
||||
max: 300,
|
||||
expr: `tag10 = 'v10'`,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(3),
|
||||
},
|
||||
{
|
||||
name: "predicate/v11",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 0,
|
||||
max: 300,
|
||||
expr: `tag10 = 'v11'`,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}},
|
||||
expStats: makeStats(3),
|
||||
},
|
||||
|
||||
// ***********************
|
||||
// * queries for the second org, which has no deleted data
|
||||
// ***********************
|
||||
{
|
||||
name: "all data",
|
||||
args: args{
|
||||
org: 1,
|
||||
m: "m10",
|
||||
min: 0,
|
||||
max: 1000,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "barF", Type: cursors.Float}, {Key: "barS", Type: cursors.String}},
|
||||
expStats: makeStats(1),
|
||||
},
|
||||
|
||||
// ***********************
|
||||
// * other scenarios
|
||||
// ***********************
|
||||
{
|
||||
// ensure StringIterator is never nil
|
||||
name: "predicate/no candidate series",
|
||||
args: args{
|
||||
org: 1,
|
||||
m: "m10",
|
||||
min: 0,
|
||||
max: 1000,
|
||||
expr: `foo = 'nonexistent'`,
|
||||
},
|
||||
exp: nil,
|
||||
expStats: makeStats(0),
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(fmt.Sprintf("org%d/%s", tc.args.org, tc.name), func(t *testing.T) {
|
||||
a := tc.args
|
||||
var expr influxql.Expr
|
||||
if len(a.expr) > 0 {
|
||||
expr = influxql.MustParseExpr(a.expr)
|
||||
}
|
||||
|
||||
iter, err := e.MeasurementFields(context.Background(), orgs[a.org].org, orgs[a.org].bucket, a.m, a.min, a.max, expr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if got := cursors.MeasurementFieldsIteratorFlatMap(iter); !assert.ElementsMatch(t, tc.exp, got) {
|
||||
return
|
||||
}
|
||||
|
||||
if got := iter.Stats(); !assert.Equal(t, tc.expStats, got) {
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Verifies AddMeasurementToExpr amends the given influxql.Expr
|
||||
// with a predicate to restrict results to a single measurement
|
||||
func TestAddMeasurementToExpr(t *testing.T) {
|
||||
|
|
|
@ -102,6 +102,8 @@ func (e *Engine) tagValuesNoPredicate(ctx context.Context, orgID, bucketID influ
|
|||
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
var ts cursors.TimestampArray
|
||||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
tsmKeyprefixStr := string(tsmKeyPrefix)
|
||||
|
@ -122,12 +124,18 @@ func (e *Engine) tagValuesNoPredicate(ctx context.Context, orgID, bucketID influ
|
|||
return nil
|
||||
}
|
||||
|
||||
stats.ScannedValues += entry.values.Len()
|
||||
stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
if entry.values.Contains(start, end) {
|
||||
tsmValues[string(curVal)] = struct{}{}
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
tsmValues[string(curVal)] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -207,6 +215,7 @@ func (e *Engine) tagValuesPredicate(ctx context.Context, orgID, bucketID influxd
|
|||
tags models.Tags
|
||||
keybuf []byte
|
||||
sfkey []byte
|
||||
ts cursors.TimestampArray
|
||||
)
|
||||
|
||||
for i := range keys {
|
||||
|
@ -230,15 +239,23 @@ func (e *Engine) tagValuesPredicate(ctx context.Context, orgID, bucketID influxd
|
|||
continue
|
||||
}
|
||||
|
||||
keybuf = models.AppendMakeKey(keybuf[:0], orgBucketEsc, tags)
|
||||
// orgBucketEsc is already escaped, so no need to use models.AppendMakeKey, which
|
||||
// unescapes and escapes the value again. The degenerate case is if the orgBucketEsc
|
||||
// has escaped values, causing two allocations per key
|
||||
keybuf = append(keybuf[:0], orgBucketEsc...)
|
||||
keybuf = tags.AppendHashKey(keybuf)
|
||||
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, tags.Get(models.FieldKeyTagKeyBytes))
|
||||
|
||||
values := e.Cache.Values(sfkey)
|
||||
stats.ScannedValues += values.Len()
|
||||
stats.ScannedBytes += values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = e.Cache.AppendTimestamps(sfkey, ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
if values.Contains(start, end) {
|
||||
tsmValues[string(curVal)] = struct{}{}
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
tsmValues[string(curVal)] = struct{}{}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -378,6 +395,8 @@ func (e *Engine) tagKeysNoPredicate(ctx context.Context, orgID, bucketID influxd
|
|||
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
var ts cursors.TimestampArray
|
||||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
tsmKeyprefixStr := string(tsmKeyPrefix)
|
||||
|
@ -393,12 +412,18 @@ func (e *Engine) tagKeysNoPredicate(ctx context.Context, orgID, bucketID influxd
|
|||
return nil
|
||||
}
|
||||
|
||||
stats.ScannedValues += entry.values.Len()
|
||||
stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
if entry.values.Contains(start, end) {
|
||||
keyset.UnionKeys(tags)
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
keyset.UnionKeys(tags)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -472,6 +497,7 @@ func (e *Engine) tagKeysPredicate(ctx context.Context, orgID, bucketID influxdb.
|
|||
tags models.Tags
|
||||
keybuf []byte
|
||||
sfkey []byte
|
||||
ts cursors.TimestampArray
|
||||
)
|
||||
|
||||
for i := range keys {
|
||||
|
@ -490,16 +516,24 @@ func (e *Engine) tagKeysPredicate(ctx context.Context, orgID, bucketID influxdb.
|
|||
continue
|
||||
}
|
||||
|
||||
keybuf = models.AppendMakeKey(keybuf[:0], orgBucketEsc, tags)
|
||||
// orgBucketEsc is already escaped, so no need to use models.AppendMakeKey, which
|
||||
// unescapes and escapes the value again. The degenerate case is if the orgBucketEsc
|
||||
// has escaped values, causing two allocations per key
|
||||
keybuf = append(keybuf[:0], orgBucketEsc...)
|
||||
keybuf = tags.AppendHashKey(keybuf)
|
||||
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, tags.Get(models.FieldKeyTagKeyBytes))
|
||||
|
||||
values := e.Cache.Values(sfkey)
|
||||
stats.ScannedValues += values.Len()
|
||||
stats.ScannedBytes += values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = e.Cache.AppendTimestamps(sfkey, ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
if values.Contains(start, end) {
|
||||
keyset.UnionKeys(tags)
|
||||
continue
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
keyset.UnionKeys(tags)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for _, iter := range iters {
|
||||
|
|
|
@ -161,6 +161,12 @@ type TSMFile interface {
|
|||
// Next must be called before calling any of the accessors.
|
||||
TimeRangeIterator(key []byte, min, max int64) *TimeRangeIterator
|
||||
|
||||
// TimeRangeMaxTimeIterator returns an iterator over the keys, starting at the provided
|
||||
// key. Calling the HasData and MaxTime accessors will be restricted to the
|
||||
// interval [min, max] for the current key.
|
||||
// Next must be called before calling any of the accessors.
|
||||
TimeRangeMaxTimeIterator(key []byte, min, max int64) *TimeRangeMaxTimeIterator
|
||||
|
||||
// Free releases any resources held by the FileStore to free up system resources.
|
||||
Free() error
|
||||
|
||||
|
|
|
@ -488,11 +488,34 @@ func (t *TSMReader) TimeRangeIterator(key []byte, min, max int64) *TimeRangeIter
|
|||
t.mu.RUnlock()
|
||||
|
||||
return &TimeRangeIterator{
|
||||
r: t,
|
||||
iter: iter,
|
||||
tr: TimeRange{
|
||||
Min: min,
|
||||
Max: max,
|
||||
timeRangeBlockReader: timeRangeBlockReader{
|
||||
r: t,
|
||||
iter: iter,
|
||||
tr: TimeRange{
|
||||
Min: min,
|
||||
Max: max,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TimeRangeMaxTimeIterator returns an iterator over the keys, starting at the provided
|
||||
// key. Calling the HasData and MaxTime accessors will be restricted to the
|
||||
// interval [min, max] for the current key and MaxTime ≤ max.
|
||||
// Next must be called before calling any of the accessors.
|
||||
func (t *TSMReader) TimeRangeMaxTimeIterator(key []byte, min, max int64) *TimeRangeMaxTimeIterator {
|
||||
t.mu.RLock()
|
||||
iter := t.index.Iterator(key)
|
||||
t.mu.RUnlock()
|
||||
|
||||
return &TimeRangeMaxTimeIterator{
|
||||
timeRangeBlockReader: timeRangeBlockReader{
|
||||
r: t,
|
||||
iter: iter,
|
||||
tr: TimeRange{
|
||||
Min: min,
|
||||
Max: max,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,23 +8,7 @@ import (
|
|||
// the provided key. It is used to determine if each key has data which exists
|
||||
// within a specified time interval.
|
||||
type TimeRangeIterator struct {
|
||||
r *TSMReader
|
||||
iter *TSMIndexIterator
|
||||
tr TimeRange
|
||||
err error
|
||||
stats cursors.CursorStats
|
||||
|
||||
// temporary storage
|
||||
trbuf []TimeRange
|
||||
buf []byte
|
||||
a cursors.TimestampArray
|
||||
}
|
||||
|
||||
func (b *TimeRangeIterator) Err() error {
|
||||
if b.err != nil {
|
||||
return b.err
|
||||
}
|
||||
return b.iter.Err()
|
||||
timeRangeBlockReader
|
||||
}
|
||||
|
||||
// Next advances the iterator and reports if it is still valid.
|
||||
|
@ -47,67 +31,98 @@ func (b *TimeRangeIterator) Seek(key []byte) (exact, ok bool) {
|
|||
return b.iter.Seek(key)
|
||||
}
|
||||
|
||||
// Key reports the current key.
|
||||
func (b *TimeRangeIterator) Key() []byte {
|
||||
return b.iter.Key()
|
||||
}
|
||||
|
||||
// HasData reports true if the current key has data for the time range.
|
||||
func (b *TimeRangeIterator) HasData() bool {
|
||||
if b.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
e := excludeEntries(b.iter.Entries(), b.tr)
|
||||
e, ts := b.getEntriesAndTombstones()
|
||||
if len(e) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
b.trbuf = b.r.TombstoneRange(b.iter.Key(), b.trbuf[:0])
|
||||
var ts []TimeRange
|
||||
if len(b.trbuf) > 0 {
|
||||
ts = excludeTimeRanges(b.trbuf, b.tr)
|
||||
}
|
||||
|
||||
if len(ts) == 0 {
|
||||
// no tombstones, fast path will avoid decoding blocks
|
||||
// if queried time interval intersects with one of the entries
|
||||
if intersectsEntry(e, b.tr) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
for i := range e {
|
||||
if !b.readBlock(&e[i]) {
|
||||
return false
|
||||
}
|
||||
|
||||
if b.a.Contains(b.tr.Min, b.tr.Max) {
|
||||
return true
|
||||
}
|
||||
for i := range e {
|
||||
if !b.readBlock(&e[i]) {
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
for i := range e {
|
||||
if !b.readBlock(&e[i]) {
|
||||
return false
|
||||
}
|
||||
|
||||
// remove tombstoned timestamps
|
||||
for i := range ts {
|
||||
b.a.Exclude(ts[i].Min, ts[i].Max)
|
||||
}
|
||||
// remove tombstoned timestamps
|
||||
for i := range ts {
|
||||
b.a.Exclude(ts[i].Min, ts[i].Max)
|
||||
}
|
||||
|
||||
if b.a.Contains(b.tr.Min, b.tr.Max) {
|
||||
return true
|
||||
}
|
||||
if b.a.Contains(b.tr.Min, b.tr.Max) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// The timeRangeBlockReader provides common behavior
|
||||
// for enumerating keys over a given time range and
|
||||
// accumulating statistics.
|
||||
type timeRangeBlockReader struct {
|
||||
r *TSMReader
|
||||
iter *TSMIndexIterator
|
||||
tr TimeRange
|
||||
err error
|
||||
stats cursors.CursorStats
|
||||
|
||||
// temporary storage
|
||||
trbuf []TimeRange
|
||||
buf []byte
|
||||
a cursors.TimestampArray
|
||||
}
|
||||
|
||||
func (b *timeRangeBlockReader) Err() error {
|
||||
if b.err != nil {
|
||||
return b.err
|
||||
}
|
||||
return b.iter.Err()
|
||||
}
|
||||
|
||||
// Key reports the current key.
|
||||
func (b *timeRangeBlockReader) Key() []byte {
|
||||
return b.iter.Key()
|
||||
}
|
||||
|
||||
// Type reports the current block type.
|
||||
func (b *timeRangeBlockReader) Type() byte {
|
||||
return b.iter.Type()
|
||||
}
|
||||
|
||||
func (b *timeRangeBlockReader) getEntriesAndTombstones() ([]IndexEntry, []TimeRange) {
|
||||
if b.err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
e := excludeEntries(b.iter.Entries(), b.tr)
|
||||
if len(e) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
b.trbuf = b.r.TombstoneRange(b.iter.Key(), b.trbuf[:0])
|
||||
var ts []TimeRange
|
||||
if len(b.trbuf) > 0 {
|
||||
ts = excludeTimeRanges(b.trbuf, b.tr)
|
||||
}
|
||||
|
||||
return e, ts
|
||||
}
|
||||
|
||||
// readBlock reads the block identified by IndexEntry e and accumulates
|
||||
// statistics. readBlock returns true on success.
|
||||
func (b *TimeRangeIterator) readBlock(e *IndexEntry) bool {
|
||||
func (b *timeRangeBlockReader) readBlock(e *IndexEntry) bool {
|
||||
_, b.buf, b.err = b.r.ReadBytes(e, b.buf)
|
||||
if b.err != nil {
|
||||
return false
|
||||
|
@ -124,7 +139,7 @@ func (b *TimeRangeIterator) readBlock(e *IndexEntry) bool {
|
|||
}
|
||||
|
||||
// Stats returns statistics accumulated by the iterator for any block reads.
|
||||
func (b *TimeRangeIterator) Stats() cursors.CursorStats {
|
||||
func (b *timeRangeBlockReader) Stats() cursors.CursorStats {
|
||||
return b.stats
|
||||
}
|
||||
|
||||
|
|
|
@ -358,6 +358,14 @@ func TestExcludeEntries(t *testing.T) {
|
|||
},
|
||||
exp: entries(12, 15, 19, 21),
|
||||
},
|
||||
{
|
||||
args: args{
|
||||
e: entries(0, 10, 12, 15, 19, 21),
|
||||
min: 13,
|
||||
max: 20,
|
||||
},
|
||||
exp: entries(12, 15, 19, 21),
|
||||
},
|
||||
{
|
||||
args: args{
|
||||
e: entries(0, 10, 12, 15, 19, 21),
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
)
|
||||
|
||||
const (
|
||||
// InvalidMinNanoTime is an invalid nano timestamp that has an ordinal
|
||||
// value lower than models.MinNanoTime, the minimum valid timestamp
|
||||
// that can be represented.
|
||||
InvalidMinNanoTime = models.MinNanoTime - 1
|
||||
)
|
||||
|
||||
// TimeRangeMaxTimeIterator will iterate over the keys of a TSM file, starting at
|
||||
// the provided key. It is used to determine if each key has data which exists
|
||||
// within a specified time interval.
|
||||
type TimeRangeMaxTimeIterator struct {
|
||||
timeRangeBlockReader
|
||||
|
||||
// cached values
|
||||
maxTime int64
|
||||
hasData bool
|
||||
isLoaded bool
|
||||
}
|
||||
|
||||
// Next advances the iterator and reports if it is still valid.
|
||||
func (b *TimeRangeMaxTimeIterator) Next() bool {
|
||||
if b.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
b.clearIsLoaded()
|
||||
|
||||
return b.iter.Next()
|
||||
}
|
||||
|
||||
// Seek points the iterator at the smallest key greater than or equal to the
|
||||
// given key, returning true if it was an exact match. It returns false for
|
||||
// ok if the key does not exist.
|
||||
func (b *TimeRangeMaxTimeIterator) Seek(key []byte) (exact, ok bool) {
|
||||
if b.Err() != nil {
|
||||
return false, false
|
||||
}
|
||||
|
||||
b.clearIsLoaded()
|
||||
|
||||
return b.iter.Seek(key)
|
||||
}
|
||||
|
||||
// HasData reports true if the current key has data for the time range.
|
||||
func (b *TimeRangeMaxTimeIterator) HasData() bool {
|
||||
if b.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
b.load()
|
||||
|
||||
return b.hasData
|
||||
}
|
||||
|
||||
// MaxTime returns the maximum timestamp for the current key within the
|
||||
// requested time range. If an error occurred or there is no data,
|
||||
// InvalidMinTimeStamp will be returned, which is less than models.MinTimeStamp.
|
||||
// This property can be leveraged when enumerating keys to find the maximum timestamp,
|
||||
// as this value will always be lower than any valid timestamp returned.
|
||||
//
|
||||
// NOTE: If MaxTime is equal to the upper bounds of the queried time range, it
|
||||
// means data was found equal to or beyond the requested time range and
|
||||
// does not mean that data exists at that specific timestamp.
|
||||
func (b *TimeRangeMaxTimeIterator) MaxTime() int64 {
|
||||
if b.Err() != nil {
|
||||
return InvalidMinNanoTime
|
||||
}
|
||||
|
||||
b.load()
|
||||
|
||||
return b.maxTime
|
||||
}
|
||||
|
||||
func (b *TimeRangeMaxTimeIterator) clearIsLoaded() { b.isLoaded = false }
|
||||
|
||||
// setMaxTime sets maxTime = min(b.tr.Max, max) and
|
||||
// returns true if maxTime == b.tr.Max, indicating
|
||||
// the iterator has reached the upper bound.
|
||||
func (b *TimeRangeMaxTimeIterator) setMaxTime(max int64) bool {
|
||||
if max > b.tr.Max {
|
||||
b.maxTime = b.tr.Max
|
||||
return true
|
||||
}
|
||||
b.maxTime = max
|
||||
return false
|
||||
}
|
||||
|
||||
func (b *TimeRangeMaxTimeIterator) load() {
|
||||
if b.isLoaded {
|
||||
return
|
||||
}
|
||||
|
||||
b.isLoaded = true
|
||||
b.hasData = false
|
||||
b.maxTime = InvalidMinNanoTime
|
||||
|
||||
e, ts := b.getEntriesAndTombstones()
|
||||
if len(e) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if len(ts) == 0 {
|
||||
// no tombstones, fast path will avoid decoding blocks
|
||||
// if queried time interval intersects with one of the entries
|
||||
if intersectsEntry(e, b.tr) {
|
||||
b.hasData = true
|
||||
b.setMaxTime(e[len(e)-1].MaxTime)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for i := range e {
|
||||
if !b.readBlock(&e[i]) {
|
||||
goto ERROR
|
||||
}
|
||||
|
||||
// remove tombstoned timestamps
|
||||
for i := range ts {
|
||||
b.a.Exclude(ts[i].Min, ts[i].Max)
|
||||
}
|
||||
|
||||
if b.a.Contains(b.tr.Min, b.tr.Max) {
|
||||
b.hasData = true
|
||||
if b.setMaxTime(b.a.MaxTime()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
ERROR:
|
||||
// ERROR ensures cached state is set to invalid values
|
||||
b.hasData = false
|
||||
b.maxTime = InvalidMinNanoTime
|
||||
}
|
|
@ -0,0 +1,313 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
||||
func TestTimeRangeMaxTimeIterator(t *testing.T) {
|
||||
tsm := mustWriteTSM(
|
||||
bucket{
|
||||
org: 0x50,
|
||||
bucket: 0x60,
|
||||
w: writes(
|
||||
mw("cpu",
|
||||
kw("tag0=val0",
|
||||
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
|
||||
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
|
||||
),
|
||||
kw("tag0=val1",
|
||||
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
|
||||
vals(tvi(3000, 1), tvi(3010, 2), tvi(3020, 3)),
|
||||
),
|
||||
),
|
||||
),
|
||||
},
|
||||
|
||||
bucket{
|
||||
org: 0x51,
|
||||
bucket: 0x61,
|
||||
w: writes(
|
||||
mw("mem",
|
||||
kw("tag0=val0",
|
||||
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
|
||||
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
|
||||
),
|
||||
kw("tag0=val1",
|
||||
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
|
||||
vals(tvi(2000, 1)),
|
||||
),
|
||||
kw("tag0=val2",
|
||||
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
|
||||
vals(tvi(3000, 1), tvi(3010, 2), tvi(3020, 3)),
|
||||
),
|
||||
),
|
||||
),
|
||||
},
|
||||
)
|
||||
defer tsm.RemoveAll()
|
||||
|
||||
orgBucket := func(org, bucket uint) []byte {
|
||||
n := tsdb.EncodeName(influxdb.ID(org), influxdb.ID(bucket))
|
||||
return n[:]
|
||||
}
|
||||
|
||||
type args struct {
|
||||
min int64
|
||||
max int64
|
||||
}
|
||||
|
||||
type res struct {
|
||||
k string
|
||||
hasData bool
|
||||
maxTime int64
|
||||
}
|
||||
|
||||
EXP := func(r ...interface{}) (rr []res) {
|
||||
for i := 0; i+2 < len(r); i += 3 {
|
||||
rr = append(rr, res{k: r[i].(string), hasData: r[i+1].(bool), maxTime: int64(r[i+2].(int))})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type test struct {
|
||||
name string
|
||||
args args
|
||||
exp []res
|
||||
expStats cursors.CursorStats
|
||||
}
|
||||
|
||||
type bucketTest struct {
|
||||
org, bucket uint
|
||||
m string
|
||||
tests []test
|
||||
}
|
||||
|
||||
r := tsm.TSMReader()
|
||||
|
||||
runTests := func(name string, tests []bucketTest) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
for _, bt := range tests {
|
||||
key := orgBucket(bt.org, bt.bucket)
|
||||
t.Run(fmt.Sprintf("0x%x-0x%x", bt.org, bt.bucket), func(t *testing.T) {
|
||||
for _, tt := range bt.tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
iter := r.TimeRangeMaxTimeIterator(key, tt.args.min, tt.args.max)
|
||||
count := 0
|
||||
for i, exp := range tt.exp {
|
||||
if !iter.Next() {
|
||||
t.Errorf("Next(%d): expected true", i)
|
||||
}
|
||||
|
||||
expKey := makeKey(influxdb.ID(bt.org), influxdb.ID(bt.bucket), bt.m, exp.k)
|
||||
if got := iter.Key(); !cmp.Equal(got, expKey) {
|
||||
t.Errorf("Key(%d): -got/+exp\n%v", i, cmp.Diff(got, expKey))
|
||||
}
|
||||
|
||||
if got := iter.HasData(); got != exp.hasData {
|
||||
t.Errorf("HasData(%d): -got/+exp\n%v", i, cmp.Diff(got, exp.hasData))
|
||||
}
|
||||
|
||||
if got := iter.MaxTime(); got != exp.maxTime {
|
||||
t.Errorf("MaxTime(%d): -got/+exp\n%v", i, cmp.Diff(got, exp.maxTime))
|
||||
}
|
||||
count++
|
||||
}
|
||||
if count != len(tt.exp) {
|
||||
t.Errorf("count: -got/+exp\n%v", cmp.Diff(count, len(tt.exp)))
|
||||
}
|
||||
|
||||
if got := iter.Stats(); !cmp.Equal(got, tt.expStats) {
|
||||
t.Errorf("Stats: -got/+exp\n%v", cmp.Diff(got, tt.expStats))
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
runTests("before delete", []bucketTest{
|
||||
{
|
||||
org: 0x50,
|
||||
bucket: 0x60,
|
||||
m: "cpu",
|
||||
tests: []test{
|
||||
{
|
||||
name: "cover file",
|
||||
args: args{
|
||||
min: 900,
|
||||
max: 10000,
|
||||
},
|
||||
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 3020),
|
||||
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
|
||||
},
|
||||
{
|
||||
name: "within block",
|
||||
args: args{
|
||||
min: 2001,
|
||||
max: 2011,
|
||||
},
|
||||
exp: EXP("tag0=val0", true, 2011, "tag0=val1", true, 2011),
|
||||
expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48},
|
||||
},
|
||||
{
|
||||
name: "to_2999",
|
||||
args: args{
|
||||
min: 0,
|
||||
max: 2999,
|
||||
},
|
||||
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2020),
|
||||
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
|
||||
},
|
||||
{
|
||||
name: "intersects block",
|
||||
args: args{
|
||||
min: 1500,
|
||||
max: 2500,
|
||||
},
|
||||
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2020),
|
||||
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
org: 0x51,
|
||||
bucket: 0x61,
|
||||
m: "mem",
|
||||
tests: []test{
|
||||
{
|
||||
name: "cover file",
|
||||
args: args{
|
||||
min: 900,
|
||||
max: 10000,
|
||||
},
|
||||
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2000, "tag0=val2", true, 3020),
|
||||
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
|
||||
},
|
||||
{
|
||||
name: "within block",
|
||||
args: args{
|
||||
min: 2001,
|
||||
max: 2011,
|
||||
},
|
||||
exp: EXP("tag0=val0", true, 2011, "tag0=val1", false, int(InvalidMinNanoTime), "tag0=val2", true, 2011),
|
||||
expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48},
|
||||
},
|
||||
{
|
||||
name: "1000_2999",
|
||||
args: args{
|
||||
min: 1000,
|
||||
max: 2500,
|
||||
},
|
||||
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2000, "tag0=val2", true, 2020),
|
||||
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
tsm.MustDeletePrefix(orgBucket(0x50, 0x60), 0, 2999)
|
||||
tsm.MustDelete(makeKey(0x51, 0x61, "mem", "tag0=val0"))
|
||||
tsm.MustDeleteRange(2000, 2999,
|
||||
makeKey(0x51, 0x61, "mem", "tag0=val1"),
|
||||
makeKey(0x51, 0x61, "mem", "tag0=val2"),
|
||||
)
|
||||
|
||||
runTests("after delete", []bucketTest{
|
||||
{
|
||||
org: 0x50,
|
||||
bucket: 0x60,
|
||||
m: "cpu",
|
||||
tests: []test{
|
||||
{
|
||||
name: "cover file",
|
||||
args: args{
|
||||
min: 900,
|
||||
max: 10000,
|
||||
},
|
||||
exp: EXP("tag0=val1", true, 3020),
|
||||
expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48},
|
||||
},
|
||||
{
|
||||
name: "within block",
|
||||
args: args{
|
||||
min: 2001,
|
||||
max: 2011,
|
||||
},
|
||||
exp: nil,
|
||||
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
|
||||
},
|
||||
{
|
||||
name: "to_2999",
|
||||
args: args{
|
||||
min: 0,
|
||||
max: 2999,
|
||||
},
|
||||
exp: EXP("tag0=val1", false, int(InvalidMinNanoTime)),
|
||||
expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24},
|
||||
},
|
||||
{
|
||||
name: "intersects block",
|
||||
args: args{
|
||||
min: 1500,
|
||||
max: 2500,
|
||||
},
|
||||
exp: EXP("tag0=val1", false, int(InvalidMinNanoTime)),
|
||||
expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24},
|
||||
},
|
||||
{
|
||||
name: "beyond all tombstones",
|
||||
args: args{
|
||||
min: 3000,
|
||||
max: 4000,
|
||||
},
|
||||
exp: EXP("tag0=val1", true, 3020),
|
||||
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
org: 0x51,
|
||||
bucket: 0x61,
|
||||
m: "mem",
|
||||
tests: []test{
|
||||
{
|
||||
name: "cover file",
|
||||
args: args{
|
||||
min: 900,
|
||||
max: 10000,
|
||||
},
|
||||
exp: EXP("tag0=val1", true, 1020, "tag0=val2", true, 3020),
|
||||
expStats: cursors.CursorStats{ScannedValues: 10, ScannedBytes: 80},
|
||||
},
|
||||
{
|
||||
name: "within block",
|
||||
args: args{
|
||||
min: 2001,
|
||||
max: 2011,
|
||||
},
|
||||
exp: EXP("tag0=val1", false, int(InvalidMinNanoTime), "tag0=val2", false, int(InvalidMinNanoTime)),
|
||||
expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24},
|
||||
},
|
||||
{
|
||||
name: "1000_2500",
|
||||
args: args{
|
||||
min: 1000,
|
||||
max: 2500,
|
||||
},
|
||||
exp: EXP("tag0=val1", true, 1020, "tag0=val2", false, int(InvalidMinNanoTime)),
|
||||
expStats: cursors.CursorStats{ScannedValues: 7, ScannedBytes: 56},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue