Merge pull request #15697 from influxdata/sgc/kvhints

feat(kv): Cursor API accepts hints for improving performance
pull/15727/head
Stuart Carnie 2019-11-01 16:22:36 -07:00 committed by GitHub
commit e5eac576fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 508 additions and 83 deletions

View File

@ -198,7 +198,7 @@ func (b *Bucket) Delete(key []byte) error {
// Cursor retrieves a cursor for iterating through the entries
// in the key value store.
func (b *Bucket) Cursor() (kv.Cursor, error) {
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
return &Cursor{
cursor: b.bucket.Cursor(),
}, nil

View File

@ -199,9 +199,9 @@ func (b *Bucket) Delete(key []byte) error {
}
// Cursor creates a static cursor from all entries in the database.
func (b *Bucket) Cursor() (kv.Cursor, error) {
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
// TODO we should do this by using the Ascend/Descend methods that
// the btree provides.
// the btree provides.
pairs, err := b.getAll()
if err != nil {
return nil, err

View File

@ -38,14 +38,41 @@ type Tx interface {
WithContext(ctx context.Context)
}
type CursorHints struct {
KeyPrefix *string
KeyStart *string
}
// CursorHint configures CursorHints
type CursorHint func(*CursorHints)
// WithCursorHintPrefix is a hint to the store
// that the caller is only interested keys with the
// specified prefix.
func WithCursorHintPrefix(prefix string) CursorHint {
return func(o *CursorHints) {
o.KeyPrefix = &prefix
}
}
// WithCursorHintKeyStart is a hint to the store
// that the caller is interested in reading keys from
// start.
func WithCursorHintKeyStart(start string) CursorHint {
return func(o *CursorHints) {
o.KeyStart = &start
}
}
// Bucket is the abstraction used to perform get/put/delete/get-many operations
// in a key value store.
type Bucket interface {
// TODO context?
// Get returns a key within this bucket. Errors if key does not exist.
Get(key []byte) ([]byte, error)
// Cursor returns a cursor at the beginning of this bucket.
Cursor() (Cursor, error)
// Cursor returns a cursor at the beginning of this bucket optionally
// using the provided hints to improve performance.
Cursor(hints ...CursorHint) (Cursor, error)
// Put should error if the transaction it was called in is not writable.
Put(key, value []byte) error
// Delete should error if the transaction it was called in is not writable.

View File

@ -292,10 +292,6 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
}
}
if filter.Name != nil {
ts = filterByName(ts, *filter.Name)
}
return ts, len(ts), nil
}
@ -417,6 +413,45 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.Tas
return ts, len(ts), err
}
type taskMatchFn func(*influxdb.Task) bool
// newTaskMatchFn returns a function for validating
// a task matches the filter. Will return nil if
// the filter should match all tasks.
func newTaskMatchFn(f influxdb.TaskFilter, org *influxdb.Organization) func(t *influxdb.Task) bool {
var fn taskMatchFn
if org != nil {
expected := org.ID
prevFn := fn
fn = func(t *influxdb.Task) bool {
res := prevFn == nil || prevFn(t)
return res && expected == t.OrganizationID
}
}
if f.Type != nil {
expected := *f.Type
prevFn := fn
fn = func(t *influxdb.Task) bool {
res := prevFn == nil || prevFn(t)
return res &&
((expected == influxdb.TaskSystemType && (t.Type == influxdb.TaskSystemType || t.Type == "")) || expected == t.Type)
}
}
if f.Name != nil {
expected := *f.Name
prevFn := fn
fn = func(t *influxdb.Task) bool {
res := prevFn == nil || prevFn(t)
return res && (expected == t.Name)
}
}
return fn
}
// findAllTasks is a subset of the find tasks function. Used for cleanliness.
// This function should only be executed internally because it doesn't force organization or user filtering.
// Enforcing filters should be done in a validation layer.
@ -432,9 +467,10 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
if err != nil {
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
}
var k, v []byte
// we can filter by orgID
if filter.After != nil {
key, err := taskKey(*filter.After)
if err != nil {
return nil, 0, err
@ -442,73 +478,38 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
// ignore the key:val returned in this seek because we are starting "after"
// this key
c.Seek(key)
k, v = c.Next()
} else {
k, v := c.First()
if k == nil {
return ts, len(ts), nil
}
k, v = c.First()
}
matchFn := newTaskMatchFn(filter, nil)
for k != nil {
t := &influxdb.Task{}
if err := json.Unmarshal(v, t); err != nil {
return nil, 0, influxdb.ErrInternalTaskServiceError(err)
}
latestCompleted, err := s.findLatestScheduledTime(ctx, tx, t.ID)
if err != nil {
return nil, 0, err
}
if !latestCompleted.IsZero() {
t.LatestCompleted = latestCompleted.Format(time.RFC3339)
} else {
t.LatestCompleted = t.CreatedAt
}
if t != nil {
if matchFn == nil || matchFn(t) {
latestCompleted, err := s.findLatestScheduledTimeForTask(ctx, tx, t)
if err != nil {
return nil, 0, err
}
if !latestCompleted.IsZero() {
t.LatestCompleted = latestCompleted.Format(time.RFC3339)
} else {
t.LatestCompleted = t.CreatedAt
}
if taskFilterMatch(filter.Type, t.Type) {
ts = append(ts, t)
ts = append(ts, t)
if len(ts) >= filter.Limit {
break
}
}
}
// if someone has a limit of 1
if len(ts) >= filter.Limit {
return ts, len(ts), nil
}
for {
k, v := c.Next()
if k == nil {
break
}
t := &influxdb.Task{}
if err := json.Unmarshal(v, t); err != nil {
return nil, 0, influxdb.ErrInternalTaskServiceError(err)
}
latestCompleted, err := s.findLatestScheduledTime(ctx, tx, t.ID)
if err != nil {
return nil, 0, err
}
if !latestCompleted.IsZero() {
t.LatestCompleted = latestCompleted.Format(time.RFC3339)
} else {
t.LatestCompleted = t.CreatedAt
}
if !taskFilterMatch(filter.Type, t.Type) {
continue
}
// insert the new task into the list
ts = append(ts, t)
// Check if we are over running the limit
if len(ts) >= filter.Limit {
break
}
}
if filter.Name != nil {
ts = filterByName(ts, *filter.Name)
k, v = c.Next()
}
return ts, len(ts), err
@ -1405,7 +1406,7 @@ func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.I
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
c, err := bucket.Cursor()
c, err := bucket.Cursor(WithCursorHintPrefix(taskID.String()))
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
@ -1807,12 +1808,7 @@ func (s *Service) findLatestCompleted(ctx context.Context, tx Tx, id influxdb.ID
return run, nil
}
func (s *Service) findLatestScheduledTime(ctx context.Context, tx Tx, id influxdb.ID) (time.Time, error) {
task, err := s.findTaskByID(ctx, tx, id)
if err != nil {
return time.Time{}, err
}
func (s *Service) findLatestScheduledTimeForTask(ctx context.Context, tx Tx, task *influxdb.Task) (time.Time, error) {
// Get the latest completed time
// This can come from whichever is latest between:
@ -1820,7 +1816,10 @@ func (s *Service) findLatestScheduledTime(ctx context.Context, tx Tx, id influxd
// - LatestCompleted time of the task
// - Latest scheduled currently running task
// - or the latest completed run's ScheduleFor time
var latestCompleted time.Time
var (
latestCompleted time.Time
err error
)
if task.LatestCompleted == "" {
latestCompleted, err = time.Parse(time.RFC3339, task.CreatedAt)
@ -1835,31 +1834,25 @@ func (s *Service) findLatestScheduledTime(ctx context.Context, tx Tx, id influxd
}
// look to see if we have a "latest completed run"
lRun, err := s.findLatestCompleted(ctx, tx, id)
lRun, err := s.findLatestCompleted(ctx, tx, task.ID)
if err != nil {
return time.Time{}, err
}
if lRun != nil {
runTime := lRun.ScheduledFor
if err != nil {
return time.Time{}, err
}
if runTime.After(latestCompleted) {
latestCompleted = runTime
}
}
// find out if we have a currently running schedule that is after the latest completed
currentRunning, err := s.currentlyRunning(ctx, tx, id)
currentRunning, err := s.currentlyRunning(ctx, tx, task.ID)
if err != nil {
return time.Time{}, err
}
for _, cr := range currentRunning {
crTime := cr.ScheduledFor
if err != nil {
return time.Time{}, err
}
if crTime.After(latestCompleted) {
latestCompleted = crTime
}
@ -1868,6 +1861,15 @@ func (s *Service) findLatestScheduledTime(ctx context.Context, tx Tx, id influxd
return latestCompleted, nil
}
func (s *Service) findLatestScheduledTime(ctx context.Context, tx Tx, id influxdb.ID) (time.Time, error) {
task, err := s.findTaskByID(ctx, tx, id)
if err != nil {
return time.Time{}, err
}
return s.findLatestScheduledTimeForTask(ctx, tx, task)
}
func taskKey(taskID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {

214
kv/task_private_test.go Normal file
View File

@ -0,0 +1,214 @@
package kv
import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
)
func Test_newTaskMatchFN(t *testing.T) {
ct := func(typ string, name string) *influxdb.Task {
return &influxdb.Task{
Type: typ,
OrganizationID: 1,
Name: name,
}
}
const (
NoOrg = influxdb.ID(0)
NoTyp = "-"
NoNam = "-"
)
newMatch := func(orgID influxdb.ID, typ string, name string) taskMatchFn {
var (
org *influxdb.Organization
fil influxdb.TaskFilter
)
if orgID != NoOrg {
org = &influxdb.Organization{ID: orgID}
}
if typ != NoTyp {
fil.Type = &typ
}
if name != NoNam {
fil.Name = &name
}
return newTaskMatchFn(fil, org)
}
type test struct {
name string
task *influxdb.Task
fn taskMatchFn
exp bool
}
tests := []struct {
name string
tests []test
}{
{
"match org",
[]test{
{
name: "equal",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(1, NoTyp, NoNam),
exp: true,
},
{
name: "not org",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(2, NoTyp, NoNam),
exp: false,
},
},
},
{
"match type",
[]test{
{
name: "empty with system type",
task: ct("", "Foo"),
fn: newMatch(NoOrg, influxdb.TaskSystemType, NoNam),
exp: true,
},
{
name: "system with system type",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(NoOrg, influxdb.TaskSystemType, NoNam),
exp: true,
},
{
name: "equal",
task: ct("other type", "Foo"),
fn: newMatch(NoOrg, "other type", NoNam),
exp: true,
},
{
name: "not type",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(NoOrg, "other type", NoNam),
exp: false,
},
},
},
{
"match name",
[]test{
{
name: "equal",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(NoOrg, NoTyp, "Foo"),
exp: true,
},
{
name: "not name",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(NoOrg, NoTyp, "Bar"),
exp: false,
},
},
},
{
"match org type",
[]test{
{
name: "equal",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(1, influxdb.TaskSystemType, NoNam),
exp: true,
},
{
name: "not type",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(1, "wrong type", NoNam),
exp: false,
},
{
name: "not org",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(2, influxdb.TaskSystemType, NoNam),
exp: false,
},
{
name: "not org and type",
task: ct("check", "Foo"),
fn: newMatch(2, influxdb.TaskSystemType, NoNam),
exp: false,
},
},
},
{
"match org name",
[]test{
{
name: "equal",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(1, NoTyp, "Foo"),
exp: true,
},
{
name: "not org",
task: ct(influxdb.TaskSystemType, "Foo"),
fn: newMatch(2, NoTyp, "Foo"),
exp: false,
},
},
},
{
"match org name type",
[]test{
{
name: "equal",
task: ct("check", "Foo"),
fn: newMatch(1, "check", "Foo"),
exp: true,
},
{
name: "not org",
task: ct("check", "Foo"),
fn: newMatch(2, "check", "Foo"),
exp: false,
},
{
name: "not name",
task: ct("check", "Foo"),
fn: newMatch(1, "check", "Bar"),
exp: false,
},
{
name: "not type",
task: ct("check", "Foo"),
fn: newMatch(1, "other", "Foo"),
exp: false,
},
},
},
}
for _, group := range tests {
t.Run(group.name, func(t *testing.T) {
for _, test := range group.tests {
t.Run(test.name, func(t *testing.T) {
if got, exp := test.fn(test.task), test.exp; got != exp {
t.Errorf("unxpected match result: -got/+exp\n%v", cmp.Diff(got, exp))
}
})
}
})
}
t.Run("match returns nil for no filter", func(t *testing.T) {
fn := newTaskMatchFn(influxdb.TaskFilter{}, nil)
if fn != nil {
t.Error("expected nil")
}
})
}

View File

@ -66,7 +66,7 @@ func (b *Bucket) Get(key []byte) ([]byte, error) {
}
// Cursor returns a cursor at the beginning of this bucket.
func (b *Bucket) Cursor() (kv.Cursor, error) {
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
return b.CursorFn()
}

View File

@ -56,6 +56,10 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory, testCategory ...s
testTaskCRUD(t, sys)
})
t.Run("FindTasks paging", func(t *testing.T) {
testTaskFindTasksPaging(t, sys)
})
t.Run("Task Update Options Full", func(t *testing.T) {
t.Parallel()
testTaskOptionsUpdateFull(t, sys)
@ -423,6 +427,62 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
}
func testTaskFindTasksPaging(t *testing.T, sys *System) {
script := `option task = {
name: "Task %03d",
cron: "* * * * *",
concurrency: 100,
offset: 10s,
}
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
cr := creds(t, sys)
tc := influxdb.TaskCreate{
OrganizationID: cr.OrgID,
OwnerID: cr.UserID,
Type: influxdb.TaskSystemType,
}
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
created := make([]*influxdb.Task, 50)
for i := 0; i < 50; i++ {
tc.Flux = fmt.Sprintf(script, i/10)
tsk, err := sys.TaskService.CreateTask(authorizedCtx, tc)
if err != nil {
t.Fatal(err)
}
if !tsk.ID.Valid() {
t.Fatal("no task ID set")
}
created[i] = tsk
}
tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{Limit: 5})
if err != nil {
t.Fatalf("FindTasks: %v", err)
}
if got, exp := len(tasks), 5; got != exp {
t.Fatalf("unexpected len(taksks), -got/+exp: %v", cmp.Diff(got, exp))
}
// find tasks using name which are after first 10
name := "Task 004"
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{Limit: 5, Name: &name})
if err != nil {
t.Fatalf("FindTasks: %v", err)
}
if got, exp := len(tasks), 5; got != exp {
t.Fatalf("unexpected len(taksks), -got/+exp: %v", cmp.Diff(got, exp))
}
}
//Create a new task with a Cron and Offset option
//Update the task to remove the Offset option, and change Cron to Every
//Retrieve the task again to ensure the options are now Every, without Cron or Offset

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/kv"
)
@ -45,6 +46,10 @@ func KVStore(
name: "Cursor",
fn: KVCursor,
},
{
name: "CursorWithHints",
fn: KVCursorWithHints,
},
{
name: "View",
fn: KVView,
@ -512,6 +517,123 @@ func KVCursor(
}
}
// KVCursor tests the cursor contract for the key value store.
func KVCursorWithHints(
init func(KVStoreFields, *testing.T) (kv.Store, func()),
t *testing.T,
) {
type args struct {
seek string
until string
hints []kv.CursorHint
}
pairs := func(keys ...string) []kv.Pair {
p := make([]kv.Pair, len(keys))
for i, k := range keys {
p[i].Key = []byte(k)
p[i].Value = []byte("val:" + k)
}
return p
}
tests := []struct {
name string
fields KVStoreFields
args args
exp []string
}{
{
name: "no hints",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aaa",
until: "bbb/00",
},
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
},
{
name: "prefix hint",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aaa",
until: "aaa/03",
hints: []kv.CursorHint{kv.WithCursorHintPrefix("aaa/")},
},
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
},
{
name: "start hint",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aaa",
until: "bbb/00",
hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")},
},
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, fin := init(tt.fields, t)
defer fin()
err := s.View(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte("bucket"))
if err != nil {
t.Errorf("unexpected error retrieving bucket: %v", err)
return err
}
cur, err := b.Cursor(tt.args.hints...)
if err != nil {
t.Errorf("unexpected error: %v", err)
return err
}
var got []string
k, _ := cur.Seek([]byte(tt.args.seek))
for len(k) > 0 {
got = append(got, string(k))
if string(k) == tt.args.until {
break
}
k, _ = cur.Next()
}
if exp := tt.exp; !cmp.Equal(got, exp) {
t.Errorf("unexpected cursor values: -got/+exp\n%v", cmp.Diff(got, exp))
}
return nil
})
if err != nil {
t.Fatalf("error during view transaction: %v", err)
}
})
}
}
// KVView tests the view method contract for the key value store.
func KVView(
init func(KVStoreFields, *testing.T) (kv.Store, func()),