Merge pull request #1564 from influxdata/addrunreturntorerun

add run return to rerun
pull/10616/head
Jorge Landivar 2018-11-27 12:00:13 -06:00 committed by GitHub
commit ab39981ac4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 212 additions and 98 deletions

View File

@ -617,10 +617,11 @@ func runRetryF(cmd *cobra.Command, args []string) {
}
ctx := context.TODO()
if err := s.RetryRun(ctx, taskID, runID, time.Now().Unix()); err != nil {
newRun, err := s.RetryRun(ctx, taskID, runID, time.Now().Unix())
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("Retry for task %s's run %s queued.\n", taskID, runID)
fmt.Printf("Retry for task %s's run %s queued as run %s.\n", taskID, runID, newRun.ID)
}

View File

@ -2706,8 +2706,12 @@ paths:
required: true
description: run ID
responses:
'204':
description: retry has been queued
'200':
description: run that has been queued
content:
application/json:
schema:
$ref: "#/components/schemas/Run"
default:
description: unexpected error
content:

View File

@ -141,7 +141,7 @@ func newTasksResponse(ts []*platform.Task) tasksResponse {
}
type runResponse struct {
Links map[string]string `json:"links"`
Links map[string]string `json:"links,omitempty"`
Run platform.Run `json:"run"`
}
@ -684,12 +684,15 @@ func (h *TaskHandler) handleRetryRun(w http.ResponseWriter, r *http.Request) {
req.RequestedAt = &now
}
if err := h.TaskService.RetryRun(ctx, req.TaskID, req.RunID, *req.RequestedAt); err != nil {
run, err := h.TaskService.RetryRun(ctx, req.TaskID, req.RunID, *req.RequestedAt)
if err != nil {
EncodeError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, newRunResponse(*run)); err != nil {
EncodeError(ctx, err, w)
return
}
w.WriteHeader(http.StatusNoContent)
}
type retryRunRequest struct {
@ -1061,20 +1064,19 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID)
return nil, err
}
var r runResponse
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
var rs = &runResponse{}
if err := json.NewDecoder(resp.Body).Decode(rs); err != nil {
return nil, err
}
return &r.Run, nil
return &rs.Run, nil
}
// RetryRun creates and returns a new run (which is a retry of another run).
func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID, requestedAt int64) error {
func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID, requestedAt int64) (*platform.Run, error) {
p := path.Join(taskIDRunIDPath(taskID, runID), "retry")
u, err := newURL(t.Addr, p)
if err != nil {
return err
return nil, err
}
val := url.Values{}
@ -1083,7 +1085,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID, re
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return err
return nil, err
}
SetToken(t.Token, req)
@ -1092,7 +1094,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID, re
resp, err := hc.Do(req)
if err != nil {
return err
return nil, err
}
defer resp.Body.Close()
@ -1100,18 +1102,22 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID, re
if err.Error() == backend.ErrRunNotFound.Error() {
// ErrRunNotFound is expected as part of the RetryRun contract,
// so return that actual error instead of a different error that looks like it.
return backend.ErrRunNotFound
return nil, backend.ErrRunNotFound
}
// RetryAlreadyQueuedError is also part of the contract.
if e := backend.ParseRetryAlreadyQueuedError(err.Error()); e != nil {
return *e
return nil, *e
}
return err
return nil, err
}
return nil
rs := &runResponse{}
if err := json.NewDecoder(resp.Body).Decode(rs); err != nil {
return nil, err
}
return &rs.Run, nil
}
func cancelPath(taskID, runID platform.ID) string {

View File

@ -18,7 +18,7 @@ type TaskService struct {
FindRunsFn func(context.Context, platform.RunFilter) ([]*platform.Run, int, error)
FindRunByIDFn func(context.Context, platform.ID, platform.ID) (*platform.Run, error)
CancelRunFn func(context.Context, platform.ID, platform.ID) error
RetryRunFn func(context.Context, platform.ID, platform.ID, int64) error
RetryRunFn func(context.Context, platform.ID, platform.ID, int64) (*platform.Run, error)
}
func (s *TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
@ -58,6 +58,6 @@ func (s *TaskService) CancelRun(ctx context.Context, taskID, runID platform.ID)
return s.CancelRunFn(ctx, taskID, runID)
}
func (s *TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID, requestedAt int64) error {
func (s *TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID, requestedAt int64) (*platform.Run, error) {
return s.RetryRunFn(ctx, taskID, runID, requestedAt)
}

View File

@ -1,6 +1,8 @@
package platform
import "context"
import (
"context"
)
// Task is a task. 🎊
type Task struct {
@ -62,7 +64,7 @@ type TaskService interface {
// RetryRun creates and returns a new run (which is a retry of another run).
// The requestedAt parameter is the Unix timestamp that will be recorded for the retry.
RetryRun(ctx context.Context, taskID, runID ID, requestedAt int64) error
RetryRun(ctx context.Context, taskID, runID ID, requestedAt int64) (*Run, error)
}
// TaskUpdate represents updates to a task

View File

@ -682,20 +682,22 @@ func (s *Store) FinishRun(ctx context.Context, taskID, runID platform.ID) error
})
}
func (s *Store) ManuallyRunTimeRange(_ context.Context, taskID platform.ID, start, end, requestedAt int64) error {
func (s *Store) ManuallyRunTimeRange(_ context.Context, taskID platform.ID, start, end, requestedAt int64) (*backend.StoreTaskMetaManualRun, error) {
encodedID, err := taskID.Encode()
if err != nil {
return err
return nil, err
}
var mRun *backend.StoreTaskMetaManualRun
return s.db.Update(func(tx *bolt.Tx) error {
if err = s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
stmBytes := b.Bucket(taskMetaPath).Get(encodedID)
var stm backend.StoreTaskMeta
if err := stm.Unmarshal(stmBytes); err != nil {
return err
}
if err := stm.ManuallyRunTimeRange(start, end, requestedAt); err != nil {
makeID := func() (platform.ID, error) { return s.idGen.ID(), nil }
if err := stm.ManuallyRunTimeRange(start, end, requestedAt, makeID); err != nil {
return err
}
@ -703,9 +705,13 @@ func (s *Store) ManuallyRunTimeRange(_ context.Context, taskID platform.ID, star
if err != nil {
return err
}
mRun = stm.ManualRuns[len(stm.ManualRuns)-1]
return tx.Bucket(s.bucket).Bucket(taskMetaPath).Put(encodedID, stmBytes)
})
}); err != nil {
return nil, err
}
return mRun, nil
}
// Close closes the store

View File

@ -120,7 +120,6 @@ func (r *runReaderWriter) ListRuns(ctx context.Context, runFilter platform.RunFi
func (r *runReaderWriter) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
r.mu.RLock()
defer r.mu.RUnlock()
run, ok := r.byRunID[runID.String()]
if !ok {
return nil, ErrRunNotFound

View File

@ -313,7 +313,7 @@ func (s *inmem) FinishRun(ctx context.Context, taskID, runID platform.ID) error
return nil
}
func (s *inmem) ManuallyRunTimeRange(_ context.Context, taskID platform.ID, start, end, requestedAt int64) error {
func (s *inmem) ManuallyRunTimeRange(_ context.Context, taskID platform.ID, start, end, requestedAt int64) (*StoreTaskMetaManualRun, error) {
tid := taskID.String()
s.mu.Lock()
@ -321,15 +321,16 @@ func (s *inmem) ManuallyRunTimeRange(_ context.Context, taskID platform.ID, star
stm, ok := s.runners[tid]
if !ok {
return errors.New("task not found")
return nil, errors.New("task not found")
}
if err := stm.ManuallyRunTimeRange(start, end, requestedAt); err != nil {
return err
if err := stm.ManuallyRunTimeRange(start, end, requestedAt, func() (platform.ID, error) { return s.idgen.ID(), nil }); err != nil {
return nil, err
}
s.runners[tid] = stm
return nil
mr := stm.ManualRuns[len(stm.ManualRuns)-1]
return mr, nil
}
func (s *inmem) delete(ctx context.Context, id platform.ID, f func(StoreTask) platform.ID) error {

View File

@ -127,10 +127,16 @@ func (stm *StoreTaskMeta) createNextRunFromQueue(now, nextDue int64, sch cron.Sc
runNow := sch.Next(time.Unix(latest, 0)).Unix()
// Already validated that we have room to create another run, in CreateNextRun.
id, err := makeID()
if err != nil {
return RunCreation{}, err
id := platform.ID(q.RunID)
if !id.Valid() {
var err error
id, err = makeID()
if err != nil {
return RunCreation{}, err
}
}
stm.CurrentlyRunning = append(stm.CurrentlyRunning, &StoreTaskMetaRun{
Now: runNow,
Try: 1,
@ -183,7 +189,7 @@ func (stm *StoreTaskMeta) NextDueRun() (int64, error) {
// requestedAt is the Unix timestamp indicating when this run range was requested.
//
// If adding the range would exceed the queue size, ManuallyRunTimeRange returns ErrManualQueueFull.
func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64) error {
func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64, makeID func() (platform.ID, error)) error {
// Arbitrarily chosen upper limit that seems unlikely to be reached except in pathological cases.
const maxQueueSize = 32
if len(stm.ManualRuns) >= maxQueueSize {
@ -206,8 +212,14 @@ func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64) er
LatestCompleted: lc,
RequestedAt: requestedAt,
}
if start == end && makeID != nil {
id, err := makeID()
if err != nil {
return err
}
run.RunID = uint64(id)
}
stm.ManualRuns = append(stm.ManualRuns, run)
return nil
}

View File

@ -45,7 +45,7 @@ func (m *StoreTaskMeta) Reset() { *m = StoreTaskMeta{} }
func (m *StoreTaskMeta) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMeta) ProtoMessage() {}
func (*StoreTaskMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_9d9298f22bde0681, []int{0}
return fileDescriptor_meta_93b4c98e367ac754, []int{0}
}
func (m *StoreTaskMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -143,7 +143,7 @@ func (m *StoreTaskMetaRun) Reset() { *m = StoreTaskMetaRun{} }
func (m *StoreTaskMetaRun) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMetaRun) ProtoMessage() {}
func (*StoreTaskMetaRun) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_9d9298f22bde0681, []int{1}
return fileDescriptor_meta_93b4c98e367ac754, []int{1}
}
func (m *StoreTaskMetaRun) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -224,7 +224,9 @@ type StoreTaskMetaManualRun struct {
// latest_completed is the timestamp of the latest completed run from this queue.
LatestCompleted int64 `protobuf:"varint,3,opt,name=latest_completed,json=latestCompleted,proto3" json:"latest_completed,omitempty"`
// requested_at is the unix timestamp indicating when this run was requested.
RequestedAt int64 `protobuf:"varint,4,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"`
RequestedAt int64 `protobuf:"varint,4,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"`
// run_id is set ahead of time for retries of individual runs. Manually run time ranges do not receive an ID.
RunID uint64 `protobuf:"varint,5,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
@ -233,7 +235,7 @@ func (m *StoreTaskMetaManualRun) Reset() { *m = StoreTaskMetaManualRun{}
func (m *StoreTaskMetaManualRun) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMetaManualRun) ProtoMessage() {}
func (*StoreTaskMetaManualRun) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_9d9298f22bde0681, []int{2}
return fileDescriptor_meta_93b4c98e367ac754, []int{2}
}
func (m *StoreTaskMetaManualRun) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -290,6 +292,13 @@ func (m *StoreTaskMetaManualRun) GetRequestedAt() int64 {
return 0
}
func (m *StoreTaskMetaManualRun) GetRunID() uint64 {
if m != nil {
return m.RunID
}
return 0
}
func init() {
proto.RegisterType((*StoreTaskMeta)(nil), "com.influxdata.platform.task.backend.StoreTaskMeta")
proto.RegisterType((*StoreTaskMetaRun)(nil), "com.influxdata.platform.task.backend.StoreTaskMetaRun")
@ -449,6 +458,11 @@ func (m *StoreTaskMetaManualRun) MarshalTo(dAtA []byte) (int, error) {
i++
i = encodeVarintMeta(dAtA, i, uint64(m.RequestedAt))
}
if m.RunID != 0 {
dAtA[i] = 0x28
i++
i = encodeVarintMeta(dAtA, i, uint64(m.RunID))
}
return i, nil
}
@ -535,6 +549,9 @@ func (m *StoreTaskMetaManualRun) Size() (n int) {
if m.RequestedAt != 0 {
n += 1 + sovMeta(uint64(m.RequestedAt))
}
if m.RunID != 0 {
n += 1 + sovMeta(uint64(m.RunID))
}
return n
}
@ -1047,6 +1064,25 @@ func (m *StoreTaskMetaManualRun) Unmarshal(dAtA []byte) error {
break
}
}
case 5:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field RunID", wireType)
}
m.RunID = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMeta
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.RunID |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipMeta(dAtA[iNdEx:])
@ -1173,38 +1209,38 @@ var (
ErrIntOverflowMeta = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_9d9298f22bde0681) }
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_93b4c98e367ac754) }
var fileDescriptor_meta_9d9298f22bde0681 = []byte{
// 468 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0x41, 0x6e, 0xd3, 0x40,
0x14, 0x86, 0x71, 0x1d, 0x07, 0xf2, 0x42, 0x5a, 0x33, 0xaa, 0x2a, 0x03, 0x52, 0x6a, 0x22, 0x10,
0x61, 0x63, 0x24, 0x90, 0x58, 0xb1, 0xa1, 0x81, 0x45, 0x17, 0xdd, 0x4c, 0x59, 0x21, 0x21, 0x6b,
0x6a, 0x4f, 0xa2, 0x28, 0xf6, 0x9b, 0x32, 0xf3, 0x06, 0x92, 0x4b, 0x20, 0xae, 0xc3, 0x0d, 0x58,
0x72, 0x02, 0x84, 0xc2, 0x35, 0x58, 0xa0, 0x99, 0x69, 0x83, 0x28, 0x59, 0x20, 0x76, 0x6f, 0x3e,
0x79, 0xfe, 0xf9, 0xff, 0xdf, 0x0f, 0xa0, 0x95, 0x24, 0x8a, 0x73, 0xad, 0x48, 0xb1, 0xfb, 0x95,
0x6a, 0x8b, 0x39, 0x4e, 0x1b, 0xbb, 0xac, 0x85, 0xa3, 0x8d, 0xa0, 0xa9, 0xd2, 0x6d, 0x41, 0xc2,
0x2c, 0x8a, 0x33, 0x51, 0x2d, 0x24, 0xd6, 0x77, 0xf6, 0x67, 0x6a, 0xa6, 0xfc, 0x85, 0xc7, 0x6e,
0x0a, 0x77, 0x47, 0x3f, 0x77, 0x60, 0x70, 0x4a, 0x4a, 0xcb, 0xd7, 0xc2, 0x2c, 0x4e, 0x24, 0x09,
0xf6, 0x10, 0xf6, 0x5a, 0xb1, 0x2c, 0x2b, 0x85, 0x95, 0xd5, 0x5a, 0x62, 0xb5, 0xca, 0xa2, 0x3c,
0x1a, 0x27, 0x7c, 0xb7, 0x15, 0xcb, 0xc9, 0x6f, 0xca, 0x1e, 0x41, 0xda, 0x08, 0x92, 0x86, 0xca,
0x4a, 0xb5, 0xe7, 0x8d, 0x24, 0x59, 0x67, 0x3b, 0x79, 0x34, 0x8e, 0xf9, 0x5e, 0xe0, 0x93, 0x4b,
0xcc, 0x0e, 0xa0, 0x6b, 0x48, 0x90, 0x35, 0x59, 0x9c, 0x47, 0xe3, 0x1e, 0xbf, 0x38, 0xb1, 0x0a,
0x6e, 0x05, 0x39, 0x6a, 0x56, 0xa5, 0xb6, 0x88, 0x73, 0x9c, 0x65, 0x9d, 0x3c, 0x1e, 0xf7, 0x9f,
0x3c, 0x2b, 0xfe, 0x25, 0x55, 0xf1, 0x87, 0x77, 0x6e, 0x91, 0xa7, 0x1b, 0x41, 0x1e, 0xf4, 0xd8,
0x03, 0xd8, 0x95, 0xd3, 0xa9, 0xac, 0x68, 0xfe, 0x5e, 0x96, 0x95, 0x56, 0x98, 0x25, 0xde, 0xc4,
0x60, 0x43, 0x27, 0x5a, 0x21, 0xdb, 0x87, 0xa4, 0x96, 0x8d, 0x58, 0x65, 0x5d, 0x9f, 0x36, 0x1c,
0xd8, 0x5b, 0xe8, 0xb7, 0x02, 0xad, 0x68, 0x9c, 0x3d, 0x93, 0xa5, 0xde, 0xdb, 0xf3, 0xff, 0xf0,
0x76, 0xe2, 0x55, 0x9c, 0x43, 0x68, 0x2f, 0x47, 0x33, 0xfa, 0x1c, 0x41, 0x7a, 0x35, 0x02, 0x4b,
0x21, 0x46, 0xf5, 0xc1, 0xb7, 0x1e, 0x73, 0x37, 0x3a, 0x42, 0x7a, 0xe5, 0xdb, 0x1d, 0x70, 0x37,
0xb2, 0x1c, 0xba, 0xda, 0x62, 0x39, 0xaf, 0x7d, 0xa3, 0x9d, 0xa3, 0xde, 0xfa, 0xdb, 0x61, 0xc2,
0x2d, 0x1e, 0xbf, 0xe4, 0x89, 0xb6, 0x78, 0x5c, 0xb3, 0x43, 0xe8, 0x6b, 0x81, 0x33, 0x59, 0x1a,
0x12, 0x9a, 0xb2, 0x8e, 0x57, 0x03, 0x8f, 0x4e, 0x1d, 0x61, 0x77, 0xa1, 0x17, 0x3e, 0x90, 0x58,
0xfb, 0x4a, 0x62, 0x7e, 0xc3, 0x83, 0x57, 0x58, 0xb3, 0x7b, 0x70, 0x53, 0xcb, 0x77, 0x56, 0x1a,
0x92, 0x75, 0x29, 0xc8, 0x97, 0x12, 0xf3, 0xfe, 0x86, 0xbd, 0xa0, 0xd1, 0xc7, 0x08, 0x0e, 0xb6,
0x47, 0x74, 0x5d, 0x86, 0x57, 0x43, 0x86, 0x70, 0x70, 0x29, 0xdc, 0x53, 0x61, 0x47, 0xdc, 0xb8,
0x75, 0x85, 0xe2, 0xed, 0x2b, 0x74, 0xd5, 0x50, 0xe7, 0x2f, 0x43, 0x47, 0xb7, 0xbf, 0xac, 0x87,
0xd1, 0xd7, 0xf5, 0x30, 0xfa, 0xbe, 0x1e, 0x46, 0x9f, 0x7e, 0x0c, 0xaf, 0xbd, 0xb9, 0x7e, 0xf1,
0x2b, 0xce, 0xba, 0x7e, 0xdb, 0x9f, 0xfe, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x88, 0x78, 0xfd, 0xdc,
0x37, 0x03, 0x00, 0x00,
var fileDescriptor_meta_93b4c98e367ac754 = []byte{
// 476 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xc1, 0x6e, 0xd3, 0x4e,
0x10, 0xc6, 0xff, 0xae, 0xe3, 0xfc, 0xc9, 0x84, 0xb4, 0x66, 0x55, 0x55, 0x06, 0xa4, 0xd4, 0x44,
0x20, 0xc2, 0xc5, 0x48, 0x20, 0x71, 0xe2, 0x42, 0x03, 0x87, 0x1e, 0x7a, 0xd9, 0x72, 0x42, 0x42,
0xd6, 0xd6, 0xde, 0x44, 0x51, 0xec, 0xd9, 0xb2, 0x3b, 0x0b, 0xc9, 0x5b, 0xf0, 0x28, 0x5c, 0x79,
0x03, 0x8e, 0x3c, 0x01, 0x42, 0xe1, 0x35, 0x38, 0xa0, 0xdd, 0x6d, 0x03, 0x94, 0x1c, 0x10, 0xb7,
0x99, 0x9f, 0xbc, 0xb3, 0xdf, 0xf7, 0x79, 0x16, 0xa0, 0x95, 0x24, 0x8a, 0x73, 0xad, 0x48, 0xb1,
0xbb, 0x95, 0x6a, 0x8b, 0x39, 0x4e, 0x1b, 0xbb, 0xac, 0x85, 0xa3, 0x8d, 0xa0, 0xa9, 0xd2, 0x6d,
0x41, 0xc2, 0x2c, 0x8a, 0x33, 0x51, 0x2d, 0x24, 0xd6, 0xb7, 0xf6, 0x67, 0x6a, 0xa6, 0xfc, 0x81,
0x87, 0xae, 0x0a, 0x67, 0x47, 0xdf, 0x77, 0x60, 0x70, 0x4a, 0x4a, 0xcb, 0x97, 0xc2, 0x2c, 0x4e,
0x24, 0x09, 0x76, 0x1f, 0xf6, 0x5a, 0xb1, 0x2c, 0x2b, 0x85, 0x95, 0xd5, 0x5a, 0x62, 0xb5, 0xca,
0xa2, 0x3c, 0x1a, 0x27, 0x7c, 0xb7, 0x15, 0xcb, 0xc9, 0x4f, 0xca, 0x1e, 0x40, 0xda, 0x08, 0x92,
0x86, 0xca, 0x4a, 0xb5, 0xe7, 0x8d, 0x24, 0x59, 0x67, 0x3b, 0x79, 0x34, 0x8e, 0xf9, 0x5e, 0xe0,
0x93, 0x4b, 0xcc, 0x0e, 0xa0, 0x6b, 0x48, 0x90, 0x35, 0x59, 0x9c, 0x47, 0xe3, 0x1e, 0xbf, 0xe8,
0x58, 0x05, 0x37, 0xc2, 0x38, 0x6a, 0x56, 0xa5, 0xb6, 0x88, 0x73, 0x9c, 0x65, 0x9d, 0x3c, 0x1e,
0xf7, 0x1f, 0x3d, 0x29, 0xfe, 0xc6, 0x55, 0xf1, 0x9b, 0x76, 0x6e, 0x91, 0xa7, 0x9b, 0x81, 0x3c,
0xcc, 0x63, 0xf7, 0x60, 0x57, 0x4e, 0xa7, 0xb2, 0xa2, 0xf9, 0x5b, 0x59, 0x56, 0x5a, 0x61, 0x96,
0x78, 0x11, 0x83, 0x0d, 0x9d, 0x68, 0x85, 0x6c, 0x1f, 0x92, 0x5a, 0x36, 0x62, 0x95, 0x75, 0xbd,
0xdb, 0xd0, 0xb0, 0xd7, 0xd0, 0x6f, 0x05, 0x5a, 0xd1, 0x38, 0x79, 0x26, 0x4b, 0xbd, 0xb6, 0xa7,
0xff, 0xa0, 0xed, 0xc4, 0x4f, 0x71, 0x0a, 0xa1, 0xbd, 0x2c, 0xcd, 0xe8, 0x63, 0x04, 0xe9, 0x55,
0x0b, 0x2c, 0x85, 0x18, 0xd5, 0x3b, 0x9f, 0x7a, 0xcc, 0x5d, 0xe9, 0x08, 0xe9, 0x95, 0x4f, 0x77,
0xc0, 0x5d, 0xc9, 0x72, 0xe8, 0x6a, 0x8b, 0xe5, 0xbc, 0xf6, 0x89, 0x76, 0x8e, 0x7a, 0xeb, 0x2f,
0x87, 0x09, 0xb7, 0x78, 0xfc, 0x9c, 0x27, 0xda, 0xe2, 0x71, 0xcd, 0x0e, 0xa1, 0xaf, 0x05, 0xce,
0x64, 0x69, 0x48, 0x68, 0xca, 0x3a, 0x7e, 0x1a, 0x78, 0x74, 0xea, 0x08, 0xbb, 0x0d, 0xbd, 0xf0,
0x81, 0xc4, 0xda, 0x47, 0x12, 0xf3, 0x6b, 0x1e, 0xbc, 0xc0, 0x9a, 0xdd, 0x81, 0xeb, 0x5a, 0xbe,
0xb1, 0xd2, 0x90, 0xac, 0x4b, 0x41, 0x3e, 0x94, 0x98, 0xf7, 0x37, 0xec, 0x19, 0x8d, 0x3e, 0x44,
0x70, 0xb0, 0xdd, 0xa2, 0xcb, 0x32, 0xdc, 0x1a, 0x3c, 0x84, 0xc6, 0xb9, 0x70, 0x57, 0x85, 0x1d,
0x71, 0xe5, 0xd6, 0x15, 0x8a, 0xb7, 0xaf, 0xd0, 0x55, 0x41, 0x9d, 0x3f, 0x04, 0xfd, 0x92, 0x49,
0xb2, 0x3d, 0x93, 0xa3, 0x9b, 0x9f, 0xd6, 0xc3, 0xe8, 0xf3, 0x7a, 0x18, 0x7d, 0x5d, 0x0f, 0xa3,
0xf7, 0xdf, 0x86, 0xff, 0xbd, 0xfa, 0xff, 0xe2, 0x67, 0x9d, 0x75, 0xfd, 0x7b, 0x78, 0xfc, 0x23,
0x00, 0x00, 0xff, 0xff, 0x35, 0x98, 0x09, 0x78, 0x59, 0x03, 0x00, 0x00,
}

View File

@ -64,4 +64,7 @@ message StoreTaskMetaManualRun {
// requested_at is the unix timestamp indicating when this run was requested.
int64 requested_at = 4;
// run_id is set ahead of time for retries of individual runs. Manually run time ranges do not receive an ID.
uint64 run_id = 5 [(gogoproto.customname) = "RunID"];
}

View File

@ -101,11 +101,11 @@ func TestMeta_CreateNextRun_Queue(t *testing.T) {
}
// Should run on 0, 60, and 120.
if err := stm.ManuallyRunTimeRange(0, 120, 3005); err != nil {
if err := stm.ManuallyRunTimeRange(0, 120, 3005, nil); err != nil {
t.Fatal(err)
}
// Should run once: 240.
if err := stm.ManuallyRunTimeRange(240, 240, 3005); err != nil {
if err := stm.ManuallyRunTimeRange(240, 240, 3005, nil); err != nil {
t.Fatal(err)
}
@ -228,7 +228,7 @@ func TestMeta_ManuallyRunTimeRange(t *testing.T) {
for i := int64(0); i < maxQueueSize; i++ {
j := i * 10
if err := stm.ManuallyRunTimeRange(j, j+5, j+now); err != nil {
if err := stm.ManuallyRunTimeRange(j, j+5, j+now, nil); err != nil {
t.Fatal(err)
}
if int64(len(stm.ManualRuns)) != i+1 {
@ -251,7 +251,7 @@ func TestMeta_ManuallyRunTimeRange(t *testing.T) {
}
// One more should cause ErrManualQueueFull.
if err := stm.ManuallyRunTimeRange(maxQueueSize*100, maxQueueSize*200, maxQueueSize+now); err != backend.ErrManualQueueFull {
if err := stm.ManuallyRunTimeRange(maxQueueSize*100, maxQueueSize*200, maxQueueSize+now, nil); err != backend.ErrManualQueueFull {
t.Fatalf("expected ErrManualQueueFull, got %v", err)
}
if len(stm.ManualRuns) != maxQueueSize {
@ -262,18 +262,18 @@ func TestMeta_ManuallyRunTimeRange(t *testing.T) {
stm.ManualRuns = stm.ManualRuns[:0]
// Duplicate manual run with single timestamp should be rejected.
if err := stm.ManuallyRunTimeRange(1, 1, 2); err != nil {
if err := stm.ManuallyRunTimeRange(1, 1, 2, nil); err != nil {
t.Fatal(err)
}
if exp, err := (backend.RetryAlreadyQueuedError{Start: 1, End: 1}), stm.ManuallyRunTimeRange(1, 1, 3); err != exp {
if exp, err := (backend.RetryAlreadyQueuedError{Start: 1, End: 1}), stm.ManuallyRunTimeRange(1, 1, 3, func() (platform.ID, error) { return platform.ID(1099), nil }); err != exp {
t.Fatalf("expected %v, got %v", exp, err)
}
// Duplicate manual run with time range should be rejected.
if err := stm.ManuallyRunTimeRange(100, 200, 201); err != nil {
if err := stm.ManuallyRunTimeRange(100, 200, 201, nil); err != nil {
t.Fatal(err)
}
if exp, err := (backend.RetryAlreadyQueuedError{Start: 100, End: 200}), stm.ManuallyRunTimeRange(100, 200, 202); err != exp {
if exp, err := (backend.RetryAlreadyQueuedError{Start: 100, End: 200}), stm.ManuallyRunTimeRange(100, 200, 202, nil); err != exp {
t.Fatalf("expected %v, got %v", exp, err)
}

View File

@ -64,6 +64,7 @@ const (
RunSuccess
RunFail
RunCanceled
RunScheduled
)
func (r RunStatus) String() string {
@ -76,6 +77,8 @@ func (r RunStatus) String() string {
return "failed"
case RunCanceled:
return "canceled"
case RunScheduled:
return "scheduled"
}
panic(fmt.Sprintf("unknown RunStatus: %d", r))
}
@ -223,7 +226,7 @@ type Store interface {
// ManuallyRunTimeRange enqueues a request to run the task with the given ID for all schedules no earlier than start and no later than end (Unix timestamps).
// requestedAt is the Unix timestamp when the request was initiated.
// ManuallyRunTimeRange must delegate to an underlying StoreTaskMeta's ManuallyRunTimeRange method.
ManuallyRunTimeRange(ctx context.Context, taskID platform.ID, start, end, requestedAt int64) error
ManuallyRunTimeRange(ctx context.Context, taskID platform.ID, start, end, requestedAt int64) (*StoreTaskMetaManualRun, error)
// DeleteOrg deletes the org.
DeleteOrg(ctx context.Context, orgID platform.ID) error

View File

@ -807,12 +807,12 @@ from(bucket:"test") |> range(start:-1h)`
}
// Task is set to every minute. Should schedule once on 0 and once on 60.
if err := s.ManuallyRunTimeRange(context.Background(), taskID, 0, 60, 3000); err != nil {
if _, err := s.ManuallyRunTimeRange(context.Background(), taskID, 0, 60, 3000); err != nil {
t.Fatal(err)
}
// Should schedule once exactly on 180.
if err := s.ManuallyRunTimeRange(context.Background(), taskID, 180, 180, 3001); err != nil {
if _, err := s.ManuallyRunTimeRange(context.Background(), taskID, 180, 180, 3001); err != nil {
t.Fatal(err)
}
@ -925,7 +925,7 @@ from(bucket:"test") |> range(start:-1h)`
t.Fatal(err)
}
if err := s.ManuallyRunTimeRange(context.Background(), taskID, 1, 10, 0); err != nil {
if _, err := s.ManuallyRunTimeRange(context.Background(), taskID, 1, 10, 0); err != nil {
t.Fatal(err)
}

View File

@ -175,27 +175,37 @@ func (p pAdapter) FindRunByID(ctx context.Context, taskID, id platform.ID) (*pla
return p.r.FindRunByID(ctx, task.Org, id)
}
func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID, requestedAt int64) error {
func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID, requestedAt int64) (*platform.Run, error) {
task, err := p.s.FindTaskByID(ctx, taskID)
if err != nil {
return err
return nil, err
}
run, err := p.r.FindRunByID(ctx, task.Org, id)
if err != nil {
return err
return nil, err
}
if run.Status == backend.RunStarted.String() {
return backend.ErrRunNotFinished
return nil, backend.ErrRunNotFinished
}
scheduledTime, err := time.Parse(time.RFC3339, run.ScheduledFor)
if err != nil {
return err
return nil, err
}
t := scheduledTime.UTC().Unix()
return p.s.ManuallyRunTimeRange(ctx, run.TaskID, t, t, requestedAt)
m, err := p.s.ManuallyRunTimeRange(ctx, run.TaskID, t, t, requestedAt)
if err != nil {
return nil, err
}
return &platform.Run{
ID: platform.ID(m.RunID),
TaskID: run.TaskID,
RequestedAt: time.Unix(requestedAt, 0).Format(time.RFC3339),
Status: backend.RunScheduled.String(),
ScheduledFor: run.ScheduledFor,
}, nil
}
func (p pAdapter) CancelRun(ctx context.Context, taskID, runID platform.ID) error {

View File

@ -324,10 +324,23 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Fatalf("unexpected FinishedAt; want %s, got %s", exp, runs[1].FinishedAt)
}
// look for a run that doesn't exit.
_, err = sys.ts.FindRunByID(sys.Ctx, task.ID, platform.ID(math.MaxUint64))
if err == nil {
t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err)
}
// look for a taskID that doesn't exist.
_, err = sys.ts.FindRunByID(sys.Ctx, platform.ID(math.MaxUint64), runs[0].ID)
if err == nil {
t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err)
}
foundRun0, err := sys.ts.FindRunByID(sys.Ctx, task.ID, runs[0].ID)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(foundRun0, runs[0]); diff != "" {
t.Fatalf("difference between listed run and found run: %s", diff)
}
@ -356,7 +369,8 @@ func testTaskRuns(t *testing.T, sys *System) {
}
// Non-existent ID should return the right error.
if err := sys.ts.RetryRun(sys.Ctx, task.ID, platform.ID(math.MaxUint64), 0); err != backend.ErrRunNotFound {
m, err := sys.ts.RetryRun(sys.Ctx, task.ID, platform.ID(math.MaxUint64), 0)
if err != backend.ErrRunNotFound {
t.Errorf("expected retrying run that doesn't exist to return %v, got %v", backend.ErrRunNotFound, err)
}
@ -390,9 +404,24 @@ func testTaskRuns(t *testing.T, sys *System) {
}
// Now retry the run.
if err := sys.ts.RetryRun(sys.Ctx, task.ID, rlb.RunID, requestedAtUnix); err != nil {
m, err = sys.ts.RetryRun(sys.Ctx, task.ID, rlb.RunID, requestedAtUnix)
if err != nil {
t.Fatal(err)
}
if m.TaskID != task.ID {
t.Fatalf("wrong task ID on retried run: got %s, want %s", m.TaskID, task.ID)
}
if m.Status != "scheduled" {
t.Fatal("expected new retried run to have status of scheduled")
}
nowTime, err := time.Parse(time.RFC3339, m.ScheduledFor)
if err != nil {
t.Fatalf("expected scheduledFor to be a parsable time in RFC3339, but got %s", m.ScheduledFor)
}
if nowTime.Unix() != rc.Created.Now {
t.Fatalf("wrong scheduledFor on task: got %s, want %s", m.ScheduledFor, time.Unix(rc.Created.Now, 0).Format(time.RFC3339))
}
// Ensure the retry is added on the store task meta.
meta, err := sys.S.FindTaskMetaByID(sys.Ctx, task.ID)
if err != nil {
@ -410,8 +439,10 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Fatalf("didn't find matching manual run after successful RetryRun call; got: %v", meta.ManualRuns)
}
exp := backend.RetryAlreadyQueuedError{Start: rc.Created.Now, End: rc.Created.Now}
// Retrying a run which has been queued but not started, should be rejected.
if exp, err := (backend.RetryAlreadyQueuedError{Start: rc.Created.Now, End: rc.Created.Now}), sys.ts.RetryRun(sys.Ctx, task.ID, rlb.RunID, requestedAtUnix); err != exp {
if _, err = sys.ts.RetryRun(sys.Ctx, task.ID, rlb.RunID, requestedAtUnix); err != exp {
t.Fatalf("subsequent retry should have been rejected with %v; got %v", exp, err)
}
})