diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 0355a5194c..56aa622c8c 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -442,7 +442,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { store taskbackend.Store err error ) - store, err = taskbolt.New(m.boltClient.DB(), "tasks") + store, err = taskbolt.New(m.boltClient.DB(), "tasks", taskbolt.NoCatchUp) if err != nil { m.logger.Error("failed opening task bolt", zap.Error(err)) return err diff --git a/task/backend/bolt/bolt.go b/task/backend/bolt/bolt.go index 05650b5e7d..a938622f92 100644 --- a/task/backend/bolt/bolt.go +++ b/task/backend/bolt/bolt.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "math" "time" bolt "github.com/coreos/bbolt" @@ -46,6 +47,8 @@ type Store struct { db *bolt.DB bucket []byte idGen platform.IDGenerator + + minLatestCompleted int64 } const basePath = "/tasks/v1/" @@ -59,8 +62,14 @@ var ( runIDs = []byte(basePath + "run_ids") ) +// Option is a optional configuration for the store. +type Option func(*Store) + +// NoCatchUp allows you to skip any task that was supposed to run during down time. +func NoCatchUp(st *Store) { st.minLatestCompleted = time.Now().Unix() } + // New gives us a new Store based on "github.com/coreos/bbolt" -func New(db *bolt.DB, rootBucket string) (*Store, error) { +func New(db *bolt.DB, rootBucket string, opts ...Option) (*Store, error) { if db.IsReadOnly() { return nil, ErrDBReadOnly } @@ -87,7 +96,11 @@ func New(db *bolt.DB, rootBucket string) (*Store, error) { if err != nil { return nil, err } - return &Store{db: db, bucket: bucket, idGen: snowflake.NewDefaultIDGenerator()}, nil + st := &Store{db: db, bucket: bucket, idGen: snowflake.NewDefaultIDGenerator(), minLatestCompleted: math.MinInt64} + for _, opt := range opts { + opt(st) + } + return st, nil } // CreateTask creates a task in the boltdb task store. @@ -434,6 +447,10 @@ func (s *Store) FindTaskMetaByID(ctx context.Context, id platform.ID) (*backend. return nil, err } + if stm.LatestCompleted < s.minLatestCompleted { + stm.LatestCompleted = s.minLatestCompleted + } + return &stm, nil } @@ -472,6 +489,10 @@ func (s *Store) FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*back return nil, nil, err } + if stm.LatestCompleted < s.minLatestCompleted { + stm.LatestCompleted = s.minLatestCompleted + } + return &backend.StoreTask{ ID: id, Org: orgID, @@ -539,6 +560,10 @@ func (s *Store) CreateNextRun(ctx context.Context, taskID platform.ID, now int64 return err } + if stm.LatestCompleted < s.minLatestCompleted { + stm.LatestCompleted = s.minLatestCompleted + } + rc, err = stm.CreateNextRun(now, func() (platform.ID, error) { return s.idGen.ID(), nil }) diff --git a/task/backend/bolt/bolt_test.go b/task/backend/bolt/bolt_test.go index c86f26d93e..bb9c9ba3ca 100644 --- a/task/backend/bolt/bolt_test.go +++ b/task/backend/bolt/bolt_test.go @@ -1,11 +1,14 @@ package bolt_test import ( + "context" "io/ioutil" "os" "testing" + "time" bolt "github.com/coreos/bbolt" + "github.com/influxdata/influxdb" _ "github.com/influxdata/influxdb/query/builtin" "github.com/influxdata/influxdb/task/backend" boltstore "github.com/influxdata/influxdb/task/backend/bolt" @@ -49,3 +52,92 @@ func TestBoltStore(t *testing.T) { }, )(t) } + +func TestSkip(t *testing.T) { + f, err := ioutil.TempFile("", "influx_bolt_task_store_test") + if err != nil { + t.Fatalf("failed to create tempfile for test db %v\n", err) + } + defer f.Close() + defer os.Remove(f.Name()) + + db, err := bolt.Open(f.Name(), os.ModeTemporary, nil) + if err != nil { + t.Fatalf("failed to open bolt db for test db %v\n", err) + } + s, err := boltstore.New(db, "testbucket") + if err != nil { + t.Fatalf("failed to create new bolt store %v\n", err) + } + + schedAfter := time.Now().Add(-time.Minute) + tskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{ + Org: influxdb.ID(1), + AuthorizationID: influxdb.ID(2), + Script: `option task = {name:"x", every:1s} from(bucket:"b-src") |> range(start:-1m) |> to(bucket:"b-dst", org:"o")`, + ScheduleAfter: schedAfter.Unix(), + Status: backend.TaskActive, + }) + if err != nil { + t.Fatalf("failed to create new task %v\n", err) + } + + rc, err := s.CreateNextRun(context.Background(), tskID, schedAfter.Add(10*time.Second).Unix()) + if err != nil { + t.Fatalf("failed to create new run %v\n", err) + } + + if err := s.FinishRun(context.Background(), tskID, rc.Created.RunID); err != nil { + t.Fatalf("failed to finish run %v\n", err) + } + + meta, err := s.FindTaskMetaByID(context.Background(), tskID) + if err != nil { + t.Fatalf("failed to pull meta %v\n", err) + } + if meta.LatestCompleted <= schedAfter.Unix() { + t.Fatal("failed to update latestCompleted") + } + + latestCompleted := meta.LatestCompleted + + s.Close() + + db, err = bolt.Open(f.Name(), os.ModeTemporary, nil) + if err != nil { + t.Fatalf("failed to open bolt db for test db %v\n", err) + } + s, err = boltstore.New(db, "testbucket", boltstore.NoCatchUp) + if err != nil { + t.Fatalf("failed to create new bolt store %v\n", err) + } + defer s.Close() + + meta, err = s.FindTaskMetaByID(context.Background(), tskID) + if err != nil { + t.Fatalf("failed to pull meta %v\n", err) + } + + if meta.LatestCompleted == latestCompleted { + t.Fatal("failed to overwrite latest completed on new meta pull") + } + latestCompleted = meta.LatestCompleted + + rc, err = s.CreateNextRun(context.Background(), tskID, time.Now().Add(10*time.Second).Unix()) + if err != nil { + t.Fatalf("failed to create new run %v\n", err) + } + + if err := s.FinishRun(context.Background(), tskID, rc.Created.RunID); err != nil { + t.Fatalf("failed to finish run %v\n", err) + } + + meta, err = s.FindTaskMetaByID(context.Background(), tskID) + if err != nil { + t.Fatalf("failed to pull meta %v\n", err) + } + + if meta.LatestCompleted == latestCompleted { + t.Fatal("failed to run after an override") + } +}