Allow tasks to skip catchup (#12068)
parent
6fdcaf83b4
commit
c78477314a
|
@ -442,7 +442,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
store taskbackend.Store
|
||||
err error
|
||||
)
|
||||
store, err = taskbolt.New(m.boltClient.DB(), "tasks")
|
||||
store, err = taskbolt.New(m.boltClient.DB(), "tasks", taskbolt.NoCatchUp)
|
||||
if err != nil {
|
||||
m.logger.Error("failed opening task bolt", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
|
@ -46,6 +47,8 @@ type Store struct {
|
|||
db *bolt.DB
|
||||
bucket []byte
|
||||
idGen platform.IDGenerator
|
||||
|
||||
minLatestCompleted int64
|
||||
}
|
||||
|
||||
const basePath = "/tasks/v1/"
|
||||
|
@ -59,8 +62,14 @@ var (
|
|||
runIDs = []byte(basePath + "run_ids")
|
||||
)
|
||||
|
||||
// Option is a optional configuration for the store.
|
||||
type Option func(*Store)
|
||||
|
||||
// NoCatchUp allows you to skip any task that was supposed to run during down time.
|
||||
func NoCatchUp(st *Store) { st.minLatestCompleted = time.Now().Unix() }
|
||||
|
||||
// New gives us a new Store based on "github.com/coreos/bbolt"
|
||||
func New(db *bolt.DB, rootBucket string) (*Store, error) {
|
||||
func New(db *bolt.DB, rootBucket string, opts ...Option) (*Store, error) {
|
||||
if db.IsReadOnly() {
|
||||
return nil, ErrDBReadOnly
|
||||
}
|
||||
|
@ -87,7 +96,11 @@ func New(db *bolt.DB, rootBucket string) (*Store, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Store{db: db, bucket: bucket, idGen: snowflake.NewDefaultIDGenerator()}, nil
|
||||
st := &Store{db: db, bucket: bucket, idGen: snowflake.NewDefaultIDGenerator(), minLatestCompleted: math.MinInt64}
|
||||
for _, opt := range opts {
|
||||
opt(st)
|
||||
}
|
||||
return st, nil
|
||||
}
|
||||
|
||||
// CreateTask creates a task in the boltdb task store.
|
||||
|
@ -434,6 +447,10 @@ func (s *Store) FindTaskMetaByID(ctx context.Context, id platform.ID) (*backend.
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if stm.LatestCompleted < s.minLatestCompleted {
|
||||
stm.LatestCompleted = s.minLatestCompleted
|
||||
}
|
||||
|
||||
return &stm, nil
|
||||
}
|
||||
|
||||
|
@ -472,6 +489,10 @@ func (s *Store) FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*back
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
if stm.LatestCompleted < s.minLatestCompleted {
|
||||
stm.LatestCompleted = s.minLatestCompleted
|
||||
}
|
||||
|
||||
return &backend.StoreTask{
|
||||
ID: id,
|
||||
Org: orgID,
|
||||
|
@ -539,6 +560,10 @@ func (s *Store) CreateNextRun(ctx context.Context, taskID platform.ID, now int64
|
|||
return err
|
||||
}
|
||||
|
||||
if stm.LatestCompleted < s.minLatestCompleted {
|
||||
stm.LatestCompleted = s.minLatestCompleted
|
||||
}
|
||||
|
||||
rc, err = stm.CreateNextRun(now, func() (platform.ID, error) {
|
||||
return s.idGen.ID(), nil
|
||||
})
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package bolt_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
"github.com/influxdata/influxdb"
|
||||
_ "github.com/influxdata/influxdb/query/builtin"
|
||||
"github.com/influxdata/influxdb/task/backend"
|
||||
boltstore "github.com/influxdata/influxdb/task/backend/bolt"
|
||||
|
@ -49,3 +52,92 @@ func TestBoltStore(t *testing.T) {
|
|||
},
|
||||
)(t)
|
||||
}
|
||||
|
||||
func TestSkip(t *testing.T) {
|
||||
f, err := ioutil.TempFile("", "influx_bolt_task_store_test")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create tempfile for test db %v\n", err)
|
||||
}
|
||||
defer f.Close()
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
db, err := bolt.Open(f.Name(), os.ModeTemporary, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open bolt db for test db %v\n", err)
|
||||
}
|
||||
s, err := boltstore.New(db, "testbucket")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new bolt store %v\n", err)
|
||||
}
|
||||
|
||||
schedAfter := time.Now().Add(-time.Minute)
|
||||
tskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{
|
||||
Org: influxdb.ID(1),
|
||||
AuthorizationID: influxdb.ID(2),
|
||||
Script: `option task = {name:"x", every:1s} from(bucket:"b-src") |> range(start:-1m) |> to(bucket:"b-dst", org:"o")`,
|
||||
ScheduleAfter: schedAfter.Unix(),
|
||||
Status: backend.TaskActive,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new task %v\n", err)
|
||||
}
|
||||
|
||||
rc, err := s.CreateNextRun(context.Background(), tskID, schedAfter.Add(10*time.Second).Unix())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new run %v\n", err)
|
||||
}
|
||||
|
||||
if err := s.FinishRun(context.Background(), tskID, rc.Created.RunID); err != nil {
|
||||
t.Fatalf("failed to finish run %v\n", err)
|
||||
}
|
||||
|
||||
meta, err := s.FindTaskMetaByID(context.Background(), tskID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to pull meta %v\n", err)
|
||||
}
|
||||
if meta.LatestCompleted <= schedAfter.Unix() {
|
||||
t.Fatal("failed to update latestCompleted")
|
||||
}
|
||||
|
||||
latestCompleted := meta.LatestCompleted
|
||||
|
||||
s.Close()
|
||||
|
||||
db, err = bolt.Open(f.Name(), os.ModeTemporary, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open bolt db for test db %v\n", err)
|
||||
}
|
||||
s, err = boltstore.New(db, "testbucket", boltstore.NoCatchUp)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new bolt store %v\n", err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
meta, err = s.FindTaskMetaByID(context.Background(), tskID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to pull meta %v\n", err)
|
||||
}
|
||||
|
||||
if meta.LatestCompleted == latestCompleted {
|
||||
t.Fatal("failed to overwrite latest completed on new meta pull")
|
||||
}
|
||||
latestCompleted = meta.LatestCompleted
|
||||
|
||||
rc, err = s.CreateNextRun(context.Background(), tskID, time.Now().Add(10*time.Second).Unix())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new run %v\n", err)
|
||||
}
|
||||
|
||||
if err := s.FinishRun(context.Background(), tskID, rc.Created.RunID); err != nil {
|
||||
t.Fatalf("failed to finish run %v\n", err)
|
||||
}
|
||||
|
||||
meta, err = s.FindTaskMetaByID(context.Background(), tskID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to pull meta %v\n", err)
|
||||
}
|
||||
|
||||
if meta.LatestCompleted == latestCompleted {
|
||||
t.Fatal("failed to run after an override")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue