feat(pkger): add label -> bucket mappings to pkger

pull/15574/head
Johnny Steenbergen 2019-10-25 19:11:47 -07:00
parent c0849acb9f
commit c7545f8951
8 changed files with 714 additions and 179 deletions

View File

@ -31,12 +31,12 @@ func pkgCmd() *cobra.Command {
orgID := cmd.Flags().String("org-id", "", "The ID of the organization that owns the bucket")
cmd.MarkFlagRequired("org-id")
cmd.RunE = manifestApply(orgID, path)
cmd.RunE = pkgApply(orgID, path)
return cmd
}
func manifestApply(orgID, path *string) func(*cobra.Command, []string) error {
func pkgApply(orgID, path *string) func(*cobra.Command, []string) error {
return func(cmd *cobra.Command, args []string) (e error) {
influxOrgID, err := influxdb.IDFromString(*orgID)
if err != nil {
@ -53,7 +53,7 @@ func manifestApply(orgID, path *string) func(*cobra.Command, []string) error {
return err
}
printManifestSummary(pkg.Summary())
printPkgSummary(pkg.Summary(), false)
ui := &input.UI{
Writer: os.Stdout,
@ -71,35 +71,7 @@ func manifestApply(orgID, path *string) func(*cobra.Command, []string) error {
return err
}
w := internal.NewTabWriter(os.Stdout)
if newLabels := summary.Labels; len(newLabels) > 0 {
w.WriteHeaders(strings.ToUpper("Labels"))
w.WriteHeaders("ID", "Name", "Description", "Color")
for _, l := range newLabels {
w.Write(map[string]interface{}{
"ID": l.ID,
"Name": l.Name,
"Description": l.Properties["description"],
"Color": l.Properties["color"],
})
}
w.WriteHeaders()
}
if newBuckets := summary.Buckets; len(newBuckets) > 0 {
w.WriteHeaders(strings.ToUpper("Buckets"))
w.WriteHeaders("ID", "Name", "Description", "Retention", "Created At")
for _, bucket := range newBuckets {
w.Write(map[string]interface{}{
"ID": bucket.ID.String(),
"Name": bucket.Name,
"Description": bucket.Description,
"Retention": formatDuration(bucket.RetentionPeriod),
})
}
w.WriteHeaders()
}
w.Flush()
printPkgSummary(summary, true)
return nil
}
@ -143,30 +115,53 @@ func pkgFromFile(path string) (*pkger.Pkg, error) {
return pkger.Parse(enc, pkger.FromFile(path))
}
func printManifestSummary(m pkger.Summary) {
func printPkgSummary(m pkger.Summary, withIDs bool) {
headerFn := func(headers ...string) []string {
allHeaders := make([]string, 0, len(headers)+1)
if withIDs {
allHeaders = append(allHeaders, "ID")
}
allHeaders = append(allHeaders, headers...)
return allHeaders
}
w := internal.NewTabWriter(os.Stdout)
if labels := m.Labels; len(labels) > 0 {
w.WriteHeaders(strings.ToUpper("Labels"))
w.WriteHeaders("Name", "Description", "Color")
w.WriteHeaders(headerFn("Name", "Description", "Color")...)
for _, l := range labels {
w.Write(map[string]interface{}{
base := map[string]interface{}{
"Name": l.Name,
"Description": l.Properties["description"],
"Color": l.Properties["color"],
})
}
if withIDs {
base["ID"] = l.ID
}
w.Write(base)
}
w.WriteHeaders()
}
if buckets := m.Buckets; len(buckets) > 0 {
w.WriteHeaders(strings.ToUpper("Buckets"))
w.WriteHeaders("Name", "Retention", "Description")
w.WriteHeaders(headerFn("Name", "Retention", "Description", "Labels")...)
for _, bucket := range buckets {
w.Write(map[string]interface{}{
labels := make([]string, 0, len(bucket.Associations))
for _, l := range bucket.Associations {
labels = append(labels, l.Name)
}
base := map[string]interface{}{
"Name": bucket.Name,
"Retention": formatDuration(bucket.RetentionPeriod),
"Description": bucket.Description,
})
"Labels": labels,
}
if withIDs {
base["ID"] = bucket.ID
}
w.Write(base)
}
w.WriteHeaders()
}

View File

@ -53,6 +53,7 @@ type (
Description string
Name string
RetentionPeriod time.Duration
labels []*label
}
label struct {

View File

@ -132,13 +132,13 @@ type Pkg struct {
// Summary returns a package summary that describes all the resources and
// associations the pkg contains. It is very useful for informing users of
// the changes that will take place when this pkg would be applied.
func (m *Pkg) Summary() Summary {
func (p *Pkg) Summary() Summary {
var sum Summary
type lbl struct {
influxdb.Label
}
for _, l := range m.mLabels {
for _, l := range p.mLabels {
sum.Labels = append(sum.Labels, lbl{
Label: influxdb.Label{
ID: l.ID,
@ -161,8 +161,8 @@ func (m *Pkg) Summary() Summary {
Associations []influxdb.Label
}
for _, b := range m.mBuckets {
sum.Buckets = append(sum.Buckets, bkt{
for _, b := range p.mBuckets {
iBkt := bkt{
Bucket: influxdb.Bucket{
ID: b.ID,
OrgID: b.OrgID,
@ -170,7 +170,19 @@ func (m *Pkg) Summary() Summary {
Description: b.Description,
RetentionPeriod: b.RetentionPeriod,
},
})
}
for _, l := range b.labels {
iBkt.Associations = append(iBkt.Associations, influxdb.Label{
ID: l.ID,
OrgID: l.OrgID,
Name: l.Name,
Properties: map[string]string{
"color": l.Color,
"description": l.Description,
},
})
}
sum.Buckets = append(sum.Buckets, iBkt)
}
sort.Slice(sum.Buckets, func(i, j int) bool {
return sum.Buckets[i].Name < sum.Buckets[j].Name
@ -179,9 +191,9 @@ func (m *Pkg) Summary() Summary {
return sum
}
func (m *Pkg) buckets() []*bucket {
buckets := make([]*bucket, 0, len(m.mBuckets))
for _, b := range m.mBuckets {
func (p *Pkg) buckets() []*bucket {
buckets := make([]*bucket, 0, len(p.mBuckets))
for _, b := range p.mBuckets {
buckets = append(buckets, b)
}
@ -192,9 +204,9 @@ func (m *Pkg) buckets() []*bucket {
return buckets
}
func (m *Pkg) labels() []*label {
labels := make([]*label, 0, len(m.mLabels))
for _, b := range m.mLabels {
func (p *Pkg) labels() []*label {
labels := make([]*label, 0, len(p.mLabels))
for _, b := range p.mLabels {
labels = append(labels, b)
}
@ -205,34 +217,56 @@ func (m *Pkg) labels() []*label {
return labels
}
func (m *Pkg) validMetadata() error {
// labelMappings returns the mappings that will be created for
// valid pairs of labels and resources of which all have IDs.
// If a resource does not exist yet, a label mapping will not
// be returned for it.
func (p *Pkg) labelMappings() []influxdb.LabelMapping {
var mappings []influxdb.LabelMapping
for _, b := range p.buckets() {
for _, l := range b.labels {
if l.ID == influxdb.ID(0) || b.ID == influxdb.ID(0) {
continue
}
mappings = append(mappings, influxdb.LabelMapping{
LabelID: l.ID,
ResourceID: b.ID,
ResourceType: influxdb.BucketsResourceType,
})
}
}
return mappings
}
func (p *Pkg) validMetadata() error {
var failures []*failure
if m.APIVersion != "0.1.0" {
if p.APIVersion != "0.1.0" {
failures = append(failures, &failure{
field: "apiVersion",
msg: "must be version 1.0.0",
Field: "apiVersion",
Msg: "must be version 0.1.0",
})
}
mKind := kind(strings.TrimSpace(strings.ToLower(m.Kind)))
mKind := kind(strings.TrimSpace(strings.ToLower(p.Kind)))
if mKind != kindPackage {
failures = append(failures, &failure{
field: "kind",
msg: `must be of kind "Package"`,
Field: "kind",
Msg: `must be of kind "Package"`,
})
}
if m.Metadata.Version == "" {
if p.Metadata.Version == "" {
failures = append(failures, &failure{
field: "pkgVersion",
msg: "version is required",
Field: "pkgVersion",
Msg: "version is required",
})
}
if m.Metadata.Name == "" {
if p.Metadata.Name == "" {
failures = append(failures, &failure{
field: "pkgName",
msg: "must be at least 1 char",
Field: "pkgName",
Msg: "must be at least 1 char",
})
}
@ -245,12 +279,12 @@ func (m *Pkg) validMetadata() error {
Idx: -1,
}
for _, f := range failures {
res.ValidationFailures = append(res.ValidationFailures, struct {
res.ValidationFails = append(res.ValidationFails, struct {
Field string
Msg string
}{
Field: f.field,
Msg: f.msg,
Field: f.Field,
Msg: f.Msg,
})
}
var err ParseErr
@ -258,10 +292,11 @@ func (m *Pkg) validMetadata() error {
return &err
}
func (m *Pkg) graphResources() error {
func (p *Pkg) graphResources() error {
graphFns := []func() error{
m.graphLabels,
m.graphBuckets,
// labels are first to validate associations with other resources
p.graphLabels,
p.graphBuckets,
}
for _, fn := range graphFns {
@ -270,54 +305,77 @@ func (m *Pkg) graphResources() error {
}
}
//todo: make sure a resource was created....
// TODO: make sure a resource was created....
return nil
}
func (m *Pkg) graphBuckets() error {
m.mBuckets = make(map[string]*bucket)
return m.eachResource(kindBucket, func(r Resource) *failure {
func (p *Pkg) graphBuckets() error {
p.mBuckets = make(map[string]*bucket)
return p.eachResource(kindBucket, func(r Resource) []failure {
if r.Name() == "" {
return &failure{
field: "name",
msg: "must be a string of at least 2 chars in length",
}
return []failure{{
Field: "name",
Msg: "must be a string of at least 2 chars in length",
}}
}
if _, ok := m.mBuckets[r.Name()]; ok {
return &failure{
field: "name",
msg: "duplicate name: " + r.Name(),
}
if _, ok := p.mBuckets[r.Name()]; ok {
return []failure{{
Field: "name",
Msg: "duplicate name: " + r.Name(),
}}
}
m.mBuckets[r.Name()] = &bucket{
bkt := &bucket{
Name: r.Name(),
Description: r.stringShort("description"),
RetentionPeriod: r.duration("retention_period"),
}
return nil
nestedLabels := make(map[string]*label)
var failures []failure
for i, nr := range r.nestedAssociations() {
fail := p.processNestedLabel(i, nr, func(l *label) error {
if _, ok := nestedLabels[l.Name]; ok {
return fmt.Errorf("duplicate nested label: %q", l.Name)
}
nestedLabels[l.Name] = l
bkt.labels = append(bkt.labels, l)
return nil
})
if fail != nil {
failures = append(failures, *fail)
}
}
sort.Slice(bkt.labels, func(i, j int) bool {
return bkt.labels[i].Name < bkt.labels[j].Name
})
p.mBuckets[r.Name()] = bkt
return failures
})
}
func (m *Pkg) graphLabels() error {
m.mLabels = make(map[string]*label)
return m.eachResource(kindLabel, func(r Resource) *failure {
func (p *Pkg) graphLabels() error {
p.mLabels = make(map[string]*label)
return p.eachResource(kindLabel, func(r Resource) []failure {
if r.Name() == "" {
return &failure{
field: "name",
msg: "must be a string of at least 2 chars in length",
}
return []failure{{
Field: "name",
Msg: "must be a string of at least 2 chars in length",
}}
}
if _, ok := m.mLabels[r.Name()]; ok {
return &failure{
field: "name",
msg: "duplicate name: " + r.Name(),
}
if _, ok := p.mLabels[r.Name()]; ok {
return []failure{{
Field: "name",
Msg: "duplicate name: " + r.Name(),
}}
}
m.mLabels[r.Name()] = &label{
p.mLabels[r.Name()] = &label{
Name: r.Name(),
Color: r.stringShort("color"),
Description: r.stringShort("description"),
@ -327,15 +385,15 @@ func (m *Pkg) graphLabels() error {
})
}
func (m *Pkg) eachResource(resourceKind kind, fn func(r Resource) *failure) error {
func (p *Pkg) eachResource(resourceKind kind, fn func(r Resource) []failure) error {
var parseErr ParseErr
for i, r := range m.Spec.Resources {
for i, r := range p.Spec.Resources {
k, err := r.kind()
if err != nil {
parseErr.append(errResource{
Type: k.String(),
Idx: i,
ValidationFailures: []struct {
ValidationFails: []struct {
Field string
Msg string
}{
@ -351,20 +409,26 @@ func (m *Pkg) eachResource(resourceKind kind, fn func(r Resource) *failure) erro
continue
}
if errAt := fn(r); errAt != nil {
parseErr.append(errResource{
if failures := fn(r); failures != nil {
err := errResource{
Type: resourceKind.String(),
Idx: i,
ValidationFailures: []struct {
}
for _, f := range failures {
if f.fromAssociation {
err.AssociationFails = append(err.AssociationFails, struct {
Field string
Msg string
Index int
}{Field: f.Field, Msg: f.Msg, Index: f.Index})
continue
}
err.ValidationFails = append(err.ValidationFails, struct {
Field string
Msg string
}{
{
Field: errAt.field,
Msg: errAt.msg,
},
},
})
}{Field: f.Field, Msg: f.Msg})
}
parseErr.append(err)
}
}
@ -374,6 +438,41 @@ func (m *Pkg) eachResource(resourceKind kind, fn func(r Resource) *failure) erro
return nil
}
func (p *Pkg) processNestedLabel(idx int, nr Resource, fn func(lb *label) error) *failure {
k, err := nr.kind()
if err != nil {
return &failure{
Field: "kind",
Msg: err.Error(),
fromAssociation: true,
Index: idx,
}
}
if k != kindLabel {
return nil
}
lb, found := p.mLabels[nr.Name()]
if !found {
return &failure{
Field: "associations",
Msg: fmt.Sprintf("label %q does not exist in pkg", nr.Name()),
fromAssociation: true,
Index: idx,
}
}
if err := fn(lb); err != nil {
return &failure{
Field: "associations",
Msg: err.Error(),
fromAssociation: true,
Index: idx,
}
}
return nil
}
// Resource is a pkger Resource kind. It can be one of any of
// available kinds that are supported.
type Resource map[string]interface{}
@ -393,11 +492,11 @@ func (r Resource) kind() (kind, error) {
}
func (r Resource) Name() string {
return r.stringShort("name")
return strings.TrimSpace(r.stringShort("name"))
}
func (r Resource) nestedResources() []Resource {
v, ok := r["resources"]
func (r Resource) nestedAssociations() []Resource {
v, ok := r["associations"]
if !ok {
return nil
}
@ -408,8 +507,8 @@ func (r Resource) nestedResources() []Resource {
}
var resources []Resource
for _, res := range ifaces {
newRes, ok := ifaceMapToResource(res)
for _, iface := range ifaces {
newRes, ok := ifaceMapToResource(iface)
if !ok {
continue
}
@ -484,6 +583,15 @@ func (r Resource) slcStr(key string) ([]string, bool) {
}
func ifaceMapToResource(i interface{}) (Resource, bool) {
res, ok := i.(Resource)
if ok {
return res, true
}
if m, ok := i.(map[string]interface{}); ok {
return m, true
}
m, ok := i.(map[interface{}]interface{})
if !ok {
return nil, false
@ -507,12 +615,17 @@ func ifaceMapToResource(i interface{}) (Resource, bool) {
// have multiple validation failures.
type ParseErr struct {
Resources []struct {
Type string
Idx int
ValidationFailures []struct {
Type string
Idx int
ValidationFails []struct {
Field string
Msg string
}
AssociationFails []struct {
Field string
Msg string
Index int
}
}
}
@ -520,17 +633,20 @@ type ParseErr struct {
func (e *ParseErr) Error() string {
var errMsg []string
for _, r := range e.Resources {
resIndex := fmt.Sprintf("%d", r.Idx)
resIndex := strconv.Itoa(r.Idx)
if r.Idx == -1 {
resIndex = "root"
}
err := fmt.Sprintf("resource_index=%s resource_type=%q", r.Type, resIndex)
errMsg = append(errMsg, err)
for _, f := range r.ValidationFailures {
for _, f := range r.ValidationFails {
// for time being we go to new line and indent them (mainly for CLI)
// other callers (i.e. HTTP client) can inspect the resource and print it out
// or we provide a format option of sorts. We'll see
errMsg = append(errMsg, fmt.Sprintf("\tfield=%q reason=%q", f.Field, f.Msg))
errMsg = append(errMsg, fmt.Sprintf("\terr_type=%q field=%q reason=%q", "validation", f.Field, f.Msg))
}
for _, f := range r.AssociationFails {
errMsg = append(errMsg, fmt.Sprintf("\terr_type=%q field=%q association_index=%d reason=%q", "association", f.Field, f.Index, f.Msg))
}
}
@ -552,14 +668,21 @@ func IsParseErr(err error) (*ParseErr, bool) {
}
type errResource struct {
Type string
Idx int
ValidationFailures []struct {
Type string
Idx int
ValidationFails []struct {
Field string
Msg string
}
AssociationFails []struct {
Field string
Msg string
Index int
}
}
type failure struct {
field, msg string
Field, Msg string
fromAssociation bool
Index int
}

View File

@ -9,7 +9,7 @@ import (
)
func TestParse(t *testing.T) {
t.Run("file has all necessary metadata", func(t *testing.T) {
t.Run("pkg has all necessary metadata", func(t *testing.T) {
t.Run("has valid metadata and at least 1 resource", func(t *testing.T) {
testfileRunner(t, "testdata/bucket", nil)
})
@ -127,9 +127,9 @@ spec:
failedResource := pErr.Resources[0]
assert.Equal(t, "Package", failedResource.Type)
require.Len(t, failedResource.ValidationFailures, len(tt.expectedFields))
require.Len(t, failedResource.ValidationFails, len(tt.expectedFields))
for _, f := range failedResource.ValidationFailures {
for _, f := range failedResource.ValidationFails {
containsField(t, tt.expectedFields, f.Field)
}
}
@ -139,7 +139,7 @@ spec:
})
})
t.Run("file with just a bucket", func(t *testing.T) {
t.Run("pkg with just a bucket", func(t *testing.T) {
t.Run("with valid bucket pkg should be valid", func(t *testing.T) {
testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) {
buckets := pkg.buckets()
@ -245,13 +245,13 @@ spec:
require.True(t, ok)
assert.Len(t, pErr.Resources, 1)
fields := pErr.Resources[0].ValidationFailures
fields := pErr.Resources[0].ValidationFails
require.Len(t, fields, 1)
assert.Equal(t, "name", fields[0].Field)
})
})
t.Run("file with just a label", func(t *testing.T) {
t.Run("pkg with just a label", func(t *testing.T) {
t.Run("with valid label pkg should be valid", func(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
labels := pkg.labels()
@ -335,6 +335,156 @@ spec:
}
})
})
t.Run("pkg with buckets and labels associated", func(t *testing.T) {
testfileRunner(t, "testdata/bucket_associates_label", func(t *testing.T, pkg *Pkg) {
sum := pkg.Summary()
require.Len(t, sum.Labels, 2)
bkts := sum.Buckets
require.Len(t, bkts, 3)
expectedLabels := []struct {
bktName string
labels []string
}{
{
bktName: "rucket_1",
labels: []string{"label_1"},
},
{
bktName: "rucket_2",
labels: []string{"label_2"},
},
{
bktName: "rucket_3",
labels: []string{"label_1", "label_2"},
},
}
for i, expected := range expectedLabels {
bkt := bkts[i]
require.Len(t, bkt.Associations, len(expected.labels))
for j, label := range expected.labels {
assert.Equal(t, label, bkt.Associations[j].Name)
}
}
})
t.Run("association doesn't exist then provides an error", func(t *testing.T) {
tests := []struct {
name string
numErrs int
in string
errIdxs []int
}{
{
name: "no labels provided",
numErrs: 1,
errIdxs: []int{0},
in: `apiVersion: 0.1.0
kind: Package
meta:
pkgName: label_pkg
pkgVersion: 1
spec:
resources:
- kind: Bucket
name: buck_1
associations:
- kind: Label
name: label_1
`,
},
{
name: "mixed found and not found",
numErrs: 1,
errIdxs: []int{1},
in: `apiVersion: 0.1.0
kind: Package
meta:
pkgName: label_pkg
pkgVersion: 1
spec:
resources:
- kind: Label
name: label_1
- kind: Bucket
name: buck_1
associations:
- kind: Label
name: label_1
- kind: Label
name: unfound label
`,
},
{
name: "multiple not found",
numErrs: 1,
errIdxs: []int{0, 1},
in: `apiVersion: 0.1.0
kind: Package
meta:
pkgName: label_pkg
pkgVersion: 1
spec:
resources:
- kind: Label
name: label_1
- kind: Bucket
name: buck_1
associations:
- kind: Label
name: not found 1
- kind: Label
name: unfound label
`,
},
{
name: "duplicate valid nested labels",
numErrs: 1,
errIdxs: []int{1},
in: `apiVersion: 0.1.0
kind: Package
meta:
pkgName: label_pkg
pkgVersion: 1
spec:
resources:
- kind: Label
name: label_1
- kind: Bucket
name: buck_1
associations:
- kind: Label
name: label_1
- kind: Label
name: label_1
`,
},
}
for _, tt := range tests {
fn := func(t *testing.T) {
_, err := Parse(EncodingYAML, FromString(tt.in))
pErr, ok := IsParseErr(err)
require.True(t, ok)
require.Len(t, pErr.Resources, tt.numErrs)
assFails := pErr.Resources[0].AssociationFails
require.Len(t, assFails, len(tt.errIdxs))
assert.Equal(t, "associations", assFails[0].Field)
for i, f := range assFails {
assert.Equal(t, tt.errIdxs[i], f.Index)
}
}
t.Run(tt.name, fn)
}
})
})
}
type baseAsserts struct {

View File

@ -2,6 +2,7 @@ package pkger
import (
"context"
"errors"
"fmt"
"strings"
"time"
@ -42,14 +43,44 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S
}
}()
runners := []applier{
s.applyLabels(pkg.labels()),
s.applyBuckets(pkg.buckets()),
runTilEnd := func(appliers ...applier) error {
var errs []string
for _, r := range appliers {
rollbacks = append(rollbacks, r.rollbacker)
if err := r.creater(ctx, orgID); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
// TODO: fix error up to be more actionable
return errors.New(strings.Join(errs, "\n"))
}
return nil
}
for _, r := range runners {
rollbacks = append(rollbacks, r.rollbacker)
if err := r.creater(ctx, orgID); err != nil {
runners := [][]applier{
// each grouping here runs for its entirety, then returns an error that
// is indicative of running all appliers provided. For instance, the labels
// may have 1 label fail and one of the buckets fails. The errors aggregate so
// the caller will be informed of both the failed label and the failed bucket.
// the groupings here allow for steps to occur before exiting. The first step is
// adding the primary resources. Here we get all the errors associated with them.
// If those are all good, then we run the secondary(dependent) resources which
// rely on the primary resources having been created.
{
// primary resources
s.applyLabels(pkg.labels()),
s.applyBuckets(pkg.buckets()),
},
{
// secondary (dependent) resources
s.applyLabelMappings(pkg),
},
}
for _, appliers := range runners {
if err := runTilEnd(appliers...); err != nil {
return Summary{}, err
}
}
@ -57,26 +88,46 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S
return pkg.Summary(), nil
}
type applier struct {
creater creater
rollbacker rollbacker
type (
applier struct {
creater creater
rollbacker rollbacker
}
rollbacker struct {
resource string
fn func() error
}
creater func(ctx context.Context, orgID influxdb.ID) error
)
// TODO: clean up apply errors to inform the user in an actionable way
type applyErrBody struct {
name string
msg string
}
type rollbacker struct {
resource string
fn func() error
}
type applyErrs []applyErrBody
type creater func(ctx context.Context, orgID influxdb.ID) error
func (a applyErrs) toError(resType, msg string) error {
if len(a) == 0 {
return nil
}
errMsg := fmt.Sprintf(`resource_type=%q err=%q`, resType, msg)
for _, e := range a {
errMsg += fmt.Sprintf("\n\tname=%q err_msg=%q", e.name, e.msg)
}
return errors.New(errMsg)
}
func (s *Service) applyBuckets(buckets []*bucket) applier {
const resource = "bucket"
influxBuckets := make([]influxdb.Bucket, 0, len(buckets))
createFn := func(ctx context.Context, orgID influxdb.ID) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var errs applyErrs
for i, b := range buckets {
influxBucket := influxdb.Bucket{
OrgID: orgID,
@ -87,7 +138,11 @@ func (s *Service) applyBuckets(buckets []*bucket) applier {
err := s.bucketSVC.CreateBucket(ctx, &influxBucket)
if err != nil {
return err
errs = append(errs, applyErrBody{
name: b.Name,
msg: err.Error(),
})
continue
}
buckets[i].ID = influxBucket.ID
buckets[i].OrgID = influxBucket.OrgID
@ -95,13 +150,13 @@ func (s *Service) applyBuckets(buckets []*bucket) applier {
influxBuckets = append(influxBuckets, influxBucket)
}
return nil
return errs.toError("bucket", "failed to create bucket")
}
return applier{
creater: createFn,
rollbacker: rollbacker{
resource: resource,
resource: "bucket",
fn: func() error { return s.deleteBuckets(influxBuckets) },
},
}
@ -130,6 +185,7 @@ func (s *Service) applyLabels(labels []*label) applier {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var errs applyErrs
for i, l := range labels {
influxLabel := influxdb.Label{
OrgID: orgID,
@ -141,14 +197,18 @@ func (s *Service) applyLabels(labels []*label) applier {
}
err := s.labelSVC.CreateLabel(ctx, &influxLabel)
if err != nil {
return err
errs = append(errs, applyErrBody{
name: l.Name,
msg: err.Error(),
})
continue
}
labels[i].ID = influxLabel.ID
labels[i].OrgID = influxLabel.OrgID
influxLabels = append(influxLabels, influxLabel)
}
return nil
return errs.toError("label", "failed to create label")
}
return applier{
@ -175,3 +235,48 @@ func (s *Service) deleteLabels(labels []influxdb.Label) error {
return nil
}
func (s *Service) applyLabelMappings(pkg *Pkg) applier {
var mappings []influxdb.LabelMapping
createFn := func(ctx context.Context, orgID influxdb.ID) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
labelMappings := pkg.labelMappings()
for i := range labelMappings {
mapping := labelMappings[i]
err := s.labelSVC.CreateLabelMapping(ctx, &mapping)
if err != nil {
return err
}
mappings = append(mappings, mapping)
}
return nil
}
return applier{
creater: createFn,
rollbacker: rollbacker{
resource: "label_mapping",
fn: func() error { return s.deleteLabelMappings(mappings) },
},
}
}
func (s *Service) deleteLabelMappings(mappings []influxdb.LabelMapping) error {
var errs []string
for i := range mappings {
l := mappings[i]
err := s.labelSVC.DeleteLabelMapping(context.Background(), &l)
if err != nil {
errs = append(errs, fmt.Sprintf("%s:%s", l.LabelID.String(), l.ResourceID.String()))
}
}
if len(errs) > 0 {
return fmt.Errorf(`label_resource_id_pairs=[%s] err="unable to delete label"`, strings.Join(errs, ", "))
}
return nil
}

View File

@ -42,7 +42,7 @@ func TestService(t *testing.T) {
})
})
t.Run("rollsback all created buckets on an error", func(t *testing.T) {
t.Run("rolls back all created buckets on an error", func(t *testing.T) {
testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) {
fakeBucketSVC := mock.NewBucketService()
var c int
@ -79,8 +79,8 @@ func TestService(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
fakeLabelSVC := mock.NewLabelService()
id := 1
fakeLabelSVC.CreateLabelFn = func(_ context.Context, b *influxdb.Label) error {
b.ID = influxdb.ID(id)
fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error {
l.ID = influxdb.ID(id)
id++
return nil
}
@ -107,37 +107,114 @@ func TestService(t *testing.T) {
assert.Equal(t, "#000000", label2.Properties["color"])
assert.Equal(t, "label 2 description", label2.Properties["description"])
})
})
t.Run("rollsback all created labels on an error", func(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
fakeLabelSVC := mock.NewLabelService()
var c int
fakeLabelSVC.CreateLabelFn = func(_ context.Context, b *influxdb.Label) error {
// 4th label will return the error here, and 3 before should be rolled back
if c == 3 {
return errors.New("blowed up ")
}
c++
return nil
t.Run("rolls back all created labels on an error", func(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
fakeLabelSVC := mock.NewLabelService()
var c int
fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error {
// 4th label will return the error here, and 3 before should be rolled back
if c == 3 {
return errors.New("blowed up ")
}
var count int
fakeLabelSVC.DeleteLabelFn = func(_ context.Context, id influxdb.ID) error {
count++
return nil
c++
return nil
}
var count int
fakeLabelSVC.DeleteLabelFn = func(_ context.Context, id influxdb.ID) error {
count++
return nil
}
pkg.mLabels["copy1"] = pkg.mLabels["label_1"]
pkg.mLabels["copy2"] = pkg.mLabels["label_2"]
svc := NewService(zap.NewNop(), nil, fakeLabelSVC)
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
require.Error(t, err)
assert.Equal(t, 3, count)
})
})
})
t.Run("label mapping", func(t *testing.T) {
t.Run("successfully creates pkg of labels", func(t *testing.T) {
testfileRunner(t, "testdata/bucket_associates_label", func(t *testing.T, pkg *Pkg) {
fakeBktSVC := mock.NewBucketService()
id := 1
fakeBktSVC.CreateBucketFn = func(_ context.Context, b *influxdb.Bucket) error {
b.ID = influxdb.ID(id)
id++
return nil
}
fakeLabelSVC := mock.NewLabelService()
id = 1
fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error {
l.ID = influxdb.ID(id)
id++
return nil
}
numLabelMappings := 0
fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, mapping *influxdb.LabelMapping) error {
numLabelMappings++
return nil
}
svc := NewService(zap.NewNop(), fakeBktSVC, fakeLabelSVC)
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
require.NoError(t, err)
assert.Equal(t, 4, numLabelMappings)
})
})
t.Run("rolls back all created resources on an error", func(t *testing.T) {
testfileRunner(t, "testdata/bucket_associates_label", func(t *testing.T, pkg *Pkg) {
var deleteCount struct {
bkts, labels, mappings int
}
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.DeleteBucketFn = func(_ context.Context, _ influxdb.ID) error {
deleteCount.bkts++
return nil
}
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.DeleteLabelFn = func(_ context.Context, id influxdb.ID) error {
deleteCount.labels++
return nil
}
var createdLabelMappings int
fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, _ *influxdb.LabelMapping) error {
if createdLabelMappings == 3 {
return errors.New("error")
}
createdLabelMappings++
return nil
}
fakeLabelSVC.DeleteLabelMappingFn = func(_ context.Context, _ *influxdb.LabelMapping) error {
deleteCount.mappings++
return nil
}
pkg.mLabels["copy1"] = pkg.mLabels["label_1"]
pkg.mLabels["copy2"] = pkg.mLabels["label_2"]
svc := NewService(zap.NewNop(), fakeBktSVC, fakeLabelSVC)
svc := NewService(zap.NewNop(), nil, fakeLabelSVC)
orgID := influxdb.ID(9000)
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
require.Error(t, err)
_, err := svc.Apply(context.TODO(), orgID, pkg)
require.Error(t, err)
assert.Equal(t, 3, count)
})
assert.Equal(t, 3, deleteCount.bkts)
assert.Equal(t, 2, deleteCount.labels)
assert.Equal(t, 3, deleteCount.mappings)
})
})
})

View File

@ -0,0 +1,55 @@
{
"apiVersion": "0.1.0",
"kind": "Package",
"meta": {
"pkgName": "pkg_name",
"pkgVersion": "1",
"description": "pack description"
},
"spec": {
"resources": [
{
"kind": "Label",
"name": "label_1"
},
{
"kind": "Label",
"name": "label_2"
},
{
"kind": "Bucket",
"name": "rucket_1",
"associations": [
{
"kind": "Label",
"name": "label_1"
}
]
},
{
"kind": "Bucket",
"name": "rucket_2",
"associations": [
{
"kind": "Label",
"name": "label_2"
}
]
},
{
"kind": "Bucket",
"name": "rucket_3",
"associations": [
{
"kind": "Label",
"name": "label_1"
},
{
"kind": "Label",
"name": "label_2"
}
]
}
]
}
}

View File

@ -0,0 +1,29 @@
apiVersion: 0.1.0
kind: Package
meta:
pkgName: pkg_name
pkgVersion: 1
description: pack description
spec:
resources:
- kind: Label
name: label_1
- kind: Label
name: label_2
- kind: Bucket
name: rucket_1
associations:
- kind: Label
name: label_1
- kind: Bucket
name: rucket_2
associations:
- kind: Label
name: label_2
- kind: Bucket
name: rucket_3
associations:
- kind: Label
name: label_1
- kind: Label
name: label_2