influxdb/v1/services/storage/store.go

820 lines
22 KiB
Go

package storage
import (
"context"
"errors"
"fmt"
"sort"
"time"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/storage/reads"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/influxdata/influxql"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
var (
ErrMissingReadSource = errors.New("missing ReadSource")
)
type TSDBStore interface {
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
ShardGroup(ids []uint64) tsdb.ShardGroup
Shards(ids []uint64) []*tsdb.Shard
TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
SeriesCardinality(ctx context.Context, database string) (int64, error)
SeriesCardinalityFromShards(ctx context.Context, shards []*tsdb.Shard) (*tsdb.SeriesIDSet, error)
SeriesFile(database string) *tsdb.SeriesFile
}
type MetaClient interface {
Database(name string) *meta.DatabaseInfo
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
}
type Store struct {
TSDBStore TSDBStore
MetaClient MetaClient
Logger *zap.Logger
}
func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (reads.ResultSet, error) {
if req.ReadSource == nil {
return nil, ErrMissingReadSource
}
source, err := GetReadSource(req.ReadSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.GetOrgID(), source.GetBucketID(), req.Range.GetStart(), req.Range.GetEnd())
if err != nil {
return nil, err
}
// Due to some optimizations around how flux's `last()` function is implemented with the
// storage engine, we need to detect if the read request requires a descending
// cursor or not.
descending := reads.IsLastDescendingAggregateOptimization(req)
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil
return nil, nil
}
var cur reads.SeriesCursor
if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil {
return nil, err
} else if ic == nil { // TODO(jeff): this was a typed nil
return nil, nil
} else {
cur = ic
}
return reads.NewWindowAggregateResultSet(ctx, req, cur)
}
func NewStore(store TSDBStore, metaClient MetaClient) *Store {
return &Store{
TSDBStore: store,
MetaClient: metaClient,
Logger: zap.NewNop(),
}
}
// WithLogger sets the logger for the service.
func (s *Store) WithLogger(log *zap.Logger) {
s.Logger = log.With(zap.String("service", "store"))
}
func (s *Store) findShardIDs(database, rp string, desc bool, start, end int64) ([]uint64, error) {
groups, err := s.MetaClient.ShardGroupsByTimeRange(database, rp, time.Unix(0, start), time.Unix(0, end))
if err != nil {
return nil, err
}
if len(groups) == 0 {
return nil, nil
}
if desc {
sort.Sort(sort.Reverse(meta.ShardGroupInfos(groups)))
} else {
sort.Sort(meta.ShardGroupInfos(groups))
}
shardIDs := make([]uint64, 0, len(groups[0].Shards)*len(groups))
for _, g := range groups {
for _, si := range g.Shards {
shardIDs = append(shardIDs, si.ID)
}
}
return shardIDs, nil
}
func (s *Store) validateArgs(orgID, bucketID uint64, start, end int64) (string, string, int64, int64, error) {
database := platform.ID(bucketID).String()
rp := meta.DefaultRetentionPolicyName
di := s.MetaClient.Database(database)
if di == nil {
return "", "", 0, 0, errors.New("no database")
}
rpi := di.RetentionPolicy(rp)
if rpi == nil {
return "", "", 0, 0, errors.New("invalid retention policy")
}
if start <= 0 {
start = models.MinNanoTime
}
if end <= 0 {
end = models.MaxNanoTime
}
return database, rp, start, end, nil
}
func (s *Store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) {
if req.ReadSource == nil {
return nil, ErrMissingReadSource
}
source, err := GetReadSource(req.ReadSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.OrgID, source.BucketID, req.Range.GetStart(), req.Range.GetEnd())
if err != nil {
return nil, err
}
shardIDs, err := s.findShardIDs(database, rp, false, start, end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil
return nil, nil
}
var cur reads.SeriesCursor
if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil {
return nil, err
} else if ic == nil { // TODO(jeff): this was a typed nil
return nil, nil
} else {
cur = ic
}
req.Range = &datatypes.TimestampRange{
Start: start,
End: end,
}
return reads.NewFilteredResultSet(ctx, req.Range.GetStart(), req.Range.GetEnd(), cur), nil
}
func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (reads.GroupResultSet, error) {
if req.ReadSource == nil {
return nil, ErrMissingReadSource
}
source, err := GetReadSource(req.ReadSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.OrgID, source.BucketID, req.Range.GetStart(), req.Range.GetEnd())
if err != nil {
return nil, err
}
// Due to some optimizations around how flux's `last()` function is implemented with the
// storage engine, we need to detect if the read request requires a descending
// cursor or not.
descending := reads.IsLastDescendingGroupOptimization(req)
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 {
return nil, nil
}
shards := s.TSDBStore.Shards(shardIDs)
req.Range = &datatypes.TimestampRange{
Start: start,
End: end,
}
newCursor := func() (reads.SeriesCursor, error) {
cur, err := newIndexSeriesCursor(ctx, req.Predicate, shards)
if cur == nil || err != nil {
return nil, err
}
return cur, nil
}
rs := reads.NewGroupResultSet(ctx, req, newCursor)
if rs == nil {
return nil, nil
}
return rs, nil
}
type metaqueryAttributes struct {
orgID platform.ID
db, rp string
start, end int64
pred influxql.Expr
}
func (s *Store) tagKeysWithFieldPredicate(ctx context.Context, mqAttrs *metaqueryAttributes, shardIDs []uint64) (cursors.StringIterator, error) {
var cur reads.SeriesCursor
if ic, err := newIndexSeriesCursorInfluxQLPred(ctx, mqAttrs.pred, s.TSDBStore.Shards(shardIDs)); err != nil {
return nil, err
} else if ic == nil {
return cursors.EmptyStringIterator, nil
} else {
cur = ic
}
m := make(map[string]struct{})
rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur)
for rs.Next() {
func() {
c := rs.Cursor()
if c == nil {
// no data for series key + field combination
return
}
defer c.Close()
if cursorHasData(c) {
tags := rs.Tags()
for i := range tags {
m[string(tags[i].Key)] = struct{}{}
}
}
}()
}
arr := make([]string, 0, len(m))
for tag := range m {
arr = append(arr, tag)
}
sort.Strings(arr)
return cursors.NewStringSliceIterator(arr), nil
}
func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) {
if req.TagsSource == nil {
return nil, ErrMissingReadSource
}
source, err := GetReadSource(req.TagsSource)
if err != nil {
return nil, err
}
db, rp, start, end, err := s.validateArgs(source.OrgID, source.BucketID, req.Range.GetStart(), req.Range.GetEnd())
if err != nil {
return nil, err
}
shardIDs, err := s.findShardIDs(db, rp, false, start, end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 {
return cursors.EmptyStringIterator, nil
}
var expr influxql.Expr
if root := req.Predicate.GetRoot(); root != nil {
var err error
expr, err = reads.NodeToExpr(root, measurementRemap)
if err != nil {
return nil, err
}
if found := reads.HasFieldValueKey(expr); found {
return nil, errors.New("field values unsupported")
}
if found := reads.ExprHasKey(expr, fieldKey); found {
mqAttrs := &metaqueryAttributes{
orgID: platform.ID(source.GetOrgID()),
db: db,
rp: rp,
start: start,
end: end,
pred: expr,
}
return s.tagKeysWithFieldPredicate(ctx, mqAttrs, shardIDs)
}
expr = influxql.Reduce(influxql.CloneExpr(expr), nil)
if reads.IsTrueBooleanLiteral(expr) {
expr = nil
}
}
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
keys, err := s.TSDBStore.TagKeys(ctx, auth, shardIDs, expr)
if err != nil {
return cursors.EmptyStringIterator, err
}
m := map[string]bool{
measurementKey: true,
fieldKey: true,
}
for _, ks := range keys {
for _, k := range ks.Keys {
m[k] = true
}
}
names := make([]string, 0, len(m))
for name := range m {
names = append(names, name)
}
sort.Strings(names)
return cursors.NewStringSliceIterator(names), nil
}
func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) {
if req.TagsSource == nil {
return nil, ErrMissingReadSource
}
source, err := GetReadSource(req.TagsSource)
if err != nil {
return nil, err
}
db, rp, start, end, err := s.validateArgs(source.OrgID, source.BucketID, req.Range.GetStart(), req.Range.GetEnd())
if err != nil {
return nil, err
}
var influxqlPred influxql.Expr
if root := req.Predicate.GetRoot(); root != nil {
var err error
influxqlPred, err = reads.NodeToExpr(root, measurementRemap)
if err != nil {
return nil, err
}
if found := reads.HasFieldValueKey(influxqlPred); found {
return nil, errors.New("field values unsupported")
}
influxqlPred = influxql.Reduce(influxql.CloneExpr(influxqlPred), nil)
if reads.IsTrueBooleanLiteral(influxqlPred) {
influxqlPred = nil
}
}
mqAttrs := &metaqueryAttributes{
orgID: platform.ID(source.GetOrgID()),
db: db,
rp: rp,
start: start,
end: end,
pred: influxqlPred,
}
tagKey, ok := measurementRemap[req.TagKey]
if !ok {
tagKey = req.TagKey
}
// Getting values of _measurement or _field are handled specially
switch tagKey {
case "_name":
return s.MeasurementNames(ctx, mqAttrs)
case "_field":
return s.measurementFields(ctx, mqAttrs)
}
return s.tagValues(ctx, mqAttrs, tagKey)
}
func (s *Store) tagValues(ctx context.Context, mqAttrs *metaqueryAttributes, tagKey string) (cursors.StringIterator, error) {
// If there are any references to _field, we need to use the slow path
// since we cannot rely on the index alone.
if mqAttrs.pred != nil {
if hasFieldKey := reads.ExprHasKey(mqAttrs.pred, fieldKey); hasFieldKey {
return s.tagValuesSlow(ctx, mqAttrs, tagKey)
}
}
shardIDs, err := s.findShardIDs(mqAttrs.db, mqAttrs.rp, false, mqAttrs.start, mqAttrs.end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 {
return cursors.EmptyStringIterator, nil
}
tagKeyExpr := &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{
Val: "_tagKey",
},
RHS: &influxql.StringLiteral{
Val: tagKey,
},
}
if mqAttrs.pred != nil {
mqAttrs.pred = &influxql.BinaryExpr{
Op: influxql.AND,
LHS: tagKeyExpr,
RHS: &influxql.ParenExpr{
Expr: mqAttrs.pred,
},
}
} else {
mqAttrs.pred = tagKeyExpr
}
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
values, err := s.TSDBStore.TagValues(ctx, auth, shardIDs, mqAttrs.pred)
if err != nil {
return nil, err
}
m := make(map[string]struct{})
for _, kvs := range values {
for _, kv := range kvs.Values {
m[kv.Value] = struct{}{}
}
}
names := make([]string, 0, len(m))
for name := range m {
names = append(names, name)
}
sort.Strings(names)
return cursors.NewStringSliceIterator(names), nil
}
func (s *Store) MeasurementNames(ctx context.Context, mqAttrs *metaqueryAttributes) (cursors.StringIterator, error) {
if mqAttrs.pred != nil {
if hasFieldKey := reads.ExprHasKey(mqAttrs.pred, fieldKey); hasFieldKey {
// If there is a predicate on _field, we cannot use the index
// to filter out unwanted measurement names. Use a slower
// block scan instead.
return s.tagValuesSlow(ctx, mqAttrs, measurementKey)
}
}
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
values, err := s.TSDBStore.MeasurementNames(ctx, auth, mqAttrs.db, mqAttrs.pred)
if err != nil {
return nil, err
}
m := make(map[string]struct{})
for _, name := range values {
m[string(name)] = struct{}{}
}
names := make([]string, 0, len(m))
for name := range m {
names = append(names, name)
}
sort.Strings(names)
return cursors.NewStringSliceIterator(names), nil
}
func (s *Store) GetSource(orgID, bucketID uint64) proto.Message {
return &ReadSource{
BucketID: bucketID,
OrgID: orgID,
}
}
func (s *Store) measurementFields(ctx context.Context, mqAttrs *metaqueryAttributes) (cursors.StringIterator, error) {
if mqAttrs.pred != nil {
if hasFieldKey := reads.ExprHasKey(mqAttrs.pred, fieldKey); hasFieldKey {
return s.tagValuesSlow(ctx, mqAttrs, fieldKey)
}
// If there predicates on anything besides _measurement, we can't
// use the index and need to use the slow path.
if hasTagKey(mqAttrs.pred) {
return s.tagValuesSlow(ctx, mqAttrs, fieldKey)
}
}
shardIDs, err := s.findShardIDs(mqAttrs.db, mqAttrs.rp, false, mqAttrs.start, mqAttrs.end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 {
return cursors.EmptyStringIterator, nil
}
sg := s.TSDBStore.ShardGroup(shardIDs)
ms := &influxql.Measurement{
Database: mqAttrs.db,
RetentionPolicy: mqAttrs.rp,
SystemIterator: "_fieldKeys",
}
opts := query.IteratorOptions{
OrgID: mqAttrs.orgID,
Condition: mqAttrs.pred,
Authorizer: query.OpenAuthorizer,
}
iter, err := sg.CreateIterator(ctx, ms, opts)
if err != nil {
return nil, err
}
defer func() {
if iter != nil {
_ = iter.Close()
}
}()
var fieldNames []string
fitr, ok := iter.(query.FloatIterator)
if !ok {
return cursors.NewStringSliceIterator(fieldNames), nil
}
for p, _ := fitr.Next(); p != nil; p, _ = fitr.Next() {
if len(p.Aux) >= 1 {
fieldNames = append(fieldNames, p.Aux[0].(string))
}
}
sort.Strings(fieldNames)
fieldNames = slices.MergeSortedStrings(fieldNames)
return cursors.NewStringSliceIterator(fieldNames), nil
}
func cursorHasData(c cursors.Cursor) bool {
var l int
switch typedCur := c.(type) {
case cursors.IntegerArrayCursor:
ia := typedCur.Next()
l = ia.Len()
case cursors.FloatArrayCursor:
ia := typedCur.Next()
l = ia.Len()
case cursors.UnsignedArrayCursor:
ia := typedCur.Next()
l = ia.Len()
case cursors.BooleanArrayCursor:
ia := typedCur.Next()
l = ia.Len()
case cursors.StringArrayCursor:
ia := typedCur.Next()
l = ia.Len()
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
}
return l != 0
}
// tagValuesSlow will determine the tag values for the given tagKey.
// It's generally faster to use tagValues, measurementFields or
// MeasurementNames, but those methods will only use the index and metadata
// stored in the shard. Because fields are not themselves indexed, we have no way
// of correlating fields to tag values, so we sometimes need to consult tsm to
// provide an accurate answer.
func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes, tagKey string) (cursors.StringIterator, error) {
shardIDs, err := s.findShardIDs(mqAttrs.db, mqAttrs.rp, false, mqAttrs.start, mqAttrs.end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 {
return cursors.EmptyStringIterator, nil
}
var cur reads.SeriesCursor
if ic, err := newIndexSeriesCursorInfluxQLPred(ctx, mqAttrs.pred, s.TSDBStore.Shards(shardIDs)); err != nil {
return nil, err
} else if ic == nil {
return cursors.EmptyStringIterator, nil
} else {
cur = ic
}
m := make(map[string]struct{})
rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur)
for rs.Next() {
func() {
c := rs.Cursor()
if c == nil {
// no data for series key + field combination?
// It seems that even when there is no data for this series key + field
// combo that the cursor may be not nil. We need to
// request invoke an array cursor to be sure.
// This is the reason for the call to cursorHasData below.
return
}
defer c.Close()
if cursorHasData(c) {
f := rs.Tags().Get([]byte(tagKey))
m[string(f)] = struct{}{}
}
}()
}
names := make([]string, 0, len(m))
for name := range m {
names = append(names, name)
}
sort.Strings(names)
return cursors.NewStringSliceIterator(names), nil
}
func (s *Store) ReadSeriesCardinality(ctx context.Context, req *datatypes.ReadSeriesCardinalityRequest) (cursors.Int64Iterator, error) {
if req.ReadSource == nil {
return nil, ErrMissingReadSource
}
source, err := GetReadSource(req.ReadSource)
if err != nil {
return nil, err
}
db, rp, start, end, err := s.validateArgs(source.OrgID, source.BucketID, req.Range.GetStart(), req.Range.GetEnd())
if err != nil {
return nil, err
}
sgs, err := s.MetaClient.ShardGroupsByTimeRange(db, rp, time.Unix(0, start), time.Unix(0, end))
if err != nil {
return nil, err
}
if len(sgs) == 0 {
return cursors.NewInt64SliceIterator([]int64{0}), nil
}
var expr influxql.Expr
if root := req.Predicate.GetRoot(); root != nil {
expr, err = reads.NodeToExpr(root, measurementRemap)
if err != nil {
return nil, err
}
if found := reads.HasFieldValueKey(expr); found {
return nil, errors.New("filtering on field values is not supported in cardinality predicates")
}
expr = influxql.Reduce(influxql.CloneExpr(expr), nil)
// Single boolean literals are not handled well by the cursor that will be
// generated to solve the query, so specifically check and handle those
// cases here. A true boolean is equivalent to not having any predicate, and
// a false boolean will return no results.
if reads.IsTrueBooleanLiteral(expr) {
expr = nil
}
if reads.IsFalseBooleanLiteral(expr) {
return cursors.NewInt64SliceIterator([]int64{0}), nil
}
}
shardsEntirelyInTimeRange, shardsPartiallyInTimeRange := groupShardsByTime(sgs, start, end)
sfile := s.TSDBStore.SeriesFile(db)
// Get the cardinality for the set of shards that are completely within the
// provided time range. This can be done much faster than verifying that the
// series have data in the time range, so it is done separately.
c1, err := s.seriesCardinalityWithPredicate(ctx, s.TSDBStore.Shards(shardsEntirelyInTimeRange), expr, sfile)
if err != nil {
return nil, err
}
// Others use a slower way
c2, err := s.seriesCardinalityWithPredicateAndTime(ctx, s.TSDBStore.Shards(shardsPartiallyInTimeRange), expr, sfile, start, end)
if err != nil {
return nil, err
}
ss := tsdb.NewSeriesIDSet()
ss.Merge(c1, c2)
return cursors.NewInt64SliceIterator([]int64{int64(ss.Cardinality())}), nil
}
func (s *Store) seriesCardinalityWithPredicate(ctx context.Context, shards []*tsdb.Shard, expr influxql.Expr, sfile *tsdb.SeriesFile) (*tsdb.SeriesIDSet, error) {
if expr == nil {
return s.TSDBStore.SeriesCardinalityFromShards(ctx, shards)
}
ss := tsdb.NewSeriesIDSet()
if len(shards) == 0 {
return ss, nil
}
cur, err := newIndexSeriesCursorInfluxQLPred(ctx, expr, shards)
if err != nil {
return nil, err
}
buf := make([]byte, 1024)
for {
r := cur.Next()
if r == nil {
break
}
skey := sfile.SeriesID(r.Name, r.SeriesTags, buf)
ss.Add(skey)
}
return ss, nil
}
func (s *Store) seriesCardinalityWithPredicateAndTime(ctx context.Context, shards []*tsdb.Shard, expr influxql.Expr, sfile *tsdb.SeriesFile, start, end int64) (*tsdb.SeriesIDSet, error) {
ss := tsdb.NewSeriesIDSet()
if len(shards) == 0 {
return ss, nil
}
cur, err := newIndexSeriesCursorInfluxQLPred(ctx, expr, shards)
if err != nil {
return nil, err
}
buf := make([]byte, 1024)
rs := reads.NewFilteredResultSet(ctx, start, end, cur)
for rs.Next() {
func() {
c := rs.Cursor()
if c == nil {
// no data for series key + field combination
return
}
defer c.Close()
if cursorHasData(c) {
r := cur.row
skey := sfile.SeriesID(r.Name, r.SeriesTags, buf)
ss.Add(skey)
}
}()
}
return ss, nil
}
func (s *Store) SupportReadSeriesCardinality(ctx context.Context) bool {
return true
}
// Returns two slices of shard IDs - the first is shards that are entirely in
// the provided time range; the second is shards that are not entirely within
// the provided time range.
func groupShardsByTime(sgs []meta.ShardGroupInfo, start, end int64) ([]uint64, []uint64) {
entirelyInRange := []uint64{}
partiallyInRange := []uint64{}
for _, sg := range sgs {
shards := make([]uint64, 0, len(sg.Shards))
for _, si := range sg.Shards {
shards = append(shards, si.ID)
}
if timesWithinRangeInclusive(sg.StartTime, sg.EndTime, time.Unix(0, start), time.Unix(0, end)) {
entirelyInRange = append(entirelyInRange, shards...)
continue
}
partiallyInRange = append(partiallyInRange, shards...)
}
return entirelyInRange, partiallyInRange
}
// timesWithinRangeInclusive checks to see if the provided start and end time
// are within the start end and times of the range, with the check being
// inclusive.
func timesWithinRangeInclusive(start, end, rangeStart, rangeEnd time.Time) bool {
return (start.After(rangeStart) || start.Equal(rangeStart)) &&
(end.Before(rangeEnd) || end.Equal(rangeEnd))
}