feat(tasks): added functionality to filter runns by time

pull/19979/head
Arnav Aggarwal 2020-11-10 10:30:34 -10:00
parent 6ef0e03b33
commit 64a562db42
6 changed files with 161 additions and 9 deletions

View File

@ -119,3 +119,10 @@
key: enforceOrgDashboardLimits
default: false
contact: Compute Team
- name: Time Filter Flags
description: Filter task run list based on before and after flags
key: timeFilterFlags
contact: Compute Team
default: false
expose: true

View File

@ -212,6 +212,20 @@ func EnforceOrganizationDashboardLimits() BoolFlag {
return enforceOrgDashboardLimits
}
var timeFilterFlags = MakeBoolFlag(
"Time Filter Flags",
"timeFilterFlags",
"Compute Team",
false,
Temporary,
true,
)
// TimeFilterFlags - Filter task run list based on before and after flags
func TimeFilterFlags() BoolFlag {
return timeFilterFlags
}
var all = []Flag{
appMetrics,
backendExample,
@ -228,6 +242,7 @@ var all = []Flag{
notebooks,
injectLatestSuccessTime,
enforceOrgDashboardLimits,
timeFilterFlags,
}
var byKey = map[string]Flag{
@ -246,4 +261,5 @@ var byKey = map[string]Flag{
"notebooks": notebooks,
"injectLatestSuccessTime": injectLatestSuccessTime,
"enforceOrgDashboardLimits": enforceOrgDashboardLimits,
"timeFilterFlags": timeFilterFlags,
}

View File

@ -951,6 +951,21 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, influxdb.ErrOutOfBoundsLimit
}
parsedFilterAfterTime := time.Time{}
parsedFilterBeforeTime := time.Now().UTC()
var err error
if filter.AfterTime != "" {
parsedFilterAfterTime, err = time.Parse(time.RFC3339, filter.AfterTime)
if err != nil {
return nil, 0, err
}
}
if filter.BeforeTime != "" {
parsedFilterBeforeTime, err = time.Parse(time.RFC3339, filter.BeforeTime)
if err != nil {
return nil, 0, err
}
}
var runs []*influxdb.Run
// manual runs
@ -959,7 +974,9 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter
return nil, 0, err
}
for _, run := range manualRuns {
runs = append(runs, run)
if run.ScheduledFor.After(parsedFilterAfterTime) && run.ScheduledFor.Before(parsedFilterBeforeTime) {
runs = append(runs, run)
}
if len(runs) >= filter.Limit {
return runs, len(runs), nil
}
@ -971,13 +988,16 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter
return nil, 0, err
}
for _, run := range currentlyRunning {
runs = append(runs, run)
if run.ScheduledFor.After(parsedFilterAfterTime) && run.ScheduledFor.Before(parsedFilterBeforeTime) {
runs = append(runs, run)
}
if len(runs) >= filter.Limit {
return runs, len(runs), nil
}
}
return runs, len(runs), nil
}
// FindRunByID returns a single run.

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/storage"
"go.uber.org/zap"
@ -152,6 +153,32 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
filterPart = fmt.Sprintf(`|> filter(fn: (r) => r.runID > %q)`, filter.After.String())
}
parsedAfterTime := time.Time{}
parsedBeforeTime := time.Now()
constructedTimeFilter := ""
if feature.TimeFilterFlags().Enabled(ctx) {
if filter.AfterTime != "" {
tmpParsedAfter, err := time.Parse(time.RFC3339, filter.AfterTime)
if err != nil {
return nil, 0, err
}
parsedAfterTime = tmpParsedAfter
}
if filter.BeforeTime != "" {
tmpParsedBefore, err := time.Parse(time.RFC3339, filter.BeforeTime)
if err != nil {
return nil, 0, err
}
parsedBeforeTime = tmpParsedBefore
}
constructedTimeFilter = fmt.Sprintf(
`|> filter(fn: (r) =>time(v: r["scheduledFor"]) > %s and time(v: r["scheduledFor"]) < %s)`,
parsedAfterTime.Format(time.RFC3339),
parsedBeforeTime.Format(time.RFC3339))
}
// the data will be stored for 7 days in the system bucket so pulling 14d's is sufficient.
runsScript := fmt.Sprintf(`from(bucketID: %q)
|> range(start: -14d)
@ -159,11 +186,12 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
|> filter(fn: (r) => r._measurement == "runs" and r.taskID == %q)
%s
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
%s
|> group(columns: ["taskID"])
|> sort(columns:["scheduledFor"], desc: true)
|> limit(n:%d)
`, sb.ID.String(), filter.Task.String(), filterPart, filter.Limit-len(runs))
`, sb.ID.String(), filter.Task.String(), filterPart, constructedTimeFilter, filter.Limit-len(runs))
// At this point we are behind authorization
// so we are faking a read only permission to the org's system bucket
@ -205,6 +233,7 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
runs = as.combineRuns(runs, re.runs)
return runs, len(runs), err
}
// remove any kv runs that exist in the list of completed runs

View File

@ -12,12 +12,12 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorization"
icontext "github.com/influxdata/influxdb/v2/context"
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query"
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
"github.com/influxdata/influxdb/v2/query/control"
"github.com/influxdata/influxdb/v2/query/fluxlang"
stdlib "github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
@ -78,6 +78,7 @@ func TestAnalyticalStore(t *testing.T) {
UserResourceMappingService: ts.UserResourceMappingService,
AuthorizationService: authSvc,
Ctx: authCtx,
CallFinishRun: true,
}, func() {
cancelFunc()
ab.Close(t)

View File

@ -19,8 +19,12 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/feature"
influxdbmock "github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/task/backend"
"github.com/influxdata/influxdb/v2/task/options"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// BackendComponentFactory is supplied by consumers of the adaptertest package,
@ -175,6 +179,9 @@ type System struct {
// the caller should set this value and return valid IDs and a valid token.
// It is safe if this returns the same values every time it is called.
CredsFunc func(*testing.T) (TestCreds, error)
// Toggles behavior between KV and archive storage because FinishRun() deletes runs after completion
CallFinishRun bool
}
func testTaskCRUD(t *testing.T, sys *System) {
@ -902,7 +909,7 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
}
requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make two runs.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make two runs.
rc0, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
if err != nil {
@ -994,6 +1001,78 @@ func testTaskRuns(t *testing.T, sys *System) {
}
})
t.Run("FindRunsByTime", func(t *testing.T) {
t.Parallel()
ctx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
ctx, err := feature.Annotate(ctx, influxdbmock.NewFlagger(map[feature.Flag]interface{}{
feature.TimeFilterFlags(): true,
}))
require.NoError(t, err)
// Script is set to run every minute. The platform adapter is currently hardcoded to schedule after "now",
// which makes timing of runs somewhat difficult.
ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
OwnerID: cr.UserID,
}
task, err := sys.TaskService.CreateTask(ctx, ct)
if err != nil {
t.Fatal(err)
}
// set to one hour before now because of bucket retention policy
scheduledFor := time.Now().Add(time.Hour * -1).UTC()
runs := make([]*influxdb.Run, 0, 5)
// create runs to put into Context
for i := 5; i > 0; i-- {
run, err := sys.TaskControlService.CreateRun(ctx, task.ID, scheduledFor.Add(time.Second*time.Duration(i)), scheduledFor.Add(time.Second*time.Duration(i)))
if err != nil {
t.Fatal(err)
}
err = sys.TaskControlService.UpdateRunState(ctx, task.ID, run.ID, scheduledFor.Add(time.Second*time.Duration(i+1)), influxdb.RunStarted)
if err != nil {
t.Fatal(err)
}
err = sys.TaskControlService.UpdateRunState(ctx, task.ID, run.ID, scheduledFor.Add(time.Second*time.Duration(i+2)), influxdb.RunSuccess)
if err != nil {
t.Fatal(err)
}
// setting run in memory to match the fields in Context
run.StartedAt = scheduledFor.Add(time.Second * time.Duration(i+1))
run.FinishedAt = scheduledFor.Add(time.Second * time.Duration(i+2))
run.RunAt = scheduledFor.Add(time.Second * time.Duration(i))
run.Status = influxdb.RunSuccess.String()
run.Log = nil
if sys.CallFinishRun {
run, err = sys.TaskControlService.FinishRun(ctx, task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
// Analytical storage does not store run at
run.RunAt = time.Time{}
}
runs = append(runs, run)
}
found, _, err := sys.TaskService.FindRuns(ctx,
influxdb.RunFilter{
Task: task.ID,
Limit: 2,
AfterTime: scheduledFor.Add(time.Second * time.Duration(1)).Format(time.RFC3339),
BeforeTime: scheduledFor.Add(time.Second * time.Duration(4)).Format(time.RFC3339),
})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, runs[2:4], found)
})
t.Run("ForceRun", func(t *testing.T) {
t.Parallel()
@ -1036,7 +1115,7 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Fatal(err)
}
requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make a run.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make a run.
// Create two runs.
rc1, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
@ -1345,7 +1424,7 @@ func testRunStorage(t *testing.T, sys *System) {
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
}
requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make two runs.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make two runs.
rc0, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
if err != nil {
@ -1519,7 +1598,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) {
t.Errorf("expected retrying run that doesn't exist to return %v, got %v", influxdb.ErrRunNotFound, err)
}
requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make a run.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make a run.
rc, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
if err != nil {
@ -1581,7 +1660,7 @@ func testLogsAcrossStorage(t *testing.T, sys *System) {
t.Fatal(err)
}
requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make two runs.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make two runs.
rc0, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
if err != nil {