fix(task): align task every with whole units
Previously, using every=1m would run every minute from when the task was created. This change restores the original intent, which is that "every 1m" happens on the minute, "every 1h" on the hour, etc.pull/10616/head
parent
c51d92363e
commit
47d04f198d
|
@ -20,7 +20,6 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
"github.com/influxdata/platform"
|
||||
|
@ -169,17 +168,7 @@ func (s *Store) CreateTask(ctx context.Context, req backend.CreateTaskRequest) (
|
|||
return err
|
||||
}
|
||||
|
||||
stm := backend.StoreTaskMeta{
|
||||
MaxConcurrency: int32(o.Concurrency),
|
||||
Status: string(req.Status),
|
||||
LatestCompleted: req.ScheduleAfter,
|
||||
EffectiveCron: o.EffectiveCronString(),
|
||||
Delay: int32(o.Delay / time.Second),
|
||||
}
|
||||
if stm.Status == "" {
|
||||
stm.Status = string(backend.DefaultTaskStatus)
|
||||
}
|
||||
|
||||
stm := backend.NewStoreTaskMeta(req, o)
|
||||
stmBytes, err := stm.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/snowflake"
|
||||
|
@ -59,18 +58,7 @@ func (s *inmem) CreateTask(_ context.Context, req CreateTaskRequest) (platform.I
|
|||
defer s.mu.Unlock()
|
||||
|
||||
s.tasks = append(s.tasks, task)
|
||||
|
||||
stm := StoreTaskMeta{
|
||||
MaxConcurrency: int32(o.Concurrency),
|
||||
Status: string(req.Status),
|
||||
LatestCompleted: req.ScheduleAfter,
|
||||
EffectiveCron: o.EffectiveCronString(),
|
||||
Delay: int32(o.Delay / time.Second),
|
||||
}
|
||||
if stm.Status == "" {
|
||||
stm.Status = string(DefaultTaskStatus)
|
||||
}
|
||||
s.meta[id] = stm
|
||||
s.meta[id] = NewStoreTaskMeta(req, o)
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
|
|
@ -6,11 +6,40 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/task/options"
|
||||
cron "gopkg.in/robfig/cron.v2"
|
||||
)
|
||||
|
||||
// This file contains helper methods for the StoreTaskMeta type defined in protobuf.
|
||||
|
||||
// NewStoreTaskMeta returns a new StoreTaskMeta based on the given request and parsed options.
|
||||
func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta {
|
||||
stm := StoreTaskMeta{
|
||||
MaxConcurrency: int32(o.Concurrency),
|
||||
Status: string(req.Status),
|
||||
LatestCompleted: req.ScheduleAfter,
|
||||
EffectiveCron: o.EffectiveCronString(),
|
||||
Delay: int32(o.Delay / time.Second),
|
||||
}
|
||||
|
||||
if stm.Status == "" {
|
||||
stm.Status = string(DefaultTaskStatus)
|
||||
}
|
||||
|
||||
if o.Every != 0 {
|
||||
t := time.Unix(stm.LatestCompleted, 0).Truncate(o.Every).Unix()
|
||||
if t == stm.LatestCompleted {
|
||||
// For example, every 1m truncates to exactly on the minute.
|
||||
// But the input request is schedule after, not "on or after".
|
||||
// Add one interval.
|
||||
t += int64(o.Every / time.Second)
|
||||
}
|
||||
stm.LatestCompleted = t
|
||||
}
|
||||
|
||||
return stm
|
||||
}
|
||||
|
||||
// FinishRun removes the run matching runID from m's CurrentlyRunning slice,
|
||||
// and if that run's Now value is greater than m's LatestCompleted value,
|
||||
// updates the value of LatestCompleted to the run's Now value.
|
||||
|
|
|
@ -601,6 +601,43 @@ from(bucket:"test") |> range(start:-1h)`
|
|||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("schedule alignment with 'every' option", func(t *testing.T) {
|
||||
s := create(t)
|
||||
defer destroy(t, s)
|
||||
|
||||
const secondsPerDay = 60 * 60 * 24
|
||||
const scriptFmt = `option task = {
|
||||
name: "task with every",
|
||||
every: %s,
|
||||
}
|
||||
from(bucket: "b") |> range(start:-1h) |> toHTTP(url: "http://example.com")`
|
||||
|
||||
for _, tc := range []struct {
|
||||
e string // every option in task
|
||||
sa int64 // ScheduleAfter when creating
|
||||
lc int64 // expected meta.LatestCompleted
|
||||
}{
|
||||
{e: "1m", sa: 65, lc: 60},
|
||||
{e: "1m", sa: 60, lc: 120},
|
||||
{e: "10s", sa: 27, lc: 20},
|
||||
{e: "2d", sa: (2 * secondsPerDay) + 1, lc: 2 * secondsPerDay},
|
||||
} {
|
||||
script := fmt.Sprintf(scriptFmt, tc.e)
|
||||
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script, ScheduleAfter: tc.sa})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
meta, err := s.FindTaskMetaByID(context.Background(), id)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if meta.LatestCompleted != tc.lc {
|
||||
t.Errorf("with every = %q, Create.ScheduleAfter = %d: got LatestCompleted %d, expected %d", tc.e, tc.sa, meta.LatestCompleted, tc.lc)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func testStoreFindByIDWithMeta(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) {
|
||||
|
|
Loading…
Reference in New Issue