fix(storage): meta.Client persists to kv and updates via BucketService

Includes initial migration to add metadata bucket
pull/19446/head
Stuart Carnie 2020-07-29 18:08:04 -07:00
parent e772333b6b
commit 21c3733464
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
12 changed files with 266 additions and 171 deletions

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"sync"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/http"
@ -25,7 +26,7 @@ import (
type Engine interface {
influxdb.DeleteService
storage.PointsWriter
storage.BucketDeleter
storage.EngineSchema
prom.PrometheusCollector
influxdb.BackupService
@ -120,6 +121,14 @@ func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID,
}
func (t *TemporaryEngine) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
return t.engine.CreateBucket(ctx, b)
}
func (t *TemporaryEngine) UpdateBucketRetentionPeriod(ctx context.Context, bucketID influxdb.ID, d time.Duration) error {
return t.engine.UpdateBucketRetentionPeriod(ctx, bucketID, d)
}
// DeleteBucket deletes a bucket from the time-series data.
func (t *TemporaryEngine) DeleteBucket(ctx context.Context, orgID, bucketID influxdb.ID) error {
return t.engine.DeleteBucket(ctx, orgID, bucketID)

View File

@ -63,6 +63,7 @@ import (
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
"github.com/influxdata/influxdb/v2/telemetry"
"github.com/influxdata/influxdb/v2/tenant"
"github.com/influxdata/influxdb/v2/v1/services/meta"
storage2 "github.com/influxdata/influxdb/v2/v1/services/storage"
_ "github.com/influxdata/influxdb/v2/v1/tsdb/engine/tsm1" // needed for tsm1
_ "github.com/influxdata/influxdb/v2/v1/tsdb/index/tsi1" // needed for tsi1
@ -483,9 +484,9 @@ func (m *Launcher) NatsURL() string {
// Engine returns a reference to the storage engine. It should only be called
// for end-to-end testing purposes.
// func (m *Launcher) Engine() Engine {
// return m.engine
// }
func (m *Launcher) Engine() Engine {
return m.engine
}
// Shutdown shuts down the HTTP server and waits for all services to clean up.
func (m *Launcher) Shutdown(ctx context.Context) {
@ -717,10 +718,17 @@ func (m *Launcher) run(ctx context.Context) (err error) {
// flushers = append(flushers, engine)
// m.engine = engine
// } else {
metaClient := meta.NewClient(meta.NewConfig(), m.kvStore)
if err := metaClient.Open(); err != nil {
m.log.Error("Failed to open meta client", zap.Error(err))
return err
}
m.engine = storage.NewEngine(
m.enginePath,
m.StorageConfig,
storage.WithRetentionEnforcer(ts.BucketService),
storage.WithMetaClient(metaClient),
)
// }
m.engine.WithLogger(m.log)

View File

@ -0,0 +1,10 @@
package all
import (
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/influxdata/influxdb/v2/v1/services/meta"
)
var Migration0007_CreateMetaDataBucket = migration.CreateBuckets(
"Create TSM metadata buckets",
meta.BucketName)

View File

@ -19,5 +19,7 @@ var Migrations = [...]migration.Spec{
Migration0005_AddPkgerBuckets,
// delete bucket sessionsv1
Migration0006_DeleteBucketSessionsv1,
// CreateMetaDataBucket
Migration0007_CreateMetaDataBucket,
// {{ do_not_edit . }}
}

View File

@ -2,14 +2,16 @@ package storage
import (
"context"
"errors"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"go.uber.org/zap"
)
// BucketDeleter defines the behaviour of deleting a bucket.
type BucketDeleter interface {
type EngineSchema interface {
CreateBucket(context.Context, *influxdb.Bucket) error
UpdateBucketRetentionPeriod(context.Context, influxdb.ID, time.Duration) error
DeleteBucket(context.Context, influxdb.ID, influxdb.ID) error
}
@ -19,85 +21,58 @@ type BucketDeleter interface {
// associated with the bucket is either removed, or marked to be removed via a
// future compaction.
type BucketService struct {
inner influxdb.BucketService
engine BucketDeleter
influxdb.BucketService
log *zap.Logger
engine EngineSchema
}
// NewBucketService returns a new BucketService for the provided BucketDeleter,
// NewBucketService returns a new BucketService for the provided EngineSchema,
// which typically will be an Engine.
func NewBucketService(s influxdb.BucketService, engine BucketDeleter) *BucketService {
func NewBucketService(s influxdb.BucketService, engine EngineSchema) *BucketService {
return &BucketService{
inner: s,
engine: engine,
BucketService: s,
engine: engine,
}
}
// FindBucketByID returns a single bucket by ID.
func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) (err error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if s.inner == nil || s.engine == nil {
return nil, errors.New("nil inner BucketService or Engine")
defer func() {
if err == nil {
return
}
if b.ID.Valid() {
if err := s.BucketService.DeleteBucket(ctx, b.ID); err != nil {
s.log.Error("Unable to cleanup bucket after create failed", zap.Error(err))
}
}
}()
if err = s.BucketService.CreateBucket(ctx, b); err != nil {
return err
}
return s.inner.FindBucketByID(ctx, id)
if err = s.engine.CreateBucket(ctx, b); err != nil {
return err
}
return nil
}
// FindBucketByName returns a single bucket by name.
func (s *BucketService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
func (s *BucketService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (b *influxdb.Bucket, err error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if s.inner == nil || s.engine == nil {
return nil, errors.New("nil inner BucketService or Engine")
if upd.RetentionPeriod != nil {
if err = s.engine.UpdateBucketRetentionPeriod(ctx, id, *upd.RetentionPeriod); err != nil {
return nil, err
}
}
return s.inner.FindBucketByName(ctx, orgID, name)
}
// FindBucket returns the first bucket that matches filter.
func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if s.inner == nil || s.engine == nil {
return nil, errors.New("nil inner BucketService or Engine")
}
return s.inner.FindBucket(ctx, filter)
}
// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
// Additional options provide pagination & sorting.
func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if s.inner == nil || s.engine == nil {
return nil, 0, errors.New("nil inner BucketService or Engine")
}
return s.inner.FindBuckets(ctx, filter, opt...)
}
// CreateBucket creates a new bucket and sets b.ID with the new identifier.
func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if s.inner == nil || s.engine == nil {
return errors.New("nil inner BucketService or Engine")
}
return s.inner.CreateBucket(ctx, b)
}
// UpdateBucket updates a single bucket with changeset.
// Returns the new bucket state after update.
func (s *BucketService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if s.inner == nil || s.engine == nil {
return nil, errors.New("nil inner BucketService or Engine")
}
return s.inner.UpdateBucket(ctx, id, upd)
return s.BucketService.UpdateBucket(ctx, id, upd)
}
// DeleteBucket removes a bucket by ID.
@ -116,5 +91,5 @@ func (s *BucketService) DeleteBucket(ctx context.Context, bucketID influxdb.ID)
if err := s.engine.DeleteBucket(ctx, bucket.OrgID, bucketID); err != nil {
return err
}
return s.inner.DeleteBucket(ctx, bucketID)
return s.BucketService.DeleteBucket(ctx, bucketID)
}

View File

@ -4,28 +4,29 @@ import (
"context"
"testing"
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/storage"
"github.com/influxdata/influxdb/v2/storage/mocks"
"go.uber.org/zap/zaptest"
)
func TestBucketService(t *testing.T) {
service := storage.NewBucketService(nil, nil)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
i, err := influxdb.IDFromString("2222222222222222")
if err != nil {
panic(err)
}
if err := service.DeleteBucket(context.TODO(), *i); err == nil {
t.Fatal("expected error, got nil")
}
engine := mocks.NewMockEngineSchema(ctrl)
inmemService := newInMemKVSVC(t)
service = storage.NewBucketService(inmemService, nil)
service := storage.NewBucketService(inmemService, engine)
if err := service.DeleteBucket(context.TODO(), *i); err == nil {
t.Fatal("expected error, got nil")
@ -41,28 +42,14 @@ func TestBucketService(t *testing.T) {
panic(err)
}
engine.EXPECT().DeleteBucket(gomock.Any(), org.ID, bucket.ID)
// Test deleting a bucket calls into the deleter.
deleter := &MockDeleter{}
service = storage.NewBucketService(inmemService, deleter)
service = storage.NewBucketService(inmemService, engine)
if err := service.DeleteBucket(context.TODO(), bucket.ID); err != nil {
t.Fatal(err)
}
if deleter.orgID != org.ID {
t.Errorf("got org ID: %s, expected %s", deleter.orgID, org.ID)
} else if deleter.bucketID != bucket.ID {
t.Errorf("got bucket ID: %s, expected %s", deleter.bucketID, bucket.ID)
}
}
type MockDeleter struct {
orgID, bucketID influxdb.ID
}
func (m *MockDeleter) DeleteBucket(_ context.Context, orgID, bucketID influxdb.ID) error {
m.orgID, m.bucketID = orgID, bucketID
return nil
}
func newInMemKVSVC(t *testing.T) *kv.Service {

View File

@ -100,61 +100,21 @@ func WithPageFaultLimiter(limiter *rate.Limiter) Option {
}
}
func WithMetaClient(c MetaClient) Option {
return func(e *Engine) {
e.MetaClient = c
}
}
type MetaClient interface {
Database(name string) (di *meta.DatabaseInfo)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
}
type DummyMetaClient struct {
}
func (m *DummyMetaClient) Database(name string) (di *meta.DatabaseInfo) {
return &meta.DatabaseInfo{
DefaultRetentionPolicy: "1234",
Name: name,
RetentionPolicies: []meta.RetentionPolicyInfo{
{
Name: "0440e2fda1957000",
Duration: 0,
ShardGroupDuration: time.Hour * 24,
},
},
}
}
func (m *DummyMetaClient) RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) {
return &meta.RetentionPolicyInfo{
Name: "1234",
Duration: 0,
ShardGroupDuration: time.Hour * 24,
}, nil
}
func (m *DummyMetaClient) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
return &meta.ShardGroupInfo{
ID: 1,
StartTime: time.Now().Add(-time.Hour * 24),
EndTime: time.Now().Add(time.Hour * 24),
Shards: []meta.ShardInfo{
{ID: 1},
},
}, nil
}
func (m *DummyMetaClient) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) {
return []meta.ShardGroupInfo{
{
ID: 1,
StartTime: time.Now().Add(-time.Hour * 24),
EndTime: time.Now().Add(time.Hour * 24),
Shards: []meta.ShardInfo{
{ID: 1},
},
},
}, nil
}
// NewEngine initialises a new storage engine, including a series file, index and
// TSM engine.
func NewEngine(path string, c Config, options ...Option) *Engine {
@ -163,12 +123,15 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
path: path,
defaultMetricLabels: prometheus.Labels{},
TSDBStore: tsdb.NewStore(path),
MetaClient: &DummyMetaClient{},
logger: zap.NewNop(),
writePointsValidationEnabled: true,
}
for _, opt := range options {
opt(e)
}
pw := coordinator.NewPointsWriter()
pw.TSDBStore = e.TSDBStore
pw.MetaClient = e.MetaClient
@ -358,14 +321,40 @@ func (e *Engine) WritePoints(ctx context.Context, orgID influxdb.ID, bucketID in
return ErrEngineClosed
}
return e.pointsWriter.WritePoints(orgID.String(), bucketID.String(), models.ConsistencyLevelAll, &meta.UserInfo{}, points)
return e.pointsWriter.WritePoints(bucketID.String(), meta.DefaultRetentionPolicyName, models.ConsistencyLevelAll, &meta.UserInfo{}, points)
}
func (e *Engine) CreateBucket(ctx context.Context, b *influxdb.Bucket) (err error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
spec := meta.RetentionPolicySpec{
Name: meta.DefaultRetentionPolicyName,
Duration: &b.RetentionPeriod,
}
if _, err = e.MetaClient.CreateDatabaseWithRetentionPolicy(b.ID.String(), &spec); err != nil {
return err
}
return nil
}
func (e *Engine) UpdateBucketRetentionPeriod(ctx context.Context, bucketID influxdb.ID, d time.Duration) (err error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
rpu := meta.RetentionPolicyUpdate{
Duration: &d,
}
return e.MetaClient.UpdateRetentionPolicy(bucketID.String(), meta.DefaultRetentionPolicyName, &rpu, true)
}
// DeleteBucket deletes an entire bucket from the storage engine.
func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID influxdb.ID) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
return e.TSDBStore.DeleteRetentionPolicy(orgID.String(), bucketID.String())
return e.TSDBStore.DeleteRetentionPolicy(bucketID.String(), meta.DefaultRetentionPolicyName)
}
// DeleteBucketRange deletes an entire range of data from the storage engine.
@ -380,7 +369,7 @@ func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb
}
// TODO(edd): create an influxql.Expr that represents the min and max time...
return e.TSDBStore.DeleteSeries(orgID.String(), nil, nil)
return e.TSDBStore.DeleteSeries(bucketID.String(), nil, nil)
}
// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
@ -407,7 +396,7 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
_ = predData
// TODO - edd convert the predicate into an influxql.Expr
return e.TSDBStore.DeleteSeries(orgID.String(), nil, nil)
return e.TSDBStore.DeleteSeries(bucketID.String(), nil, nil)
}
// CreateBackup creates a "snapshot" of all TSM data in the Engine.

View File

@ -134,8 +134,8 @@ func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats }
func (fi *filterIterator) Do(f func(flux.Table) error) error {
src := fi.s.GetSource(
fi.spec.OrganizationID.String(),
fi.spec.BucketID.String(),
uint64(fi.spec.OrganizationID),
uint64(fi.spec.BucketID),
)
// Setup read request
@ -258,8 +258,8 @@ func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats }
func (gi *groupIterator) Do(f func(flux.Table) error) error {
src := gi.s.GetSource(
gi.spec.OrganizationID.String(),
gi.spec.BucketID.String(),
uint64(gi.spec.OrganizationID),
uint64(gi.spec.BucketID),
)
// Setup read request
@ -538,8 +538,8 @@ type tagKeysIterator struct {
func (ti *tagKeysIterator) Do(f func(flux.Table) error) error {
src := ti.s.GetSource(
ti.readSpec.OrganizationID.String(),
ti.readSpec.BucketID.String(),
uint64(ti.readSpec.OrganizationID),
uint64(ti.readSpec.BucketID),
)
var req datatypes.TagKeysRequest
@ -621,8 +621,8 @@ type tagValuesIterator struct {
func (ti *tagValuesIterator) Do(f func(flux.Table) error) error {
src := ti.s.GetSource(
ti.readSpec.OrganizationID.String(),
ti.readSpec.BucketID.String(),
uint64(ti.readSpec.OrganizationID),
uint64(ti.readSpec.BucketID),
)
var req datatypes.TagValuesRequest

View File

@ -0,0 +1,78 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/influxdata/influxdb/v2/storage (interfaces: EngineSchema)
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
gomock "github.com/golang/mock/gomock"
influxdb "github.com/influxdata/influxdb/v2"
reflect "reflect"
time "time"
)
// MockEngineSchema is a mock of EngineSchema interface
type MockEngineSchema struct {
ctrl *gomock.Controller
recorder *MockEngineSchemaMockRecorder
}
// MockEngineSchemaMockRecorder is the mock recorder for MockEngineSchema
type MockEngineSchemaMockRecorder struct {
mock *MockEngineSchema
}
// NewMockEngineSchema creates a new mock instance
func NewMockEngineSchema(ctrl *gomock.Controller) *MockEngineSchema {
mock := &MockEngineSchema{ctrl: ctrl}
mock.recorder = &MockEngineSchemaMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockEngineSchema) EXPECT() *MockEngineSchemaMockRecorder {
return m.recorder
}
// CreateBucket mocks base method
func (m *MockEngineSchema) CreateBucket(arg0 context.Context, arg1 *influxdb.Bucket) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateBucket", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// CreateBucket indicates an expected call of CreateBucket
func (mr *MockEngineSchemaMockRecorder) CreateBucket(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateBucket", reflect.TypeOf((*MockEngineSchema)(nil).CreateBucket), arg0, arg1)
}
// DeleteBucket mocks base method
func (m *MockEngineSchema) DeleteBucket(arg0 context.Context, arg1, arg2 influxdb.ID) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteBucket", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteBucket indicates an expected call of DeleteBucket
func (mr *MockEngineSchemaMockRecorder) DeleteBucket(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteBucket", reflect.TypeOf((*MockEngineSchema)(nil).DeleteBucket), arg0, arg1, arg2)
}
// UpdateBucketRetentionPeriod mocks base method
func (m *MockEngineSchema) UpdateBucketRetentionPeriod(arg0 context.Context, arg1 influxdb.ID, arg2 time.Duration) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateBucketRetentionPeriod", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateBucketRetentionPeriod indicates an expected call of UpdateBucketRetentionPeriod
func (mr *MockEngineSchemaMockRecorder) UpdateBucketRetentionPeriod(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateBucketRetentionPeriod", reflect.TypeOf((*MockEngineSchema)(nil).UpdateBucketRetentionPeriod), arg0, arg1, arg2)
}

View File

@ -82,5 +82,5 @@ type Store interface {
TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error)
TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error)
GetSource(db, rp string) proto.Message
GetSource(orgID, bucketID uint64) proto.Message
}

View File

@ -0,0 +1,34 @@
package storage
import (
"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/v2"
)
// this is easier than fooling around with .proto files.
type readSource struct {
BucketID uint64 `protobuf:"varint,1,opt,name=bucket_id,proto3"`
OrganizationID uint64 `protobuf:"varint,2,opt,name=organization_id,proto3"`
}
func (r *readSource) XXX_MessageName() string { return "readSource" }
func (r *readSource) Reset() { *r = readSource{} }
func (r *readSource) String() string { return "readSource{}" }
func (r *readSource) ProtoMessage() {}
func getReadSource(any types.Any) (readSource, error) {
var source readSource
if err := types.UnmarshalAny(&any, &source); err != nil {
return source, err
}
return source, nil
}
func (r *readSource) GetOrgID() influxdb.ID {
return influxdb.ID(r.OrganizationID)
}
func (r *readSource) GetBucketID() influxdb.ID {
return influxdb.ID(r.BucketID)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/storage/reads"
@ -23,7 +24,7 @@ var (
ErrMissingReadSource = errors.New("missing ReadSource")
)
// GetReadSource will attempt to unmarshal a ReadSource from the ReadRequest or
// getReadSource will attempt to unmarshal a ReadSource from the ReadRequest or
// return an error if no valid resource is present.
func GetReadSource(any types.Any) (*ReadSource, error) {
var source ReadSource
@ -82,16 +83,15 @@ func (s *Store) findShardIDs(database, rp string, desc bool, start, end int64) (
return shardIDs, nil
}
func (s *Store) validateArgs(database, rp string, start, end int64) (string, string, int64, int64, error) {
func (s *Store) validateArgs(orgID, bucketID uint64, start, end int64) (string, string, int64, int64, error) {
database := influxdb.ID(bucketID).String()
rp := meta.DefaultRetentionPolicyName
di := s.MetaClient.Database(database)
if di == nil {
return "", "", 0, 0, errors.New("no database")
}
if rp == "" {
rp = di.DefaultRetentionPolicy
}
rpi := di.RetentionPolicy(rp)
if rpi == nil {
return "", "", 0, 0, errors.New("invalid retention policy")
@ -111,12 +111,12 @@ func (s *Store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest
return nil, errors.New("missing read source")
}
source, err := GetReadSource(*req.ReadSource)
source, err := getReadSource(*req.ReadSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.Start, req.Range.End)
database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End)
if err != nil {
return nil, err
}
@ -149,12 +149,12 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest)
return nil, errors.New("missing read source")
}
source, err := GetReadSource(*req.ReadSource)
source, err := getReadSource(*req.ReadSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.Start, req.Range.End)
database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End)
if err != nil {
return nil, err
}
@ -193,12 +193,12 @@ func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cur
return nil, errors.New("missing read source")
}
source, err := GetReadSource(*req.TagsSource)
source, err := getReadSource(*req.TagsSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.Start, req.Range.End)
database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End)
if err != nil {
return nil, err
}
@ -268,12 +268,12 @@ func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest)
return nil, errors.New("missing read source")
}
source, err := GetReadSource(*req.TagsSource)
source, err := getReadSource(*req.TagsSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.Range.Start, req.Range.End)
database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End)
if err != nil {
return nil, err
}
@ -356,12 +356,12 @@ func (s *Store) MeasurementNames(ctx context.Context, req *MeasurementNamesReque
return nil, errors.New("missing read source")
}
source, err := GetReadSource(*req.MeasurementsSource)
source, err := getReadSource(*req.MeasurementsSource)
if err != nil {
return nil, err
}
database, _, _, _, err := s.validateArgs(source.Database, source.RetentionPolicy, -1, -1)
database, _, _, _, err := s.validateArgs(source.OrganizationID, source.BucketID, -1, -1)
if err != nil {
return nil, err
}
@ -403,6 +403,9 @@ func (s *Store) MeasurementNames(ctx context.Context, req *MeasurementNamesReque
return cursors.NewStringSliceIterator(names), nil
}
func (s *Store) GetSource(db, rp string) proto.Message {
return &ReadSource{Database: db, RetentionPolicy: rp}
func (s *Store) GetSource(orgID, bucketID uint64) proto.Message {
return &readSource{
BucketID: bucketID,
OrganizationID: orgID,
}
}