refactor(tsm1): Allow race-free access to cache
This commit adds a new API to `Cache` to address data races with the `TagKeys` and `TagValues` APIs. `Cache` and `entry` provide `AppendTimestamps`, which appends the current timestamps to the provided slice to reduce allocations. As noted in the documentation, it is the responsibility of the caller to sort and deduplicate the values, if required. The `cursors.TimestampArray` type was extended to permit use of the `sort.Sort` API.pull/17589/head
parent
31df76e1e9
commit
7de6383adf
|
@ -1,5 +1,7 @@
|
|||
package cursors
|
||||
|
||||
import "sort"
|
||||
|
||||
func (a *FloatArray) Size() int {
|
||||
// size of timestamps + values
|
||||
return len(a.Timestamps)*8 + len(a.Values)*8
|
||||
|
@ -27,3 +29,13 @@ func (a *BooleanArray) Size() int {
|
|||
// size of timestamps + values
|
||||
return len(a.Timestamps)*8 + len(a.Values)
|
||||
}
|
||||
|
||||
var _ sort.Interface = (*TimestampArray)(nil)
|
||||
|
||||
func (a *TimestampArray) Less(i, j int) bool {
|
||||
return a.Timestamps[i] < a.Timestamps[j]
|
||||
}
|
||||
|
||||
func (a *TimestampArray) Swap(i, j int) {
|
||||
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
|
||||
}
|
||||
|
|
|
@ -328,6 +328,47 @@ func (c *Cache) Type(key []byte) (models.FieldType, error) {
|
|||
return models.Empty, errUnknownFieldType
|
||||
}
|
||||
|
||||
// BlockType returns the TSM block type for the specified
|
||||
// key or BlockUndefined if the type cannot be determined
|
||||
// either because the key does not exist or there are no
|
||||
// values for the key.
|
||||
func (c *Cache) BlockType(key []byte) byte {
|
||||
c.mu.RLock()
|
||||
e := c.store.entry(key)
|
||||
if e == nil && c.snapshot != nil {
|
||||
e = c.snapshot.store.entry(key)
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
if e != nil {
|
||||
return e.BlockType()
|
||||
}
|
||||
|
||||
return BlockUndefined
|
||||
}
|
||||
|
||||
// AppendTimestamps appends ts with the timestamps for the specified key.
|
||||
// It is the responsibility of the caller to sort and or deduplicate the slice.
|
||||
func (c *Cache) AppendTimestamps(key []byte, ts []int64) []int64 {
|
||||
var snapshotEntries *entry
|
||||
|
||||
c.mu.RLock()
|
||||
e := c.store.entry(key)
|
||||
if c.snapshot != nil {
|
||||
snapshotEntries = c.snapshot.store.entry(key)
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
if e != nil {
|
||||
ts = e.AppendTimestamps(ts)
|
||||
}
|
||||
if snapshotEntries != nil {
|
||||
ts = snapshotEntries.AppendTimestamps(ts)
|
||||
}
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
// Values returns a copy of all values, deduped and sorted, for the given key.
|
||||
func (c *Cache) Values(key []byte) Values {
|
||||
var snapshotEntries *entry
|
||||
|
@ -729,14 +770,14 @@ func valueType(v Value) byte {
|
|||
|
||||
var (
|
||||
valueTypeBlockType = [8]byte{
|
||||
valueTypeUndefined: blockUndefined,
|
||||
valueTypeUndefined: BlockUndefined,
|
||||
valueTypeFloat64: BlockFloat64,
|
||||
valueTypeInteger: BlockInteger,
|
||||
valueTypeString: BlockString,
|
||||
valueTypeBoolean: BlockBoolean,
|
||||
valueTypeUnsigned: BlockUnsigned,
|
||||
6: blockUndefined,
|
||||
7: blockUndefined,
|
||||
6: BlockUndefined,
|
||||
7: BlockUndefined,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -117,6 +117,19 @@ func (e *entry) size() int {
|
|||
return sz
|
||||
}
|
||||
|
||||
// AppendTimestamps appends ts with the timestamps from the entry.
|
||||
func (e *entry) AppendTimestamps(ts []int64) []int64 {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
n := e.values.Len()
|
||||
if n > 0 {
|
||||
for i := range e.values {
|
||||
ts = append(ts, e.values[i].UnixNano())
|
||||
}
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
// InfluxQLType returns for the entry the data type of its values.
|
||||
func (e *entry) InfluxQLType() (influxql.DataType, error) {
|
||||
e.mu.RLock()
|
||||
|
|
|
@ -25,8 +25,8 @@ const (
|
|||
// BlockUnsigned designates a block encodes uint64 values.
|
||||
BlockUnsigned = byte(4)
|
||||
|
||||
// blockUndefined represents an undefined block type value.
|
||||
blockUndefined = BlockUnsigned + 1
|
||||
// BlockUndefined represents an undefined block type value.
|
||||
BlockUndefined = BlockUnsigned + 1
|
||||
|
||||
// encodedBlockHeaderSize is the size of the header for an encoded block. There is one
|
||||
// byte encoding the type of the block.
|
||||
|
@ -162,6 +162,28 @@ func (a Values) InfluxQLType() (influxql.DataType, error) {
|
|||
return influxql.Unknown, fmt.Errorf("unsupported value type %T", a[0])
|
||||
}
|
||||
|
||||
// BlockType returns the TSM block type the values map to.
|
||||
func (a Values) BlockType() byte {
|
||||
if len(a) == 0 {
|
||||
return BlockUndefined
|
||||
}
|
||||
|
||||
switch a[0].(type) {
|
||||
case FloatValue:
|
||||
return BlockFloat64
|
||||
case IntegerValue:
|
||||
return BlockInteger
|
||||
case UnsignedValue:
|
||||
return BlockUnsigned
|
||||
case BooleanValue:
|
||||
return BlockBoolean
|
||||
case StringValue:
|
||||
return BlockString
|
||||
}
|
||||
|
||||
return BlockUndefined
|
||||
}
|
||||
|
||||
// BlockType returns the type of value encoded in a block or an error
|
||||
// if the block type is unknown.
|
||||
func BlockType(block []byte) (byte, error) {
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/pkg/metrics"
|
||||
"github.com/influxdata/influxdb/v2/query"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/seriesfile"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/tsi1"
|
||||
"github.com/influxdata/influxql"
|
||||
|
@ -1464,7 +1465,7 @@ var (
|
|||
BlockBoolean: influxql.Boolean,
|
||||
BlockString: influxql.String,
|
||||
BlockUnsigned: influxql.Unsigned,
|
||||
blockUndefined: influxql.Unknown,
|
||||
BlockUndefined: influxql.Unknown,
|
||||
6: influxql.Unknown,
|
||||
7: influxql.Unknown,
|
||||
}
|
||||
|
@ -1472,6 +1473,21 @@ var (
|
|||
|
||||
func BlockTypeToInfluxQLDataType(typ byte) influxql.DataType { return blockToFieldType[typ&7] }
|
||||
|
||||
var (
|
||||
blockTypeFieldType = [8]cursors.FieldType{
|
||||
BlockFloat64: cursors.Float,
|
||||
BlockInteger: cursors.Integer,
|
||||
BlockBoolean: cursors.Boolean,
|
||||
BlockString: cursors.String,
|
||||
BlockUnsigned: cursors.Unsigned,
|
||||
BlockUndefined: cursors.Undefined,
|
||||
6: cursors.Undefined,
|
||||
7: cursors.Undefined,
|
||||
}
|
||||
)
|
||||
|
||||
func BlockTypeToFieldType(typ byte) cursors.FieldType { return blockTypeFieldType[typ&7] }
|
||||
|
||||
// SeriesAndFieldFromCompositeKey returns the series key and the field key extracted from the composite key.
|
||||
func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, []byte) {
|
||||
sep := bytes.Index(key, KeyFieldSeparatorBytes)
|
||||
|
|
|
@ -102,6 +102,8 @@ func (e *Engine) tagValuesNoPredicate(ctx context.Context, orgID, bucketID influ
|
|||
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
var ts cursors.TimestampArray
|
||||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
tsmKeyprefixStr := string(tsmKeyPrefix)
|
||||
|
@ -122,12 +124,18 @@ func (e *Engine) tagValuesNoPredicate(ctx context.Context, orgID, bucketID influ
|
|||
return nil
|
||||
}
|
||||
|
||||
stats.ScannedValues += entry.values.Len()
|
||||
stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
if entry.values.Contains(start, end) {
|
||||
tsmValues[string(curVal)] = struct{}{}
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
tsmValues[string(curVal)] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -207,6 +215,7 @@ func (e *Engine) tagValuesPredicate(ctx context.Context, orgID, bucketID influxd
|
|||
tags models.Tags
|
||||
keybuf []byte
|
||||
sfkey []byte
|
||||
ts cursors.TimestampArray
|
||||
)
|
||||
|
||||
for i := range keys {
|
||||
|
@ -230,15 +239,23 @@ func (e *Engine) tagValuesPredicate(ctx context.Context, orgID, bucketID influxd
|
|||
continue
|
||||
}
|
||||
|
||||
keybuf = models.AppendMakeKey(keybuf[:0], orgBucketEsc, tags)
|
||||
// orgBucketEsc is already escaped, so no need to use models.AppendMakeKey, which
|
||||
// unescapes and escapes the value again. The degenerate case is if the orgBucketEsc
|
||||
// has escaped values, causing two allocations per key
|
||||
keybuf = append(keybuf[:0], orgBucketEsc...)
|
||||
keybuf = tags.AppendHashKey(keybuf)
|
||||
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, tags.Get(models.FieldKeyTagKeyBytes))
|
||||
|
||||
values := e.Cache.Values(sfkey)
|
||||
stats.ScannedValues += values.Len()
|
||||
stats.ScannedBytes += values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = e.Cache.AppendTimestamps(sfkey, ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
if values.Contains(start, end) {
|
||||
tsmValues[string(curVal)] = struct{}{}
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
tsmValues[string(curVal)] = struct{}{}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -378,6 +395,8 @@ func (e *Engine) tagKeysNoPredicate(ctx context.Context, orgID, bucketID influxd
|
|||
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
var ts cursors.TimestampArray
|
||||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
tsmKeyprefixStr := string(tsmKeyPrefix)
|
||||
|
@ -393,12 +412,18 @@ func (e *Engine) tagKeysNoPredicate(ctx context.Context, orgID, bucketID influxd
|
|||
return nil
|
||||
}
|
||||
|
||||
stats.ScannedValues += entry.values.Len()
|
||||
stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
if entry.values.Contains(start, end) {
|
||||
keyset.UnionKeys(tags)
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
keyset.UnionKeys(tags)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -472,6 +497,7 @@ func (e *Engine) tagKeysPredicate(ctx context.Context, orgID, bucketID influxdb.
|
|||
tags models.Tags
|
||||
keybuf []byte
|
||||
sfkey []byte
|
||||
ts cursors.TimestampArray
|
||||
)
|
||||
|
||||
for i := range keys {
|
||||
|
@ -490,16 +516,24 @@ func (e *Engine) tagKeysPredicate(ctx context.Context, orgID, bucketID influxdb.
|
|||
continue
|
||||
}
|
||||
|
||||
keybuf = models.AppendMakeKey(keybuf[:0], orgBucketEsc, tags)
|
||||
// orgBucketEsc is already escaped, so no need to use models.AppendMakeKey, which
|
||||
// unescapes and escapes the value again. The degenerate case is if the orgBucketEsc
|
||||
// has escaped values, causing two allocations per key
|
||||
keybuf = append(keybuf[:0], orgBucketEsc...)
|
||||
keybuf = tags.AppendHashKey(keybuf)
|
||||
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, tags.Get(models.FieldKeyTagKeyBytes))
|
||||
|
||||
values := e.Cache.Values(sfkey)
|
||||
stats.ScannedValues += values.Len()
|
||||
stats.ScannedBytes += values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = e.Cache.AppendTimestamps(sfkey, ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
if values.Contains(start, end) {
|
||||
keyset.UnionKeys(tags)
|
||||
continue
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
keyset.UnionKeys(tags)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for _, iter := range iters {
|
||||
|
|
Loading…
Reference in New Issue