fix(tasks): tasks look up system bucket id

pull/15241/head
Brandon Farmer 2019-10-16 14:25:07 -07:00
parent b343250390
commit ea82dc3470
14 changed files with 199 additions and 52 deletions

View File

@ -72,6 +72,23 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*in
return b, nil
}
// FindBucketByName returns a bucket by name for a particular organization.
func (s *BucketService) FindBucketByName(ctx context.Context, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
b, err := s.s.FindBucketByName(ctx, orgID, n)
if err != nil {
return nil, err
}
if err := authorizeReadBucket(ctx, b.OrgID, b.ID); err != nil {
return nil, err
}
return b, nil
}
// FindBucket retrieves the bucket and checks to see if the authorizer on context has read access to the bucket.
func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
@ -105,6 +122,12 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
buckets := bs[:0]
for _, b := range bs {
// HACK: remove once system buckets are migrated away from hard coded values
if b.Type == influxdb.BucketTypeSystem {
buckets = append(buckets, b)
continue
}
err := authorizeReadBucket(ctx, b.OrgID, b.ID)
if err != nil && influxdb.ErrorCode(err) != influxdb.EUnauthorized {
return nil, 0, err

View File

@ -2,15 +2,23 @@ package influxdb
import (
"context"
"fmt"
"strings"
"time"
)
const (
// TasksSystemBucketID is the fixed ID for our tasks system bucket
TasksSystemBucketID = ID(10)
// MonitoringSystemBucketID is the fixed ID for our monitoring system bucket
MonitoringSystemBucketID = ID(11)
// BucketTypeUser is a user created bucket
BucketTypeUser = BucketType(0)
// BucketTypeSystem is an internally created bucket that cannot be deleted/renamed.
BucketTypeSystem = BucketType(1)
// SystemBucketRetention is the time we should retain system bucket information
SystemBucketRetention = time.Hour * 24 * 7
)
// Bucket names constants
@ -76,6 +84,7 @@ type BucketService interface {
// DeleteBucket removes a bucket by ID.
DeleteBucket(ctx context.Context, id ID) error
FindBucketByName(ctx context.Context, orgID ID, name string) (*Bucket, error)
}
// BucketUpdate represents updates to a bucket.
@ -135,3 +144,39 @@ func (f BucketFilter) String() string {
}
return "[" + strings.Join(parts, ", ") + "]"
}
// FindSystemBucket finds the system bucket with a given name
func FindSystemBucket(ctx context.Context, bs BucketService, orgID ID, name string) (*Bucket, error) {
bucket, err := bs.FindBucketByName(ctx, orgID, name)
if err != nil {
return nil, err
}
if bucket != nil {
return bucket, nil
}
switch name {
case TasksSystemBucketName:
return &Bucket{
ID: TasksSystemBucketID,
Type: BucketTypeSystem,
Name: TasksSystemBucketName,
RetentionPeriod: SystemBucketRetention,
Description: "System bucket for task logs",
}, nil
case MonitoringSystemBucketName:
return &Bucket{
ID: MonitoringSystemBucketID,
Type: BucketTypeSystem,
Name: MonitoringSystemBucketName,
RetentionPeriod: SystemBucketRetention,
Description: "System bucket for monitoring logs",
}, nil
default:
return nil, &Error{
Code: ENotFound,
Msg: fmt.Sprintf("system bucket %q not found", name),
}
}
}

View File

@ -616,7 +616,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
// validation(coordinator(analyticalstore(kv.Service)))
// define the executor and build analytical storage middleware
combinedTaskService := taskbackend.NewAnalyticalStorage(m.logger.With(zap.String("service", "task-analytical-store")), m.kvService, m.kvService, pointsWriter, query.QueryServiceBridge{AsyncQueryService: m.queryController})
combinedTaskService := taskbackend.NewAnalyticalStorage(
m.logger.With(zap.String("service", "task-analytical-store")), m.kvService, m.kvService, m.kvService, pointsWriter, query.QueryServiceBridge{AsyncQueryService: m.queryController})
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, combinedTaskService)
// create the scheduler

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"path"
@ -642,6 +643,12 @@ type BucketService struct {
OpPrefix string
}
// FindBucketByName returns a single bucket by name
// NOTE: Currently not implemented
func (s *BucketService) FindBucketByName(ctx context.Context, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
return nil, errors.New("not implemented")
}
// FindBucketByID returns a single bucket by ID.
func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
span, _ := tracing.StartSpanFromContext(ctx)

View File

@ -14,6 +14,10 @@ type BucketService struct {
Source *platform.Source
}
func (s *BucketService) FindBucketByName(ctx context.Context, orgID platform.ID, n string) (*platform.Bucket, error) {
return nil, fmt.Errorf("not supported")
}
func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
return nil, fmt.Errorf("not supported")
}

View File

@ -2,6 +2,7 @@ package inmem
import (
"context"
"errors"
"fmt"
"sort"
@ -45,6 +46,12 @@ func (s *Service) FindBucketByID(ctx context.Context, id platform.ID) (*platform
return b, err
}
// FindBucketByName returns a single bucket by name
// NOTE: Currently not implemented
func (s *Service) FindBucketByName(ctx context.Context, orgID platform.ID, n string) (*platform.Bucket, error) {
return nil, errors.New("not implemented")
}
func (s *Service) forEachBucket(ctx context.Context, descending bool, fn func(b *platform.Bucket) bool) error {
var err error
bs := make([]*platform.Bucket, 0)

View File

@ -123,8 +123,7 @@ func (s *Service) FindBucketByName(ctx context.Context, orgID influxdb.ID, n str
err := s.kv.View(ctx, func(tx Tx) error {
bkt, pe := s.findBucketByName(ctx, tx, orgID, n)
if pe != nil {
err := pe
return err
return pe
}
b = bkt
@ -140,7 +139,7 @@ func (s *Service) createSystemBuckets(ctx context.Context, tx Tx, o *influxdb.Or
OrgID: o.ID,
Type: influxdb.BucketTypeSystem,
Name: influxdb.TasksSystemBucketName,
RetentionPeriod: time.Hour * 24 * 3,
RetentionPeriod: influxdb.SystemBucketRetention,
Description: "System bucket for task logs",
}
@ -152,15 +151,11 @@ func (s *Service) createSystemBuckets(ctx context.Context, tx Tx, o *influxdb.Or
OrgID: o.ID,
Type: influxdb.BucketTypeSystem,
Name: influxdb.MonitoringSystemBucketName,
RetentionPeriod: time.Hour * 24 * 7,
RetentionPeriod: influxdb.SystemBucketRetention,
Description: "System bucket for monitoring logs",
}
if err := s.createBucket(ctx, tx, mb); err != nil {
return err
}
return nil
return s.createBucket(ctx, tx, mb)
}
func (s *Service) findBucketByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
@ -323,10 +318,42 @@ func (s *Service) FindBuckets(ctx context.Context, filter influxdb.BucketFilter,
if err != nil {
return err
}
bs = bkts
return nil
})
needsSystemBuckets := true
for _, b := range bs {
if b.Type == influxdb.BucketTypeSystem {
needsSystemBuckets = false
break
}
}
if needsSystemBuckets {
tb := &influxdb.Bucket{
ID: influxdb.TasksSystemBucketID,
Type: influxdb.BucketTypeSystem,
Name: influxdb.TasksSystemBucketName,
RetentionPeriod: influxdb.SystemBucketRetention,
Description: "System bucket for task logs",
}
bs = append(bs, tb)
mb := &influxdb.Bucket{
ID: influxdb.MonitoringSystemBucketID,
Type: influxdb.BucketTypeSystem,
Name: influxdb.MonitoringSystemBucketName,
RetentionPeriod: influxdb.SystemBucketRetention,
Description: "System bucket for monitoring logs",
}
bs = append(bs, mb)
}
if err != nil {
return nil, 0, err
}

View File

@ -264,15 +264,10 @@ func (s *Service) CreateOrganization(ctx context.Context, o *influxdb.Organizati
// Attempt to add user as owner of organization, if that is not possible allow the
// organization to be created anyways.
if err := s.addOrgOwner(ctx, tx, o.ID); err != nil {
s.Logger.Info("failed to make user owner of organization", zap.Error(err))
s.Logger.Info("Failed to make user owner of organization", zap.Error(err))
}
if err := s.createSystemBuckets(ctx, tx, o); err != nil {
s.Logger.Info("failed to create system buckets of organization", zap.Error(err))
return err
}
return nil
return s.createSystemBuckets(ctx, tx, o)
})
}

View File

@ -2,6 +2,7 @@ package mock
import (
"context"
"time"
platform "github.com/influxdata/influxdb"
"go.uber.org/zap"
@ -16,12 +17,13 @@ type BucketService struct {
WithLoggerFn func(l *zap.Logger)
// Methods for an platform.BucketService
FindBucketByIDFn func(context.Context, platform.ID) (*platform.Bucket, error)
FindBucketFn func(context.Context, platform.BucketFilter) (*platform.Bucket, error)
FindBucketsFn func(context.Context, platform.BucketFilter, ...platform.FindOptions) ([]*platform.Bucket, int, error)
CreateBucketFn func(context.Context, *platform.Bucket) error
UpdateBucketFn func(context.Context, platform.ID, platform.BucketUpdate) (*platform.Bucket, error)
DeleteBucketFn func(context.Context, platform.ID) error
FindBucketByIDFn func(context.Context, platform.ID) (*platform.Bucket, error)
FindBucketByNameFn func(context.Context, platform.ID, string) (*platform.Bucket, error)
FindBucketFn func(context.Context, platform.BucketFilter) (*platform.Bucket, error)
FindBucketsFn func(context.Context, platform.BucketFilter, ...platform.FindOptions) ([]*platform.Bucket, int, error)
CreateBucketFn func(context.Context, *platform.Bucket) error
UpdateBucketFn func(context.Context, platform.ID, platform.BucketUpdate) (*platform.Bucket, error)
DeleteBucketFn func(context.Context, platform.ID) error
}
// NewBucketService returns a mock BucketService where its methods will return
@ -32,7 +34,16 @@ func NewBucketService() *BucketService {
CloseFn: func() error { return nil },
WithLoggerFn: func(l *zap.Logger) {},
FindBucketByIDFn: func(context.Context, platform.ID) (*platform.Bucket, error) { return nil, nil },
FindBucketFn: func(context.Context, platform.BucketFilter) (*platform.Bucket, error) { return nil, nil },
FindBucketByNameFn: func(context.Context, platform.ID, string) (*platform.Bucket, error) {
return &platform.Bucket{
ID: platform.TasksSystemBucketID,
Type: platform.BucketTypeSystem,
Name: "_tasks",
RetentionPeriod: time.Hour * 24 * 3,
Description: "System bucket for task logs",
}, nil
},
FindBucketFn: func(context.Context, platform.BucketFilter) (*platform.Bucket, error) { return nil, nil },
FindBucketsFn: func(context.Context, platform.BucketFilter, ...platform.FindOptions) ([]*platform.Bucket, int, error) {
return nil, 0, nil
},
@ -56,6 +67,11 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*pl
return s.FindBucketByIDFn(ctx, id)
}
// FindBucketByName returns a single bucket by name.
func (s *BucketService) FindBucketByName(ctx context.Context, orgID platform.ID, name string) (*platform.Bucket, error) {
return s.FindBucketByNameFn(ctx, orgID, name)
}
// FindBucket returns the first bucket that matches filter.
func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
return s.FindBucketFn(ctx, filter)

View File

@ -43,6 +43,17 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*pl
return s.inner.FindBucketByID(ctx, id)
}
// FindBucketByName returns a single bucket by name.
func (s *BucketService) FindBucketByName(ctx context.Context, orgID platform.ID, name string) (*platform.Bucket, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if s.inner == nil || s.engine == nil {
return nil, errors.New("nil inner BucketService or Engine")
}
return s.inner.FindBucketByName(ctx, orgID, name)
}
// FindBucket returns the first bucket that matches filter.
func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
span, ctx := tracing.StartSpanFromContext(ctx)

View File

@ -151,9 +151,9 @@ func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.
span, ctx := tracing.StartSpanFromContext(ctx)
span.LogKV(
"bucket", b.ID,
"bucket_id", b.ID,
"org_id", b.OrgID,
"system", b.Type,
"system_type", b.Type,
"retention_period", b.RetentionPeriod,
"retention_policy", b.RetentionPolicyName,
"from", time.Unix(0, min).UTC(),

View File

@ -25,10 +25,6 @@ const (
taskIDTag = "taskID"
statusTag = "status"
// Fixed system bucket ID for task and run logs.
taskSystemBucket = "_tasks"
taskSystemBucketID influxdb.ID = 10
)
// RunRecorder is a type which records runs into an influxdb
@ -38,10 +34,11 @@ type RunRecorder interface {
}
// NewAnalyticalRunStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware
func NewAnalyticalRunStorage(logger *zap.Logger, ts influxdb.TaskService, tcs TaskControlService, rr RunRecorder, qs query.QueryService) *AnalyticalStorage {
func NewAnalyticalRunStorage(logger *zap.Logger, ts influxdb.TaskService, bs influxdb.BucketService, tcs TaskControlService, rr RunRecorder, qs query.QueryService) *AnalyticalStorage {
return &AnalyticalStorage{
logger: logger,
TaskService: ts,
BucketService: bs,
TaskControlService: tcs,
rr: rr,
qs: qs,
@ -49,10 +46,11 @@ func NewAnalyticalRunStorage(logger *zap.Logger, ts influxdb.TaskService, tcs Ta
}
// NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware (deprecated)
func NewAnalyticalStorage(logger *zap.Logger, ts influxdb.TaskService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage {
func NewAnalyticalStorage(logger *zap.Logger, ts influxdb.TaskService, bs influxdb.BucketService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage {
return &AnalyticalStorage{
logger: logger,
TaskService: ts,
BucketService: bs,
TaskControlService: tcs,
rr: NewStoragePointsWriterRecorder(pw, logger),
qs: qs,
@ -61,6 +59,7 @@ func NewAnalyticalStorage(logger *zap.Logger, ts influxdb.TaskService, tcs TaskC
type AnalyticalStorage struct {
influxdb.TaskService
influxdb.BucketService
TaskControlService
rr RunRecorder
@ -76,7 +75,12 @@ func (as *AnalyticalStorage) FinishRun(ctx context.Context, taskID, runID influx
return run, err
}
return run, as.rr.Record(ctx, task.OrganizationID, task.Organization, taskSystemBucketID, taskSystemBucket, run)
sb, err := influxdb.FindSystemBucket(ctx, as.BucketService, task.OrganizationID, influxdb.TasksSystemBucketName)
if err != nil {
return run, err
}
return run, as.rr.Record(ctx, task.OrganizationID, task.Organization, sb.ID, influxdb.TasksSystemBucketName, run)
}
return run, err
@ -138,13 +142,18 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
return runs, n, err
}
sb, err := influxdb.FindSystemBucket(ctx, as.BucketService, task.OrganizationID, influxdb.TasksSystemBucketName)
if err != nil {
return runs, n, err
}
filterPart := ""
if filter.After != nil {
filterPart = fmt.Sprintf(`|> filter(fn: (r) => r.runID > %q)`, filter.After.String())
}
// the data will be stored for 7 days in the system bucket so pulling 14d's is sufficient.
runsScript := fmt.Sprintf(`from(bucketID: "000000000000000a")
runsScript := fmt.Sprintf(`from(bucketID: %q)
|> range(start: -14d)
|> filter(fn: (r) => r._field != "status")
|> filter(fn: (r) => r._measurement == "runs" and r.taskID == %q)
@ -154,14 +163,14 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
|> sort(columns:["scheduledFor"], desc: true)
|> limit(n:%d)
`, filter.Task.String(), filterPart, filter.Limit-len(runs))
`, sb.ID.String(), filter.Task.String(), filterPart, filter.Limit-len(runs))
// At this point we are behind authorization
// so we are faking a read only permission to the org's system bucket
runSystemBucketID := taskSystemBucketID
runSystemBucketID := sb.ID
runAuth := &influxdb.Authorization{
Status: influxdb.Active,
ID: taskSystemBucketID,
ID: sb.ID,
OrgID: task.OrganizationID,
Permissions: []influxdb.Permission{
influxdb.Permission{
@ -238,21 +247,26 @@ func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID infl
return run, err
}
sb, err := influxdb.FindSystemBucket(ctx, as.BucketService, task.OrganizationID, "_tasks")
if err != nil {
return run, err
}
// the data will be stored for 7 days in the system bucket so pulling 14d's is sufficient.
findRunScript := fmt.Sprintf(`from(bucketID: "000000000000000a")
findRunScript := fmt.Sprintf(`from(bucketID: %q)
|> range(start: -14d)
|> filter(fn: (r) => r._field != "status")
|> filter(fn: (r) => r._measurement == "runs" and r.taskID == %q)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["taskID"])
|> filter(fn: (r) => r.runID == %q)
`, taskID.String(), runID.String())
`, sb.ID.String(), taskID.String(), runID.String())
// At this point we are behind authorization
// so we are faking a read only permission to the org's system bucket
runSystemBucketID := taskSystemBucketID
runSystemBucketID := sb.ID
runAuth := &influxdb.Authorization{
ID: taskSystemBucketID,
ID: sb.ID,
OrgID: task.OrganizationID,
Permissions: []influxdb.Permission{
influxdb.Permission{

View File

@ -40,7 +40,7 @@ func TestAnalyticalStore(t *testing.T) {
ab = newAnalyticalBackend(t, svc, svc)
logger = zaptest.NewLogger(t)
rr = backend.NewStoragePointsWriterRecorder(ab.PointsWriter(), logger)
svcStack = backend.NewAnalyticalRunStorage(logger, svc, svc, rr, ab.QueryService())
svcStack = backend.NewAnalyticalRunStorage(logger, svc, svc, svc, rr, ab.QueryService())
)
go func() {
@ -86,8 +86,9 @@ func TestDeduplicateRuns(t *testing.T) {
return &influxdb.Run{ID: 2, TaskID: 1, Status: "success", ScheduledFor: time.Now(), StartedAt: time.Now().Add(1), FinishedAt: time.Now().Add(2)}, nil
},
}
mockBS := mock.NewBucketService()
svcStack := backend.NewAnalyticalStorage(zaptest.NewLogger(t), mockTS, mockTCS, ab.PointsWriter(), ab.QueryService())
svcStack := backend.NewAnalyticalStorage(zaptest.NewLogger(t), mockTS, mockBS, mockTCS, ab.PointsWriter(), ab.QueryService())
_, err := svcStack.FinishRun(context.Background(), 1, 2)
if err != nil {
@ -122,14 +123,14 @@ func (ab *analyticalBackend) QueryService() query.QueryService {
return query.QueryServiceBridge{AsyncQueryService: ab.queryController}
}
func (lrw *analyticalBackend) Close(t *testing.T) {
if err := lrw.queryController.Shutdown(context.Background()); err != nil {
func (ab *analyticalBackend) Close(t *testing.T) {
if err := ab.queryController.Shutdown(context.Background()); err != nil {
t.Error(err)
}
if err := lrw.storageEngine.Close(); err != nil {
if err := ab.storageEngine.Close(); err != nil {
t.Error(err)
}
if err := os.RemoveAll(lrw.rootDir); err != nil {
if err := os.RemoveAll(ab.rootDir); err != nil {
t.Error(err)
}
}

View File

@ -18,11 +18,7 @@ const (
orgTwoID = "020f755c3c083001"
)
var orgBucketsIDGenerator *mock.MockIDGenerator
func init() {
orgBucketsIDGenerator = mock.NewMockIDGenerator()
}
var orgBucketsIDGenerator = mock.NewMockIDGenerator()
var organizationCmpOptions = cmp.Options{
cmp.Comparer(func(x, y []byte) bool {