feat(pkger): add dry run functionality

does not apply to mappings, will apply mappings no matter what. we need to
remedy the uniqueness of resources from the API side. Applies to labels,
label mappings,
pull/15574/head
Johnny Steenbergen 2019-10-28 15:23:40 -07:00
parent fb5f00e05b
commit a03c497015
7 changed files with 1035 additions and 391 deletions

View File

@ -6,13 +6,15 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/fatih/color"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cmd/influx/internal"
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/pkger"
"github.com/olekukonko/tablewriter"
"github.com/spf13/cobra"
input "github.com/tcnksm/go-input"
"go.uber.org/zap"
@ -53,7 +55,12 @@ func pkgApply(orgID, path *string) func(*cobra.Command, []string) error {
return err
}
printPkgSummary(pkg.Summary(), false)
_, diff, err := svc.DryRun(context.Background(), *influxOrgID, pkg)
if err != nil {
return err
}
printPkgDiff(diff)
ui := &input.UI{
Writer: os.Stdout,
@ -62,7 +69,7 @@ func pkgApply(orgID, path *string) func(*cobra.Command, []string) error {
confirm := getInput(ui, "Confirm application of the above resources (y/n)", "n")
if strings.ToLower(confirm) != "y" {
fmt.Fprintln(os.Stdout, "aborted application of manifest")
fmt.Fprintln(os.Stdout, "aborted application of package")
return nil
}
@ -71,7 +78,7 @@ func pkgApply(orgID, path *string) func(*cobra.Command, []string) error {
return err
}
printPkgSummary(summary, true)
printPkgSummary(summary)
return nil
}
@ -115,57 +122,172 @@ func pkgFromFile(path string) (*pkger.Pkg, error) {
return pkger.Parse(enc, pkger.FromFile(path))
}
func printPkgSummary(m pkger.Summary, withIDs bool) {
headerFn := func(headers ...string) []string {
allHeaders := make([]string, 0, len(headers)+1)
if withIDs {
allHeaders = append(allHeaders, "ID")
func printPkgDiff(diff pkger.Diff) {
red := color.New(color.FgRed).SprintfFunc()
green := color.New(color.FgHiGreen, color.Bold).SprintfFunc()
strDiff := func(isNew bool, old, new string) string {
if isNew {
return green(new)
}
allHeaders = append(allHeaders, headers...)
return allHeaders
if old == new {
return new
}
return fmt.Sprintf("%s\n%s", red("%q", old), green("%q", new))
}
w := internal.NewTabWriter(os.Stdout)
if labels := m.Labels; len(labels) > 0 {
w.WriteHeaders(strings.ToUpper("Labels"))
w.WriteHeaders(headerFn("Name", "Description", "Color")...)
for _, l := range labels {
base := map[string]interface{}{
"Name": l.Name,
"Description": l.Properties["description"],
"Color": l.Properties["color"],
}
if withIDs {
base["ID"] = l.ID
}
w.Write(base)
boolDiff := func(b bool) string {
bb := strconv.FormatBool(b)
if b {
return green(bb)
}
w.WriteHeaders()
return bb
}
if buckets := m.Buckets; len(buckets) > 0 {
w.WriteHeaders(strings.ToUpper("Buckets"))
w.WriteHeaders(headerFn("Name", "Retention", "Description", "Labels")...)
for _, bucket := range buckets {
labels := make([]string, 0, len(bucket.Associations))
for _, l := range bucket.Associations {
labels = append(labels, l.Name)
}
base := map[string]interface{}{
"Name": bucket.Name,
"Retention": formatDuration(bucket.RetentionPeriod),
"Description": bucket.Description,
"Labels": labels,
}
if withIDs {
base["ID"] = bucket.ID
}
w.Write(base)
durDiff := func(isNew bool, oldDur, newDur time.Duration) string {
o := oldDur.String()
if oldDur == 0 {
o = "inf"
}
w.WriteHeaders()
n := newDur.String()
if newDur == 0 {
n = "inf"
}
if isNew {
return green(n)
}
if oldDur == newDur {
return n
}
return fmt.Sprintf("%s\n%s", red(o), green(n))
}
w.Flush()
if len(diff.Labels) > 0 {
headers := []string{"New", "ID", "Name", "Color", "Description"}
tablePrinter("LABELS", headers, len(diff.Labels), func(w *tablewriter.Table) {
for _, l := range diff.Labels {
w.Append([]string{
boolDiff(l.IsNew()),
l.ID.String(),
l.Name,
strDiff(l.IsNew(), l.OldColor, l.NewColor),
strDiff(l.IsNew(), l.OldDesc, l.NewDesc),
})
}
})
}
if len(diff.Buckets) > 0 {
headers := []string{"New", "ID", "Name", "Retention Period", "Description"}
tablePrinter("BUCKETS", headers, len(diff.Buckets), func(w *tablewriter.Table) {
for _, b := range diff.Buckets {
w.Append([]string{
boolDiff(b.IsNew()),
b.ID.String(),
b.Name,
durDiff(b.IsNew(), b.OldRetention, b.NewRetention),
strDiff(b.IsNew(), b.OldDesc, b.NewDesc),
})
}
})
}
if len(diff.LabelMappings) > 0 {
headers := []string{"New", "Resource Type", "Resource Name", "Resource ID", "Label Name", "Label ID"}
tablePrinter("LABEL MAPPINGS", headers, len(diff.LabelMappings), func(w *tablewriter.Table) {
for _, m := range diff.LabelMappings {
w.Append([]string{
boolDiff(m.IsNew),
string(m.ResType),
m.ResName,
m.ResID.String(),
m.LabelName,
m.LabelID.String(),
})
}
})
}
}
func printPkgSummary(sum pkger.Summary) {
if labels := sum.Labels; len(labels) > 0 {
headers := []string{"ID", "Name", "Description", "Color"}
tablePrinter("LABELS", headers, len(labels), func(w *tablewriter.Table) {
for _, l := range labels {
w.Append([]string{
l.ID.String(),
l.Name,
l.Properties["description"],
l.Properties["color"],
})
}
})
}
if buckets := sum.Buckets; len(buckets) > 0 {
headers := []string{"ID", "Name", "Retention", "Description"}
tablePrinter("BUCKETS", headers, len(buckets), func(w *tablewriter.Table) {
for _, bucket := range buckets {
w.Append([]string{
bucket.ID.String(),
bucket.Name,
formatDuration(bucket.RetentionPeriod),
bucket.Description,
})
}
})
}
if mappings := sum.LabelMappings; len(mappings) > 0 {
headers := []string{"Resource Type", "Resource Name", "Resource ID", "Label Name", "Label ID"}
tablePrinter("LABEL MAPPINGS", headers, len(mappings), func(w *tablewriter.Table) {
for _, m := range mappings {
w.Append([]string{
string(m.ResourceType),
m.ResourceName,
m.ResourceID.String(),
m.LabelName,
m.LabelID.String(),
})
}
})
}
}
func tablePrinter(table string, headers []string, count int, appendFn func(w *tablewriter.Table)) {
descrCol := -1
for i, h := range headers {
if strings.ToLower(h) == "description" {
descrCol = i
break
}
}
w := tablewriter.NewWriter(os.Stdout)
w.SetBorder(false)
if descrCol != -1 {
w.SetAutoWrapText(false)
w.SetColMinWidth(descrCol, 30)
}
color.New(color.FgYellow, color.Bold).Fprintln(os.Stdout, strings.ToUpper(table))
w.SetHeader(headers)
var colors []tablewriter.Colors
for i := 0; i < len(headers); i++ {
colors = append(colors, tablewriter.Color(tablewriter.FgHiCyanColor))
}
w.SetHeaderColor(colors...)
appendFn(w)
footers := make([]string, len(headers))
footers[len(footers)-2] = "TOTAL"
footers[len(footers)-1] = strconv.Itoa(count)
w.SetFooter(footers)
w.SetFooterColor(colors...)
w.Render()
fmt.Fprintln(os.Stdout)
}
func formatDuration(d time.Duration) string {

2
go.mod
View File

@ -22,6 +22,7 @@ require (
github.com/docker/docker v1.13.1 // indirect
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190819115812-1474bdeaf2a2
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/fatih/color v1.7.0
github.com/getkin/kin-openapi v0.2.0
github.com/ghodss/yaml v1.0.0
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 // indirect
@ -58,6 +59,7 @@ require (
github.com/nats-io/nats-streaming-server v0.11.2
github.com/nats-io/nkeys v0.0.2 // indirect
github.com/nats-io/nuid v1.0.0 // indirect
github.com/olekukonko/tablewriter v0.0.1
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/opentracing/opentracing-go v1.1.0

2
go.sum
View File

@ -339,6 +339,8 @@ github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7
github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs=
github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/olekukonko/tablewriter v0.0.1 h1:b3iUnf1v+ppJiOfNX4yxxqfWKMQPZR5yoh8urCTFX88=
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=

View File

@ -9,8 +9,8 @@ import (
const (
kindUnknown kind = ""
kindBucket kind = "bucket"
kindLabel kind = "label"
kindDashboard kind = "dashboard"
kindLabel kind = "label"
kindPackage kind = "package"
)
@ -33,6 +33,76 @@ type Metadata struct {
Version string `yaml:"pkgVersion" json:"pkgVersion"`
}
// Diff is the result of a service DryRun call. The diff outlines
// what is new and or updated from the current state of the platform.
type Diff struct {
Buckets []DiffBucket
Labels []DiffLabel
LabelMappings []DiffLabelMapping
}
// DiffBucket is a diff of an individual bucket.
type DiffBucket struct {
ID influxdb.ID
Name string
OldDesc, NewDesc string
OldRetention, NewRetention time.Duration
}
// IsNew indicates whether a pkg bucket is going to be new to the platform.
func (d DiffBucket) IsNew() bool {
return d.ID == influxdb.ID(0)
}
func newDiffBucket(b *bucket, i influxdb.Bucket) DiffBucket {
return DiffBucket{
ID: i.ID,
Name: b.Name,
OldDesc: i.Description,
NewDesc: b.Description,
OldRetention: i.RetentionPeriod,
NewRetention: b.RetentionPeriod,
}
}
// DiffLabel is a diff of an individual label.
type DiffLabel struct {
ID influxdb.ID
Name string
OldColor, NewColor string
OldDesc, NewDesc string
}
// IsNew indicates whether a pkg label is going to be new to the platform.
func (d DiffLabel) IsNew() bool {
return d.ID == influxdb.ID(0)
}
func newDiffLabel(l *label, i influxdb.Label) DiffLabel {
return DiffLabel{
ID: i.ID,
Name: l.Name,
OldColor: i.Properties["color"],
NewColor: l.Color,
OldDesc: i.Properties["description"],
NewDesc: l.Description,
}
}
// DiffLabelMapping is a diff of an individual label mapping. A
// single resource may have multiple mappings to multiple labels.
// A label can have many mappings to other resources.
type DiffLabelMapping struct {
IsNew bool
ResType influxdb.ResourceType
ResID influxdb.ID
ResName string
LabelID influxdb.ID
LabelName string
}
// Summary is a definition of all the resources that have or
// will be created from a pkg.
type Summary struct {
@ -44,23 +114,166 @@ type Summary struct {
Labels []struct {
influxdb.Label
}
LabelMappings []struct {
ResourceName string
LabelName string
influxdb.LabelMapping
}
}
type (
bucket struct {
ID influxdb.ID
OrgID influxdb.ID
Description string
Name string
RetentionPeriod time.Duration
labels []*label
type bucket struct {
ID influxdb.ID
OrgID influxdb.ID
Description string
Name string
RetentionPeriod time.Duration
labels []*label
// exists provides context for a resource that already
// exists in the platform. If a resource already exists(exists=true)
// then the ID should be populated.
existing *influxdb.Bucket
}
func (b *bucket) summarize() struct {
influxdb.Bucket
Associations []influxdb.Label
} {
iBkt := struct {
influxdb.Bucket
Associations []influxdb.Label
}{
Bucket: influxdb.Bucket{
ID: b.ID,
OrgID: b.OrgID,
Name: b.Name,
Description: b.Description,
RetentionPeriod: b.RetentionPeriod,
},
}
for _, l := range b.labels {
iBkt.Associations = append(iBkt.Associations, influxdb.Label{
ID: l.ID,
OrgID: l.OrgID,
Name: l.Name,
Properties: l.properties(),
})
}
return iBkt
}
type labelMapKey struct {
resType influxdb.ResourceType
name string
}
type labelMapVal struct {
exists bool
v interface{}
}
func (l labelMapVal) bucket() (*bucket, bool) {
if l.v == nil {
return nil, false
}
b, ok := l.v.(*bucket)
return b, ok
}
type label struct {
ID influxdb.ID
OrgID influxdb.ID
Name string
Color string
Description string
mappings map[labelMapKey]labelMapVal
// exists provides context for a resource that already
// exists in the platform. If a resource already exists(exists=true)
// then the ID should be populated.
existing *influxdb.Label
}
func (l *label) mappingSummary() []struct {
exists bool
ResourceName string
LabelName string
influxdb.LabelMapping
} {
var mappings []struct {
exists bool
ResourceName string
LabelName string
influxdb.LabelMapping
}
for k, lm := range l.mappings {
mappings = append(mappings, struct {
exists bool
ResourceName string
LabelName string
influxdb.LabelMapping
}{
exists: lm.exists,
ResourceName: k.name,
LabelName: l.Name,
LabelMapping: influxdb.LabelMapping{
LabelID: l.ID,
ResourceID: l.getMappedResourceID(k),
ResourceType: influxdb.BucketsResourceType,
},
})
}
label struct {
ID influxdb.ID
OrgID influxdb.ID
Name string
Color string
Description string
return mappings
}
func (l *label) getMappedResourceID(k labelMapKey) influxdb.ID {
switch k.resType {
case influxdb.BucketsResourceType:
b, ok := l.mappings[k].bucket()
if ok {
return b.ID
}
}
)
return 0
}
func (l *label) setBucketMapping(b *bucket, exists bool) {
if l == nil {
return
}
if l.mappings == nil {
l.mappings = make(map[labelMapKey]labelMapVal)
}
key := labelMapKey{
resType: influxdb.BucketsResourceType,
name: b.Name,
}
l.mappings[key] = labelMapVal{
exists: exists,
v: b,
}
}
func (l *label) summarize() struct {
influxdb.Label
} {
return struct{ influxdb.Label }{
Label: influxdb.Label{
ID: l.ID,
OrgID: l.OrgID,
Name: l.Name,
Properties: l.properties(),
},
}
}
func (l *label) properties() map[string]string {
return map[string]string{
"color": l.Color,
"description": l.Description,
}
}

View File

@ -127,6 +127,8 @@ type Pkg struct {
mLabels map[string]*label
mBuckets map[string]*bucket
isVerified bool
}
// Summary returns a package summary that describes all the resources and
@ -135,59 +137,49 @@ type Pkg struct {
func (p *Pkg) Summary() Summary {
var sum Summary
type lbl struct {
influxdb.Label
}
for _, l := range p.mLabels {
sum.Labels = append(sum.Labels, lbl{
Label: influxdb.Label{
ID: l.ID,
OrgID: l.OrgID,
Name: l.Name,
Properties: map[string]string{
"color": l.Color,
"description": l.Description,
},
},
})
sum.Labels = append(sum.Labels, l.summarize())
}
sort.Slice(sum.Labels, func(i, j int) bool {
return sum.Labels[i].Name < sum.Labels[j].Name
})
type bkt struct {
influxdb.Bucket
Associations []influxdb.Label
}
for _, b := range p.mBuckets {
iBkt := bkt{
Bucket: influxdb.Bucket{
ID: b.ID,
OrgID: b.OrgID,
Name: b.Name,
Description: b.Description,
RetentionPeriod: b.RetentionPeriod,
},
}
for _, l := range b.labels {
iBkt.Associations = append(iBkt.Associations, influxdb.Label{
ID: l.ID,
OrgID: l.OrgID,
Name: l.Name,
Properties: map[string]string{
"color": l.Color,
"description": l.Description,
},
})
}
sum.Buckets = append(sum.Buckets, iBkt)
sum.Buckets = append(sum.Buckets, b.summarize())
}
sort.Slice(sum.Buckets, func(i, j int) bool {
return sum.Buckets[i].Name < sum.Buckets[j].Name
})
for _, m := range p.labelMappings() {
sum.LabelMappings = append(sum.LabelMappings, struct {
ResourceName string
LabelName string
influxdb.LabelMapping
}{
ResourceName: m.ResourceName,
LabelName: m.LabelName,
LabelMapping: m.LabelMapping,
})
}
// sort by res type ASC, then res name ASC, then label name ASC
sort.Slice(sum.LabelMappings, func(i, j int) bool {
n, m := sum.LabelMappings[i], sum.LabelMappings[j]
if n.ResourceType < m.ResourceType {
return true
}
if n.ResourceType > m.ResourceType {
return false
}
if n.ResourceName < m.ResourceName {
return true
}
if n.ResourceName > m.ResourceName {
return false
}
return n.LabelName < m.LabelName
})
return sum
}
@ -221,19 +213,20 @@ func (p *Pkg) labels() []*label {
// valid pairs of labels and resources of which all have IDs.
// If a resource does not exist yet, a label mapping will not
// be returned for it.
func (p *Pkg) labelMappings() []influxdb.LabelMapping {
var mappings []influxdb.LabelMapping
for _, b := range p.buckets() {
for _, l := range b.labels {
if l.ID == influxdb.ID(0) || b.ID == influxdb.ID(0) {
continue
}
mappings = append(mappings, influxdb.LabelMapping{
LabelID: l.ID,
ResourceID: b.ID,
ResourceType: influxdb.BucketsResourceType,
})
}
func (p *Pkg) labelMappings() []struct {
exists bool
ResourceName string
LabelName string
influxdb.LabelMapping
} {
var mappings []struct {
exists bool
ResourceName string
LabelName string
influxdb.LabelMapping
}
for _, l := range p.mLabels {
mappings = append(mappings, l.mappingSummary()...)
}
return mappings
@ -518,44 +511,11 @@ func (r Resource) nestedAssociations() []Resource {
return resources
}
func (r Resource) bool(key string) bool {
b, _ := r[key].(bool)
return b
}
func (r Resource) duration(key string) time.Duration {
dur, _ := time.ParseDuration(r.stringShort(key))
return dur
}
func (r Resource) float64(key string) float64 {
i, ok := r[key].(float64)
if !ok {
return 0
}
return i
}
func (r Resource) intShort(key string) int {
i, _ := r.int(key)
return i
}
func (r Resource) int(key string) (int, bool) {
i, ok := r[key].(int)
if ok {
return i, true
}
s, ok := r[key].(string)
if !ok {
return 0, false
}
i, err := strconv.Atoi(s)
return i, err == nil
}
func (r Resource) string(key string) (string, bool) {
s, ok := r[key].(string)
return s, ok
@ -566,22 +526,6 @@ func (r Resource) stringShort(key string) string {
return s
}
func (r Resource) slcStr(key string) ([]string, bool) {
v, ok := r[key].([]interface{})
if !ok {
return nil, false
}
out := make([]string, 0, len(v))
for _, iface := range v {
s, ok := iface.(string)
if ok {
out = append(out, s)
}
}
return out, true
}
func ifaceMapToResource(i interface{}) (Resource, bool) {
res, ok := i.(Resource)
if ok {

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"time"
@ -11,12 +12,15 @@ import (
"go.uber.org/zap"
)
// Service provides the pkger business logic including all the dependencies to make
// this resource sausage.
type Service struct {
logger *zap.Logger
labelSVC influxdb.LabelService
bucketSVC influxdb.BucketService
}
// NewService is a constructor for a pkger Service.
func NewService(l *zap.Logger, bucketSVC influxdb.BucketService, labelSVC influxdb.LabelService) *Service {
svc := Service{
logger: zap.NewNop(),
@ -30,34 +34,201 @@ func NewService(l *zap.Logger, bucketSVC influxdb.BucketService, labelSVC influx
return &svc
}
func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum Summary, e error) {
var rollbacks []rollbacker
defer func() {
if e == nil {
return
}
for _, r := range rollbacks {
if err := r.fn(); err != nil {
s.logger.Error("failed to delete "+r.resource, zap.Error(err))
}
}
}()
// DryRun provides a dry run of the pkg application. The pkg will be marked verified
// for later calls to Apply. This func will be run on an Apply if it has not been run
// already.
func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summary, Diff, error) {
diffBuckets, err := s.dryRunBuckets(ctx, orgID, pkg)
if err != nil {
return Summary{}, Diff{}, err
}
runTilEnd := func(appliers ...applier) error {
var errs []string
for _, r := range appliers {
rollbacks = append(rollbacks, r.rollbacker)
if err := r.creater(ctx, orgID); err != nil {
errs = append(errs, err.Error())
}
}
diffLabels, err := s.dryRunLabels(ctx, orgID, pkg)
if err != nil {
return Summary{}, Diff{}, err
}
if len(errs) > 0 {
// TODO: fix error up to be more actionable
return errors.New(strings.Join(errs, "\n"))
diffLabelMappings, err := s.dryRunLabelMappings(ctx, pkg)
if err != nil {
return Summary{}, Diff{}, err
}
// verify the pkg is verified by a dry run. when calling apply this
// is required to have been run. if it is not true, then apply runs
// the Dry run again.
pkg.isVerified = true
diff := Diff{
Buckets: diffBuckets,
Labels: diffLabels,
LabelMappings: diffLabelMappings,
}
return pkg.Summary(), diff, nil
}
func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffBucket, error) {
mExistingBkts := make(map[string]DiffBucket)
bkts := pkg.buckets()
for i := range bkts {
b := bkts[i]
existingBkt, err := s.bucketSVC.FindBucketByName(ctx, orgID, b.Name)
switch err {
// TODO: this is useless until the http client provides this functionality, right none of the http
// clients do :sad_face:
// TODO: case for err not found here and another case handle where
// err isn't a not found (some other error)
case nil:
b.existing = existingBkt
b.ID = existingBkt.ID
mExistingBkts[b.Name] = newDiffBucket(b, *existingBkt)
default:
mExistingBkts[b.Name] = newDiffBucket(b, influxdb.Bucket{})
}
}
var diffs []DiffBucket
for _, diff := range mExistingBkts {
diffs = append(diffs, diff)
}
sort.Slice(diffs, func(i, j int) bool {
return diffs[i].Name < diffs[j].Name
})
return diffs, nil
}
func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffLabel, error) {
mExistingLabels := make(map[string]DiffLabel)
labels := pkg.labels()
for i := range labels {
l := labels[i]
existingLabels, err := s.labelSVC.FindLabels(ctx, influxdb.LabelFilter{
Name: l.Name,
OrgID: &orgID,
}, influxdb.FindOptions{Limit: 1})
switch {
// TODO: this is useless until the http client provides this functionality, right none of the http
// clients do :sad_face:
// TODO: case for err not found here and another case handle where
// err isn't a not found (some other error)
case err == nil && len(existingLabels) > 0:
existingLabel := existingLabels[0]
l.existing = existingLabel
l.ID = existingLabel.ID
mExistingLabels[l.Name] = newDiffLabel(l, *existingLabel)
default:
mExistingLabels[l.Name] = newDiffLabel(l, influxdb.Label{})
}
}
diffs := make([]DiffLabel, 0, len(mExistingLabels))
for _, diff := range mExistingLabels {
diffs = append(diffs, diff)
}
sort.Slice(diffs, func(i, j int) bool {
return diffs[i].Name < diffs[j].Name
})
return diffs, nil
}
func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabelMapping, error) {
var diffs []DiffLabelMapping
for _, b := range pkg.buckets() {
err := s.dryRunBucketLabelMapping(ctx, b, func(labelID influxdb.ID, labelName string, isNew bool) {
pkg.mLabels[labelName].setBucketMapping(b, !isNew)
diffs = append(diffs, DiffLabelMapping{
IsNew: isNew,
ResType: influxdb.BucketsResourceType,
ResID: b.ID,
ResName: b.Name,
LabelID: labelID,
LabelName: labelName,
})
})
if err != nil {
return nil, err
}
}
// sort by res type ASC, then res name ASC, then label name ASC
sort.Slice(diffs, func(i, j int) bool {
n, m := diffs[i], diffs[j]
if n.ResType < m.ResType {
return true
}
if n.ResType > m.ResType {
return false
}
if n.ResName < m.ResName {
return true
}
if n.ResName > m.ResName {
return false
}
return n.LabelName < m.LabelName
})
return diffs, nil
}
type labelMappingDiffFn func(labelID influxdb.ID, labelName string, isNew bool)
func (s *Service) dryRunBucketLabelMapping(ctx context.Context, b *bucket, mappingFn labelMappingDiffFn) error {
if b.existing == nil {
for _, l := range b.labels {
mappingFn(l.ID, l.Name, true)
}
return nil
}
// loop through and hit api for all labels associated with a bkt
// lookup labels in pkg, add it to the label mapping, if exists in
// the results from API, mark it exists
labels, err := s.labelSVC.FindResourceLabels(ctx, influxdb.LabelMappingFilter{
ResourceID: b.ID,
ResourceType: influxdb.BucketsResourceType,
})
if err != nil {
// TODO: inspect err, if its a not found error, do nothing, if any other error
// handle it better
return err
}
pkgLabels := labelSlcToMap(b.labels)
for _, l := range labels {
// should ignore any labels that are not specified in pkg
mappingFn(l.ID, l.Name, false)
delete(pkgLabels, l.Name)
}
// now we add labels that were not apart of the existing labels
for _, l := range pkgLabels {
mappingFn(l.ID, l.Name, true)
}
return nil
}
func labelSlcToMap(labels []*label) map[string]*label {
m := make(map[string]*label)
for i := range labels {
m[labels[i].Name] = labels[i]
}
return m
}
// Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied
// in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state
// from before the pkg were applied.
func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum Summary, e error) {
if !pkg.isVerified {
_, _, err := s.DryRun(ctx, orgID, pkg)
if err != nil {
return Summary{}, err
}
}
coordinator := new(rollbackCoordinator)
defer coordinator.rollback(s.logger, &e)
runners := [][]applier{
// each grouping here runs for its entirety, then returns an error that
@ -80,7 +251,8 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S
}
for _, appliers := range runners {
if err := runTilEnd(appliers...); err != nil {
err := coordinator.runTilEnd(ctx, orgID, appliers...)
if err != nil {
return Summary{}, err
}
}
@ -88,6 +260,220 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S
return pkg.Summary(), nil
}
func (s *Service) applyBuckets(buckets []*bucket) applier {
rollbackBuckets := make([]*bucket, 0, len(buckets))
createFn := func(ctx context.Context, orgID influxdb.ID) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var errs applyErrs
for i, b := range buckets {
buckets[i].OrgID = orgID
influxBucket, err := s.applyBucket(ctx, b)
if err != nil {
errs = append(errs, applyErrBody{
name: b.Name,
msg: err.Error(),
})
continue
}
buckets[i].ID = influxBucket.ID
rollbackBuckets = append(rollbackBuckets, buckets[i])
}
return errs.toError("bucket", "failed to create bucket")
}
return applier{
creater: createFn,
rollbacker: rollbacker{
resource: "bucket",
fn: func() error { return s.rollbackBuckets(rollbackBuckets) },
},
}
}
func (s *Service) rollbackBuckets(buckets []*bucket) error {
var errs []string
for _, b := range buckets {
if b.existing == nil {
err := s.bucketSVC.DeleteBucket(context.Background(), b.ID)
if err != nil {
errs = append(errs, b.ID.String())
}
continue
}
_, err := s.bucketSVC.UpdateBucket(context.Background(), b.ID, influxdb.BucketUpdate{
Description: &b.Description,
RetentionPeriod: &b.RetentionPeriod,
})
if err != nil {
errs = append(errs, b.ID.String())
}
}
if len(errs) > 0 {
// TODO: fixup error
return fmt.Errorf(`bucket_ids=[%s] err="unable to delete bucket"`, strings.Join(errs, ", "))
}
return nil
}
func (s *Service) applyBucket(ctx context.Context, b *bucket) (influxdb.Bucket, error) {
if b.existing != nil {
influxBucket, err := s.bucketSVC.UpdateBucket(ctx, b.ID, influxdb.BucketUpdate{
Description: &b.Description,
RetentionPeriod: &b.RetentionPeriod,
})
if err != nil {
return influxdb.Bucket{}, err
}
return *influxBucket, nil
}
influxBucket := influxdb.Bucket{
OrgID: b.OrgID,
Description: b.Description,
Name: b.Name,
RetentionPeriod: b.RetentionPeriod,
}
err := s.bucketSVC.CreateBucket(ctx, &influxBucket)
if err != nil {
return influxdb.Bucket{}, err
}
return influxBucket, nil
}
func (s *Service) applyLabels(labels []*label) applier {
rollBackLabels := make([]*label, 0, len(labels))
createFn := func(ctx context.Context, orgID influxdb.ID) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var errs applyErrs
for i, l := range labels {
labels[i].OrgID = orgID
influxLabel, err := s.applyLabel(ctx, l)
if err != nil {
errs = append(errs, applyErrBody{
name: l.Name,
msg: err.Error(),
})
continue
}
labels[i].ID = influxLabel.ID
rollBackLabels = append(rollBackLabels, labels[i])
}
return errs.toError("label", "failed to create label")
}
return applier{
creater: createFn,
rollbacker: rollbacker{
resource: "label",
fn: func() error { return s.rollbackLabels(rollBackLabels) },
},
}
}
func (s *Service) rollbackLabels(labels []*label) error {
var errs []string
for _, l := range labels {
err := s.labelSVC.DeleteLabel(context.Background(), l.ID)
if err != nil {
errs = append(errs, l.ID.String())
}
}
if len(errs) > 0 {
return fmt.Errorf(`label_ids=[%s] err="unable to delete label"`, strings.Join(errs, ", "))
}
return nil
}
func (s *Service) applyLabel(ctx context.Context, l *label) (influxdb.Label, error) {
if l.existing != nil {
updatedlabel, err := s.labelSVC.UpdateLabel(ctx, l.ID, influxdb.LabelUpdate{
Properties: l.properties(),
})
if err != nil {
return influxdb.Label{}, err
}
return *updatedlabel, nil
}
influxLabel := influxdb.Label{
OrgID: l.OrgID,
Name: l.Name,
Properties: l.properties(),
}
err := s.labelSVC.CreateLabel(ctx, &influxLabel)
if err != nil {
return influxdb.Label{}, err
}
return influxLabel, nil
}
func (s *Service) applyLabelMappings(pkg *Pkg) applier {
var mappings []influxdb.LabelMapping
createFn := func(ctx context.Context, orgID influxdb.ID) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
labelMappings := pkg.labelMappings()
for i := range labelMappings {
mapping := labelMappings[i]
if mapping.exists {
// this block here does 2 things, it does note write a
// mapping when one exists. it also avoids having to worry
// about deleting an existing mapping since it will not be
// passed to the delete function below b/c it is never added
// to the list of mappings that is referenced in the delete
// call.
continue
}
err := s.labelSVC.CreateLabelMapping(ctx, &mapping.LabelMapping)
if err != nil {
return err
}
mappings = append(mappings, mapping.LabelMapping)
}
return nil
}
return applier{
creater: createFn,
rollbacker: rollbacker{
resource: "label_mapping",
fn: func() error { return s.rollbackLabelMappings(mappings) },
},
}
}
func (s *Service) rollbackLabelMappings(mappings []influxdb.LabelMapping) error {
var errs []string
for i := range mappings {
l := mappings[i]
err := s.labelSVC.DeleteLabelMapping(context.Background(), &l)
if err != nil {
errs = append(errs, fmt.Sprintf("%s:%s", l.LabelID.String(), l.ResourceID.String()))
}
}
if len(errs) > 0 {
return fmt.Errorf(`label_resource_id_pairs=[%s] err="unable to delete label"`, strings.Join(errs, ", "))
}
return nil
}
type (
applier struct {
creater creater
@ -102,6 +488,38 @@ type (
creater func(ctx context.Context, orgID influxdb.ID) error
)
type rollbackCoordinator struct {
rollbacks []rollbacker
}
func (r *rollbackCoordinator) runTilEnd(ctx context.Context, orgID influxdb.ID, appliers ...applier) error {
var errs []string
for _, app := range appliers {
r.rollbacks = append(r.rollbacks, app.rollbacker)
if err := app.creater(ctx, orgID); err != nil {
errs = append(errs, fmt.Sprintf("failed %s create: %s", app.rollbacker.resource, err.Error()))
}
}
if len(errs) > 0 {
// TODO: fix error up to be more actionable
return errors.New(strings.Join(errs, "\n"))
}
return nil
}
func (r *rollbackCoordinator) rollback(l *zap.Logger, err *error) {
if *err == nil {
return
}
for _, r := range r.rollbacks {
if err := r.fn(); err != nil {
l.Error("failed to delete "+r.resource, zap.Error(err))
}
}
}
// TODO: clean up apply errors to inform the user in an actionable way
type applyErrBody struct {
name string
@ -120,163 +538,3 @@ func (a applyErrs) toError(resType, msg string) error {
}
return errors.New(errMsg)
}
func (s *Service) applyBuckets(buckets []*bucket) applier {
influxBuckets := make([]influxdb.Bucket, 0, len(buckets))
createFn := func(ctx context.Context, orgID influxdb.ID) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var errs applyErrs
for i, b := range buckets {
influxBucket := influxdb.Bucket{
OrgID: orgID,
Description: b.Description,
Name: b.Name,
RetentionPeriod: b.RetentionPeriod,
}
err := s.bucketSVC.CreateBucket(ctx, &influxBucket)
if err != nil {
errs = append(errs, applyErrBody{
name: b.Name,
msg: err.Error(),
})
continue
}
buckets[i].ID = influxBucket.ID
buckets[i].OrgID = influxBucket.OrgID
influxBuckets = append(influxBuckets, influxBucket)
}
return errs.toError("bucket", "failed to create bucket")
}
return applier{
creater: createFn,
rollbacker: rollbacker{
resource: "bucket",
fn: func() error { return s.deleteBuckets(influxBuckets) },
},
}
}
func (s *Service) deleteBuckets(buckets []influxdb.Bucket) error {
var errs []string
for _, b := range buckets {
err := s.bucketSVC.DeleteBucket(context.Background(), b.ID)
if err != nil {
errs = append(errs, b.ID.String())
}
}
if len(errs) > 0 {
// TODO: fixup error
return fmt.Errorf(`bucket_ids=[%s] err="unable to delete bucket"`, strings.Join(errs, ", "))
}
return nil
}
func (s *Service) applyLabels(labels []*label) applier {
influxLabels := make([]influxdb.Label, 0, len(labels))
createFn := func(ctx context.Context, orgID influxdb.ID) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var errs applyErrs
for i, l := range labels {
influxLabel := influxdb.Label{
OrgID: orgID,
Name: l.Name,
Properties: map[string]string{
"color": l.Color,
"description": l.Description,
},
}
err := s.labelSVC.CreateLabel(ctx, &influxLabel)
if err != nil {
errs = append(errs, applyErrBody{
name: l.Name,
msg: err.Error(),
})
continue
}
labels[i].ID = influxLabel.ID
labels[i].OrgID = influxLabel.OrgID
influxLabels = append(influxLabels, influxLabel)
}
return errs.toError("label", "failed to create label")
}
return applier{
creater: createFn,
rollbacker: rollbacker{
resource: "label",
fn: func() error { return s.deleteLabels(influxLabels) },
},
}
}
func (s *Service) deleteLabels(labels []influxdb.Label) error {
var errs []string
for _, l := range labels {
err := s.labelSVC.DeleteLabel(context.Background(), l.ID)
if err != nil {
errs = append(errs, l.ID.String())
}
}
if len(errs) > 0 {
return fmt.Errorf(`label_ids=[%s] err="unable to delete label"`, strings.Join(errs, ", "))
}
return nil
}
func (s *Service) applyLabelMappings(pkg *Pkg) applier {
var mappings []influxdb.LabelMapping
createFn := func(ctx context.Context, orgID influxdb.ID) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
labelMappings := pkg.labelMappings()
for i := range labelMappings {
mapping := labelMappings[i]
err := s.labelSVC.CreateLabelMapping(ctx, &mapping)
if err != nil {
return err
}
mappings = append(mappings, mapping)
}
return nil
}
return applier{
creater: createFn,
rollbacker: rollbacker{
resource: "label_mapping",
fn: func() error { return s.deleteLabelMappings(mappings) },
},
}
}
func (s *Service) deleteLabelMappings(mappings []influxdb.LabelMapping) error {
var errs []string
for i := range mappings {
l := mappings[i]
err := s.labelSVC.DeleteLabelMapping(context.Background(), &l)
if err != nil {
errs = append(errs, fmt.Sprintf("%s:%s", l.LabelID.String(), l.ResourceID.String()))
}
}
if len(errs) > 0 {
return fmt.Errorf(`label_resource_id_pairs=[%s] err="unable to delete label"`, strings.Join(errs, ", "))
}
return nil
}

View File

@ -14,6 +14,135 @@ import (
)
func TestService(t *testing.T) {
t.Run("DryRun", func(t *testing.T) {
t.Run("buckets", func(t *testing.T) {
t.Run("single bucket updated", func(t *testing.T) {
testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) {
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.FindBucketByNameFn = func(_ context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
return &influxdb.Bucket{
ID: influxdb.ID(1),
OrgID: orgID,
Name: name,
Description: "old desc",
RetentionPeriod: 30 * time.Hour,
}, nil
}
fakeLabelSVC := mock.NewLabelService()
svc := NewService(zap.NewNop(), fakeBktSVC, fakeLabelSVC)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
require.NoError(t, err)
require.Len(t, diff.Buckets, 1)
expected := DiffBucket{
ID: influxdb.ID(1),
Name: "rucket_11",
OldDesc: "old desc",
NewDesc: "bucket 1 description",
OldRetention: 30 * time.Hour,
NewRetention: time.Hour,
}
assert.Equal(t, expected, diff.Buckets[0])
})
})
t.Run("single bucket new", func(t *testing.T) {
testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) {
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.FindBucketByNameFn = func(_ context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
return nil, errors.New("not found")
}
fakeLabelSVC := mock.NewLabelService()
svc := NewService(zap.NewNop(), fakeBktSVC, fakeLabelSVC)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
require.NoError(t, err)
require.Len(t, diff.Buckets, 1)
expected := DiffBucket{
Name: "rucket_11",
NewDesc: "bucket 1 description",
NewRetention: time.Hour,
}
assert.Equal(t, expected, diff.Buckets[0])
})
})
})
t.Run("labels", func(t *testing.T) {
t.Run("two labels updated", func(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
fakeBktSVC := mock.NewBucketService()
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.FindLabelsFn = func(_ context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) {
return []*influxdb.Label{
{
ID: influxdb.ID(1),
Name: filter.Name,
Properties: map[string]string{
"color": "old color",
"description": "old description",
},
},
}, nil
}
svc := NewService(zap.NewNop(), fakeBktSVC, fakeLabelSVC)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
require.NoError(t, err)
require.Len(t, diff.Labels, 2)
expected := DiffLabel{
ID: influxdb.ID(1),
Name: "label_1",
OldColor: "old color",
NewColor: "#FFFFFF",
OldDesc: "old description",
NewDesc: "label 1 description",
}
assert.Equal(t, expected, diff.Labels[0])
expected.Name = "label_2"
expected.NewColor = "#000000"
expected.NewDesc = "label 2 description"
assert.Equal(t, expected, diff.Labels[1])
})
})
t.Run("two labels created", func(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
fakeBktSVC := mock.NewBucketService()
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.FindLabelsFn = func(_ context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) {
return nil, errors.New("no labels found")
}
svc := NewService(zap.NewNop(), fakeBktSVC, fakeLabelSVC)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
require.NoError(t, err)
require.Len(t, diff.Labels, 2)
expected := DiffLabel{
Name: "label_1",
NewColor: "#FFFFFF",
NewDesc: "label 1 description",
}
assert.Equal(t, expected, diff.Labels[0])
expected.Name = "label_2"
expected.NewColor = "#000000"
expected.NewDesc = "label 2 description"
assert.Equal(t, expected, diff.Labels[1])
})
})
})
})
t.Run("Apply", func(t *testing.T) {
t.Run("buckets", func(t *testing.T) {
t.Run("successfully creates pkg of buckets", func(t *testing.T) {
@ -23,6 +152,13 @@ func TestService(t *testing.T) {
b.ID = influxdb.ID(b.RetentionPeriod)
return nil
}
fakeBucketSVC.FindBucketByNameFn = func(_ context.Context, id influxdb.ID, s string) (*influxdb.Bucket, error) {
// forces the bucket to be created a new
return nil, errors.New("an error")
}
fakeBucketSVC.UpdateBucketFn = func(_ context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
return &influxdb.Bucket{ID: id}, nil
}
svc := NewService(zap.NewNop(), fakeBucketSVC, nil)
@ -44,6 +180,10 @@ func TestService(t *testing.T) {
t.Run("rolls back all created buckets on an error", func(t *testing.T) {
testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) {
fakeBucketSVC := mock.NewBucketService()
fakeBucketSVC.FindBucketByNameFn = func(_ context.Context, id influxdb.ID, s string) (*influxdb.Bucket, error) {
// forces the bucket to be created a new
return nil, errors.New("an error")
}
var c int
fakeBucketSVC.CreateBucketFn = func(_ context.Context, b *influxdb.Bucket) error {
if c == 2 {
@ -151,6 +291,10 @@ func TestService(t *testing.T) {
id++
return nil
}
fakeBktSVC.FindBucketByNameFn = func(_ context.Context, id influxdb.ID, s string) (*influxdb.Bucket, error) {
// forces the bucket to be created a new
return nil, errors.New("an error")
}
fakeLabelSVC := mock.NewLabelService()
id = 1
@ -175,47 +319,6 @@ func TestService(t *testing.T) {
assert.Equal(t, 4, numLabelMappings)
})
})
t.Run("rolls back all created resources on an error", func(t *testing.T) {
testfileRunner(t, "testdata/bucket_associates_label", func(t *testing.T, pkg *Pkg) {
var deleteCount struct {
bkts, labels, mappings int
}
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.DeleteBucketFn = func(_ context.Context, _ influxdb.ID) error {
deleteCount.bkts++
return nil
}
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.DeleteLabelFn = func(_ context.Context, id influxdb.ID) error {
deleteCount.labels++
return nil
}
var createdLabelMappings int
fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, _ *influxdb.LabelMapping) error {
if createdLabelMappings == 3 {
return errors.New("error")
}
createdLabelMappings++
return nil
}
fakeLabelSVC.DeleteLabelMappingFn = func(_ context.Context, _ *influxdb.LabelMapping) error {
deleteCount.mappings++
return nil
}
svc := NewService(zap.NewNop(), fakeBktSVC, fakeLabelSVC)
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
require.Error(t, err)
assert.Equal(t, 3, deleteCount.bkts)
assert.Equal(t, 2, deleteCount.labels)
assert.Equal(t, 3, deleteCount.mappings)
})
})
})
})
}