feat(pkger): move bucket pkger model to reflect http APIs retention rules in palce of retention period
parent
2b4e6283ce
commit
888baa2db6
|
@ -63,7 +63,7 @@ func bucketToResource(bkt influxdb.Bucket, name string) Resource {
|
|||
r[fieldDescription] = bkt.Description
|
||||
}
|
||||
if bkt.RetentionPeriod != 0 {
|
||||
r[fieldBucketRetentionPeriod] = bkt.RetentionPeriod.String()
|
||||
r[fieldBucketRetentionRules] = retentionRules{newRetentionRule(bkt.RetentionPeriod)}
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
@ -351,7 +351,7 @@ func variableToResource(v influxdb.Variable, name string) Resource {
|
|||
case fieldArgTypeQuery:
|
||||
vals, ok := args.Values.(influxdb.VariableQueryValues)
|
||||
if ok {
|
||||
r[fieldVarLanguage] = vals.Language
|
||||
r[fieldLanguage] = vals.Language
|
||||
r[fieldQuery] = vals.Query
|
||||
}
|
||||
}
|
||||
|
|
117
pkger/models.go
117
pkger/models.go
|
@ -3,6 +3,7 @@ package pkger
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -151,7 +152,7 @@ func newDiffBucket(b *bucket, i influxdb.Bucket) DiffBucket {
|
|||
OldDesc: i.Description,
|
||||
NewDesc: b.Description,
|
||||
OldRetention: i.RetentionPeriod,
|
||||
NewRetention: b.RetentionPeriod,
|
||||
NewRetention: b.RetentionRules.RP(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -366,6 +367,7 @@ const (
|
|||
fieldAssociations = "associations"
|
||||
fieldDescription = "description"
|
||||
fieldKind = "kind"
|
||||
fieldLanguage = "language"
|
||||
fieldName = "name"
|
||||
fieldPrefix = "prefix"
|
||||
fieldQuery = "query"
|
||||
|
@ -376,16 +378,16 @@ const (
|
|||
)
|
||||
|
||||
const (
|
||||
fieldBucketRetentionPeriod = "retention_period"
|
||||
fieldBucketRetentionRules = "retentionRules"
|
||||
)
|
||||
|
||||
type bucket struct {
|
||||
id influxdb.ID
|
||||
OrgID influxdb.ID
|
||||
Description string
|
||||
Name string
|
||||
RetentionPeriod time.Duration
|
||||
labels []*label
|
||||
id influxdb.ID
|
||||
OrgID influxdb.ID
|
||||
Description string
|
||||
Name string
|
||||
RetentionRules retentionRules
|
||||
labels []*label
|
||||
|
||||
// existing provides context for a resource that already
|
||||
// exists in the platform. If a resource already exists
|
||||
|
@ -415,17 +417,84 @@ func (b *bucket) summarize() SummaryBucket {
|
|||
OrgID: b.OrgID,
|
||||
Name: b.Name,
|
||||
Description: b.Description,
|
||||
RetentionPeriod: b.RetentionPeriod,
|
||||
RetentionPeriod: b.RetentionRules.RP(),
|
||||
},
|
||||
LabelAssociations: toInfluxLabels(b.labels...),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bucket) valid() []validationErr {
|
||||
return b.RetentionRules.valid()
|
||||
}
|
||||
|
||||
func (b *bucket) shouldApply() bool {
|
||||
return b.existing == nil ||
|
||||
b.Description != b.existing.Description ||
|
||||
b.Name != b.existing.Name ||
|
||||
b.RetentionPeriod != b.existing.RetentionPeriod
|
||||
b.RetentionRules.RP() != b.existing.RetentionPeriod
|
||||
}
|
||||
|
||||
const (
|
||||
retentionRuleTypeExpire = "expire"
|
||||
)
|
||||
|
||||
type retentionRule struct {
|
||||
Type string `json:"type" yaml:"type"`
|
||||
Seconds int `json:"everySeconds" yaml:"everySeconds"`
|
||||
}
|
||||
|
||||
func newRetentionRule(d time.Duration) retentionRule {
|
||||
return retentionRule{
|
||||
Type: retentionRuleTypeExpire,
|
||||
Seconds: int(d.Round(time.Second) / time.Second),
|
||||
}
|
||||
}
|
||||
|
||||
func (r retentionRule) valid() []validationErr {
|
||||
const hour = 3600
|
||||
var ff []validationErr
|
||||
if r.Seconds < hour {
|
||||
ff = append(ff, validationErr{
|
||||
Field: fieldRetentionRulesEverySeconds,
|
||||
Msg: "seconds must be a minimum of " + strconv.Itoa(hour),
|
||||
})
|
||||
}
|
||||
if r.Type != retentionRuleTypeExpire {
|
||||
ff = append(ff, validationErr{
|
||||
Field: fieldType,
|
||||
Msg: `type must be "expire"`,
|
||||
})
|
||||
}
|
||||
return ff
|
||||
}
|
||||
|
||||
const (
|
||||
fieldRetentionRulesEverySeconds = "everySeconds"
|
||||
)
|
||||
|
||||
type retentionRules []retentionRule
|
||||
|
||||
func (r retentionRules) RP() time.Duration {
|
||||
// TODO: this feels very odd to me, will need to follow up with
|
||||
// team to better understand this
|
||||
for _, rule := range r {
|
||||
return time.Duration(rule.Seconds) * time.Second
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r retentionRules) valid() []validationErr {
|
||||
var failures []validationErr
|
||||
for i, rule := range r {
|
||||
if ff := rule.valid(); len(ff) > 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldBucketRetentionRules,
|
||||
Index: intPtr(i),
|
||||
Nested: ff,
|
||||
})
|
||||
}
|
||||
}
|
||||
return failures
|
||||
}
|
||||
|
||||
type assocMapKey struct {
|
||||
|
@ -615,7 +684,6 @@ const (
|
|||
fieldArgTypeConstant = "constant"
|
||||
fieldArgTypeMap = "map"
|
||||
fieldArgTypeQuery = "query"
|
||||
fieldVarLanguage = "language"
|
||||
)
|
||||
|
||||
type variable struct {
|
||||
|
@ -693,27 +761,27 @@ func (v *variable) valid() []validationErr {
|
|||
case "map":
|
||||
if len(v.MapValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: "values",
|
||||
Field: fieldValues,
|
||||
Msg: "map variable must have at least 1 key/val pair",
|
||||
})
|
||||
}
|
||||
case "constant":
|
||||
if len(v.ConstValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: "values",
|
||||
Field: fieldValues,
|
||||
Msg: "constant variable must have a least 1 value provided",
|
||||
})
|
||||
}
|
||||
case "query":
|
||||
if v.Query == "" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: "query",
|
||||
Field: fieldQuery,
|
||||
Msg: "query variable must provide a query string",
|
||||
})
|
||||
}
|
||||
if v.Language != "influxql" && v.Language != "flux" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: "language",
|
||||
Field: fieldLanguage,
|
||||
Msg: fmt.Sprintf(`query variable language must be either "influxql" or "flux"; got %q`, v.Language),
|
||||
})
|
||||
}
|
||||
|
@ -990,7 +1058,7 @@ func validGeometry(geom string) []validationErr {
|
|||
msg = "type provided is not supported"
|
||||
}
|
||||
return []validationErr{{
|
||||
Field: "geom",
|
||||
Field: fieldChartGeom,
|
||||
Msg: fmt.Sprintf("%s: %q", msg, geom),
|
||||
}}
|
||||
}
|
||||
|
@ -1002,14 +1070,14 @@ func (c chart) validBaseProps() []validationErr {
|
|||
var fails []validationErr
|
||||
if c.Width <= 0 {
|
||||
fails = append(fails, validationErr{
|
||||
Field: "width",
|
||||
Field: fieldChartWidth,
|
||||
Msg: "must be greater than 0",
|
||||
})
|
||||
}
|
||||
|
||||
if c.Height <= 0 {
|
||||
fails = append(fails, validationErr{
|
||||
Field: "height",
|
||||
Field: fieldChartHeight,
|
||||
Msg: "must be greater than 0",
|
||||
})
|
||||
}
|
||||
|
@ -1103,12 +1171,12 @@ func (c colors) valid() []validationErr {
|
|||
var fails []validationErr
|
||||
for i, cc := range c {
|
||||
cErr := validationErr{
|
||||
Field: "colors",
|
||||
Field: fieldChartColors,
|
||||
Index: intPtr(i),
|
||||
}
|
||||
if cc.Hex == "" {
|
||||
cErr.Nested = append(cErr.Nested, validationErr{
|
||||
Field: "hex",
|
||||
Field: fieldColorHex,
|
||||
Msg: "a color must have a hex value provided",
|
||||
})
|
||||
}
|
||||
|
@ -1144,19 +1212,19 @@ func (q queries) valid() []validationErr {
|
|||
var fails []validationErr
|
||||
if len(q) == 0 {
|
||||
fails = append(fails, validationErr{
|
||||
Field: "queries",
|
||||
Field: fieldChartQueries,
|
||||
Msg: "at least 1 query must be provided",
|
||||
})
|
||||
}
|
||||
|
||||
for i, qq := range q {
|
||||
qErr := validationErr{
|
||||
Field: "queries",
|
||||
Field: fieldChartQueries,
|
||||
Index: intPtr(i),
|
||||
}
|
||||
if qq.Query == "" {
|
||||
qErr.Nested = append(fails, validationErr{
|
||||
Field: "query",
|
||||
Field: fieldQuery,
|
||||
Msg: "a query must be provided",
|
||||
})
|
||||
}
|
||||
|
@ -1220,7 +1288,7 @@ func (a axes) hasAxes(expectedAxes ...string) []validationErr {
|
|||
for _, expected := range expectedAxes {
|
||||
if !mAxes[expected] {
|
||||
failures = append(failures, validationErr{
|
||||
Field: "axes",
|
||||
Field: fieldChartAxes,
|
||||
Msg: fmt.Sprintf("axis not found: %q", expected),
|
||||
})
|
||||
}
|
||||
|
@ -1230,7 +1298,6 @@ func (a axes) hasAxes(expectedAxes ...string) []validationErr {
|
|||
}
|
||||
|
||||
const (
|
||||
fieldLegendLanguage = "language"
|
||||
fieldLegendOrientation = "orientation"
|
||||
)
|
||||
|
||||
|
|
|
@ -16,18 +16,18 @@ func TestPkg(t *testing.T) {
|
|||
pkg := Pkg{
|
||||
mBuckets: map[string]*bucket{
|
||||
"buck_2": {
|
||||
id: influxdb.ID(2),
|
||||
OrgID: influxdb.ID(100),
|
||||
Description: "desc2",
|
||||
Name: "name2",
|
||||
RetentionPeriod: 2 * time.Hour,
|
||||
id: influxdb.ID(2),
|
||||
OrgID: influxdb.ID(100),
|
||||
Description: "desc2",
|
||||
Name: "name2",
|
||||
RetentionRules: retentionRules{newRetentionRule(2 * time.Hour)},
|
||||
},
|
||||
"buck_1": {
|
||||
id: influxdb.ID(1),
|
||||
OrgID: influxdb.ID(100),
|
||||
Name: "name1",
|
||||
Description: "desc1",
|
||||
RetentionPeriod: time.Hour,
|
||||
id: influxdb.ID(1),
|
||||
OrgID: influxdb.ID(100),
|
||||
Name: "name1",
|
||||
Description: "desc1",
|
||||
RetentionRules: retentionRules{newRetentionRule(time.Hour)},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -424,9 +424,18 @@ func (p *Pkg) graphBuckets() error {
|
|||
}
|
||||
|
||||
bkt := &bucket{
|
||||
Name: r.Name(),
|
||||
Description: r.stringShort(fieldDescription),
|
||||
RetentionPeriod: r.duration(fieldBucketRetentionPeriod),
|
||||
Name: r.Name(),
|
||||
Description: r.stringShort(fieldDescription),
|
||||
}
|
||||
if rules, ok := r[fieldBucketRetentionRules].(retentionRules); ok {
|
||||
bkt.RetentionRules = rules
|
||||
} else {
|
||||
for _, r := range r.slcResource(fieldBucketRetentionRules) {
|
||||
bkt.RetentionRules = append(bkt.RetentionRules, retentionRule{
|
||||
Type: r.stringShort(fieldType),
|
||||
Seconds: r.intShort(fieldRetentionRulesEverySeconds),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
failures := p.parseNestedLabels(r, func(l *label) error {
|
||||
|
@ -440,7 +449,7 @@ func (p *Pkg) graphBuckets() error {
|
|||
|
||||
p.mBuckets[r.Name()] = bkt
|
||||
|
||||
return failures
|
||||
return append(failures, bkt.valid()...)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -535,7 +544,7 @@ func (p *Pkg) graphVariables() error {
|
|||
Description: r.stringShort(fieldDescription),
|
||||
Type: strings.ToLower(r.stringShort(fieldType)),
|
||||
Query: strings.TrimSpace(r.stringShort(fieldQuery)),
|
||||
Language: strings.ToLower(strings.TrimSpace(r.stringShort(fieldLegendLanguage))),
|
||||
Language: strings.ToLower(strings.TrimSpace(r.stringShort(fieldLanguage))),
|
||||
ConstValues: r.slcStr(fieldValues),
|
||||
MapValues: r.mapStrStr(fieldValues),
|
||||
}
|
||||
|
@ -809,11 +818,6 @@ func (r Resource) boolShort(key string) bool {
|
|||
return b
|
||||
}
|
||||
|
||||
func (r Resource) duration(key string) time.Duration {
|
||||
dur, _ := time.ParseDuration(r.stringShort(key))
|
||||
return dur
|
||||
}
|
||||
|
||||
func (r Resource) float64(key string) (float64, bool) {
|
||||
f, ok := r[key].(float64)
|
||||
if ok {
|
||||
|
|
|
@ -117,9 +117,9 @@ spec:
|
|||
|
||||
actual := buckets[0]
|
||||
expectedBucket := bucket{
|
||||
Name: "rucket_11",
|
||||
Description: "bucket 1 description",
|
||||
RetentionPeriod: time.Hour,
|
||||
Name: "rucket_11",
|
||||
Description: "bucket 1 description",
|
||||
RetentionRules: retentionRules{newRetentionRule(time.Hour)},
|
||||
}
|
||||
assert.Equal(t, expectedBucket, *actual)
|
||||
})
|
||||
|
|
|
@ -829,9 +829,10 @@ func (s *Service) rollbackBuckets(buckets []*bucket) error {
|
|||
continue
|
||||
}
|
||||
|
||||
rp := b.RetentionRules.RP()
|
||||
_, err := s.bucketSVC.UpdateBucket(context.Background(), b.ID(), influxdb.BucketUpdate{
|
||||
Description: &b.Description,
|
||||
RetentionPeriod: &b.RetentionPeriod,
|
||||
RetentionPeriod: &rp,
|
||||
})
|
||||
if err != nil {
|
||||
errs = append(errs, b.ID().String())
|
||||
|
@ -847,10 +848,11 @@ func (s *Service) rollbackBuckets(buckets []*bucket) error {
|
|||
}
|
||||
|
||||
func (s *Service) applyBucket(ctx context.Context, b *bucket) (influxdb.Bucket, error) {
|
||||
rp := b.RetentionRules.RP()
|
||||
if b.existing != nil {
|
||||
influxBucket, err := s.bucketSVC.UpdateBucket(ctx, b.ID(), influxdb.BucketUpdate{
|
||||
Description: &b.Description,
|
||||
RetentionPeriod: &b.RetentionPeriod,
|
||||
RetentionPeriod: &rp,
|
||||
})
|
||||
if err != nil {
|
||||
return influxdb.Bucket{}, err
|
||||
|
@ -862,7 +864,7 @@ func (s *Service) applyBucket(ctx context.Context, b *bucket) (influxdb.Bucket,
|
|||
OrgID: b.OrgID,
|
||||
Description: b.Description,
|
||||
Name: b.Name,
|
||||
RetentionPeriod: b.RetentionPeriod,
|
||||
RetentionPeriod: rp,
|
||||
}
|
||||
err := s.bucketSVC.CreateBucket(ctx, &influxBucket)
|
||||
if err != nil {
|
||||
|
|
|
@ -234,7 +234,7 @@ func TestService(t *testing.T) {
|
|||
OrgID: orgID,
|
||||
Name: pkgBkt.Name,
|
||||
Description: pkgBkt.Description,
|
||||
RetentionPeriod: pkgBkt.RetentionPeriod,
|
||||
RetentionPeriod: pkgBkt.RetentionRules.RP(),
|
||||
}
|
||||
|
||||
fakeBktSVC := mock.NewBucketService()
|
||||
|
|
|
@ -11,6 +11,12 @@
|
|||
{
|
||||
"kind": "Bucket",
|
||||
"name": "rucket_11",
|
||||
"retentionRules": [
|
||||
{
|
||||
"type": "expire",
|
||||
"everySeconds": 3600
|
||||
}
|
||||
],
|
||||
"retention_period": "1h",
|
||||
"description": "bucket 1 description"
|
||||
}
|
||||
|
|
|
@ -8,5 +8,7 @@ spec:
|
|||
resources:
|
||||
- kind: Bucket
|
||||
name: rucket_11
|
||||
retention_period: 1h
|
||||
description: bucket 1 description
|
||||
retentionRules:
|
||||
- type: expire
|
||||
everySeconds: 3600
|
||||
|
|
Loading…
Reference in New Issue