From 492d8406aa5e6a481eeec88d66e9337091d2a99d Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Wed, 27 Mar 2019 15:24:53 -0500 Subject: [PATCH] fix(tasks): make durations visibly show up nicely --- pkg/pointer/pointer.go | 4 +- task.go | 44 ++-- task/backend/meta.go | 58 +++-- task/backend/meta.pb.go | 118 +++++----- task/backend/meta.proto | 2 +- task/backend/meta_test.go | 4 +- task/backend/storetest/storetest.go | 19 +- task/backend/task.go | 4 +- task/options/options.go | 206 ++++++++++++++++-- task/options/options_test.go | 105 ++++++++- task/platform_adapter.go | 8 +- task/servicetest/servicetest.go | 11 +- task_test.go | 37 ++-- ui/cypress/e2e/tasks.test.ts | 4 +- ui/cypress/support/commands.ts | 2 +- .../shared/utils/resourceToTemplate.test.ts | 4 +- ui/src/tasks/reducers/tasks.test.ts | 2 +- 17 files changed, 469 insertions(+), 163 deletions(-) diff --git a/pkg/pointer/pointer.go b/pkg/pointer/pointer.go index ec5c37a46b..51a6147b8b 100644 --- a/pkg/pointer/pointer.go +++ b/pkg/pointer/pointer.go @@ -2,7 +2,9 @@ // Feel free to add more pointerification functions for more types as you need them. package pointer -import "time" +import ( + "time" +) // Duration returns a pointer to its argument. func Duration(d time.Duration) *time.Duration { diff --git a/task.go b/task.go index 3012c81864..eaaa520b68 100644 --- a/task.go +++ b/task.go @@ -6,9 +6,7 @@ import ( "errors" "fmt" "strconv" - "time" - "github.com/influxdata/flux" "github.com/influxdata/flux/ast" "github.com/influxdata/flux/ast/edit" "github.com/influxdata/flux/parser" @@ -160,11 +158,11 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error { // Every represents a fixed period to repeat execution. // It gets marshalled from a string duration, i.e.: "10s" is 10 seconds - Every flux.Duration `json:"every,omitempty"` + Every options.Duration `json:"every,omitempty"` // Offset represents a delay before execution. // It gets marshalled from a string duration, i.e.: "10s" is 10 seconds - Offset *flux.Duration `json:"offset,omitempty"` + Offset *options.Duration `json:"offset,omitempty"` Concurrency *int64 `json:"concurrency,omitempty"` @@ -178,9 +176,9 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error { } t.Options.Name = jo.Name t.Options.Cron = jo.Cron - t.Options.Every = time.Duration(jo.Every) + t.Options.Every = jo.Every if jo.Offset != nil { - offset := time.Duration(*jo.Offset) + offset := *jo.Offset t.Options.Offset = &offset } t.Options.Concurrency = jo.Concurrency @@ -202,10 +200,10 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) { Cron string `json:"cron,omitempty"` // Every represents a fixed period to repeat execution. - Every flux.Duration `json:"every,omitempty"` + Every options.Duration `json:"every,omitempty"` // Offset represents a delay before execution. - Offset *flux.Duration `json:"offset,omitempty"` + Offset *options.Duration `json:"offset,omitempty"` Concurrency *int64 `json:"concurrency,omitempty"` @@ -215,9 +213,9 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) { }{} jo.Name = t.Options.Name jo.Cron = t.Options.Cron - jo.Every = flux.Duration(t.Options.Every) + jo.Every = t.Options.Every if t.Options.Offset != nil { - offset := flux.Duration(*t.Options.Offset) + offset := *t.Options.Offset jo.Offset = &offset } jo.Concurrency = t.Options.Concurrency @@ -230,7 +228,7 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) { func (t TaskUpdate) Validate() error { switch { - case t.Options.Every != 0 && t.Options.Cron != "": + case !t.Options.Every.IsZero() && t.Options.Cron != "": return errors.New("cannot specify both every and cron") case t.Flux == nil && t.Status == nil && t.Options.IsZero() && t.Token == "": return errors.New("cannot update task without content") @@ -252,25 +250,23 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error { return ast.GetError(parsedPKG) } parsed := parsedPKG.Files[0] - if t.Options.Every != 0 && t.Options.Cron != "" { - return errors.New("cannot specify both every and cron") + if !t.Options.Every.IsZero() && t.Options.Cron != "" { + return errors.New("cannot specify both cron and every") } op := make(map[string]ast.Expression, 4) if t.Options.Name != "" { op["name"] = &ast.StringLiteral{Value: t.Options.Name} } - if t.Options.Every != 0 { - d := ast.Duration{Magnitude: int64(t.Options.Every), Unit: "ns"} - op["every"] = &ast.DurationLiteral{Values: []ast.Duration{d}} + if !t.Options.Every.IsZero() { + op["every"] = &t.Options.Every.Node } if t.Options.Cron != "" { op["cron"] = &ast.StringLiteral{Value: t.Options.Cron} } if t.Options.Offset != nil { - if *t.Options.Offset != 0 { - d := ast.Duration{Magnitude: int64(*t.Options.Offset), Unit: "ns"} - op["offset"] = &ast.DurationLiteral{Values: []ast.Duration{d}} + if !t.Options.Offset.IsZero() { + op["offset"] = &t.Options.Offset.Node } else { toDelete["offset"] = struct{}{} } @@ -300,12 +296,12 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error { case "offset": if offset, ok := op["offset"]; ok && t.Options.Offset != nil { delete(op, "offset") - p.Value = offset + p.Value = offset.Copy().(*ast.DurationLiteral) } case "every": - if every, ok := op["every"]; ok && t.Options.Every != 0 { + if every, ok := op["every"]; ok && !t.Options.Every.IsZero() { + p.Value = every.Copy().(*ast.DurationLiteral) delete(op, "every") - p.Value = every } else if cron, ok := op["cron"]; ok && t.Options.Cron != "" { delete(op, "cron") p.Value = cron @@ -315,10 +311,10 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error { if cron, ok := op["cron"]; ok && t.Options.Cron != "" { delete(op, "cron") p.Value = cron - } else if every, ok := op["every"]; ok && t.Options.Every != 0 { + } else if every, ok := op["every"]; ok && !t.Options.Every.IsZero() { delete(op, "every") p.Key = &ast.Identifier{Name: "every"} - p.Value = every + p.Value = every.Copy().(*ast.DurationLiteral) } } } diff --git a/task/backend/meta.go b/task/backend/meta.go index 9cca1b7172..7a6a31739b 100644 --- a/task/backend/meta.go +++ b/task/backend/meta.go @@ -14,6 +14,7 @@ import ( // This file contains helper methods for the StoreTaskMeta type defined in protobuf. // NewStoreTaskMeta returns a new StoreTaskMeta based on the given request and parsed options. +// Do not call this without validating the request and options first. func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta { stm := StoreTaskMeta{ Status: string(req.Status), @@ -26,7 +27,8 @@ func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta { stm.MaxConcurrency = int32(*o.Concurrency) } if o.Offset != nil { - stm.Offset = int32(*o.Offset / time.Second) + offset, _ := o.Offset.DurationFrom(time.Unix(req.ScheduleAfter, 0)) // we can do this because it is validated already. + stm.Offset = offset.String() } if stm.Status == "" { @@ -43,20 +45,29 @@ func (stm *StoreTaskMeta) AlignLatestCompleted() { if strings.HasPrefix(stm.EffectiveCron, "@every ") { everyString := strings.TrimPrefix(stm.EffectiveCron, "@every ") - every, err := time.ParseDuration(everyString) + every := options.Duration{} + err := every.Parse(everyString) if err != nil { // We cannot align a invalid time return } - - t := time.Unix(stm.LatestCompleted, 0).Truncate(every).Unix() - if t == stm.LatestCompleted { + t := time.Unix(stm.LatestCompleted, 0) + everyDur, err := every.DurationFrom(t) + if err != nil { + return + } + t = t.Truncate(everyDur) + if t.Unix() == stm.LatestCompleted { // For example, every 1m truncates to exactly on the minute. // But the input request is schedule after, not "on or after". // Add one interval. - t += int64(every / time.Second) + tafter, err := every.Add(t) + if err != nil { + return + } + t = tafter } - stm.LatestCompleted = t + stm.LatestCompleted = t.Truncate(time.Second).Unix() } } @@ -123,15 +134,23 @@ func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, e latest = cr.Now } } - + nowTime := time.Unix(now, 0) nextScheduled := sch.Next(time.Unix(latest, 0)) nextScheduledUnix := nextScheduled.Unix() - if dueAt := nextScheduledUnix + int64(stm.Offset); dueAt > now { + offset := &options.Duration{} + if err := offset.Parse(stm.Offset); err != nil { + return RunCreation{}, err + } + dueAt, err := offset.Add(nextScheduled) + if err != nil { + return RunCreation{}, err + } + if dueAt.After(nowTime) { // Can't schedule yet. if len(stm.ManualRuns) > 0 { - return stm.createNextRunFromQueue(now, dueAt, sch, makeID) + return stm.createNextRunFromQueue(now, dueAt.Unix(), sch, makeID) } - return RunCreation{}, RunNotYetDueError{DueAt: dueAt} + return RunCreation{}, RunNotYetDueError{DueAt: dueAt.Unix()} } id, err := makeID() @@ -145,12 +164,16 @@ func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, e RunID: uint64(id), }) + nextDue, err := offset.Add(sch.Next(nextScheduled)) + if err != nil { + return RunCreation{}, err + } return RunCreation{ Created: QueuedRun{ RunID: id, Now: nextScheduledUnix, }, - NextDue: sch.Next(nextScheduled).Unix() + int64(stm.Offset), + NextDue: nextDue.Unix(), HasQueue: len(stm.ManualRuns) > 0, }, nil } @@ -229,8 +252,15 @@ func (stm *StoreTaskMeta) NextDueRun() (int64, error) { latest = cr.Now } } - - return sch.Next(time.Unix(latest, 0)).Unix() + int64(stm.Offset), nil + offset := &options.Duration{} + if err := offset.Parse(stm.Offset); err != nil { + return 0, err + } + nextDue, err := offset.Add(sch.Next(time.Unix(latest, 0))) + if err != nil { + return 0, err + } + return nextDue.Unix(), nil } // ManuallyRunTimeRange requests a manual run covering the approximate range specified by the Unix timestamps start and end. diff --git a/task/backend/meta.pb.go b/task/backend/meta.pb.go index d18c156673..8f1cb8b68d 100644 --- a/task/backend/meta.pb.go +++ b/task/backend/meta.pb.go @@ -35,9 +35,9 @@ type StoreTaskMeta struct { // effective_cron is the effective cron string as reported by the task's options. EffectiveCron string `protobuf:"bytes,5,opt,name=effective_cron,json=effectiveCron,proto3" json:"effective_cron,omitempty"` // Task's configured delay, in seconds. - Offset int32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset,omitempty"` - CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` - UpdatedAt int64 `protobuf:"varint,8,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"` + Offset string `protobuf:"bytes,6,opt,name=offset,proto3" json:"offset,omitempty"` + CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` + UpdatedAt int64 `protobuf:"varint,8,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"` // The Authorization ID associated with the task. AuthorizationID uint64 `protobuf:"varint,9,opt,name=authorization_id,json=authorizationId,proto3" json:"authorization_id,omitempty"` ManualRuns []*StoreTaskMetaManualRun `protobuf:"bytes,16,rep,name=manual_runs,json=manualRuns,proto3" json:"manual_runs,omitempty"` @@ -47,7 +47,7 @@ func (m *StoreTaskMeta) Reset() { *m = StoreTaskMeta{} } func (m *StoreTaskMeta) String() string { return proto.CompactTextString(m) } func (*StoreTaskMeta) ProtoMessage() {} func (*StoreTaskMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_841ef32afee093f0, []int{0} + return fileDescriptor_meta_b8385560be3db2c8, []int{0} } func (m *StoreTaskMeta) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -111,11 +111,11 @@ func (m *StoreTaskMeta) GetEffectiveCron() string { return "" } -func (m *StoreTaskMeta) GetOffset() int32 { +func (m *StoreTaskMeta) GetOffset() string { if m != nil { return m.Offset } - return 0 + return "" } func (m *StoreTaskMeta) GetCreatedAt() int64 { @@ -164,7 +164,7 @@ func (m *StoreTaskMetaRun) Reset() { *m = StoreTaskMetaRun{} } func (m *StoreTaskMetaRun) String() string { return proto.CompactTextString(m) } func (*StoreTaskMetaRun) ProtoMessage() {} func (*StoreTaskMetaRun) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_841ef32afee093f0, []int{1} + return fileDescriptor_meta_b8385560be3db2c8, []int{1} } func (m *StoreTaskMetaRun) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -254,7 +254,7 @@ func (m *StoreTaskMetaManualRun) Reset() { *m = StoreTaskMetaManualRun{} func (m *StoreTaskMetaManualRun) String() string { return proto.CompactTextString(m) } func (*StoreTaskMetaManualRun) ProtoMessage() {} func (*StoreTaskMetaManualRun) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_841ef32afee093f0, []int{2} + return fileDescriptor_meta_b8385560be3db2c8, []int{2} } func (m *StoreTaskMetaManualRun) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -372,10 +372,11 @@ func (m *StoreTaskMeta) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintMeta(dAtA, i, uint64(len(m.EffectiveCron))) i += copy(dAtA[i:], m.EffectiveCron) } - if m.Offset != 0 { - dAtA[i] = 0x30 + if len(m.Offset) > 0 { + dAtA[i] = 0x32 i++ - i = encodeVarintMeta(dAtA, i, uint64(m.Offset)) + i = encodeVarintMeta(dAtA, i, uint64(len(m.Offset))) + i += copy(dAtA[i:], m.Offset) } if m.CreatedAt != 0 { dAtA[i] = 0x38 @@ -535,8 +536,9 @@ func (m *StoreTaskMeta) Size() (n int) { if l > 0 { n += 1 + l + sovMeta(uint64(l)) } - if m.Offset != 0 { - n += 1 + sovMeta(uint64(m.Offset)) + l = len(m.Offset) + if l > 0 { + n += 1 + l + sovMeta(uint64(l)) } if m.CreatedAt != 0 { n += 1 + sovMeta(uint64(m.CreatedAt)) @@ -777,10 +779,10 @@ func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error { m.EffectiveCron = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 6: - if wireType != 0 { + if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) } - m.Offset = 0 + var stringLen uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowMeta @@ -790,11 +792,21 @@ func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Offset |= (int32(b) & 0x7F) << shift + stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { break } } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMeta + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Offset = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex case 7: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field CreatedAt", wireType) @@ -1318,42 +1330,42 @@ var ( ErrIntOverflowMeta = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_841ef32afee093f0) } +func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_b8385560be3db2c8) } -var fileDescriptor_meta_841ef32afee093f0 = []byte{ - // 543 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0x41, 0x6f, 0xd3, 0x30, - 0x14, 0xc7, 0x1b, 0xd2, 0x74, 0xab, 0x4b, 0xd7, 0x60, 0xa6, 0x29, 0x02, 0x91, 0x66, 0x15, 0x88, - 0x72, 0x09, 0x12, 0x48, 0x9c, 0x10, 0x52, 0x57, 0x38, 0xec, 0xb0, 0x8b, 0xc7, 0x09, 0x09, 0x45, - 0x5e, 0xe2, 0x94, 0xa8, 0x89, 0x5d, 0x9c, 0x67, 0x68, 0xf9, 0x14, 0x7c, 0x14, 0xae, 0x7c, 0x03, - 0x8e, 0x3b, 0x72, 0x9a, 0x50, 0xfb, 0x35, 0x38, 0x20, 0x3b, 0x69, 0xd9, 0x46, 0x0f, 0x68, 0xb7, - 0xe7, 0xdf, 0x8b, 0x9f, 0xdf, 0xff, 0xff, 0x5e, 0x10, 0x2a, 0x18, 0xd0, 0x70, 0x26, 0x05, 0x08, - 0xfc, 0x30, 0x16, 0x45, 0x98, 0xf1, 0x34, 0x57, 0xf3, 0x84, 0x6a, 0x9a, 0x53, 0x48, 0x85, 0x2c, - 0x42, 0xa0, 0xe5, 0x34, 0x3c, 0xa3, 0xf1, 0x94, 0xf1, 0xe4, 0xde, 0xfe, 0x44, 0x4c, 0x84, 0xb9, - 0xf0, 0x54, 0x47, 0xd5, 0xdd, 0xc1, 0x6f, 0x1b, 0x75, 0x4f, 0x41, 0x48, 0xf6, 0x96, 0x96, 0xd3, - 0x13, 0x06, 0x14, 0x3f, 0x46, 0xbd, 0x82, 0xce, 0xa3, 0x58, 0xf0, 0x58, 0x49, 0xc9, 0x78, 0xbc, - 0xf0, 0xac, 0xc0, 0x1a, 0x3a, 0x64, 0xaf, 0xa0, 0xf3, 0xf1, 0x5f, 0x8a, 0x9f, 0x20, 0x37, 0xa7, - 0xc0, 0x4a, 0x88, 0x62, 0x51, 0xcc, 0x72, 0x06, 0x2c, 0xf1, 0x6e, 0x05, 0xd6, 0xd0, 0x26, 0xbd, - 0x8a, 0x8f, 0xd7, 0x18, 0x1f, 0xa0, 0x56, 0x09, 0x14, 0x54, 0xe9, 0xd9, 0x81, 0x35, 0x6c, 0x93, - 0xfa, 0x84, 0x63, 0x74, 0xa7, 0x2a, 0x07, 0xf9, 0x22, 0x92, 0x8a, 0xf3, 0x8c, 0x4f, 0xbc, 0x66, - 0x60, 0x0f, 0x3b, 0xcf, 0x5e, 0x84, 0xff, 0xa3, 0x2a, 0xbc, 0xd2, 0x3b, 0x51, 0x9c, 0xb8, 0x9b, - 0x82, 0xa4, 0xaa, 0x87, 0x1f, 0xa1, 0x3d, 0x96, 0xa6, 0x2c, 0x86, 0xec, 0x13, 0x8b, 0x62, 0x29, - 0xb8, 0xe7, 0x98, 0x26, 0xba, 0x1b, 0x3a, 0x96, 0x82, 0xeb, 0x1e, 0x45, 0x9a, 0x96, 0x0c, 0xbc, - 0x96, 0x91, 0x5b, 0x9f, 0xf0, 0x03, 0x84, 0x62, 0xc9, 0x28, 0xb0, 0x24, 0xa2, 0xe0, 0xed, 0x18, - 0x81, 0xed, 0x9a, 0x8c, 0x4c, 0x5a, 0xcd, 0x92, 0x75, 0x7a, 0xb7, 0x4a, 0xd7, 0x64, 0x04, 0xf8, - 0x15, 0x72, 0xa9, 0x82, 0x0f, 0x42, 0x66, 0x5f, 0x28, 0x64, 0x82, 0x47, 0x59, 0xe2, 0xb5, 0x03, - 0x6b, 0xd8, 0x3c, 0xba, 0xbb, 0xbc, 0xe8, 0xf7, 0x46, 0x97, 0x73, 0xc7, 0xaf, 0x49, 0xef, 0xca, - 0xc7, 0xc7, 0x09, 0x7e, 0x8f, 0x3a, 0x05, 0xe5, 0x8a, 0xe6, 0xda, 0x9e, 0xd2, 0x73, 0x8d, 0x37, - 0x2f, 0x6f, 0xe0, 0xcd, 0x89, 0xa9, 0xa2, 0x1d, 0x42, 0xc5, 0x3a, 0x2c, 0x07, 0xdf, 0x2d, 0xe4, - 0x5e, 0xb7, 0x10, 0xbb, 0xc8, 0xe6, 0xe2, 0xb3, 0x99, 0xba, 0x4d, 0x74, 0xa8, 0x09, 0xc8, 0x85, - 0x99, 0x6e, 0x97, 0xe8, 0x10, 0x07, 0xa8, 0x25, 0x95, 0x51, 0x63, 0x1b, 0x35, 0xed, 0xe5, 0x45, - 0xdf, 0x21, 0x4a, 0x6b, 0x70, 0xa4, 0xd2, 0x9d, 0xf7, 0x51, 0x47, 0x52, 0x3e, 0x61, 0x51, 0x09, - 0x54, 0x82, 0xd7, 0x34, 0xd5, 0x90, 0x41, 0xa7, 0x9a, 0xe0, 0xfb, 0xa8, 0x5d, 0x7d, 0xc0, 0x78, - 0x62, 0x46, 0x62, 0x93, 0x5d, 0x03, 0xde, 0xf0, 0x04, 0x1f, 0xa2, 0xdb, 0x92, 0x7d, 0x54, 0xac, - 0xac, 0x8d, 0x6d, 0x99, 0x7c, 0x67, 0xc3, 0x46, 0x30, 0xf8, 0x66, 0xa1, 0x83, 0xed, 0x12, 0xf1, - 0x3e, 0x72, 0xaa, 0x57, 0x2b, 0x0d, 0xd5, 0x41, 0xab, 0xd0, 0x4f, 0x55, 0x3b, 0xaa, 0xc3, 0xad, - 0x2b, 0x6c, 0x6f, 0x5f, 0xe1, 0xeb, 0x0d, 0x35, 0xff, 0x69, 0xe8, 0x92, 0x27, 0xce, 0x76, 0x4f, - 0x8e, 0x0e, 0x7f, 0x2c, 0x7d, 0xeb, 0x7c, 0xe9, 0x5b, 0xbf, 0x96, 0xbe, 0xf5, 0x75, 0xe5, 0x37, - 0xce, 0x57, 0x7e, 0xe3, 0xe7, 0xca, 0x6f, 0xbc, 0xdb, 0xa9, 0x87, 0x76, 0xd6, 0x32, 0xff, 0xe5, - 0xf3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x74, 0xb0, 0xab, 0x79, 0xe1, 0x03, 0x00, 0x00, +var fileDescriptor_meta_b8385560be3db2c8 = []byte{ + // 544 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xc1, 0x6e, 0xd3, 0x4c, + 0x10, 0xae, 0x7f, 0xc7, 0x69, 0xb3, 0xf9, 0xd3, 0x98, 0xa5, 0xaa, 0x2c, 0x10, 0x8e, 0x1b, 0x81, + 0x08, 0x17, 0x23, 0x81, 0xc4, 0x09, 0x21, 0xa5, 0x81, 0x43, 0x0f, 0xbd, 0x6c, 0x39, 0x21, 0x21, + 0x6b, 0x6b, 0xaf, 0x83, 0x15, 0x7b, 0x37, 0xac, 0x67, 0x21, 0xe1, 0x29, 0x78, 0x14, 0xae, 0xbc, + 0x01, 0xc7, 0x1e, 0x39, 0x55, 0x28, 0x79, 0x0d, 0x0e, 0x68, 0x77, 0x93, 0xd0, 0x96, 0x1c, 0x10, + 0xb7, 0x99, 0x6f, 0x76, 0x67, 0xbf, 0xef, 0x9b, 0x59, 0x84, 0x2a, 0x06, 0x34, 0x9e, 0x4a, 0x01, + 0x02, 0xdf, 0x4f, 0x45, 0x15, 0x17, 0x3c, 0x2f, 0xd5, 0x2c, 0xa3, 0x1a, 0x2d, 0x29, 0xe4, 0x42, + 0x56, 0x31, 0xd0, 0x7a, 0x12, 0x9f, 0xd3, 0x74, 0xc2, 0x78, 0x76, 0xe7, 0x60, 0x2c, 0xc6, 0xc2, + 0x5c, 0x78, 0xac, 0x23, 0x7b, 0xb7, 0xff, 0xd3, 0x45, 0x9d, 0x33, 0x10, 0x92, 0xbd, 0xa6, 0xf5, + 0xe4, 0x94, 0x01, 0xc5, 0x0f, 0x51, 0xb7, 0xa2, 0xb3, 0x24, 0x15, 0x3c, 0x55, 0x52, 0x32, 0x9e, + 0xce, 0x03, 0x27, 0x72, 0x06, 0x1e, 0xd9, 0xaf, 0xe8, 0x6c, 0xf4, 0x1b, 0xc5, 0x8f, 0x90, 0x5f, + 0x52, 0x60, 0x35, 0x24, 0xa9, 0xa8, 0xa6, 0x25, 0x03, 0x96, 0x05, 0xff, 0x45, 0xce, 0xc0, 0x25, + 0x5d, 0x8b, 0x8f, 0xd6, 0x30, 0x3e, 0x44, 0xcd, 0x1a, 0x28, 0xa8, 0x3a, 0x70, 0x23, 0x67, 0xd0, + 0x22, 0xab, 0x0c, 0xa7, 0xe8, 0x96, 0x6d, 0x07, 0xe5, 0x3c, 0x91, 0x8a, 0xf3, 0x82, 0x8f, 0x83, + 0x46, 0xe4, 0x0e, 0xda, 0x4f, 0x9e, 0xc5, 0x7f, 0xa3, 0x2a, 0xbe, 0xc6, 0x9d, 0x28, 0x4e, 0xfc, + 0x4d, 0x43, 0x62, 0xfb, 0xe1, 0x07, 0x68, 0x9f, 0xe5, 0x39, 0x4b, 0xa1, 0xf8, 0xc0, 0x92, 0x54, + 0x0a, 0x1e, 0x78, 0x86, 0x44, 0x67, 0x83, 0x8e, 0xa4, 0xe0, 0x9a, 0xa3, 0xc8, 0xf3, 0x9a, 0x41, + 0xd0, 0xb4, 0x1c, 0x6d, 0x86, 0xef, 0x21, 0x94, 0x4a, 0x46, 0x81, 0x65, 0x09, 0x85, 0x60, 0xd7, + 0x08, 0x6c, 0xad, 0x90, 0xa1, 0x29, 0xab, 0x69, 0xb6, 0x2e, 0xef, 0xd9, 0xf2, 0x0a, 0x19, 0x02, + 0x7e, 0x81, 0x7c, 0xaa, 0xe0, 0x9d, 0x90, 0xc5, 0x27, 0x0a, 0x85, 0xe0, 0x49, 0x91, 0x05, 0xad, + 0xc8, 0x19, 0x34, 0x8e, 0x6f, 0x2f, 0x2e, 0x7b, 0xdd, 0xe1, 0xd5, 0xda, 0xc9, 0x4b, 0xd2, 0xbd, + 0x76, 0xf8, 0x24, 0xc3, 0x6f, 0x51, 0xbb, 0xa2, 0x5c, 0xd1, 0x52, 0xdb, 0x53, 0x07, 0xbe, 0xf1, + 0xe6, 0xf9, 0x3f, 0x78, 0x73, 0x6a, 0xba, 0x68, 0x87, 0x50, 0xb5, 0x0e, 0xeb, 0xfe, 0x57, 0x07, + 0xf9, 0x37, 0x2d, 0xc4, 0x3e, 0x72, 0xb9, 0xf8, 0x68, 0xa6, 0xee, 0x12, 0x1d, 0x6a, 0x04, 0xe4, + 0xdc, 0x4c, 0xb7, 0x43, 0x74, 0x88, 0x23, 0xd4, 0x94, 0xca, 0xa8, 0x71, 0x8d, 0x9a, 0xd6, 0xe2, + 0xb2, 0xe7, 0x11, 0xa5, 0x35, 0x78, 0x52, 0x69, 0xe6, 0x3d, 0xd4, 0x96, 0x94, 0x8f, 0x59, 0x52, + 0x03, 0x95, 0x10, 0x34, 0x4c, 0x37, 0x64, 0xa0, 0x33, 0x8d, 0xe0, 0xbb, 0xa8, 0x65, 0x0f, 0x30, + 0x9e, 0x99, 0x91, 0xb8, 0x64, 0xcf, 0x00, 0xaf, 0x78, 0x86, 0x8f, 0xd0, 0xff, 0x92, 0xbd, 0x57, + 0xac, 0x5e, 0x19, 0xdb, 0x34, 0xf5, 0xf6, 0x06, 0x1b, 0x42, 0xff, 0x8b, 0x83, 0x0e, 0xb7, 0x4b, + 0xc4, 0x07, 0xc8, 0xb3, 0xaf, 0x5a, 0x0d, 0x36, 0xd1, 0x2a, 0xf4, 0x53, 0x76, 0x47, 0x75, 0xb8, + 0x75, 0x85, 0xdd, 0xed, 0x2b, 0x7c, 0x93, 0x50, 0xe3, 0x0f, 0x42, 0x57, 0x3c, 0xf1, 0xb6, 0x7b, + 0x72, 0x7c, 0xf4, 0x6d, 0x11, 0x3a, 0x17, 0x8b, 0xd0, 0xf9, 0xb1, 0x08, 0x9d, 0xcf, 0xcb, 0x70, + 0xe7, 0x62, 0x19, 0xee, 0x7c, 0x5f, 0x86, 0x3b, 0x6f, 0x76, 0x57, 0x43, 0x3b, 0x6f, 0x9a, 0x7f, + 0xf9, 0xf4, 0x57, 0x00, 0x00, 0x00, 0xff, 0xff, 0xca, 0x64, 0xf6, 0x93, 0xe1, 0x03, 0x00, 0x00, } diff --git a/task/backend/meta.proto b/task/backend/meta.proto index 0c12fe52b6..f82aefc5fb 100644 --- a/task/backend/meta.proto +++ b/task/backend/meta.proto @@ -25,7 +25,7 @@ message StoreTaskMeta { string effective_cron = 5; // Task's configured delay, in seconds. - int32 offset = 6; + string offset = 6; int64 created_at = 7; int64 updated_at = 8; diff --git a/task/backend/meta_test.go b/task/backend/meta_test.go index 8bcd43cd64..8dfa2b9a81 100644 --- a/task/backend/meta_test.go +++ b/task/backend/meta_test.go @@ -190,7 +190,7 @@ func TestMeta_CreateNextRun_Delay(t *testing.T) { MaxConcurrency: 2, Status: "enabled", EffectiveCron: "* * * * *", // Every minute. - Offset: 5, + Offset: "5s", LatestCompleted: 30, // Arbitrary non-overlap starting point. } @@ -219,7 +219,7 @@ func TestMeta_ManuallyRunTimeRange(t *testing.T) { MaxConcurrency: 2, Status: "enabled", EffectiveCron: "* * * * *", // Every minute. - Offset: 5, + Offset: "5s", LatestCompleted: 30, // Arbitrary non-overlap starting point. } diff --git a/task/backend/storetest/storetest.go b/task/backend/storetest/storetest.go index 810fc6ee58..35dc587b01 100644 --- a/task/backend/storetest/storetest.go +++ b/task/backend/storetest/storetest.go @@ -12,6 +12,7 @@ import ( platform "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/snowflake" "github.com/influxdata/influxdb/task/backend" + "github.com/influxdata/influxdb/task/options" ) var idGen = snowflake.NewIDGenerator() @@ -522,8 +523,15 @@ from(bucket:"test") |> range(start:-1h)` if meta.EffectiveCron != "* * * * *" { t.Fatalf("unexpected cron stored in meta: %q", meta.EffectiveCron) } - - if time.Duration(meta.Offset)*time.Second != 5*time.Second { + duration := options.Duration{} + if err := duration.Parse(meta.Offset); err != nil { + t.Fatal(err) + } + dur, err := duration.DurationFrom(time.Now()) // is time.Now() the best option here + if err != nil { + t.Fatal(err) + } + if dur != 5*time.Second { t.Fatalf("unexpected delay stored in meta: %v", meta.Offset) } @@ -683,7 +691,12 @@ from(bucket:"test") |> range(start:-1h)` t.Fatalf("unexpected cron stored in meta: %q", meta.EffectiveCron) } - if time.Duration(meta.Offset)*time.Second != 5*time.Second { + duration := options.Duration{} + + if err := duration.Parse(meta.Offset); err != nil { + t.Fatal(err) + } + if duration.String() != "5s" { t.Fatalf("unexpected delay stored in meta: %v", meta.Offset) } diff --git a/task/backend/task.go b/task/backend/task.go index c6bcafb804..09f9f613c7 100644 --- a/task/backend/task.go +++ b/task/backend/task.go @@ -224,10 +224,10 @@ func ToInfluxTask(t *StoreTask, m *StoreTaskMeta) (*influxdb.Task, error) { Cron: opts.Cron, AuthorizationID: influxdb.ID(m.AuthorizationID), } - if opts.Every != 0 { + if !opts.Every.IsZero() { pt.Every = opts.Every.String() } - if opts.Offset != nil && *opts.Offset != 0 { + if opts.Offset != nil && !opts.Offset.IsZero() { pt.Offset = opts.Offset.String() } if m != nil { diff --git a/task/options/options.go b/task/options/options.go index 8ca6dba96c..b6e0e1e592 100644 --- a/task/options/options.go +++ b/task/options/options.go @@ -8,6 +8,10 @@ import ( "sync" "time" + "github.com/influxdata/flux/parser" + + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux" "github.com/influxdata/flux/semantic" "github.com/influxdata/flux/values" @@ -40,31 +44,111 @@ type Options struct { // Every represents a fixed period to repeat execution. // this can be unmarshaled from json as a string i.e.: "1d" will unmarshal as 1 day - Every time.Duration `json:"every,omitempty"` + Every Duration `json:"every,omitempty"` // Offset represents a delay before execution. // this can be unmarshaled from json as a string i.e.: "1d" will unmarshal as 1 day - Offset *time.Duration `json:"offset,omitempty"` + Offset *Duration `json:"offset,omitempty"` Concurrency *int64 `json:"concurrency,omitempty"` Retry *int64 `json:"retry,omitempty"` } +// Duration is a time span that supports the same units as the flux parser's time duration, as well as negative length time spans. +type Duration struct { + Node ast.DurationLiteral +} + +func (a Duration) String() string { + return ast.Format(&a.Node) +} + +// Parse parses a string into a Duration. +func (a *Duration) Parse(s string) error { + q, err := parseSignedDuration(s) + if err != nil { + return err + } + a.Node = *q + return nil +} + +// MustParseDuration parses a string and returns a duration. It panics if there is an error. +func MustParseDuration(s string) (dur *Duration) { + dur = &Duration{} + if err := dur.Parse(s); err != nil { + panic(err) + } + return dur +} + +// parseSignedDuration is a helper wrapper around parser.ParseSignedDuration. +// We use it because we need to clear the basenode, but flux does not. +func parseSignedDuration(text string) (*ast.DurationLiteral, error) { + q, err := parser.ParseSignedDuration(text) + if err != nil { + return nil, err + } + q.BaseNode = ast.BaseNode{} + return q, err +} + +// UnmarshalText unmarshals text into a Duration. +func (a *Duration) UnmarshalText(text []byte) error { + q, err := parseSignedDuration(string(text)) + if err != nil { + return err + } + a.Node = *q + return nil +} + +// UnmarshalText marshals text into a Duration. +func (a Duration) MarshalText() ([]byte, error) { + return []byte(a.String()), nil +} + +// IsZero checks if each segment of the duration is zero, it doesn't check if the Duration sums to zero, just if each internal duration is zero. +func (a *Duration) IsZero() bool { + for i := range a.Node.Values { + if a.Node.Values[i].Magnitude != 0 { + return false + } + } + return true +} + +// DurationFrom gives us a time.Duration from a time. +// Currently because of how flux works, this is just an approfimation for any time unit larger than hours. +func (a *Duration) DurationFrom(t time.Time) (time.Duration, error) { + return ast.DurationFrom(&a.Node, t) +} + +// Add adds the duration to a time. +func (a *Duration) Add(t time.Time) (time.Time, error) { + d, err := ast.DurationFrom(&a.Node, t) + if err != nil { + return time.Time{}, err + } + return t.Add(d), nil +} + // Clear clears out all options in the options struct, it us useful if you wish to reuse it. func (o *Options) Clear() { o.Name = "" o.Cron = "" - o.Every = 0 + o.Every = Duration{} o.Offset = nil o.Concurrency = nil o.Retry = nil } +// IsZero tells us if the options has been zeroed out. func (o *Options) IsZero() bool { return o.Name == "" && o.Cron == "" && - o.Every == 0 && + o.Every.IsZero() && o.Offset == nil && o.Concurrency == nil && o.Retry == nil @@ -80,6 +164,50 @@ const ( optRetry = "retry" ) +// contains is a helper function to see if an array of strings contains a string +func contains(s []string, e string) bool { + for i := range s { + if s[i] == e { + return true + } + } + return false +} + +func grabTaskOptionAST(p *ast.Package, keys ...string) map[string]ast.Expression { + res := make(map[string]ast.Expression, 2) // we preallocate two keys for the map, as that is how many we will use at maximum (offset and every) + for i := range p.Files { + for j := range p.Files[i].Body { + if p.Files[i].Body[j].Type() != "OptionStatement" { + continue + } + opt := (p.Files[i].Body[j]).(*ast.OptionStatement) + if opt.Assignment.Type() != "VariableAssignment" { + continue + } + asmt, ok := opt.Assignment.(*ast.VariableAssignment) + if !ok { + continue + } + if asmt.ID.Key() != "task" { + continue + } + ae, ok := asmt.Init.(*ast.ObjectExpression) + if !ok { + continue + } + for k := range ae.Properties { + prop := ae.Properties[k] + if key := prop.Key.Key(); prop != nil && contains(keys, key) { + res[key] = prop.Value + } + } + return res + } + } + return res +} + // FromScript extracts Options from a Flux script. func FromScript(script string) (Options, error) { if optionCache != nil { @@ -93,7 +221,12 @@ func FromScript(script string) (Options, error) { } opt := Options{Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)} - _, scope, err := flux.Eval(script) + fluxAST, err := flux.Parse(script) + if err != nil { + return opt, err + } + durTypes := grabTaskOptionAST(fluxAST, optEvery, optOffset) + _, scope, err := flux.EvalAST(fluxAST) if err != nil { return opt, err } @@ -103,6 +236,10 @@ func FromScript(script string) (Options, error) { if !ok { return opt, errors.New("missing required option: 'task'") } + // check to make sure task is an object + if err := checkNature(task.PolyType().Nature(), semantic.Object); err != nil { + return opt, err + } optObject := task.Object() if err := validateOptionNames(optObject); err != nil { return opt, err @@ -138,14 +275,39 @@ func FromScript(script string) (Options, error) { if err := checkNature(everyVal.PolyType().Nature(), semantic.Duration); err != nil { return opt, err } - opt.Every = everyVal.Duration().Duration() + dur, ok := durTypes["every"] + if !ok || dur == nil { + return opt, errors.New("failed to parse `every` in task") + } + durNode, err := parseSignedDuration(dur.Location().Source) + if err != nil { + return opt, err + } + if !ok || durNode == nil { + return opt, errors.New("failed to parse `every` in task") + } + durNode.BaseNode = ast.BaseNode{} + opt.Every.Node = *durNode } if offsetVal, ok := optObject.Get(optOffset); ok { if err := checkNature(offsetVal.PolyType().Nature(), semantic.Duration); err != nil { return opt, err } - opt.Offset = pointer.Duration(offsetVal.Duration().Duration()) + dur, ok := durTypes["offset"] + if !ok || dur == nil { + return opt, errors.New("failed to parse `offset` in task") + } + durNode, err := parseSignedDuration(dur.Location().Source) + if err != nil { + return opt, err + } + if !ok || durNode == nil { + return opt, errors.New("failed to parse `offset` in task") + } + durNode.BaseNode = ast.BaseNode{} + opt.Offset = &Duration{} + opt.Offset.Node = *durNode } if concurrencyVal, ok := optObject.Get(optConcurrency); ok { @@ -177,13 +339,14 @@ func FromScript(script string) (Options, error) { // Validate returns an error if the options aren't valid. func (o *Options) Validate() error { + now := time.Now() var errs []string if o.Name == "" { errs = append(errs, "name required") } cronPresent := o.Cron != "" - everyPresent := o.Every != 0 + everyPresent := !o.Every.IsZero() if cronPresent == everyPresent { // They're both present or both missing. errs = append(errs, "must specify exactly one of either cron or every") @@ -193,16 +356,25 @@ func (o *Options) Validate() error { errs = append(errs, "cron invalid: "+err.Error()) } } else if everyPresent { - if o.Every < time.Second { + every, err := o.Every.DurationFrom(now) + if err != nil { + return err + } + if every < time.Second { errs = append(errs, "every option must be at least 1 second") - } else if o.Every.Truncate(time.Second) != o.Every { + } else if every.Truncate(time.Second) != every { errs = append(errs, "every option must be expressible as whole seconds") } } - - if o.Offset != nil && o.Offset.Truncate(time.Second) != *o.Offset { - // For now, allowing negative offset delays. Maybe they're useful for forecasting? - errs = append(errs, "offset option must be expressible as whole seconds") + if o.Offset != nil { + offset, err := o.Offset.DurationFrom(now) + if err != nil { + return err + } + if offset.Truncate(time.Second) != offset { + // For now, allowing negative offset delays. Maybe they're useful for forecasting? + errs = append(errs, "offset option must be expressible as whole seconds") + } } if o.Concurrency != nil { if *o.Concurrency < 1 { @@ -231,11 +403,15 @@ func (o *Options) Validate() error { // If the every option was specified, it is converted into a cron string using "@every". // Otherwise, the empty string is returned. // The value of the offset option is not considered. +// TODO(docmerlin): create an EffectiveCronStringFrom(t time.Time) string, +// that works from a unit of time. +// Do not use this if you haven't checked for validity already. func (o *Options) EffectiveCronString() string { if o.Cron != "" { return o.Cron } - if o.Every > 0 { + every, _ := o.Every.DurationFrom(time.Now()) // we can ignore errors here because we have alreach checked for validity. + if every > 0 { return "@every " + o.Every.String() } return "" diff --git a/task/options/options_test.go b/task/options/options_test.go index 5f77f51658..e2a96b675f 100644 --- a/task/options/options_test.go +++ b/task/options/options_test.go @@ -21,10 +21,10 @@ func scriptGenerator(opt options.Options, body string) string { if opt.Cron != "" { taskData = fmt.Sprintf("%s cron: %q,\n", taskData, opt.Cron) } - if opt.Every != 0 { + if !opt.Every.IsZero() { taskData = fmt.Sprintf("%s every: %s,\n", taskData, opt.Every.String()) } - if opt.Offset != nil && *opt.Offset != 0 { + if opt.Offset != nil && !(*opt.Offset).IsZero() { taskData = fmt.Sprintf("%s offset: %s,\n", taskData, opt.Offset.String()) } if opt.Concurrency != nil && *opt.Concurrency != 0 { @@ -45,20 +45,36 @@ func scriptGenerator(opt options.Options, body string) string { %s`, taskData, body) } +func TestNegDurations(t *testing.T) { + dur := options.MustParseDuration("-1m") + d, err := dur.DurationFrom(time.Now()) + if err != nil { + t.Fatal(err) + } + if d != -time.Minute { + t.Fatalf("expected duration to be -1m but was %s", d) + } +} + func TestFromScript(t *testing.T) { for _, c := range []struct { script string exp options.Options shouldErr bool }{ - {script: scriptGenerator(options.Options{Name: "name0", Cron: "* * * * *", Concurrency: pointer.Int64(2), Retry: pointer.Int64(3), Offset: pointer.Duration(-time.Minute)}, ""), exp: options.Options{Name: "name0", Cron: "* * * * *", Concurrency: pointer.Int64(2), Retry: pointer.Int64(3), Offset: pointer.Duration(-time.Minute)}}, - {script: scriptGenerator(options.Options{Name: "name1", Every: 5 * time.Second}, ""), exp: options.Options{Name: "name1", Every: 5 * time.Second, Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}}, + {script: scriptGenerator(options.Options{Name: "name0", Cron: "* * * * *", Concurrency: pointer.Int64(2), Retry: pointer.Int64(3), Offset: options.MustParseDuration("-1m")}, ""), + exp: options.Options{Name: "name0", + Cron: "* * * * *", + Concurrency: pointer.Int64(2), + Retry: pointer.Int64(3), + Offset: options.MustParseDuration("-1m")}}, + {script: scriptGenerator(options.Options{Name: "name1", Every: *(options.MustParseDuration("5s"))}, ""), exp: options.Options{Name: "name1", Every: *(options.MustParseDuration("5s")), Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}}, {script: scriptGenerator(options.Options{Name: "name2", Cron: "* * * * *"}, ""), exp: options.Options{Name: "name2", Cron: "* * * * *", Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}}, - {script: scriptGenerator(options.Options{Name: "name3", Every: time.Hour, Cron: "* * * * *"}, ""), shouldErr: true}, - {script: scriptGenerator(options.Options{Name: "name4", Concurrency: pointer.Int64(1000), Every: time.Hour}, ""), shouldErr: true}, + {script: scriptGenerator(options.Options{Name: "name3", Every: *(options.MustParseDuration("1h")), Cron: "* * * * *"}, ""), shouldErr: true}, + {script: scriptGenerator(options.Options{Name: "name4", Concurrency: pointer.Int64(1000), Every: *(options.MustParseDuration("1h"))}, ""), shouldErr: true}, {script: "option task = {\n name: \"name5\",\n concurrency: 0,\n every: 1m0s,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true}, {script: "option task = {\n name: \"name6\",\n concurrency: 1,\n every: 1,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true}, - {script: scriptGenerator(options.Options{Name: "name7", Retry: pointer.Int64(20), Every: time.Hour}, ""), shouldErr: true}, + {script: scriptGenerator(options.Options{Name: "name7", Retry: pointer.Int64(20), Every: *(options.MustParseDuration("1h"))}, ""), shouldErr: true}, {script: "option task = {\n name: \"name8\",\n retry: 0,\n every: 1m0s,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true}, {script: scriptGenerator(options.Options{Name: "name9"}, ""), shouldErr: true}, {script: scriptGenerator(options.Options{}, ""), shouldErr: true}, @@ -125,7 +141,7 @@ func TestValidate(t *testing.T) { } *bad = good - bad.Every = time.Minute + bad.Every = *options.MustParseDuration("1m") if err := bad.Validate(); err == nil { t.Error("expected error for options with both cron and every") } @@ -138,13 +154,13 @@ func TestValidate(t *testing.T) { *bad = good bad.Cron = "" - bad.Every = -1 * time.Minute + bad.Every = *options.MustParseDuration("-1m") if err := bad.Validate(); err == nil { t.Error("expected error for negative every") } *bad = good - bad.Offset = pointer.Duration(1500 * time.Millisecond) + bad.Offset = options.MustParseDuration("1500ms") if err := bad.Validate(); err == nil { t.Error("expected error for sub-second delay resolution") } @@ -177,11 +193,11 @@ func TestValidate(t *testing.T) { func TestEffectiveCronString(t *testing.T) { for _, c := range []struct { c string - e time.Duration + e options.Duration exp string }{ {c: "10 * * * *", exp: "10 * * * *"}, - {e: 10 * time.Second, exp: "@every 10s"}, + {e: *(options.MustParseDuration("10s")), exp: "@every 10s"}, {exp: ""}, } { o := options.Options{Cron: c.c, Every: c.e} @@ -191,3 +207,68 @@ func TestEffectiveCronString(t *testing.T) { } } } + +func TestDurationMarshaling(t *testing.T) { + t.Run("unmarshaling", func(t *testing.T) { + now := time.Now() + dur1 := options.Duration{} + if err := dur1.UnmarshalText([]byte("1h10m3s")); err != nil { + t.Fatal(err) + } + d1, err1 := dur1.DurationFrom(now) + if err1 != nil { + t.Fatal(err1) + } + + dur2 := options.Duration{} + if err := dur2.Parse("1h10m3s"); err != nil { + t.Fatal(err) + } + d2, err2 := dur2.DurationFrom(now) + if err2 != nil { + t.Fatal(err2) + } + + if d1 != d2 || d1 != time.Hour+10*time.Minute+3*time.Second { + t.Fatal("Parse and Marshaling do not give us the same result") + } + }) + + t.Run("marshaling", func(t *testing.T) { + dur := options.Duration{} + if err := dur.UnmarshalText([]byte("1h10m3s")); err != nil { + t.Fatal(err) + } + if dur.String() != "1h10m3s" { + t.Fatalf("duration string should be \"1h10m3s\" but was %s", dur.String()) + } + text, err := dur.MarshalText() + if err != nil { + t.Fatal(err) + } + if string(text) != "1h10m3s" { + t.Fatalf("duration text should be \"1h10m3s\" but was %s", text) + } + }) + + t.Run("parse zero", func(t *testing.T) { + dur := options.Duration{} + if err := dur.UnmarshalText([]byte("0h0s")); err != nil { + t.Fatal(err) + } + if !dur.IsZero() { + t.Fatalf("expected duration \"0s\" to be zero but was %s", dur.String()) + } + }) +} + +func TestDurationMath(t *testing.T) { + dur := options.MustParseDuration("10s") + d, err := dur.DurationFrom(time.Now()) + if err != nil { + t.Fatal(err) + } + if d != 10*time.Second { + t.Fatalf("expected duration to be 10s but it was %s", d) + } +} diff --git a/task/platform_adapter.go b/task/platform_adapter.go index 8740434bba..f9ff265d49 100644 --- a/task/platform_adapter.go +++ b/task/platform_adapter.go @@ -176,10 +176,10 @@ func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platf AuthorizationID: req.AuthorizationID, } - if opts.Every != 0 { + if !opts.Every.IsZero() { task.Every = opts.Every.String() } - if opts.Offset != nil && *opts.Offset != 0 { + if opts.Offset != nil && !(*opts.Offset).IsZero() { task.Offset = opts.Offset.String() } @@ -429,10 +429,10 @@ func (p *pAdapter) toPlatformTask(ctx context.Context, t backend.StoreTask, m *b Flux: t.Script, Cron: opts.Cron, } - if opts.Every != 0 { + if !opts.Every.IsZero() { pt.Every = opts.Every.String() } - if opts.Offset != nil && *opts.Offset != 0 { + if opts.Offset != nil && !(*opts.Offset).IsZero() { pt.Offset = opts.Offset.String() } if m != nil { diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index 1481bbef21..21bb238358 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -18,7 +18,6 @@ import ( "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/inmem" - "github.com/influxdata/influxdb/pkg/pointer" "github.com/influxdata/influxdb/task" "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb/task/options" @@ -276,8 +275,8 @@ func testTaskCRUD(t *testing.T, sys *System) { // Update task: switch to every. newStatus = string(backend.TaskActive) - newFlux = "import \"http\"\n\noption task = {\n\tname: \"task-changed #98\",\n\tevery: 30000000000ns,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> http.to(url: \"http://example.com\")" - f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Every: 30 * time.Second}}) + newFlux = "import \"http\"\n\noption task = {\n\tname: \"task-changed #98\",\n\tevery: 30s,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> http.to(url: \"http://example.com\")" + f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Every: *(options.MustParseDuration("30s"))}}) if err != nil { t.Fatal(err) } @@ -361,7 +360,7 @@ from(bucket: "b") expectedFlux := `import "http" -option task = {name: "task-Options-Update", every: 10000000000ns, concurrency: 100} +option task = {name: "task-Options-Update", every: 10s, concurrency: 100} from(bucket: "b") |> http.to(url: "http://example.com")` @@ -378,7 +377,7 @@ from(bucket: "b") if err != nil { t.Fatal(err) } - f, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, influxdb.TaskUpdate{Options: options.Options{Offset: pointer.Duration(0), Every: 10 * time.Second}}) + f, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, influxdb.TaskUpdate{Options: options.Options{Offset: &options.Duration{}, Every: *(options.MustParseDuration("10s"))}}) if err != nil { t.Fatal(err) } @@ -937,7 +936,7 @@ func testTaskConcurrency(t *testing.T, sys *System) { // Create a run for the last task we found. // The script should run every minute, so use max now. tid := tasks[len(tasks)-1].ID - if _, err := sys.TaskControlService.CreateNextRun(sys.Ctx, tid, math.MaxInt64); err != nil { + if _, err := sys.TaskControlService.CreateNextRun(sys.Ctx, tid, math.MaxInt64>>6); err != nil { // we use the >>6 here because math.MaxInt64 is too large which causes problems when converting back and forth from time // This may have errored due to the task being deleted. Check if the task still exists. if _, err2 := sys.TaskService.FindTaskByID(sys.Ctx, tid); err2 == backend.ErrTaskNotFound { diff --git a/task_test.go b/task_test.go index d898536217..1723e9f7dc 100644 --- a/task_test.go +++ b/task_test.go @@ -3,11 +3,9 @@ package influxdb_test import ( "encoding/json" "testing" - "time" "github.com/google/go-cmp/cmp" platform "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/pkg/pointer" _ "github.com/influxdata/influxdb/query/builtin" "github.com/influxdata/influxdb/task/options" ) @@ -18,10 +16,10 @@ func TestOptionsMarshal(t *testing.T) { if err := json.Unmarshal([]byte(`{"every":"10s", "offset":"1h"}`), tu); err != nil { t.Fatal(err) } - if tu.Options.Every != 10*time.Second { + if tu.Options.Every.String() != "10s" { t.Fatalf("option.every not properly unmarshaled, expected 10s got %s", tu.Options.Every) } - if *tu.Options.Offset != time.Hour { + if tu.Options.Offset.String() != "1h" { t.Fatalf("option.every not properly unmarshaled, expected 1h got %s", tu.Options.Offset) } @@ -38,22 +36,22 @@ func TestOptionsMarshal(t *testing.T) { func TestOptionsEdit(t *testing.T) { tu := &platform.TaskUpdate{} - tu.Options.Every = 10 * time.Second + tu.Options.Every = *(options.MustParseDuration("10s")) if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } t.Run("zeroing", func(t *testing.T) { - if tu.Options.Every != 0 { - t.Errorf("expected Every to be zeroed but it wasn't") + if !tu.Options.Every.IsZero() { + t.Errorf("expected Every to be zeroed but it was not") } }) t.Run("fmt string", func(t *testing.T) { - t.Skip("This won't work until the flux formatter formats durations in a nicer way") expected := `option task = {every: 10s, name: "foo"} - from(bucket:"x") - |> range(start:-1h)` + +from(bucket: "x") + |> range(start: -1h)` if *tu.Flux != expected { - t.Errorf("got the wrong task back, expected %s,\n got %s\n", expected, *tu.Flux) + t.Errorf("got the wrong task back, expected %s,\n got %s\n diff: %s", expected, *tu.Flux, cmp.Diff(expected, *tu.Flux)) } }) t.Run("replacement", func(t *testing.T) { @@ -61,15 +59,14 @@ func TestOptionsEdit(t *testing.T) { if err != nil { t.Error(err) } - if op.Every != 10*time.Second { + if op.Every.String() != "10s" { t.Logf("expected every to be 10s but was %s", op.Every) t.Fail() } }) t.Run("add new option", func(t *testing.T) { tu := &platform.TaskUpdate{} - ofst := 30 * time.Second - tu.Options.Offset = &ofst + tu.Options.Offset = options.MustParseDuration("30s") if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } @@ -77,7 +74,7 @@ func TestOptionsEdit(t *testing.T) { if err != nil { t.Error(err) } - if op.Offset == nil || *op.Offset != 30*time.Second { + if op.Offset == nil || op.Offset.String() != "30s" { t.Fatalf("expected every to be 30s but was %s", op.Every) } }) @@ -91,7 +88,7 @@ func TestOptionsEdit(t *testing.T) { if err != nil { t.Error(err) } - if op.Every != 0 { + if !op.Every.IsZero() { t.Fatalf("expected every to be 0 but was %s", op.Every) } if op.Cron != "* * * * *" { @@ -100,7 +97,7 @@ func TestOptionsEdit(t *testing.T) { }) t.Run("switching from cron to every", func(t *testing.T) { tu := &platform.TaskUpdate{} - tu.Options.Every = 10 * time.Second + tu.Options.Every = *(options.MustParseDuration("10s")) if err := tu.UpdateFlux(`option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } @@ -108,7 +105,7 @@ func TestOptionsEdit(t *testing.T) { if err != nil { t.Error(err) } - if op.Every != 10*time.Second { + if op.Every.String() != "10s" { t.Fatalf("expected every to be 10s but was %s", op.Every) } if op.Cron != "" { @@ -117,7 +114,7 @@ func TestOptionsEdit(t *testing.T) { }) t.Run("delete deletable option", func(t *testing.T) { tu := &platform.TaskUpdate{} - tu.Options.Offset = pointer.Duration(0) + tu.Options.Offset = &options.Duration{} expscript := `option task = {cron: "* * * * *", name: "foo"} from(bucket: "x") @@ -129,7 +126,7 @@ from(bucket: "x") if err != nil { t.Error(err) } - if op.Every != 0 { + if !op.Every.IsZero() { t.Fatalf("expected every to be 0s but was %s", op.Every) } if op.Cron != "* * * * *" { diff --git a/ui/cypress/e2e/tasks.test.ts b/ui/cypress/e2e/tasks.test.ts index 73dea882c6..947c25bbc5 100644 --- a/ui/cypress/e2e/tasks.test.ts +++ b/ui/cypress/e2e/tasks.test.ts @@ -21,7 +21,7 @@ describe('Tasks', () => { cy.getByTestID('dropdown--item New Task').click() cy.getByInputName('name').type(taskName) - cy.getByInputName('interval').type('1d') + cy.getByInputName('interval').type('24h') cy.getByInputName('offset').type('20m') cy.get('@bucket').then(({name}) => { @@ -103,7 +103,7 @@ describe('Tasks', () => { cy.getByTestID('dropdown--item New Task').click() cy.getByInputName('name').type('🦄ask') - cy.getByInputName('interval').type('1d') + cy.getByInputName('interval').type('24h') cy.getByInputName('offset').type('20m') cy.getByTestID('flux-editor').within(() => { diff --git a/ui/cypress/support/commands.ts b/ui/cypress/support/commands.ts index 789f711b35..e446af18cb 100644 --- a/ui/cypress/support/commands.ts +++ b/ui/cypress/support/commands.ts @@ -61,7 +61,7 @@ export const createTask = ( ): Cypress.Chainable => { const flux = `option task = { name: "${name}", - every: 1d, + every: 24h, offset: 20m } from(bucket: "defbuck") diff --git a/ui/src/shared/utils/resourceToTemplate.test.ts b/ui/src/shared/utils/resourceToTemplate.test.ts index cc82fe3b97..b9925fb061 100644 --- a/ui/src/shared/utils/resourceToTemplate.test.ts +++ b/ui/src/shared/utils/resourceToTemplate.test.ts @@ -17,7 +17,7 @@ const myfavetask: Task = { authorizationID: '037b084ed9abc000', every: '24h0m0s', flux: - 'option task = {name: "lala", every: 86400000000000ns, offset: 60000000000ns}\n\nfrom(bucket: "defnuck")\n\t|> range(start: -task.every)', + 'option task = {name: "lala", every: 24h0m0s, offset: 1m0s}\n\nfrom(bucket: "defnuck")\n\t|> range(start: -task.every)', id: '037b0877b359a000', labels: [ { @@ -194,7 +194,7 @@ describe('resourceToTemplate', () => { attributes: { every: '24h0m0s', flux: - 'option task = {name: "lala", every: 86400000000000ns, offset: 60000000000ns}\n\nfrom(bucket: "defnuck")\n\t|> range(start: -task.every)', + 'option task = {name: "lala", every: 24h0m0s, offset: 1m0s}\n\nfrom(bucket: "defnuck")\n\t|> range(start: -task.every)', name: 'lala', offset: '1m0s', status: 'active', diff --git a/ui/src/tasks/reducers/tasks.test.ts b/ui/src/tasks/reducers/tasks.test.ts index 5e41ced56a..2057b85649 100644 --- a/ui/src/tasks/reducers/tasks.test.ts +++ b/ui/src/tasks/reducers/tasks.test.ts @@ -30,7 +30,7 @@ describe('tasksReducer', () => { it('clears the interval property from the task options when cron is selected', () => { const initialState = defaultState - initialState.taskOptions = {...defaultTaskOptions, interval: '1d'} + initialState.taskOptions = {...defaultTaskOptions, interval: '24h'} // todo(docmerlin): allow for time units larger than 1d, right now h is the longest unit our s const actual = tasksReducer( initialState,