Merge pull request #12951 from influxdata/fix/task-options-flux-duration-parser

fix(tasks): make durations visibly show up nicely
pull/13050/head
Jorge Landivar 2019-04-01 09:59:15 -05:00 committed by GitHub
commit e983809bb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 469 additions and 163 deletions

View File

@ -2,7 +2,9 @@
// Feel free to add more pointerification functions for more types as you need them.
package pointer
import "time"
import (
"time"
)
// Duration returns a pointer to its argument.
func Duration(d time.Duration) *time.Duration {

44
task.go
View File

@ -6,9 +6,7 @@ import (
"errors"
"fmt"
"strconv"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/edit"
"github.com/influxdata/flux/parser"
@ -160,11 +158,11 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
// Every represents a fixed period to repeat execution.
// It gets marshalled from a string duration, i.e.: "10s" is 10 seconds
Every flux.Duration `json:"every,omitempty"`
Every options.Duration `json:"every,omitempty"`
// Offset represents a delay before execution.
// It gets marshalled from a string duration, i.e.: "10s" is 10 seconds
Offset *flux.Duration `json:"offset,omitempty"`
Offset *options.Duration `json:"offset,omitempty"`
Concurrency *int64 `json:"concurrency,omitempty"`
@ -178,9 +176,9 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
}
t.Options.Name = jo.Name
t.Options.Cron = jo.Cron
t.Options.Every = time.Duration(jo.Every)
t.Options.Every = jo.Every
if jo.Offset != nil {
offset := time.Duration(*jo.Offset)
offset := *jo.Offset
t.Options.Offset = &offset
}
t.Options.Concurrency = jo.Concurrency
@ -202,10 +200,10 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) {
Cron string `json:"cron,omitempty"`
// Every represents a fixed period to repeat execution.
Every flux.Duration `json:"every,omitempty"`
Every options.Duration `json:"every,omitempty"`
// Offset represents a delay before execution.
Offset *flux.Duration `json:"offset,omitempty"`
Offset *options.Duration `json:"offset,omitempty"`
Concurrency *int64 `json:"concurrency,omitempty"`
@ -215,9 +213,9 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) {
}{}
jo.Name = t.Options.Name
jo.Cron = t.Options.Cron
jo.Every = flux.Duration(t.Options.Every)
jo.Every = t.Options.Every
if t.Options.Offset != nil {
offset := flux.Duration(*t.Options.Offset)
offset := *t.Options.Offset
jo.Offset = &offset
}
jo.Concurrency = t.Options.Concurrency
@ -230,7 +228,7 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) {
func (t TaskUpdate) Validate() error {
switch {
case t.Options.Every != 0 && t.Options.Cron != "":
case !t.Options.Every.IsZero() && t.Options.Cron != "":
return errors.New("cannot specify both every and cron")
case t.Flux == nil && t.Status == nil && t.Options.IsZero() && t.Token == "":
return errors.New("cannot update task without content")
@ -252,25 +250,23 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error {
return ast.GetError(parsedPKG)
}
parsed := parsedPKG.Files[0]
if t.Options.Every != 0 && t.Options.Cron != "" {
return errors.New("cannot specify both every and cron")
if !t.Options.Every.IsZero() && t.Options.Cron != "" {
return errors.New("cannot specify both cron and every")
}
op := make(map[string]ast.Expression, 4)
if t.Options.Name != "" {
op["name"] = &ast.StringLiteral{Value: t.Options.Name}
}
if t.Options.Every != 0 {
d := ast.Duration{Magnitude: int64(t.Options.Every), Unit: "ns"}
op["every"] = &ast.DurationLiteral{Values: []ast.Duration{d}}
if !t.Options.Every.IsZero() {
op["every"] = &t.Options.Every.Node
}
if t.Options.Cron != "" {
op["cron"] = &ast.StringLiteral{Value: t.Options.Cron}
}
if t.Options.Offset != nil {
if *t.Options.Offset != 0 {
d := ast.Duration{Magnitude: int64(*t.Options.Offset), Unit: "ns"}
op["offset"] = &ast.DurationLiteral{Values: []ast.Duration{d}}
if !t.Options.Offset.IsZero() {
op["offset"] = &t.Options.Offset.Node
} else {
toDelete["offset"] = struct{}{}
}
@ -300,12 +296,12 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error {
case "offset":
if offset, ok := op["offset"]; ok && t.Options.Offset != nil {
delete(op, "offset")
p.Value = offset
p.Value = offset.Copy().(*ast.DurationLiteral)
}
case "every":
if every, ok := op["every"]; ok && t.Options.Every != 0 {
if every, ok := op["every"]; ok && !t.Options.Every.IsZero() {
p.Value = every.Copy().(*ast.DurationLiteral)
delete(op, "every")
p.Value = every
} else if cron, ok := op["cron"]; ok && t.Options.Cron != "" {
delete(op, "cron")
p.Value = cron
@ -315,10 +311,10 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error {
if cron, ok := op["cron"]; ok && t.Options.Cron != "" {
delete(op, "cron")
p.Value = cron
} else if every, ok := op["every"]; ok && t.Options.Every != 0 {
} else if every, ok := op["every"]; ok && !t.Options.Every.IsZero() {
delete(op, "every")
p.Key = &ast.Identifier{Name: "every"}
p.Value = every
p.Value = every.Copy().(*ast.DurationLiteral)
}
}
}

View File

@ -14,6 +14,7 @@ import (
// This file contains helper methods for the StoreTaskMeta type defined in protobuf.
// NewStoreTaskMeta returns a new StoreTaskMeta based on the given request and parsed options.
// Do not call this without validating the request and options first.
func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta {
stm := StoreTaskMeta{
Status: string(req.Status),
@ -26,7 +27,8 @@ func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta {
stm.MaxConcurrency = int32(*o.Concurrency)
}
if o.Offset != nil {
stm.Offset = int32(*o.Offset / time.Second)
offset, _ := o.Offset.DurationFrom(time.Unix(req.ScheduleAfter, 0)) // we can do this because it is validated already.
stm.Offset = offset.String()
}
if stm.Status == "" {
@ -43,20 +45,29 @@ func (stm *StoreTaskMeta) AlignLatestCompleted() {
if strings.HasPrefix(stm.EffectiveCron, "@every ") {
everyString := strings.TrimPrefix(stm.EffectiveCron, "@every ")
every, err := time.ParseDuration(everyString)
every := options.Duration{}
err := every.Parse(everyString)
if err != nil {
// We cannot align a invalid time
return
}
t := time.Unix(stm.LatestCompleted, 0).Truncate(every).Unix()
if t == stm.LatestCompleted {
t := time.Unix(stm.LatestCompleted, 0)
everyDur, err := every.DurationFrom(t)
if err != nil {
return
}
t = t.Truncate(everyDur)
if t.Unix() == stm.LatestCompleted {
// For example, every 1m truncates to exactly on the minute.
// But the input request is schedule after, not "on or after".
// Add one interval.
t += int64(every / time.Second)
tafter, err := every.Add(t)
if err != nil {
return
}
t = tafter
}
stm.LatestCompleted = t
stm.LatestCompleted = t.Truncate(time.Second).Unix()
}
}
@ -123,15 +134,23 @@ func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, e
latest = cr.Now
}
}
nowTime := time.Unix(now, 0)
nextScheduled := sch.Next(time.Unix(latest, 0))
nextScheduledUnix := nextScheduled.Unix()
if dueAt := nextScheduledUnix + int64(stm.Offset); dueAt > now {
offset := &options.Duration{}
if err := offset.Parse(stm.Offset); err != nil {
return RunCreation{}, err
}
dueAt, err := offset.Add(nextScheduled)
if err != nil {
return RunCreation{}, err
}
if dueAt.After(nowTime) {
// Can't schedule yet.
if len(stm.ManualRuns) > 0 {
return stm.createNextRunFromQueue(now, dueAt, sch, makeID)
return stm.createNextRunFromQueue(now, dueAt.Unix(), sch, makeID)
}
return RunCreation{}, RunNotYetDueError{DueAt: dueAt}
return RunCreation{}, RunNotYetDueError{DueAt: dueAt.Unix()}
}
id, err := makeID()
@ -145,12 +164,16 @@ func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, e
RunID: uint64(id),
})
nextDue, err := offset.Add(sch.Next(nextScheduled))
if err != nil {
return RunCreation{}, err
}
return RunCreation{
Created: QueuedRun{
RunID: id,
Now: nextScheduledUnix,
},
NextDue: sch.Next(nextScheduled).Unix() + int64(stm.Offset),
NextDue: nextDue.Unix(),
HasQueue: len(stm.ManualRuns) > 0,
}, nil
}
@ -229,8 +252,15 @@ func (stm *StoreTaskMeta) NextDueRun() (int64, error) {
latest = cr.Now
}
}
return sch.Next(time.Unix(latest, 0)).Unix() + int64(stm.Offset), nil
offset := &options.Duration{}
if err := offset.Parse(stm.Offset); err != nil {
return 0, err
}
nextDue, err := offset.Add(sch.Next(time.Unix(latest, 0)))
if err != nil {
return 0, err
}
return nextDue.Unix(), nil
}
// ManuallyRunTimeRange requests a manual run covering the approximate range specified by the Unix timestamps start and end.

View File

@ -35,9 +35,9 @@ type StoreTaskMeta struct {
// effective_cron is the effective cron string as reported by the task's options.
EffectiveCron string `protobuf:"bytes,5,opt,name=effective_cron,json=effectiveCron,proto3" json:"effective_cron,omitempty"`
// Task's configured delay, in seconds.
Offset int32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset,omitempty"`
CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
UpdatedAt int64 `protobuf:"varint,8,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"`
Offset string `protobuf:"bytes,6,opt,name=offset,proto3" json:"offset,omitempty"`
CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
UpdatedAt int64 `protobuf:"varint,8,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"`
// The Authorization ID associated with the task.
AuthorizationID uint64 `protobuf:"varint,9,opt,name=authorization_id,json=authorizationId,proto3" json:"authorization_id,omitempty"`
ManualRuns []*StoreTaskMetaManualRun `protobuf:"bytes,16,rep,name=manual_runs,json=manualRuns,proto3" json:"manual_runs,omitempty"`
@ -47,7 +47,7 @@ func (m *StoreTaskMeta) Reset() { *m = StoreTaskMeta{} }
func (m *StoreTaskMeta) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMeta) ProtoMessage() {}
func (*StoreTaskMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_841ef32afee093f0, []int{0}
return fileDescriptor_meta_b8385560be3db2c8, []int{0}
}
func (m *StoreTaskMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -111,11 +111,11 @@ func (m *StoreTaskMeta) GetEffectiveCron() string {
return ""
}
func (m *StoreTaskMeta) GetOffset() int32 {
func (m *StoreTaskMeta) GetOffset() string {
if m != nil {
return m.Offset
}
return 0
return ""
}
func (m *StoreTaskMeta) GetCreatedAt() int64 {
@ -164,7 +164,7 @@ func (m *StoreTaskMetaRun) Reset() { *m = StoreTaskMetaRun{} }
func (m *StoreTaskMetaRun) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMetaRun) ProtoMessage() {}
func (*StoreTaskMetaRun) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_841ef32afee093f0, []int{1}
return fileDescriptor_meta_b8385560be3db2c8, []int{1}
}
func (m *StoreTaskMetaRun) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -254,7 +254,7 @@ func (m *StoreTaskMetaManualRun) Reset() { *m = StoreTaskMetaManualRun{}
func (m *StoreTaskMetaManualRun) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMetaManualRun) ProtoMessage() {}
func (*StoreTaskMetaManualRun) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_841ef32afee093f0, []int{2}
return fileDescriptor_meta_b8385560be3db2c8, []int{2}
}
func (m *StoreTaskMetaManualRun) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -372,10 +372,11 @@ func (m *StoreTaskMeta) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintMeta(dAtA, i, uint64(len(m.EffectiveCron)))
i += copy(dAtA[i:], m.EffectiveCron)
}
if m.Offset != 0 {
dAtA[i] = 0x30
if len(m.Offset) > 0 {
dAtA[i] = 0x32
i++
i = encodeVarintMeta(dAtA, i, uint64(m.Offset))
i = encodeVarintMeta(dAtA, i, uint64(len(m.Offset)))
i += copy(dAtA[i:], m.Offset)
}
if m.CreatedAt != 0 {
dAtA[i] = 0x38
@ -535,8 +536,9 @@ func (m *StoreTaskMeta) Size() (n int) {
if l > 0 {
n += 1 + l + sovMeta(uint64(l))
}
if m.Offset != 0 {
n += 1 + sovMeta(uint64(m.Offset))
l = len(m.Offset)
if l > 0 {
n += 1 + l + sovMeta(uint64(l))
}
if m.CreatedAt != 0 {
n += 1 + sovMeta(uint64(m.CreatedAt))
@ -777,10 +779,10 @@ func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error {
m.EffectiveCron = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 6:
if wireType != 0 {
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType)
}
m.Offset = 0
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMeta
@ -790,11 +792,21 @@ func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
m.Offset |= (int32(b) & 0x7F) << shift
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMeta
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Offset = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 7:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field CreatedAt", wireType)
@ -1318,42 +1330,42 @@ var (
ErrIntOverflowMeta = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_841ef32afee093f0) }
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_b8385560be3db2c8) }
var fileDescriptor_meta_841ef32afee093f0 = []byte{
// 543 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0x41, 0x6f, 0xd3, 0x30,
0x14, 0xc7, 0x1b, 0xd2, 0x74, 0xab, 0x4b, 0xd7, 0x60, 0xa6, 0x29, 0x02, 0x91, 0x66, 0x15, 0x88,
0x72, 0x09, 0x12, 0x48, 0x9c, 0x10, 0x52, 0x57, 0x38, 0xec, 0xb0, 0x8b, 0xc7, 0x09, 0x09, 0x45,
0x5e, 0xe2, 0x94, 0xa8, 0x89, 0x5d, 0x9c, 0x67, 0x68, 0xf9, 0x14, 0x7c, 0x14, 0xae, 0x7c, 0x03,
0x8e, 0x3b, 0x72, 0x9a, 0x50, 0xfb, 0x35, 0x38, 0x20, 0x3b, 0x69, 0xd9, 0x46, 0x0f, 0x68, 0xb7,
0xe7, 0xdf, 0x8b, 0x9f, 0xdf, 0xff, 0xff, 0x5e, 0x10, 0x2a, 0x18, 0xd0, 0x70, 0x26, 0x05, 0x08,
0xfc, 0x30, 0x16, 0x45, 0x98, 0xf1, 0x34, 0x57, 0xf3, 0x84, 0x6a, 0x9a, 0x53, 0x48, 0x85, 0x2c,
0x42, 0xa0, 0xe5, 0x34, 0x3c, 0xa3, 0xf1, 0x94, 0xf1, 0xe4, 0xde, 0xfe, 0x44, 0x4c, 0x84, 0xb9,
0xf0, 0x54, 0x47, 0xd5, 0xdd, 0xc1, 0x6f, 0x1b, 0x75, 0x4f, 0x41, 0x48, 0xf6, 0x96, 0x96, 0xd3,
0x13, 0x06, 0x14, 0x3f, 0x46, 0xbd, 0x82, 0xce, 0xa3, 0x58, 0xf0, 0x58, 0x49, 0xc9, 0x78, 0xbc,
0xf0, 0xac, 0xc0, 0x1a, 0x3a, 0x64, 0xaf, 0xa0, 0xf3, 0xf1, 0x5f, 0x8a, 0x9f, 0x20, 0x37, 0xa7,
0xc0, 0x4a, 0x88, 0x62, 0x51, 0xcc, 0x72, 0x06, 0x2c, 0xf1, 0x6e, 0x05, 0xd6, 0xd0, 0x26, 0xbd,
0x8a, 0x8f, 0xd7, 0x18, 0x1f, 0xa0, 0x56, 0x09, 0x14, 0x54, 0xe9, 0xd9, 0x81, 0x35, 0x6c, 0x93,
0xfa, 0x84, 0x63, 0x74, 0xa7, 0x2a, 0x07, 0xf9, 0x22, 0x92, 0x8a, 0xf3, 0x8c, 0x4f, 0xbc, 0x66,
0x60, 0x0f, 0x3b, 0xcf, 0x5e, 0x84, 0xff, 0xa3, 0x2a, 0xbc, 0xd2, 0x3b, 0x51, 0x9c, 0xb8, 0x9b,
0x82, 0xa4, 0xaa, 0x87, 0x1f, 0xa1, 0x3d, 0x96, 0xa6, 0x2c, 0x86, 0xec, 0x13, 0x8b, 0x62, 0x29,
0xb8, 0xe7, 0x98, 0x26, 0xba, 0x1b, 0x3a, 0x96, 0x82, 0xeb, 0x1e, 0x45, 0x9a, 0x96, 0x0c, 0xbc,
0x96, 0x91, 0x5b, 0x9f, 0xf0, 0x03, 0x84, 0x62, 0xc9, 0x28, 0xb0, 0x24, 0xa2, 0xe0, 0xed, 0x18,
0x81, 0xed, 0x9a, 0x8c, 0x4c, 0x5a, 0xcd, 0x92, 0x75, 0x7a, 0xb7, 0x4a, 0xd7, 0x64, 0x04, 0xf8,
0x15, 0x72, 0xa9, 0x82, 0x0f, 0x42, 0x66, 0x5f, 0x28, 0x64, 0x82, 0x47, 0x59, 0xe2, 0xb5, 0x03,
0x6b, 0xd8, 0x3c, 0xba, 0xbb, 0xbc, 0xe8, 0xf7, 0x46, 0x97, 0x73, 0xc7, 0xaf, 0x49, 0xef, 0xca,
0xc7, 0xc7, 0x09, 0x7e, 0x8f, 0x3a, 0x05, 0xe5, 0x8a, 0xe6, 0xda, 0x9e, 0xd2, 0x73, 0x8d, 0x37,
0x2f, 0x6f, 0xe0, 0xcd, 0x89, 0xa9, 0xa2, 0x1d, 0x42, 0xc5, 0x3a, 0x2c, 0x07, 0xdf, 0x2d, 0xe4,
0x5e, 0xb7, 0x10, 0xbb, 0xc8, 0xe6, 0xe2, 0xb3, 0x99, 0xba, 0x4d, 0x74, 0xa8, 0x09, 0xc8, 0x85,
0x99, 0x6e, 0x97, 0xe8, 0x10, 0x07, 0xa8, 0x25, 0x95, 0x51, 0x63, 0x1b, 0x35, 0xed, 0xe5, 0x45,
0xdf, 0x21, 0x4a, 0x6b, 0x70, 0xa4, 0xd2, 0x9d, 0xf7, 0x51, 0x47, 0x52, 0x3e, 0x61, 0x51, 0x09,
0x54, 0x82, 0xd7, 0x34, 0xd5, 0x90, 0x41, 0xa7, 0x9a, 0xe0, 0xfb, 0xa8, 0x5d, 0x7d, 0xc0, 0x78,
0x62, 0x46, 0x62, 0x93, 0x5d, 0x03, 0xde, 0xf0, 0x04, 0x1f, 0xa2, 0xdb, 0x92, 0x7d, 0x54, 0xac,
0xac, 0x8d, 0x6d, 0x99, 0x7c, 0x67, 0xc3, 0x46, 0x30, 0xf8, 0x66, 0xa1, 0x83, 0xed, 0x12, 0xf1,
0x3e, 0x72, 0xaa, 0x57, 0x2b, 0x0d, 0xd5, 0x41, 0xab, 0xd0, 0x4f, 0x55, 0x3b, 0xaa, 0xc3, 0xad,
0x2b, 0x6c, 0x6f, 0x5f, 0xe1, 0xeb, 0x0d, 0x35, 0xff, 0x69, 0xe8, 0x92, 0x27, 0xce, 0x76, 0x4f,
0x8e, 0x0e, 0x7f, 0x2c, 0x7d, 0xeb, 0x7c, 0xe9, 0x5b, 0xbf, 0x96, 0xbe, 0xf5, 0x75, 0xe5, 0x37,
0xce, 0x57, 0x7e, 0xe3, 0xe7, 0xca, 0x6f, 0xbc, 0xdb, 0xa9, 0x87, 0x76, 0xd6, 0x32, 0xff, 0xe5,
0xf3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x74, 0xb0, 0xab, 0x79, 0xe1, 0x03, 0x00, 0x00,
var fileDescriptor_meta_b8385560be3db2c8 = []byte{
// 544 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xc1, 0x6e, 0xd3, 0x4c,
0x10, 0xae, 0x7f, 0xc7, 0x69, 0xb3, 0xf9, 0xd3, 0x98, 0xa5, 0xaa, 0x2c, 0x10, 0x8e, 0x1b, 0x81,
0x08, 0x17, 0x23, 0x81, 0xc4, 0x09, 0x21, 0xa5, 0x81, 0x43, 0x0f, 0xbd, 0x6c, 0x39, 0x21, 0x21,
0x6b, 0x6b, 0xaf, 0x83, 0x15, 0x7b, 0x37, 0xac, 0x67, 0x21, 0xe1, 0x29, 0x78, 0x14, 0xae, 0xbc,
0x01, 0xc7, 0x1e, 0x39, 0x55, 0x28, 0x79, 0x0d, 0x0e, 0x68, 0x77, 0x93, 0xd0, 0x96, 0x1c, 0x10,
0xb7, 0x99, 0x6f, 0x76, 0x67, 0xbf, 0xef, 0x9b, 0x59, 0x84, 0x2a, 0x06, 0x34, 0x9e, 0x4a, 0x01,
0x02, 0xdf, 0x4f, 0x45, 0x15, 0x17, 0x3c, 0x2f, 0xd5, 0x2c, 0xa3, 0x1a, 0x2d, 0x29, 0xe4, 0x42,
0x56, 0x31, 0xd0, 0x7a, 0x12, 0x9f, 0xd3, 0x74, 0xc2, 0x78, 0x76, 0xe7, 0x60, 0x2c, 0xc6, 0xc2,
0x5c, 0x78, 0xac, 0x23, 0x7b, 0xb7, 0xff, 0xd3, 0x45, 0x9d, 0x33, 0x10, 0x92, 0xbd, 0xa6, 0xf5,
0xe4, 0x94, 0x01, 0xc5, 0x0f, 0x51, 0xb7, 0xa2, 0xb3, 0x24, 0x15, 0x3c, 0x55, 0x52, 0x32, 0x9e,
0xce, 0x03, 0x27, 0x72, 0x06, 0x1e, 0xd9, 0xaf, 0xe8, 0x6c, 0xf4, 0x1b, 0xc5, 0x8f, 0x90, 0x5f,
0x52, 0x60, 0x35, 0x24, 0xa9, 0xa8, 0xa6, 0x25, 0x03, 0x96, 0x05, 0xff, 0x45, 0xce, 0xc0, 0x25,
0x5d, 0x8b, 0x8f, 0xd6, 0x30, 0x3e, 0x44, 0xcd, 0x1a, 0x28, 0xa8, 0x3a, 0x70, 0x23, 0x67, 0xd0,
0x22, 0xab, 0x0c, 0xa7, 0xe8, 0x96, 0x6d, 0x07, 0xe5, 0x3c, 0x91, 0x8a, 0xf3, 0x82, 0x8f, 0x83,
0x46, 0xe4, 0x0e, 0xda, 0x4f, 0x9e, 0xc5, 0x7f, 0xa3, 0x2a, 0xbe, 0xc6, 0x9d, 0x28, 0x4e, 0xfc,
0x4d, 0x43, 0x62, 0xfb, 0xe1, 0x07, 0x68, 0x9f, 0xe5, 0x39, 0x4b, 0xa1, 0xf8, 0xc0, 0x92, 0x54,
0x0a, 0x1e, 0x78, 0x86, 0x44, 0x67, 0x83, 0x8e, 0xa4, 0xe0, 0x9a, 0xa3, 0xc8, 0xf3, 0x9a, 0x41,
0xd0, 0xb4, 0x1c, 0x6d, 0x86, 0xef, 0x21, 0x94, 0x4a, 0x46, 0x81, 0x65, 0x09, 0x85, 0x60, 0xd7,
0x08, 0x6c, 0xad, 0x90, 0xa1, 0x29, 0xab, 0x69, 0xb6, 0x2e, 0xef, 0xd9, 0xf2, 0x0a, 0x19, 0x02,
0x7e, 0x81, 0x7c, 0xaa, 0xe0, 0x9d, 0x90, 0xc5, 0x27, 0x0a, 0x85, 0xe0, 0x49, 0x91, 0x05, 0xad,
0xc8, 0x19, 0x34, 0x8e, 0x6f, 0x2f, 0x2e, 0x7b, 0xdd, 0xe1, 0xd5, 0xda, 0xc9, 0x4b, 0xd2, 0xbd,
0x76, 0xf8, 0x24, 0xc3, 0x6f, 0x51, 0xbb, 0xa2, 0x5c, 0xd1, 0x52, 0xdb, 0x53, 0x07, 0xbe, 0xf1,
0xe6, 0xf9, 0x3f, 0x78, 0x73, 0x6a, 0xba, 0x68, 0x87, 0x50, 0xb5, 0x0e, 0xeb, 0xfe, 0x57, 0x07,
0xf9, 0x37, 0x2d, 0xc4, 0x3e, 0x72, 0xb9, 0xf8, 0x68, 0xa6, 0xee, 0x12, 0x1d, 0x6a, 0x04, 0xe4,
0xdc, 0x4c, 0xb7, 0x43, 0x74, 0x88, 0x23, 0xd4, 0x94, 0xca, 0xa8, 0x71, 0x8d, 0x9a, 0xd6, 0xe2,
0xb2, 0xe7, 0x11, 0xa5, 0x35, 0x78, 0x52, 0x69, 0xe6, 0x3d, 0xd4, 0x96, 0x94, 0x8f, 0x59, 0x52,
0x03, 0x95, 0x10, 0x34, 0x4c, 0x37, 0x64, 0xa0, 0x33, 0x8d, 0xe0, 0xbb, 0xa8, 0x65, 0x0f, 0x30,
0x9e, 0x99, 0x91, 0xb8, 0x64, 0xcf, 0x00, 0xaf, 0x78, 0x86, 0x8f, 0xd0, 0xff, 0x92, 0xbd, 0x57,
0xac, 0x5e, 0x19, 0xdb, 0x34, 0xf5, 0xf6, 0x06, 0x1b, 0x42, 0xff, 0x8b, 0x83, 0x0e, 0xb7, 0x4b,
0xc4, 0x07, 0xc8, 0xb3, 0xaf, 0x5a, 0x0d, 0x36, 0xd1, 0x2a, 0xf4, 0x53, 0x76, 0x47, 0x75, 0xb8,
0x75, 0x85, 0xdd, 0xed, 0x2b, 0x7c, 0x93, 0x50, 0xe3, 0x0f, 0x42, 0x57, 0x3c, 0xf1, 0xb6, 0x7b,
0x72, 0x7c, 0xf4, 0x6d, 0x11, 0x3a, 0x17, 0x8b, 0xd0, 0xf9, 0xb1, 0x08, 0x9d, 0xcf, 0xcb, 0x70,
0xe7, 0x62, 0x19, 0xee, 0x7c, 0x5f, 0x86, 0x3b, 0x6f, 0x76, 0x57, 0x43, 0x3b, 0x6f, 0x9a, 0x7f,
0xf9, 0xf4, 0x57, 0x00, 0x00, 0x00, 0xff, 0xff, 0xca, 0x64, 0xf6, 0x93, 0xe1, 0x03, 0x00, 0x00,
}

View File

@ -25,7 +25,7 @@ message StoreTaskMeta {
string effective_cron = 5;
// Task's configured delay, in seconds.
int32 offset = 6;
string offset = 6;
int64 created_at = 7;
int64 updated_at = 8;

View File

@ -190,7 +190,7 @@ func TestMeta_CreateNextRun_Delay(t *testing.T) {
MaxConcurrency: 2,
Status: "enabled",
EffectiveCron: "* * * * *", // Every minute.
Offset: 5,
Offset: "5s",
LatestCompleted: 30, // Arbitrary non-overlap starting point.
}
@ -219,7 +219,7 @@ func TestMeta_ManuallyRunTimeRange(t *testing.T) {
MaxConcurrency: 2,
Status: "enabled",
EffectiveCron: "* * * * *", // Every minute.
Offset: 5,
Offset: "5s",
LatestCompleted: 30, // Arbitrary non-overlap starting point.
}

View File

@ -12,6 +12,7 @@ import (
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/options"
)
var idGen = snowflake.NewIDGenerator()
@ -522,8 +523,15 @@ from(bucket:"test") |> range(start:-1h)`
if meta.EffectiveCron != "* * * * *" {
t.Fatalf("unexpected cron stored in meta: %q", meta.EffectiveCron)
}
if time.Duration(meta.Offset)*time.Second != 5*time.Second {
duration := options.Duration{}
if err := duration.Parse(meta.Offset); err != nil {
t.Fatal(err)
}
dur, err := duration.DurationFrom(time.Now()) // is time.Now() the best option here
if err != nil {
t.Fatal(err)
}
if dur != 5*time.Second {
t.Fatalf("unexpected delay stored in meta: %v", meta.Offset)
}
@ -683,7 +691,12 @@ from(bucket:"test") |> range(start:-1h)`
t.Fatalf("unexpected cron stored in meta: %q", meta.EffectiveCron)
}
if time.Duration(meta.Offset)*time.Second != 5*time.Second {
duration := options.Duration{}
if err := duration.Parse(meta.Offset); err != nil {
t.Fatal(err)
}
if duration.String() != "5s" {
t.Fatalf("unexpected delay stored in meta: %v", meta.Offset)
}

View File

@ -224,10 +224,10 @@ func ToInfluxTask(t *StoreTask, m *StoreTaskMeta) (*influxdb.Task, error) {
Cron: opts.Cron,
AuthorizationID: influxdb.ID(m.AuthorizationID),
}
if opts.Every != 0 {
if !opts.Every.IsZero() {
pt.Every = opts.Every.String()
}
if opts.Offset != nil && *opts.Offset != 0 {
if opts.Offset != nil && !opts.Offset.IsZero() {
pt.Offset = opts.Offset.String()
}
if m != nil {

View File

@ -8,6 +8,10 @@ import (
"sync"
"time"
"github.com/influxdata/flux/parser"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
@ -40,31 +44,111 @@ type Options struct {
// Every represents a fixed period to repeat execution.
// this can be unmarshaled from json as a string i.e.: "1d" will unmarshal as 1 day
Every time.Duration `json:"every,omitempty"`
Every Duration `json:"every,omitempty"`
// Offset represents a delay before execution.
// this can be unmarshaled from json as a string i.e.: "1d" will unmarshal as 1 day
Offset *time.Duration `json:"offset,omitempty"`
Offset *Duration `json:"offset,omitempty"`
Concurrency *int64 `json:"concurrency,omitempty"`
Retry *int64 `json:"retry,omitempty"`
}
// Duration is a time span that supports the same units as the flux parser's time duration, as well as negative length time spans.
type Duration struct {
Node ast.DurationLiteral
}
func (a Duration) String() string {
return ast.Format(&a.Node)
}
// Parse parses a string into a Duration.
func (a *Duration) Parse(s string) error {
q, err := parseSignedDuration(s)
if err != nil {
return err
}
a.Node = *q
return nil
}
// MustParseDuration parses a string and returns a duration. It panics if there is an error.
func MustParseDuration(s string) (dur *Duration) {
dur = &Duration{}
if err := dur.Parse(s); err != nil {
panic(err)
}
return dur
}
// parseSignedDuration is a helper wrapper around parser.ParseSignedDuration.
// We use it because we need to clear the basenode, but flux does not.
func parseSignedDuration(text string) (*ast.DurationLiteral, error) {
q, err := parser.ParseSignedDuration(text)
if err != nil {
return nil, err
}
q.BaseNode = ast.BaseNode{}
return q, err
}
// UnmarshalText unmarshals text into a Duration.
func (a *Duration) UnmarshalText(text []byte) error {
q, err := parseSignedDuration(string(text))
if err != nil {
return err
}
a.Node = *q
return nil
}
// UnmarshalText marshals text into a Duration.
func (a Duration) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}
// IsZero checks if each segment of the duration is zero, it doesn't check if the Duration sums to zero, just if each internal duration is zero.
func (a *Duration) IsZero() bool {
for i := range a.Node.Values {
if a.Node.Values[i].Magnitude != 0 {
return false
}
}
return true
}
// DurationFrom gives us a time.Duration from a time.
// Currently because of how flux works, this is just an approfimation for any time unit larger than hours.
func (a *Duration) DurationFrom(t time.Time) (time.Duration, error) {
return ast.DurationFrom(&a.Node, t)
}
// Add adds the duration to a time.
func (a *Duration) Add(t time.Time) (time.Time, error) {
d, err := ast.DurationFrom(&a.Node, t)
if err != nil {
return time.Time{}, err
}
return t.Add(d), nil
}
// Clear clears out all options in the options struct, it us useful if you wish to reuse it.
func (o *Options) Clear() {
o.Name = ""
o.Cron = ""
o.Every = 0
o.Every = Duration{}
o.Offset = nil
o.Concurrency = nil
o.Retry = nil
}
// IsZero tells us if the options has been zeroed out.
func (o *Options) IsZero() bool {
return o.Name == "" &&
o.Cron == "" &&
o.Every == 0 &&
o.Every.IsZero() &&
o.Offset == nil &&
o.Concurrency == nil &&
o.Retry == nil
@ -80,6 +164,50 @@ const (
optRetry = "retry"
)
// contains is a helper function to see if an array of strings contains a string
func contains(s []string, e string) bool {
for i := range s {
if s[i] == e {
return true
}
}
return false
}
func grabTaskOptionAST(p *ast.Package, keys ...string) map[string]ast.Expression {
res := make(map[string]ast.Expression, 2) // we preallocate two keys for the map, as that is how many we will use at maximum (offset and every)
for i := range p.Files {
for j := range p.Files[i].Body {
if p.Files[i].Body[j].Type() != "OptionStatement" {
continue
}
opt := (p.Files[i].Body[j]).(*ast.OptionStatement)
if opt.Assignment.Type() != "VariableAssignment" {
continue
}
asmt, ok := opt.Assignment.(*ast.VariableAssignment)
if !ok {
continue
}
if asmt.ID.Key() != "task" {
continue
}
ae, ok := asmt.Init.(*ast.ObjectExpression)
if !ok {
continue
}
for k := range ae.Properties {
prop := ae.Properties[k]
if key := prop.Key.Key(); prop != nil && contains(keys, key) {
res[key] = prop.Value
}
}
return res
}
}
return res
}
// FromScript extracts Options from a Flux script.
func FromScript(script string) (Options, error) {
if optionCache != nil {
@ -93,7 +221,12 @@ func FromScript(script string) (Options, error) {
}
opt := Options{Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)}
_, scope, err := flux.Eval(script)
fluxAST, err := flux.Parse(script)
if err != nil {
return opt, err
}
durTypes := grabTaskOptionAST(fluxAST, optEvery, optOffset)
_, scope, err := flux.EvalAST(fluxAST)
if err != nil {
return opt, err
}
@ -103,6 +236,10 @@ func FromScript(script string) (Options, error) {
if !ok {
return opt, errors.New("missing required option: 'task'")
}
// check to make sure task is an object
if err := checkNature(task.PolyType().Nature(), semantic.Object); err != nil {
return opt, err
}
optObject := task.Object()
if err := validateOptionNames(optObject); err != nil {
return opt, err
@ -138,14 +275,39 @@ func FromScript(script string) (Options, error) {
if err := checkNature(everyVal.PolyType().Nature(), semantic.Duration); err != nil {
return opt, err
}
opt.Every = everyVal.Duration().Duration()
dur, ok := durTypes["every"]
if !ok || dur == nil {
return opt, errors.New("failed to parse `every` in task")
}
durNode, err := parseSignedDuration(dur.Location().Source)
if err != nil {
return opt, err
}
if !ok || durNode == nil {
return opt, errors.New("failed to parse `every` in task")
}
durNode.BaseNode = ast.BaseNode{}
opt.Every.Node = *durNode
}
if offsetVal, ok := optObject.Get(optOffset); ok {
if err := checkNature(offsetVal.PolyType().Nature(), semantic.Duration); err != nil {
return opt, err
}
opt.Offset = pointer.Duration(offsetVal.Duration().Duration())
dur, ok := durTypes["offset"]
if !ok || dur == nil {
return opt, errors.New("failed to parse `offset` in task")
}
durNode, err := parseSignedDuration(dur.Location().Source)
if err != nil {
return opt, err
}
if !ok || durNode == nil {
return opt, errors.New("failed to parse `offset` in task")
}
durNode.BaseNode = ast.BaseNode{}
opt.Offset = &Duration{}
opt.Offset.Node = *durNode
}
if concurrencyVal, ok := optObject.Get(optConcurrency); ok {
@ -177,13 +339,14 @@ func FromScript(script string) (Options, error) {
// Validate returns an error if the options aren't valid.
func (o *Options) Validate() error {
now := time.Now()
var errs []string
if o.Name == "" {
errs = append(errs, "name required")
}
cronPresent := o.Cron != ""
everyPresent := o.Every != 0
everyPresent := !o.Every.IsZero()
if cronPresent == everyPresent {
// They're both present or both missing.
errs = append(errs, "must specify exactly one of either cron or every")
@ -193,16 +356,25 @@ func (o *Options) Validate() error {
errs = append(errs, "cron invalid: "+err.Error())
}
} else if everyPresent {
if o.Every < time.Second {
every, err := o.Every.DurationFrom(now)
if err != nil {
return err
}
if every < time.Second {
errs = append(errs, "every option must be at least 1 second")
} else if o.Every.Truncate(time.Second) != o.Every {
} else if every.Truncate(time.Second) != every {
errs = append(errs, "every option must be expressible as whole seconds")
}
}
if o.Offset != nil && o.Offset.Truncate(time.Second) != *o.Offset {
// For now, allowing negative offset delays. Maybe they're useful for forecasting?
errs = append(errs, "offset option must be expressible as whole seconds")
if o.Offset != nil {
offset, err := o.Offset.DurationFrom(now)
if err != nil {
return err
}
if offset.Truncate(time.Second) != offset {
// For now, allowing negative offset delays. Maybe they're useful for forecasting?
errs = append(errs, "offset option must be expressible as whole seconds")
}
}
if o.Concurrency != nil {
if *o.Concurrency < 1 {
@ -231,11 +403,15 @@ func (o *Options) Validate() error {
// If the every option was specified, it is converted into a cron string using "@every".
// Otherwise, the empty string is returned.
// The value of the offset option is not considered.
// TODO(docmerlin): create an EffectiveCronStringFrom(t time.Time) string,
// that works from a unit of time.
// Do not use this if you haven't checked for validity already.
func (o *Options) EffectiveCronString() string {
if o.Cron != "" {
return o.Cron
}
if o.Every > 0 {
every, _ := o.Every.DurationFrom(time.Now()) // we can ignore errors here because we have alreach checked for validity.
if every > 0 {
return "@every " + o.Every.String()
}
return ""

View File

@ -21,10 +21,10 @@ func scriptGenerator(opt options.Options, body string) string {
if opt.Cron != "" {
taskData = fmt.Sprintf("%s cron: %q,\n", taskData, opt.Cron)
}
if opt.Every != 0 {
if !opt.Every.IsZero() {
taskData = fmt.Sprintf("%s every: %s,\n", taskData, opt.Every.String())
}
if opt.Offset != nil && *opt.Offset != 0 {
if opt.Offset != nil && !(*opt.Offset).IsZero() {
taskData = fmt.Sprintf("%s offset: %s,\n", taskData, opt.Offset.String())
}
if opt.Concurrency != nil && *opt.Concurrency != 0 {
@ -45,20 +45,36 @@ func scriptGenerator(opt options.Options, body string) string {
%s`, taskData, body)
}
func TestNegDurations(t *testing.T) {
dur := options.MustParseDuration("-1m")
d, err := dur.DurationFrom(time.Now())
if err != nil {
t.Fatal(err)
}
if d != -time.Minute {
t.Fatalf("expected duration to be -1m but was %s", d)
}
}
func TestFromScript(t *testing.T) {
for _, c := range []struct {
script string
exp options.Options
shouldErr bool
}{
{script: scriptGenerator(options.Options{Name: "name0", Cron: "* * * * *", Concurrency: pointer.Int64(2), Retry: pointer.Int64(3), Offset: pointer.Duration(-time.Minute)}, ""), exp: options.Options{Name: "name0", Cron: "* * * * *", Concurrency: pointer.Int64(2), Retry: pointer.Int64(3), Offset: pointer.Duration(-time.Minute)}},
{script: scriptGenerator(options.Options{Name: "name1", Every: 5 * time.Second}, ""), exp: options.Options{Name: "name1", Every: 5 * time.Second, Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}},
{script: scriptGenerator(options.Options{Name: "name0", Cron: "* * * * *", Concurrency: pointer.Int64(2), Retry: pointer.Int64(3), Offset: options.MustParseDuration("-1m")}, ""),
exp: options.Options{Name: "name0",
Cron: "* * * * *",
Concurrency: pointer.Int64(2),
Retry: pointer.Int64(3),
Offset: options.MustParseDuration("-1m")}},
{script: scriptGenerator(options.Options{Name: "name1", Every: *(options.MustParseDuration("5s"))}, ""), exp: options.Options{Name: "name1", Every: *(options.MustParseDuration("5s")), Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}},
{script: scriptGenerator(options.Options{Name: "name2", Cron: "* * * * *"}, ""), exp: options.Options{Name: "name2", Cron: "* * * * *", Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}},
{script: scriptGenerator(options.Options{Name: "name3", Every: time.Hour, Cron: "* * * * *"}, ""), shouldErr: true},
{script: scriptGenerator(options.Options{Name: "name4", Concurrency: pointer.Int64(1000), Every: time.Hour}, ""), shouldErr: true},
{script: scriptGenerator(options.Options{Name: "name3", Every: *(options.MustParseDuration("1h")), Cron: "* * * * *"}, ""), shouldErr: true},
{script: scriptGenerator(options.Options{Name: "name4", Concurrency: pointer.Int64(1000), Every: *(options.MustParseDuration("1h"))}, ""), shouldErr: true},
{script: "option task = {\n name: \"name5\",\n concurrency: 0,\n every: 1m0s,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true},
{script: "option task = {\n name: \"name6\",\n concurrency: 1,\n every: 1,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true},
{script: scriptGenerator(options.Options{Name: "name7", Retry: pointer.Int64(20), Every: time.Hour}, ""), shouldErr: true},
{script: scriptGenerator(options.Options{Name: "name7", Retry: pointer.Int64(20), Every: *(options.MustParseDuration("1h"))}, ""), shouldErr: true},
{script: "option task = {\n name: \"name8\",\n retry: 0,\n every: 1m0s,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true},
{script: scriptGenerator(options.Options{Name: "name9"}, ""), shouldErr: true},
{script: scriptGenerator(options.Options{}, ""), shouldErr: true},
@ -125,7 +141,7 @@ func TestValidate(t *testing.T) {
}
*bad = good
bad.Every = time.Minute
bad.Every = *options.MustParseDuration("1m")
if err := bad.Validate(); err == nil {
t.Error("expected error for options with both cron and every")
}
@ -138,13 +154,13 @@ func TestValidate(t *testing.T) {
*bad = good
bad.Cron = ""
bad.Every = -1 * time.Minute
bad.Every = *options.MustParseDuration("-1m")
if err := bad.Validate(); err == nil {
t.Error("expected error for negative every")
}
*bad = good
bad.Offset = pointer.Duration(1500 * time.Millisecond)
bad.Offset = options.MustParseDuration("1500ms")
if err := bad.Validate(); err == nil {
t.Error("expected error for sub-second delay resolution")
}
@ -177,11 +193,11 @@ func TestValidate(t *testing.T) {
func TestEffectiveCronString(t *testing.T) {
for _, c := range []struct {
c string
e time.Duration
e options.Duration
exp string
}{
{c: "10 * * * *", exp: "10 * * * *"},
{e: 10 * time.Second, exp: "@every 10s"},
{e: *(options.MustParseDuration("10s")), exp: "@every 10s"},
{exp: ""},
} {
o := options.Options{Cron: c.c, Every: c.e}
@ -191,3 +207,68 @@ func TestEffectiveCronString(t *testing.T) {
}
}
}
func TestDurationMarshaling(t *testing.T) {
t.Run("unmarshaling", func(t *testing.T) {
now := time.Now()
dur1 := options.Duration{}
if err := dur1.UnmarshalText([]byte("1h10m3s")); err != nil {
t.Fatal(err)
}
d1, err1 := dur1.DurationFrom(now)
if err1 != nil {
t.Fatal(err1)
}
dur2 := options.Duration{}
if err := dur2.Parse("1h10m3s"); err != nil {
t.Fatal(err)
}
d2, err2 := dur2.DurationFrom(now)
if err2 != nil {
t.Fatal(err2)
}
if d1 != d2 || d1 != time.Hour+10*time.Minute+3*time.Second {
t.Fatal("Parse and Marshaling do not give us the same result")
}
})
t.Run("marshaling", func(t *testing.T) {
dur := options.Duration{}
if err := dur.UnmarshalText([]byte("1h10m3s")); err != nil {
t.Fatal(err)
}
if dur.String() != "1h10m3s" {
t.Fatalf("duration string should be \"1h10m3s\" but was %s", dur.String())
}
text, err := dur.MarshalText()
if err != nil {
t.Fatal(err)
}
if string(text) != "1h10m3s" {
t.Fatalf("duration text should be \"1h10m3s\" but was %s", text)
}
})
t.Run("parse zero", func(t *testing.T) {
dur := options.Duration{}
if err := dur.UnmarshalText([]byte("0h0s")); err != nil {
t.Fatal(err)
}
if !dur.IsZero() {
t.Fatalf("expected duration \"0s\" to be zero but was %s", dur.String())
}
})
}
func TestDurationMath(t *testing.T) {
dur := options.MustParseDuration("10s")
d, err := dur.DurationFrom(time.Now())
if err != nil {
t.Fatal(err)
}
if d != 10*time.Second {
t.Fatalf("expected duration to be 10s but it was %s", d)
}
}

View File

@ -176,10 +176,10 @@ func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platf
AuthorizationID: req.AuthorizationID,
}
if opts.Every != 0 {
if !opts.Every.IsZero() {
task.Every = opts.Every.String()
}
if opts.Offset != nil && *opts.Offset != 0 {
if opts.Offset != nil && !(*opts.Offset).IsZero() {
task.Offset = opts.Offset.String()
}
@ -429,10 +429,10 @@ func (p *pAdapter) toPlatformTask(ctx context.Context, t backend.StoreTask, m *b
Flux: t.Script,
Cron: opts.Cron,
}
if opts.Every != 0 {
if !opts.Every.IsZero() {
pt.Every = opts.Every.String()
}
if opts.Offset != nil && *opts.Offset != 0 {
if opts.Offset != nil && !(*opts.Offset).IsZero() {
pt.Offset = opts.Offset.String()
}
if m != nil {

View File

@ -18,7 +18,6 @@ import (
"github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/pkg/pointer"
"github.com/influxdata/influxdb/task"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/options"
@ -276,8 +275,8 @@ func testTaskCRUD(t *testing.T, sys *System) {
// Update task: switch to every.
newStatus = string(backend.TaskActive)
newFlux = "import \"http\"\n\noption task = {\n\tname: \"task-changed #98\",\n\tevery: 30000000000ns,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> http.to(url: \"http://example.com\")"
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Every: 30 * time.Second}})
newFlux = "import \"http\"\n\noption task = {\n\tname: \"task-changed #98\",\n\tevery: 30s,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> http.to(url: \"http://example.com\")"
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Every: *(options.MustParseDuration("30s"))}})
if err != nil {
t.Fatal(err)
}
@ -361,7 +360,7 @@ from(bucket: "b")
expectedFlux := `import "http"
option task = {name: "task-Options-Update", every: 10000000000ns, concurrency: 100}
option task = {name: "task-Options-Update", every: 10s, concurrency: 100}
from(bucket: "b")
|> http.to(url: "http://example.com")`
@ -378,7 +377,7 @@ from(bucket: "b")
if err != nil {
t.Fatal(err)
}
f, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, influxdb.TaskUpdate{Options: options.Options{Offset: pointer.Duration(0), Every: 10 * time.Second}})
f, err := sys.TaskService.UpdateTask(authorizedCtx, task.ID, influxdb.TaskUpdate{Options: options.Options{Offset: &options.Duration{}, Every: *(options.MustParseDuration("10s"))}})
if err != nil {
t.Fatal(err)
}
@ -937,7 +936,7 @@ func testTaskConcurrency(t *testing.T, sys *System) {
// Create a run for the last task we found.
// The script should run every minute, so use max now.
tid := tasks[len(tasks)-1].ID
if _, err := sys.TaskControlService.CreateNextRun(sys.Ctx, tid, math.MaxInt64); err != nil {
if _, err := sys.TaskControlService.CreateNextRun(sys.Ctx, tid, math.MaxInt64>>6); err != nil { // we use the >>6 here because math.MaxInt64 is too large which causes problems when converting back and forth from time
// This may have errored due to the task being deleted. Check if the task still exists.
if _, err2 := sys.TaskService.FindTaskByID(sys.Ctx, tid); err2 == backend.ErrTaskNotFound {

View File

@ -3,11 +3,9 @@ package influxdb_test
import (
"encoding/json"
"testing"
"time"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/pkg/pointer"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/options"
)
@ -18,10 +16,10 @@ func TestOptionsMarshal(t *testing.T) {
if err := json.Unmarshal([]byte(`{"every":"10s", "offset":"1h"}`), tu); err != nil {
t.Fatal(err)
}
if tu.Options.Every != 10*time.Second {
if tu.Options.Every.String() != "10s" {
t.Fatalf("option.every not properly unmarshaled, expected 10s got %s", tu.Options.Every)
}
if *tu.Options.Offset != time.Hour {
if tu.Options.Offset.String() != "1h" {
t.Fatalf("option.every not properly unmarshaled, expected 1h got %s", tu.Options.Offset)
}
@ -38,22 +36,22 @@ func TestOptionsMarshal(t *testing.T) {
func TestOptionsEdit(t *testing.T) {
tu := &platform.TaskUpdate{}
tu.Options.Every = 10 * time.Second
tu.Options.Every = *(options.MustParseDuration("10s"))
if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
t.Fatal(err)
}
t.Run("zeroing", func(t *testing.T) {
if tu.Options.Every != 0 {
t.Errorf("expected Every to be zeroed but it wasn't")
if !tu.Options.Every.IsZero() {
t.Errorf("expected Every to be zeroed but it was not")
}
})
t.Run("fmt string", func(t *testing.T) {
t.Skip("This won't work until the flux formatter formats durations in a nicer way")
expected := `option task = {every: 10s, name: "foo"}
from(bucket:"x")
|> range(start:-1h)`
from(bucket: "x")
|> range(start: -1h)`
if *tu.Flux != expected {
t.Errorf("got the wrong task back, expected %s,\n got %s\n", expected, *tu.Flux)
t.Errorf("got the wrong task back, expected %s,\n got %s\n diff: %s", expected, *tu.Flux, cmp.Diff(expected, *tu.Flux))
}
})
t.Run("replacement", func(t *testing.T) {
@ -61,15 +59,14 @@ func TestOptionsEdit(t *testing.T) {
if err != nil {
t.Error(err)
}
if op.Every != 10*time.Second {
if op.Every.String() != "10s" {
t.Logf("expected every to be 10s but was %s", op.Every)
t.Fail()
}
})
t.Run("add new option", func(t *testing.T) {
tu := &platform.TaskUpdate{}
ofst := 30 * time.Second
tu.Options.Offset = &ofst
tu.Options.Offset = options.MustParseDuration("30s")
if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
t.Fatal(err)
}
@ -77,7 +74,7 @@ func TestOptionsEdit(t *testing.T) {
if err != nil {
t.Error(err)
}
if op.Offset == nil || *op.Offset != 30*time.Second {
if op.Offset == nil || op.Offset.String() != "30s" {
t.Fatalf("expected every to be 30s but was %s", op.Every)
}
})
@ -91,7 +88,7 @@ func TestOptionsEdit(t *testing.T) {
if err != nil {
t.Error(err)
}
if op.Every != 0 {
if !op.Every.IsZero() {
t.Fatalf("expected every to be 0 but was %s", op.Every)
}
if op.Cron != "* * * * *" {
@ -100,7 +97,7 @@ func TestOptionsEdit(t *testing.T) {
})
t.Run("switching from cron to every", func(t *testing.T) {
tu := &platform.TaskUpdate{}
tu.Options.Every = 10 * time.Second
tu.Options.Every = *(options.MustParseDuration("10s"))
if err := tu.UpdateFlux(`option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
t.Fatal(err)
}
@ -108,7 +105,7 @@ func TestOptionsEdit(t *testing.T) {
if err != nil {
t.Error(err)
}
if op.Every != 10*time.Second {
if op.Every.String() != "10s" {
t.Fatalf("expected every to be 10s but was %s", op.Every)
}
if op.Cron != "" {
@ -117,7 +114,7 @@ func TestOptionsEdit(t *testing.T) {
})
t.Run("delete deletable option", func(t *testing.T) {
tu := &platform.TaskUpdate{}
tu.Options.Offset = pointer.Duration(0)
tu.Options.Offset = &options.Duration{}
expscript := `option task = {cron: "* * * * *", name: "foo"}
from(bucket: "x")
@ -129,7 +126,7 @@ from(bucket: "x")
if err != nil {
t.Error(err)
}
if op.Every != 0 {
if !op.Every.IsZero() {
t.Fatalf("expected every to be 0s but was %s", op.Every)
}
if op.Cron != "* * * * *" {

View File

@ -21,7 +21,7 @@ describe('Tasks', () => {
cy.getByTestID('dropdown--item New Task').click()
cy.getByInputName('name').type(taskName)
cy.getByInputName('interval').type('1d')
cy.getByInputName('interval').type('24h')
cy.getByInputName('offset').type('20m')
cy.get<Bucket>('@bucket').then(({name}) => {
@ -103,7 +103,7 @@ describe('Tasks', () => {
cy.getByTestID('dropdown--item New Task').click()
cy.getByInputName('name').type('🦄ask')
cy.getByInputName('interval').type('1d')
cy.getByInputName('interval').type('24h')
cy.getByInputName('offset').type('20m')
cy.getByTestID('flux-editor').within(() => {

View File

@ -61,7 +61,7 @@ export const createTask = (
): Cypress.Chainable<Cypress.Response> => {
const flux = `option task = {
name: "${name}",
every: 1d,
every: 24h,
offset: 20m
}
from(bucket: "defbuck")

View File

@ -17,7 +17,7 @@ const myfavetask: Task = {
authorizationID: '037b084ed9abc000',
every: '24h0m0s',
flux:
'option task = {name: "lala", every: 86400000000000ns, offset: 60000000000ns}\n\nfrom(bucket: "defnuck")\n\t|> range(start: -task.every)',
'option task = {name: "lala", every: 24h0m0s, offset: 1m0s}\n\nfrom(bucket: "defnuck")\n\t|> range(start: -task.every)',
id: '037b0877b359a000',
labels: [
{
@ -194,7 +194,7 @@ describe('resourceToTemplate', () => {
attributes: {
every: '24h0m0s',
flux:
'option task = {name: "lala", every: 86400000000000ns, offset: 60000000000ns}\n\nfrom(bucket: "defnuck")\n\t|> range(start: -task.every)',
'option task = {name: "lala", every: 24h0m0s, offset: 1m0s}\n\nfrom(bucket: "defnuck")\n\t|> range(start: -task.every)',
name: 'lala',
offset: '1m0s',
status: 'active',

View File

@ -30,7 +30,7 @@ describe('tasksReducer', () => {
it('clears the interval property from the task options when cron is selected', () => {
const initialState = defaultState
initialState.taskOptions = {...defaultTaskOptions, interval: '1d'}
initialState.taskOptions = {...defaultTaskOptions, interval: '24h'} // todo(docmerlin): allow for time units larger than 1d, right now h is the longest unit our s
const actual = tasksReducer(
initialState,