Merge pull request #16150 from influxdata/sgc/issue/5482

fix(task): LatestScheduled is set when task is updated to active
pull/16160/head
Stuart Carnie 2019-12-06 13:07:43 -07:00 committed by GitHub
commit 2ae10bb83d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 137 additions and 48 deletions

View File

@ -4,11 +4,11 @@ import (
"context"
"time"
"go.uber.org/zap"
"github.com/benbjohnson/clock"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/rand"
"github.com/influxdata/influxdb/snowflake"
"go.uber.org/zap"
)
var (
@ -22,6 +22,7 @@ const OpPrefix = "kv/"
type Service struct {
kv Store
log *zap.Logger
clock clock.Clock
Config ServiceConfig
IDGenerator influxdb.IDGenerator
@ -56,12 +57,18 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service {
s.Config.SessionLength = influxdb.DefaultSessionLength
}
s.clock = s.Config.Clock
if s.clock == nil {
s.clock = clock.New()
}
return s
}
// ServiceConfig allows us to configure Services
type ServiceConfig struct {
SessionLength time.Duration
Clock clock.Clock
}
// Initialize creates Buckets needed.

View File

@ -577,7 +577,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
tc.Status = string(backend.TaskActive)
}
createdAt := time.Now().Truncate(time.Second).UTC()
createdAt := s.clock.Now().Truncate(time.Second).UTC()
task := &influxdb.Task{
ID: s.IDGenerator.ID(),
Type: tc.Type,
@ -593,6 +593,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
Cron: opt.Cron,
CreatedAt: createdAt,
LatestCompleted: createdAt,
LatestScheduled: createdAt,
}
if opt.Offset != nil {
@ -696,7 +697,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
return nil, err
}
updatedAt := time.Now().UTC()
updatedAt := s.clock.Now().UTC()
// update the flux script
if !upd.Options.IsZero() || upd.Flux != nil {
@ -727,13 +728,18 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
if upd.Description != nil {
task.Description = *upd.Description
task.UpdatedAt = updatedAt
}
if upd.Status != nil {
if upd.Status != nil && task.Status != *upd.Status {
task.Status = *upd.Status
task.UpdatedAt = updatedAt
// task is transitioning from inactive to active, ensure scheduled and completed are updated
if task.Status == influxdb.TaskStatusActive {
updatedAtTrunc := updatedAt.Truncate(time.Second).UTC()
task.LatestCompleted = updatedAtTrunc
task.LatestScheduled = updatedAtTrunc
}
}
if upd.Metadata != nil {

View File

@ -7,6 +7,8 @@ import (
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kv"
@ -179,52 +181,84 @@ func TestNextRunDue(t *testing.T) {
}
func TestRetrieveTaskWithBadAuth(t *testing.T) {
store, close, err := NewTestInmemStore(t)
type testService struct {
Store kv.Store
Service *kv.Service
Org influxdb.Organization
User influxdb.User
Auth influxdb.Authorization
Clock clock.Clock
storeCloseFn func()
}
func (s *testService) Close() {
s.storeCloseFn()
}
func newService(t *testing.T, ctx context.Context, c clock.Clock) *testService {
t.Helper()
if c == nil {
c = clock.New()
}
ts := &testService{}
var err error
ts.Store, ts.storeCloseFn, err = NewTestInmemStore(t)
if err != nil {
t.Fatal(err)
t.Fatal("failed to create InmemStore", err)
}
defer close()
service := kv.NewService(zaptest.NewLogger(t), store)
ctx, cancelFunc := context.WithCancel(context.Background())
if err := service.Initialize(ctx); err != nil {
t.Fatalf("error initializing urm service: %v", err)
ts.Service = kv.NewService(zaptest.NewLogger(t), ts.Store, kv.ServiceConfig{Clock: c})
err = ts.Service.Initialize(ctx)
if err != nil {
t.Fatal("Service.Initialize", err)
}
defer cancelFunc()
u := &influxdb.User{Name: t.Name() + "-user"}
if err := service.CreateUser(ctx, u); err != nil {
ts.User = influxdb.User{Name: t.Name() + "-user"}
if err := ts.Service.CreateUser(ctx, &ts.User); err != nil {
t.Fatal(err)
}
o := &influxdb.Organization{Name: t.Name() + "-org"}
if err := service.CreateOrganization(ctx, o); err != nil {
ts.Org = influxdb.Organization{Name: t.Name() + "-org"}
if err := ts.Service.CreateOrganization(ctx, &ts.Org); err != nil {
t.Fatal(err)
}
if err := service.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
if err := ts.Service.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
ResourceType: influxdb.OrgsResourceType,
ResourceID: o.ID,
UserID: u.ID,
ResourceID: ts.Org.ID,
UserID: ts.User.ID,
UserType: influxdb.Owner,
}); err != nil {
t.Fatal(err)
}
authz := influxdb.Authorization{
OrgID: o.ID,
UserID: u.ID,
ts.Auth = influxdb.Authorization{
OrgID: ts.Org.ID,
UserID: ts.User.ID,
Permissions: influxdb.OperPermissions(),
}
if err := service.CreateAuthorization(context.Background(), &authz); err != nil {
if err := ts.Service.CreateAuthorization(context.Background(), &ts.Auth); err != nil {
t.Fatal(err)
}
ctx = icontext.SetAuthorizer(ctx, &authz)
return ts
}
task, err := service.CreateTask(ctx, influxdb.TaskCreate{
func TestRetrieveTaskWithBadAuth(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
ts := newService(t, ctx, nil)
defer ts.Close()
ctx = icontext.SetAuthorizer(ctx, &ts.Auth)
task, err := ts.Service.CreateTask(ctx, influxdb.TaskCreate{
Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`,
OrganizationID: o.ID,
OwnerID: u.ID,
OrganizationID: ts.Org.ID,
OwnerID: ts.User.ID,
Status: string(backend.TaskActive),
})
if err != nil {
@ -232,7 +266,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {
}
// convert task to old one with a bad auth
err = store.Update(ctx, func(tx kv.Tx) error {
err = ts.Store.Update(ctx, func(tx kv.Tx) error {
b, err := tx.Bucket([]byte("tasksv1"))
if err != nil {
return err
@ -260,7 +294,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {
}
// lets see if we can list and find the task
newTask, err := service.FindTaskByID(ctx, task.ID)
newTask, err := ts.Service.FindTaskByID(ctx, task.ID)
if err != nil {
t.Fatal(err)
}
@ -268,7 +302,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {
t.Fatal("miss matching taskID's")
}
tasks, _, err := service.FindTasks(ctx, influxdb.TaskFilter{})
tasks, _, err := ts.Service.FindTasks(ctx, influxdb.TaskFilter{})
if err != nil {
t.Fatal(err)
}
@ -278,7 +312,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {
// test status filter
active := string(backend.TaskActive)
tasksWithActiveFilter, _, err := service.FindTasks(ctx, influxdb.TaskFilter{Status: &active})
tasksWithActiveFilter, _, err := ts.Service.FindTasks(ctx, influxdb.TaskFilter{Status: &active})
if err != nil {
t.Fatal("could not find tasks")
}
@ -286,3 +320,53 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {
t.Fatal("failed to find active task with filter")
}
}
func TestService_UpdateTask_InactiveToActive(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
c := clock.NewMock()
c.Set(time.Unix(1000, 0))
ts := newService(t, ctx, c)
defer ts.Close()
ctx = icontext.SetAuthorizer(ctx, &ts.Auth)
originalTask, err := ts.Service.CreateTask(ctx, influxdb.TaskCreate{
Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`,
OrganizationID: ts.Org.ID,
OwnerID: ts.User.ID,
Status: string(backend.TaskActive),
})
if err != nil {
t.Fatal("CreateTask", err)
}
v := influxdb.TaskStatusInactive
c.Add(1 * time.Second)
exp := c.Now()
updatedTask, err := ts.Service.UpdateTask(ctx, originalTask.ID, influxdb.TaskUpdate{Status: &v, LatestCompleted: &exp, LatestScheduled: &exp})
if err != nil {
t.Fatal("UpdateTask", err)
}
if got := updatedTask.LatestScheduled; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
if got := updatedTask.LatestCompleted; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
c.Add(10 * time.Second)
exp = c.Now()
v = influxdb.TaskStatusActive
updatedTask, err = ts.Service.UpdateTask(ctx, originalTask.ID, influxdb.TaskUpdate{Status: &v})
if err != nil {
t.Fatal("UpdateTask", err)
}
if got := updatedTask.LatestScheduled; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
}

View File

@ -42,6 +42,7 @@ func NotifyCoordinatorOfExisting(ctx context.Context, log *zap.Logger, ts TaskSe
task, err := ts.UpdateTask(context.Background(), task.ID, influxdb.TaskUpdate{
LatestCompleted: &latestCompleted,
LatestScheduled: &latestCompleted,
})
if err != nil {
log.Error("Failed to set latestCompleted", zap.Error(err))
@ -83,6 +84,7 @@ func TaskNotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, tcs Ta
task, err := ts.UpdateTask(context.Background(), task.ID, influxdb.TaskUpdate{
LatestCompleted: &latestCompleted,
LatestScheduled: &latestCompleted,
})
if err != nil {
log.Error("Failed to set latestCompleted", zap.Error(err))

View File

@ -55,7 +55,7 @@ func Test_NotifyCoordinatorOfCreated(t *testing.T) {
}
if diff := cmp.Diff([]update{
{two, influxdb.TaskUpdate{LatestCompleted: &aTime}},
{two, influxdb.TaskUpdate{LatestCompleted: &aTime, LatestScheduled: &aTime}},
}, tasks.updates); diff != "" {
t.Errorf("unexpected updates to task service %v", diff)
}

View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
)
// Coordinator is a type which is used to react to
@ -71,16 +70,6 @@ func (s *CoordinatingTaskService) UpdateTask(ctx context.Context, id influxdb.ID
return nil, err
}
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
// this allows us to see not run the task for inactive time
if upd.Status != nil && *upd.Status == string(backend.TaskActive) {
// confirm that it was inactive and this is an attempt to activate
if from.Status == string(backend.TaskInactive) {
lc := s.now()
upd.LatestCompleted = &lc
}
}
to, err := s.TaskService.UpdateTask(ctx, id, upd)
if err != nil {
return to, err

View File

@ -262,6 +262,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
ID: tsk.ID,
CreatedAt: tsk.CreatedAt,
LatestCompleted: tsk.LatestCompleted,
LatestScheduled: tsk.LatestScheduled,
OrganizationID: cr.OrgID,
Organization: cr.Org,
AuthorizationID: tsk.AuthorizationID,
@ -632,8 +633,8 @@ func testUpdate(t *testing.T, sys *System) {
t.Fatal(err)
}
if !task.LatestScheduled.IsZero() {
t.Fatal("expected a zero LatestScheduled on created task")
if task.LatestScheduled.IsZero() {
t.Fatal("expected a non-zero LatestScheduled on created task")
}
st, err := sys.TaskService.FindTaskByID(sys.Ctx, task.ID)