507 lines
13 KiB
Go
507 lines
13 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
|
"github.com/influxdata/influxdb/query"
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
"github.com/influxdata/influxdb/storage/reads"
|
|
"github.com/influxdata/influxdb/storage/reads/datatypes"
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
"github.com/influxdata/influxdb/tsdb/cursors"
|
|
"github.com/influxdata/influxql"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
)
|
|
|
|
var (
|
|
ErrMissingReadSource = errors.New("missing ReadSource")
|
|
)
|
|
|
|
// GetReadSource will attempt to unmarshal a ReadSource from the ReadRequest or
|
|
// return an error if no valid resource is present.
|
|
func GetReadSource(any *anypb.Any) (*ReadSource, error) {
|
|
if any == nil {
|
|
return nil, ErrMissingReadSource
|
|
}
|
|
var source ReadSource
|
|
if err := any.UnmarshalTo(&source); err != nil {
|
|
return nil, err
|
|
}
|
|
return &source, nil
|
|
}
|
|
|
|
type MetaClient interface {
|
|
Database(name string) *meta.DatabaseInfo
|
|
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
|
|
}
|
|
|
|
type Store struct {
|
|
TSDBStore *tsdb.Store
|
|
MetaClient MetaClient
|
|
Logger *zap.Logger
|
|
}
|
|
|
|
func NewStore(store *tsdb.Store, metaClient MetaClient) *Store {
|
|
return &Store{
|
|
TSDBStore: store,
|
|
MetaClient: metaClient,
|
|
Logger: zap.NewNop(),
|
|
}
|
|
}
|
|
|
|
// WithLogger sets the logger for the service.
|
|
func (s *Store) WithLogger(log *zap.Logger) {
|
|
s.Logger = log.With(zap.String("service", "store"))
|
|
}
|
|
|
|
func (s *Store) findShardIDs(database, rp string, desc bool, start, end int64) ([]uint64, error) {
|
|
groups, err := s.MetaClient.ShardGroupsByTimeRange(database, rp, time.Unix(0, start), time.Unix(0, end))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(groups) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
if desc {
|
|
sort.Sort(sort.Reverse(meta.ShardGroupInfos(groups)))
|
|
} else {
|
|
sort.Sort(meta.ShardGroupInfos(groups))
|
|
}
|
|
|
|
shardIDs := make([]uint64, 0, len(groups[0].Shards)*len(groups))
|
|
for _, g := range groups {
|
|
for _, si := range g.Shards {
|
|
shardIDs = append(shardIDs, si.ID)
|
|
}
|
|
}
|
|
return shardIDs, nil
|
|
}
|
|
|
|
func (s *Store) validateArgs(database, rp string, start, end int64) (string, string, int64, int64, error) {
|
|
di := s.MetaClient.Database(database)
|
|
if di == nil {
|
|
return "", "", 0, 0, errors.New("no database")
|
|
}
|
|
|
|
if rp == "" {
|
|
rp = di.DefaultRetentionPolicy
|
|
}
|
|
|
|
rpi := di.RetentionPolicy(rp)
|
|
if rpi == nil {
|
|
return "", "", 0, 0, errors.New("invalid retention policy")
|
|
}
|
|
|
|
if start <= models.MinNanoTime {
|
|
start = models.MinNanoTime
|
|
}
|
|
if end >= models.MaxNanoTime {
|
|
end = models.MaxNanoTime
|
|
}
|
|
return database, rp, start, end, nil
|
|
}
|
|
|
|
func limitToShardIDs(list []uint64, allowed []uint64) []uint64 {
|
|
shardOk := make(map[uint64]struct{}, len(allowed))
|
|
for _, id := range allowed {
|
|
shardOk[id] = struct{}{}
|
|
}
|
|
newShardIDs := make([]uint64, 0, len(list))
|
|
for _, id := range list {
|
|
if _, ok := shardOk[id]; ok {
|
|
newShardIDs = append(newShardIDs, id)
|
|
}
|
|
}
|
|
return newShardIDs
|
|
}
|
|
|
|
func (s *Store) ReadFilterLimit(ctx context.Context, req *datatypes.ReadFilterRequest, limitShardIDs []uint64) (reads.ResultSet, error) {
|
|
if req.ReadSource == nil {
|
|
return nil, errors.New("missing read source")
|
|
}
|
|
|
|
var database, rp string
|
|
var start, end int64
|
|
var shardIDs []uint64
|
|
source, err := GetReadSource(req.ReadSource)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
database, rp, start, end, err = s.validateArgs(source.Database, source.RetentionPolicy, req.Range.GetStart(), req.Range.GetEnd())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
shardIDs, err = s.findShardIDs(database, rp, false, start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(limitShardIDs) > 0 {
|
|
shardIDs = limitToShardIDs(shardIDs, limitShardIDs)
|
|
}
|
|
if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil
|
|
return nil, nil
|
|
}
|
|
|
|
var cur reads.SeriesCursor
|
|
if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil {
|
|
return nil, err
|
|
} else if ic == nil { // TODO(jeff): this was a typed nil
|
|
return nil, nil
|
|
} else {
|
|
cur = ic
|
|
}
|
|
|
|
req.Range = &datatypes.TimestampRange{
|
|
Start: start,
|
|
End: end,
|
|
}
|
|
|
|
return reads.NewFilteredResultSet(ctx, req.Range.GetStart(), req.Range.GetEnd(), cur), nil
|
|
}
|
|
|
|
func (s *Store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) {
|
|
return s.ReadFilterLimit(ctx, req, nil)
|
|
}
|
|
|
|
func (s *Store) ReadGroupLimit(ctx context.Context, req *datatypes.ReadGroupRequest, limitShardIDs []uint64) (reads.GroupResultSet, error) {
|
|
if req.ReadSource == nil {
|
|
return nil, errors.New("missing read source")
|
|
}
|
|
|
|
source, err := GetReadSource(req.ReadSource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.GetStart(), req.Range.GetEnd())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Due to some optimizations around how flux's `last()` function is implemented with the
|
|
// storage engine, we need to detect if the read request requires a descending
|
|
// cursor or not.
|
|
descending := !reads.IsAscendingGroupAggregate(req)
|
|
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(limitShardIDs) > 0 {
|
|
shardIDs = limitToShardIDs(shardIDs, limitShardIDs)
|
|
}
|
|
if len(shardIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
shards := s.TSDBStore.Shards(shardIDs)
|
|
|
|
req.Range = &datatypes.TimestampRange{
|
|
Start: start,
|
|
End: end,
|
|
}
|
|
|
|
newCursor := func() (reads.SeriesCursor, error) {
|
|
cur, err := newIndexSeriesCursor(ctx, req.Predicate, shards)
|
|
if cur == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
return cur, nil
|
|
}
|
|
|
|
rs := reads.NewGroupResultSet(ctx, req, newCursor)
|
|
if rs == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
return rs, nil
|
|
}
|
|
|
|
func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (reads.GroupResultSet, error) {
|
|
return s.ReadGroupLimit(ctx, req, nil)
|
|
}
|
|
|
|
func (s *Store) WindowAggregateLimit(ctx context.Context, req *datatypes.ReadWindowAggregateRequest, limitShardIDs []uint64) (reads.ResultSet, error) {
|
|
if req.ReadSource == nil {
|
|
return nil, errors.New("missing read source")
|
|
}
|
|
|
|
source, err := GetReadSource(req.ReadSource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.GetStart(), req.Range.GetEnd())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Due to some optimizations around how flux's `last()` function is implemented with the
|
|
// storage engine, we need to detect if the read request requires a descending
|
|
// cursor or not.
|
|
descending := !reads.IsAscendingWindowAggregate(req)
|
|
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(limitShardIDs) > 0 {
|
|
shardIDs = limitToShardIDs(shardIDs, limitShardIDs)
|
|
}
|
|
if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil
|
|
return nil, nil
|
|
}
|
|
|
|
var cur reads.SeriesCursor
|
|
if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil {
|
|
return nil, err
|
|
} else if ic == nil { // TODO(jeff): this was a typed nil
|
|
return nil, nil
|
|
} else {
|
|
cur = ic
|
|
}
|
|
|
|
return reads.NewWindowAggregateResultSet(ctx, req, cur)
|
|
}
|
|
|
|
func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (reads.ResultSet, error) {
|
|
return s.WindowAggregateLimit(ctx, req, nil)
|
|
}
|
|
|
|
func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) {
|
|
if req.TagsSource == nil {
|
|
return nil, errors.New("missing read source")
|
|
}
|
|
|
|
source, err := GetReadSource(req.TagsSource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.GetStart(), req.Range.GetEnd())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
shardIDs, err := s.findShardIDs(database, rp, false, start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(shardIDs) == 0 {
|
|
return cursors.NewStringSliceIterator(nil), nil
|
|
}
|
|
|
|
var expr influxql.Expr
|
|
if root := req.Predicate.GetRoot(); root != nil {
|
|
var err error
|
|
expr, err = reads.NodeToExpr(root, measurementRemap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if found := reads.HasFieldValueKey(expr); found {
|
|
return nil, errors.New("field values unsupported")
|
|
}
|
|
expr = influxql.Reduce(influxql.CloneExpr(expr), nil)
|
|
if reads.IsTrueBooleanLiteral(expr) {
|
|
expr = nil
|
|
}
|
|
}
|
|
|
|
// TODO(jsternberg): Use a real authorizer.
|
|
auth := query.OpenAuthorizer
|
|
keys, err := s.TSDBStore.TagKeys(ctx, auth, shardIDs, expr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := map[string]bool{
|
|
measurementKey: true,
|
|
fieldKey: true,
|
|
}
|
|
for _, ks := range keys {
|
|
for _, k := range ks.Keys {
|
|
m[k] = true
|
|
}
|
|
}
|
|
|
|
names := make([]string, 0, len(m))
|
|
for name := range m {
|
|
names = append(names, name)
|
|
}
|
|
sort.Strings(names)
|
|
return cursors.NewStringSliceIterator(names), nil
|
|
}
|
|
|
|
func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) {
|
|
if tagKey, ok := measurementRemap[req.TagKey]; ok {
|
|
switch tagKey {
|
|
case "_name":
|
|
return s.MeasurementNames(ctx, &MeasurementNamesRequest{
|
|
MeasurementsSource: req.TagsSource,
|
|
Predicate: req.Predicate,
|
|
})
|
|
}
|
|
}
|
|
|
|
if req.TagsSource == nil {
|
|
return nil, errors.New("missing read source")
|
|
}
|
|
|
|
source, err := GetReadSource(req.TagsSource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.GetStart(), req.Range.GetEnd())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
shardIDs, err := s.findShardIDs(database, rp, false, start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(shardIDs) == 0 {
|
|
return cursors.NewStringSliceIterator(nil), nil
|
|
}
|
|
|
|
var expr influxql.Expr
|
|
if root := req.Predicate.GetRoot(); root != nil {
|
|
var err error
|
|
expr, err = reads.NodeToExpr(root, measurementRemap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if found := reads.HasFieldValueKey(expr); found {
|
|
return nil, errors.New("field values unsupported")
|
|
}
|
|
expr = influxql.Reduce(influxql.CloneExpr(expr), nil)
|
|
if reads.IsTrueBooleanLiteral(expr) {
|
|
expr = nil
|
|
}
|
|
}
|
|
|
|
tagKeyExpr := &influxql.BinaryExpr{
|
|
Op: influxql.EQ,
|
|
LHS: &influxql.VarRef{
|
|
Val: "_tagKey",
|
|
},
|
|
RHS: &influxql.StringLiteral{
|
|
Val: req.TagKey,
|
|
},
|
|
}
|
|
if expr != nil {
|
|
expr = &influxql.BinaryExpr{
|
|
Op: influxql.AND,
|
|
LHS: tagKeyExpr,
|
|
RHS: &influxql.ParenExpr{
|
|
Expr: expr,
|
|
},
|
|
}
|
|
} else {
|
|
expr = tagKeyExpr
|
|
}
|
|
|
|
// TODO(jsternberg): Use a real authorizer.
|
|
auth := query.OpenAuthorizer
|
|
values, err := s.TSDBStore.TagValues(ctx, auth, shardIDs, expr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := make(map[string]struct{})
|
|
for _, kvs := range values {
|
|
for _, kv := range kvs.Values {
|
|
m[kv.Value] = struct{}{}
|
|
}
|
|
}
|
|
|
|
names := make([]string, 0, len(m))
|
|
for name := range m {
|
|
names = append(names, name)
|
|
}
|
|
sort.Strings(names)
|
|
return cursors.NewStringSliceIterator(names), nil
|
|
}
|
|
|
|
type MeasurementNamesRequest struct {
|
|
MeasurementsSource *anypb.Any
|
|
Predicate *datatypes.Predicate
|
|
}
|
|
|
|
func (s *Store) MeasurementNames(ctx context.Context, req *MeasurementNamesRequest) (cursors.StringIterator, error) {
|
|
if req.MeasurementsSource == nil {
|
|
return nil, errors.New("missing read source")
|
|
}
|
|
|
|
source, err := GetReadSource(req.MeasurementsSource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
database, _, _, _, err := s.validateArgs(source.Database, source.RetentionPolicy, -1, -1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var expr influxql.Expr
|
|
if root := req.Predicate.GetRoot(); root != nil {
|
|
var err error
|
|
expr, err = reads.NodeToExpr(root, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if found := reads.HasFieldValueKey(expr); found {
|
|
return nil, errors.New("field values unsupported")
|
|
}
|
|
expr = influxql.Reduce(influxql.CloneExpr(expr), nil)
|
|
if reads.IsTrueBooleanLiteral(expr) {
|
|
expr = nil
|
|
}
|
|
}
|
|
|
|
// TODO(jsternberg): Use a real authorizer.
|
|
auth := query.OpenAuthorizer
|
|
// NOTE: this preserves the existing flux behaviour of ignoring the retention policy here
|
|
values, err := s.TSDBStore.MeasurementNames(ctx, auth, database, "", expr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := make(map[string]struct{})
|
|
for _, name := range values {
|
|
m[string(name)] = struct{}{}
|
|
}
|
|
|
|
names := make([]string, 0, len(m))
|
|
for name := range m {
|
|
names = append(names, name)
|
|
}
|
|
sort.Strings(names)
|
|
return cursors.NewStringSliceIterator(names), nil
|
|
}
|
|
|
|
func (s *Store) GetSource(db, rp string) proto.Message {
|
|
return &ReadSource{Database: db, RetentionPolicy: rp}
|
|
}
|
|
|
|
func (s *Store) Delete(database string, sources []influxql.Source, condition influxql.Expr) error {
|
|
return s.TSDBStore.DeleteSeries(database, sources, condition)
|
|
}
|
|
|
|
func (s *Store) DeleteRetentionPolicy(database, name string) error {
|
|
return s.TSDBStore.DeleteRetentionPolicy(database, name)
|
|
}
|