chore(pkger): utilize http client for integration tests for pkger

pull/16222/head
Johnny Steenbergen 2019-12-12 11:09:32 -08:00 committed by Johnny Steenbergen
parent 86a359f34b
commit 39e89eafdf
15 changed files with 556 additions and 330 deletions

View File

@ -47,11 +47,6 @@ func newHTTPClient() (*httpc.Client, error) {
return httpClient, nil
}
type httpClientOpts struct {
token, addr string
skipVerify bool
}
type genericCLIOptfn func(*genericCLIOpts)
type genericCLIOpts struct {
@ -140,14 +135,6 @@ type Flags struct {
skipVerify bool
}
func (f Flags) httpClientOpts() httpClientOpts {
return httpClientOpts{
addr: f.host,
token: f.token,
skipVerify: f.skipVerify,
}
}
var flags Flags
func defaultTokenPath() (string, string, error) {

View File

@ -27,7 +27,7 @@ import (
"gopkg.in/yaml.v3"
)
type pkgSVCsFn func(cliReq httpClientOpts, opts ...pkger.ServiceSetterFn) (pkger.SVC, influxdb.OrganizationService, error)
type pkgSVCsFn func() (pkger.SVC, influxdb.OrganizationService, error)
func cmdPkg(svcFn pkgSVCsFn, opts ...genericCLIOptfn) *cobra.Command {
return newCmdPkgBuilder(svcFn, opts...).cmdPkg()
@ -38,7 +38,6 @@ type cmdPkgBuilder struct {
svcFn pkgSVCsFn
applyReqLimit int
file string
hasColor bool
hasTableBorders bool
@ -93,7 +92,6 @@ func (b *cmdPkgBuilder) cmdPkgApply() *cobra.Command {
cmd.Flags().StringVarP(&b.file, "file", "f", "", "Path to package file")
cmd.MarkFlagFilename("file", "yaml", "yml", "json")
cmd.Flags().BoolVarP(&b.quiet, "quiet", "q", false, "disable output printing")
cmd.Flags().IntVarP(&b.applyReqLimit, "req-limit", "r", 0, "Request limit for applying a pkg, defaults to 5(recommended for OSS).")
cmd.Flags().StringVar(&b.applyOpts.force, "force", "", `TTY input, if package will have destructive changes, proceed if set "true".`)
cmd.Flags().StringVarP(&b.orgID, "org-id", "", "", "The ID of the organization that owns the bucket")
@ -123,7 +121,7 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn() func(*cobra.Command, []string) error {
}
color.NoColor = !b.hasColor
svc, orgSVC, err := b.svcFn(flags.httpClientOpts(), pkger.WithApplyReqLimit(b.applyReqLimit))
svc, orgSVC, err := b.svcFn()
if err != nil {
return err
}
@ -138,7 +136,7 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn() func(*cobra.Command, []string) error {
return err
}
_, diff, err := svc.DryRun(context.Background(), influxOrgID, pkg)
_, diff, err := svc.DryRun(context.Background(), influxOrgID, 0, pkg)
if err != nil {
return err
}
@ -165,7 +163,7 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn() func(*cobra.Command, []string) error {
return errors.New("package has conflicts with existing resources and cannot safely apply")
}
summary, err := svc.Apply(context.Background(), influxOrgID, pkg)
summary, err := svc.Apply(context.Background(), influxOrgID, 0, pkg)
if err != nil {
return err
}
@ -212,7 +210,7 @@ func (b *cmdPkgBuilder) pkgNewRunEFn() func(*cobra.Command, []string) error {
}
}
pkgSVC, _, err := b.svcFn(flags.httpClientOpts())
pkgSVC, _, err := b.svcFn()
if err != nil {
return err
}
@ -244,7 +242,7 @@ func (b *cmdPkgBuilder) cmdPkgExport() *cobra.Command {
func (b *cmdPkgBuilder) pkgExportRunEFn() func(*cobra.Command, []string) error {
return func(cmd *cobra.Command, args []string) error {
pkgSVC, _, err := b.svcFn(flags.httpClientOpts())
pkgSVC, _, err := b.svcFn()
if err != nil {
return err
}
@ -312,11 +310,7 @@ func (b *cmdPkgBuilder) cmdPkgExportAll() *cobra.Command {
func (b *cmdPkgBuilder) pkgExportAllRunEFn() func(*cobra.Command, []string) error {
return func(cmd *cobra.Command, args []string) error {
if err := b.validOrgFlags(); err != nil {
return err
}
pkgSVC, orgSVC, err := b.svcFn(flags.httpClientOpts())
pkgSVC, orgSVC, err := b.svcFn()
if err != nil {
return err
}
@ -503,7 +497,7 @@ func createPkgBuf(pkg *pkger.Pkg, outPath string) (*bytes.Buffer, error) {
return &buf, nil
}
func newPkgerSVC(cliReqOpts httpClientOpts, opts ...pkger.ServiceSetterFn) (pkger.SVC, influxdb.OrganizationService, error) {
func newPkgerSVC() (pkger.SVC, influxdb.OrganizationService, error) {
httpClient, err := newHTTPClient()
if err != nil {
return nil, nil, err
@ -513,16 +507,7 @@ func newPkgerSVC(cliReqOpts httpClientOpts, opts ...pkger.ServiceSetterFn) (pkge
Client: httpClient,
}
return pkger.NewService(
append(opts,
pkger.WithBucketSVC(&ihttp.BucketService{Client: httpClient}),
pkger.WithDashboardSVC(&ihttp.DashboardService{Client: httpClient}),
pkger.WithLabelSVC(&ihttp.LabelService{Client: httpClient}),
pkger.WithNoticationEndpointSVC(ihttp.NewNotificationEndpointService(httpClient)),
pkger.WithTelegrafSVC(ihttp.NewTelegrafService(httpClient)),
pkger.WithVariableSVC(&ihttp.VariableService{Client: httpClient}),
)...,
), orgSvc, nil
return &ihttp.PkgerService{Client: httpClient}, orgSvc, nil
}
func pkgFromReader(stdin io.Reader) (*pkger.Pkg, error) {
@ -704,8 +689,8 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) {
return []string{
l.ID.String(),
l.Name,
l.Properties["description"],
l.Properties["color"],
l.Properties.Description,
l.Properties.Color,
}
})
}
@ -751,10 +736,10 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) {
tablePrintFn("NOTIFICATION ENDPOINTS", headers, len(endpoints), func(i int) []string {
v := endpoints[i]
return []string{
v.GetID().String(),
v.GetName(),
v.GetDescription(),
string(v.GetStatus()),
v.NotificationEndpoint.GetID().String(),
v.NotificationEndpoint.GetName(),
v.NotificationEndpoint.GetDescription(),
string(v.NotificationEndpoint.GetStatus()),
}
})
}
@ -764,9 +749,9 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) {
tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string {
t := teles[i]
return []string{
t.ID.String(),
t.Name,
t.Description,
t.TelegrafConfig.ID.String(),
t.TelegrafConfig.Name,
t.TelegrafConfig.Description,
}
})
}

View File

@ -22,7 +22,7 @@ import (
func Test_Pkg(t *testing.T) {
fakeSVCFn := func(svc pkger.SVC) pkgSVCsFn {
return func(opts httpClientOpts, _ ...pkger.ServiceSetterFn) (pkger.SVC, influxdb.OrganizationService, error) {
return func() (pkger.SVC, influxdb.OrganizationService, error) {
return svc, &mock.OrganizationService{
FindOrganizationF: func(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
return &influxdb.Organization{ID: influxdb.ID(9000), Name: "influxdata"}, nil
@ -481,8 +481,8 @@ func testPkgWritesToBuffer(newCmdFn func() *cobra.Command, args pkgFileArgs, ass
type fakePkgSVC struct {
createFn func(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error)
dryRunFn func(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error)
applyFn func(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error)
}
func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
@ -492,16 +492,16 @@ func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSe
panic("not implemented")
}
func (f *fakePkgSVC) DryRun(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
func (f *fakePkgSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
if f.dryRunFn != nil {
return f.dryRunFn(ctx, orgID, pkg)
return f.dryRunFn(ctx, orgID, userID, pkg)
}
panic("not implemented")
}
func (f *fakePkgSVC) Apply(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error) {
func (f *fakePkgSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error) {
if f.applyFn != nil {
return f.applyFn(ctx, orgID, pkg)
return f.applyFn(ctx, orgID, userID, pkg)
}
panic("not implemented")
}

View File

@ -848,7 +848,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var pkgHTTPServer *http.HandlerPkg
{
pkgHTTPServer = http.NewHandlerPkg(m.apibackend.HTTPErrorHandler, pkgSVC)
pkgServerLogger := m.log.With(zap.String("handler", "pkger"))
pkgHTTPServer = http.NewHandlerPkg(pkgServerLogger, m.apibackend.HTTPErrorHandler, pkgSVC)
}
// HTTP server

View File

@ -22,6 +22,7 @@ import (
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/pkg/httpc"
"github.com/influxdata/influxdb/pkger"
"github.com/influxdata/influxdb/query"
)
@ -349,6 +350,10 @@ func (tl *TestLauncher) NotificationEndpointService(tb testing.TB) *http.Notific
return http.NewNotificationEndpointService(tl.HTTPClient(tb))
}
func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC {
return &http.PkgerService{Client: tl.HTTPClient(tb)}
}
func (tl *TestLauncher) TelegrafService(tb testing.TB) *http.TelegrafService {
tb.Helper()
return http.NewTelegrafService(tl.HTTPClient(tb))

View File

@ -19,14 +19,7 @@ func TestLauncher_Pkger(t *testing.T) {
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
svc := pkger.NewService(
pkger.WithBucketSVC(l.BucketService(t)),
pkger.WithDashboardSVC(l.DashboardService(t)),
pkger.WithLabelSVC(l.LabelService(t)),
pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
svc := l.PkgerService(t)
t.Run("create a new package", func(t *testing.T) {
newPkg, err := svc.CreatePkg(timedCtx(time.Second),
@ -56,7 +49,7 @@ func TestLauncher_Pkger(t *testing.T) {
pkger.WithVariableSVC(l.VariableService(t)),
)
_, err := svc.Apply(ctx, l.Org.ID, newPkg(t))
_, err := svc.Apply(ctx, l.Org.ID, l.User.ID, newPkg(t))
require.Error(t, err)
bkts, _, err := l.BucketService(t).FindBuckets(ctx, influxdb.BucketFilter{OrganizationID: &l.Org.ID})
@ -96,7 +89,7 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Empty(t, vars)
})
hasLabelAssociations := func(t *testing.T, associations []influxdb.Label, numAss int, expectedNames ...string) {
hasLabelAssociations := func(t *testing.T, associations []pkger.SummaryLabel, numAss int, expectedNames ...string) {
t.Helper()
require.Len(t, associations, numAss)
@ -128,7 +121,7 @@ func TestLauncher_Pkger(t *testing.T) {
}
t.Run("dry run a package with no existing resources", func(t *testing.T) {
sum, diff, err := svc.DryRun(ctx, l.Org.ID, newPkg(t))
sum, diff, err := svc.DryRun(ctx, l.Org.ID, l.User.ID, newPkg(t))
require.NoError(t, err)
diffBkts := diff.Buckets
@ -164,14 +157,14 @@ func TestLauncher_Pkger(t *testing.T) {
endpoints := sum.NotificationEndpoints
require.Len(t, endpoints, 1)
assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].GetName())
assert.Equal(t, "http none auth desc", endpoints[0].GetDescription())
assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].NotificationEndpoint.GetName())
assert.Equal(t, "http none auth desc", endpoints[0].NotificationEndpoint.GetDescription())
hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1")
teles := sum.TelegrafConfigs
require.Len(t, teles, 1)
assert.Equal(t, "first_tele_config", teles[0].Name)
assert.Equal(t, "desc", teles[0].Description)
assert.Equal(t, "first_tele_config", teles[0].TelegrafConfig.Name)
assert.Equal(t, "desc", teles[0].TelegrafConfig.Description)
hasLabelAssociations(t, teles[0].LabelAssociations, 1, "label_1")
vars := sum.Variables
@ -189,7 +182,7 @@ func TestLauncher_Pkger(t *testing.T) {
t.Run("apply a package of all new resources", func(t *testing.T) {
// this initial test is also setup for the sub tests
sum1, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, newPkg(t))
sum1, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t))
require.NoError(t, err)
labels := sum1.Labels
@ -209,22 +202,24 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "dash_1", dashs[0].Name)
assert.Equal(t, "desc1", dashs[0].Description)
hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1")
require.Len(t, dashs[0].Charts, 1)
assert.Equal(t, influxdb.ViewPropertyTypeSingleStat, dashs[0].Charts[0].Properties.GetType())
endpoints := sum1.NotificationEndpoints
require.Len(t, endpoints, 1)
assert.NotZero(t, endpoints[0].GetID())
assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].GetName())
assert.Equal(t, "http none auth desc", endpoints[0].GetDescription())
assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].GetStatus()))
assert.NotZero(t, endpoints[0].NotificationEndpoint.GetID())
assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].NotificationEndpoint.GetName())
assert.Equal(t, "http none auth desc", endpoints[0].NotificationEndpoint.GetDescription())
assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].NotificationEndpoint.GetStatus()))
hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1")
teles := sum1.TelegrafConfigs
require.Len(t, teles, 1)
assert.NotZero(t, teles[0].ID)
assert.Equal(t, l.Org.ID, teles[0].OrgID)
assert.Equal(t, "first_tele_config", teles[0].Name)
assert.Equal(t, "desc", teles[0].Description)
assert.Len(t, teles[0].Plugins, 2)
assert.NotZero(t, teles[0].TelegrafConfig.ID)
assert.Equal(t, l.Org.ID, teles[0].TelegrafConfig.OrgID)
assert.Equal(t, "first_tele_config", teles[0].TelegrafConfig.Name)
assert.Equal(t, "desc", teles[0].TelegrafConfig.Description)
assert.Len(t, teles[0].TelegrafConfig.Plugins, 2)
vars := sum1.Variables
require.Len(t, vars, 1)
@ -239,29 +234,27 @@ func TestLauncher_Pkger(t *testing.T) {
Language: "flux",
}, varArgs.Values)
newSumMapping := func(id influxdb.ID, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping {
newSumMapping := func(id pkger.SafeID, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping {
return pkger.SummaryLabelMapping{
ResourceName: name,
LabelName: labels[0].Name,
LabelMapping: influxdb.LabelMapping{
LabelID: labels[0].ID,
ResourceID: id,
ResourceType: rt,
},
LabelID: labels[0].ID,
ResourceID: pkger.SafeID(id),
ResourceType: rt,
}
}
mappings := sum1.LabelMappings
require.Len(t, mappings, 5)
hasMapping(t, mappings, newSumMapping(bkts[0].ID, bkts[0].Name, influxdb.BucketsResourceType))
hasMapping(t, mappings, newSumMapping(influxdb.ID(dashs[0].ID), dashs[0].Name, influxdb.DashboardsResourceType))
hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType))
hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType))
hasMapping(t, mappings, newSumMapping(teles[0].ID, teles[0].Name, influxdb.TelegrafsResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType))
t.Run("pkg with same bkt-var-label does nto create new resources for them", func(t *testing.T) {
// validate the new package doesn't create new resources for bkts/labels/vars
// since names collide.
sum2, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, newPkg(t))
sum2, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t))
require.NoError(t, err)
require.Equal(t, sum1.Buckets, sum2.Buckets)
@ -277,7 +270,7 @@ func TestLauncher_Pkger(t *testing.T) {
resToClone := []pkger.ResourceToClone{
{
Kind: pkger.KindBucket,
ID: bkts[0].ID,
ID: influxdb.ID(bkts[0].ID),
},
{
Kind: pkger.KindDashboard,
@ -285,11 +278,11 @@ func TestLauncher_Pkger(t *testing.T) {
},
{
Kind: pkger.KindLabel,
ID: labels[0].ID,
ID: influxdb.ID(labels[0].ID),
},
{
Kind: pkger.KindTelegraf,
ID: teles[0].ID,
ID: teles[0].TelegrafConfig.ID,
},
}
@ -297,7 +290,7 @@ func TestLauncher_Pkger(t *testing.T) {
{
Kind: pkger.KindVariable,
Name: "new name",
ID: vars[0].ID,
ID: influxdb.ID(vars[0].ID),
},
}
@ -334,10 +327,12 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "dash_1", dashs[0].Name)
assert.Equal(t, "desc1", dashs[0].Description)
hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1")
require.Len(t, dashs[0].Charts, 1)
assert.Equal(t, influxdb.ViewPropertyTypeSingleStat, dashs[0].Charts[0].Properties.GetType())
require.Len(t, newSum.TelegrafConfigs, 1)
assert.Equal(t, teles[0].Name, newSum.TelegrafConfigs[0].Name)
assert.Equal(t, teles[0].Description, newSum.TelegrafConfigs[0].Description)
assert.Equal(t, teles[0].TelegrafConfig.Name, newSum.TelegrafConfigs[0].TelegrafConfig.Name)
assert.Equal(t, teles[0].TelegrafConfig.Description, newSum.TelegrafConfigs[0].TelegrafConfig.Description)
vars := newSum.Variables
require.Len(t, vars, 1)
@ -369,21 +364,25 @@ func TestLauncher_Pkger(t *testing.T) {
pkger.WithVariableSVC(l.VariableService(t)),
)
_, err = svc.Apply(ctx, l.Org.ID, updatePkg)
_, err = svc.Apply(ctx, l.Org.ID, 0, updatePkg)
require.Error(t, err)
bkt, err := l.BucketService(t).FindBucketByID(ctx, bkts[0].ID)
bkt, err := l.BucketService(t).FindBucketByID(ctx, influxdb.ID(bkts[0].ID))
require.NoError(t, err)
// make sure the desc change is not applied and is rolled back to prev desc
assert.Equal(t, bkt.Description, bkts[0].Description)
assert.Equal(t, bkts[0].Description, bkt.Description)
label, err := l.LabelService(t).FindLabelByID(ctx, labels[0].ID)
label, err := l.LabelService(t).FindLabelByID(ctx, influxdb.ID(labels[0].ID))
require.NoError(t, err)
assert.Equal(t, label.Properties["description"], labels[0].Properties["description"])
assert.Equal(t, labels[0].Properties.Description, label.Properties["description"])
v, err := l.VariableService(t).FindVariableByID(ctx, vars[0].ID)
endpoint, err := l.NotificationEndpointService(t).FindNotificationEndpointByID(ctx, endpoints[0].NotificationEndpoint.GetID())
require.NoError(t, err)
assert.Equal(t, v.Description, vars[0].Description)
assert.Equal(t, endpoints[0].NotificationEndpoint.GetDescription(), endpoint.GetDescription())
v, err := l.VariableService(t).FindVariableByID(ctx, influxdb.ID(vars[0].ID))
require.NoError(t, err)
assert.Equal(t, vars[0].Description, v.Description)
})
})
}
@ -506,6 +505,13 @@ spec:
associations:
- kind: Label
name: label_1
- kind: NotificationEndpointHTTP
name: http_none_auth_notification_endpoint
type: none
description: new desc
method: GET
url: https://www.example.com/endpoint/noneauth
status: active
`
type fakeBucketSVC struct {

View File

@ -10,22 +10,29 @@ import (
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb"
pctx "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/pkg/httpc"
"github.com/influxdata/influxdb/pkger"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
const prefixPackages = "/api/v2/packages"
// HandlerPkg is a server that manages the packages HTTP transport.
type HandlerPkg struct {
chi.Router
influxdb.HTTPErrorHandler
svc pkger.SVC
logger *zap.Logger
svc pkger.SVC
}
// NewHandlerPkg constructs a new http server.
func NewHandlerPkg(errHandler influxdb.HTTPErrorHandler, svc pkger.SVC) *HandlerPkg {
func NewHandlerPkg(log *zap.Logger, errHandler influxdb.HTTPErrorHandler, svc pkger.SVC) *HandlerPkg {
svr := &HandlerPkg{
HTTPErrorHandler: errHandler,
logger: log,
svc: svc,
}
@ -48,17 +55,17 @@ func NewHandlerPkg(errHandler influxdb.HTTPErrorHandler, svc pkger.SVC) *Handler
// Prefix provides the prefix to this route tree.
func (s *HandlerPkg) Prefix() string {
return "/api/v2/packages"
return prefixPackages
}
type (
// ReqCreatePkg is a request body for the create pkg endpoint.
ReqCreatePkg struct {
PkgName string `json:"pkgName"`
PkgDescription string `json:"pkgDescription"`
PkgVersion string `json:"pkgVersion"`
Resources []pkger.ResourceToClone `json:"resources"`
PkgName string `json:"pkgName"`
PkgDescription string `json:"pkgDescription"`
PkgVersion string `json:"pkgVersion"`
OrgIDs []string `json:"orgIDs"`
Resources []pkger.ResourceToClone `json:"resources"`
}
// RespCreatePkg is a response body for the create pkg endpoint.
@ -76,15 +83,25 @@ func (s *HandlerPkg) createPkg(w http.ResponseWriter, r *http.Request) {
}
defer r.Body.Close()
newPkg, err := s.svc.CreatePkg(r.Context(),
opts := []pkger.CreatePkgSetFn{
pkger.CreateWithMetadata(pkger.Metadata{
Description: reqBody.PkgDescription,
Name: reqBody.PkgName,
Version: reqBody.PkgVersion,
}),
pkger.CreateWithExistingResources(reqBody.Resources...),
)
}
for _, orgIDStr := range reqBody.OrgIDs {
orgID, err := influxdb.IDFromString(orgIDStr)
if err != nil {
continue
}
opts = append(opts, pkger.CreateWithAllOrgResources(*orgID))
}
newPkg, err := s.svc.CreatePkg(r.Context(), opts...)
if err != nil {
s.logger.Error("failed to create pkg", zap.Error(err))
s.HandleHTTPError(r.Context(), err, w)
return
}
@ -137,8 +154,15 @@ func (s *HandlerPkg) applyPkg(w http.ResponseWriter, r *http.Request) {
return
}
auth, err := pctx.GetAuthorizer(r.Context())
if err != nil {
s.HandleHTTPError(r.Context(), err, w)
return
}
userID := auth.GetUserID()
parsedPkg := reqBody.Pkg
sum, diff, err := s.svc.DryRun(r.Context(), *orgID, parsedPkg)
sum, diff, err := s.svc.DryRun(r.Context(), *orgID, userID, parsedPkg)
if pkger.IsParseErr(err) {
s.encJSONResp(r.Context(), w, http.StatusUnprocessableEntity, RespApplyPkg{
Diff: diff,
@ -148,6 +172,7 @@ func (s *HandlerPkg) applyPkg(w http.ResponseWriter, r *http.Request) {
return
}
if err != nil {
s.logger.Error("failed to dry run pkg", zap.Error(err))
s.HandleHTTPError(r.Context(), err, w)
return
}
@ -161,8 +186,9 @@ func (s *HandlerPkg) applyPkg(w http.ResponseWriter, r *http.Request) {
return
}
sum, err = s.svc.Apply(r.Context(), *orgID, parsedPkg)
sum, err = s.svc.Apply(r.Context(), *orgID, userID, parsedPkg)
if err != nil && !pkger.IsParseErr(err) {
s.logger.Error("failed to apply pkg", zap.Error(err))
s.HandleHTTPError(r.Context(), err, w)
return
}
@ -170,6 +196,7 @@ func (s *HandlerPkg) applyPkg(w http.ResponseWriter, r *http.Request) {
s.encJSONResp(r.Context(), w, http.StatusCreated, RespApplyPkg{
Diff: diff,
Summary: sum,
Errors: convertParseErr(err),
})
}
@ -215,6 +242,83 @@ func (s *HandlerPkg) encJSONResp(ctx context.Context, w http.ResponseWriter, cod
s.encResp(ctx, w, newJSONEnc(w), code, res)
}
// PkgerService provides an http client that is fluent in all things pkger.
type PkgerService struct {
Client *httpc.Client
}
var _ pkger.SVC = (*PkgerService)(nil)
// CreatePkg will produce a pkg from the parameters provided.
func (s *PkgerService) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
var opt pkger.CreateOpt
for _, setter := range setters {
if err := setter(&opt); err != nil {
return nil, err
}
}
var orgIDs []string
for orgID := range opt.OrgIDs {
orgIDs = append(orgIDs, orgID.String())
}
reqBody := ReqCreatePkg{
PkgDescription: opt.Metadata.Description,
PkgName: opt.Metadata.Name,
PkgVersion: opt.Metadata.Version,
OrgIDs: orgIDs,
Resources: opt.Resources,
}
var newPkg RespCreatePkg
err := s.Client.
PostJSON(reqBody, prefixPackages).
DecodeJSON(&newPkg).
Do(ctx)
if err != nil {
return nil, err
}
if err := newPkg.Validate(pkger.ValidWithoutResources()); err != nil {
return nil, err
}
return newPkg.Pkg, nil
}
// DryRun provides a dry run of the pkg application. The pkg will be marked verified
// for later calls to Apply. This func will be run on an Apply if it has not been run
// already.
func (s *PkgerService) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
return s.apply(ctx, orgID, pkg, true)
}
// Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied
// in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state
// from before the pkg were applied.
func (s *PkgerService) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error) {
sum, _, err := s.apply(ctx, orgID, pkg, false)
return sum, err
}
func (s *PkgerService) apply(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg, dryRun bool) (pkger.Summary, pkger.Diff, error) {
reqBody := ReqApplyPkg{
OrgID: orgID.String(),
DryRun: dryRun,
Pkg: pkg,
}
var resp RespApplyPkg
err := s.Client.
PostJSON(reqBody, prefixPackages, "/apply").
DecodeJSON(&resp).
Do(ctx)
if err != nil {
return pkger.Summary{}, pkger.Diff{}, err
}
return resp.Summary, resp.Diff, pkger.NewParseError(resp.Errors...)
}
func convertParseErr(err error) []pkger.ValidationErr {
pErr, ok := err.(pkger.ParseError)
if !ok {

View File

@ -10,12 +10,14 @@ import (
"github.com/go-chi/chi"
"github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
fluxTTP "github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/pkg/testttp"
"github.com/influxdata/influxdb/pkger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
@ -29,8 +31,8 @@ func TestPkgerHTTPServer(t *testing.T) {
}, nil
}
svc := pkger.NewService(pkger.WithLabelSVC(fakeLabelSVC))
pkgHandler := fluxTTP.NewHandlerPkg(fluxTTP.ErrorHandler(0), svc)
svr := newMountedHandler(pkgHandler)
pkgHandler := fluxTTP.NewHandlerPkg(zap.NewNop(), fluxTTP.ErrorHandler(0), svc)
svr := newMountedHandler(pkgHandler, 1)
body := newReqBody(t, fluxTTP.ReqCreatePkg{
PkgName: "name1",
@ -88,7 +90,7 @@ func TestPkgerHTTPServer(t *testing.T) {
for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
DryRunFn: func(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
DryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
if err := pkg.Validate(); err != nil {
return pkger.Summary{}, pkger.Diff{}, err
}
@ -103,8 +105,8 @@ func TestPkgerHTTPServer(t *testing.T) {
},
}
pkgHandler := fluxTTP.NewHandlerPkg(fluxTTP.ErrorHandler(0), svc)
svr := newMountedHandler(pkgHandler)
pkgHandler := fluxTTP.NewHandlerPkg(zap.NewNop(), fluxTTP.ErrorHandler(0), svc)
svr := newMountedHandler(pkgHandler, 1)
body := newReqBody(t, fluxTTP.ReqApplyPkg{
DryRun: true,
@ -147,7 +149,7 @@ func TestPkgerHTTPServer(t *testing.T) {
for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
DryRunFn: func(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
DryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
if err := pkg.Validate(); err != nil {
return pkger.Summary{}, pkger.Diff{}, err
}
@ -162,8 +164,8 @@ func TestPkgerHTTPServer(t *testing.T) {
},
}
pkgHandler := fluxTTP.NewHandlerPkg(fluxTTP.ErrorHandler(0), svc)
svr := newMountedHandler(pkgHandler)
pkgHandler := fluxTTP.NewHandlerPkg(zap.NewNop(), fluxTTP.ErrorHandler(0), svc)
svr := newMountedHandler(pkgHandler, 1)
body := newReqApplyYMLBody(t, influxdb.ID(9000), true)
@ -187,7 +189,7 @@ func TestPkgerHTTPServer(t *testing.T) {
t.Run("apply a pkg", func(t *testing.T) {
svc := &fakeSVC{
DryRunFn: func(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
DryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
if err := pkg.Validate(); err != nil {
return pkger.Summary{}, pkger.Diff{}, err
}
@ -200,13 +202,13 @@ func TestPkgerHTTPServer(t *testing.T) {
}
return sum, diff, nil
},
ApplyFn: func(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error) {
ApplyFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error) {
return pkg.Summary(), nil
},
}
pkgHandler := fluxTTP.NewHandlerPkg(fluxTTP.ErrorHandler(0), svc)
svr := newMountedHandler(pkgHandler)
pkgHandler := fluxTTP.NewHandlerPkg(zap.NewNop(), fluxTTP.ErrorHandler(0), svc)
svr := newMountedHandler(pkgHandler, 1)
body := newReqBody(t, fluxTTP.ReqApplyPkg{
OrgID: influxdb.ID(9000).String(),
@ -309,31 +311,41 @@ func newReqBody(t *testing.T, v interface{}) *bytes.Buffer {
}
type fakeSVC struct {
DryRunFn func(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error)
ApplyFn func(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error)
DryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error)
ApplyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error)
}
func (f *fakeSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
panic("not implemented")
}
func (f *fakeSVC) DryRun(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
func (f *fakeSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) {
if f.DryRunFn == nil {
panic("not implemented")
}
return f.DryRunFn(ctx, orgID, pkg)
return f.DryRunFn(ctx, orgID, userID, pkg)
}
func (f *fakeSVC) Apply(ctx context.Context, orgID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error) {
func (f *fakeSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, error) {
if f.ApplyFn == nil {
panic("not implemented")
}
return f.ApplyFn(ctx, orgID, pkg)
return f.ApplyFn(ctx, orgID, userID, pkg)
}
func newMountedHandler(rh fluxTTP.ResourceHandler) chi.Router {
func newMountedHandler(rh fluxTTP.ResourceHandler, userID influxdb.ID) chi.Router {
r := chi.NewRouter()
r.Mount(rh.Prefix(), rh)
r.Mount(rh.Prefix(), authMW(userID)(rh))
return r
}
func authMW(userID influxdb.ID) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(pcontext.SetAuthorizer(r.Context(), &influxdb.Session{UserID: userID}))
next.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
}

View File

@ -7169,18 +7169,26 @@ components:
buckets:
type: array
items:
allOf:
- $ref: "#/components/schemas/Bucket"
- type: object
properties:
labelAssociations:
type: object
properties:
id:
type: string
orgID:
type: string
name:
type: string
description:
type: string
retentionPeriod:
type: integer
labelAssociations:
type: array
items:
$ref: "#/components/schemas/Label"
$ref: "#/components/schemas/PkgSummaryLabel"
labels:
type: array
items:
$ref: "#/components/schemas/Label"
$ref: "#/components/schemas/PkgSummaryLabel"
dashboards:
type: array
items:
@ -7242,11 +7250,19 @@ components:
variables:
type: array
items:
allOf:
- $ref: "#/components/schemas/Variable"
- type: object
properties:
labelAssociations:
type: object
properties:
id:
type: string
orgID:
type: string
name:
type: string
description:
type: string
arguments:
$ref: "#/components/schemas/VariableProperties"
labelAssociations:
type: array
items:
$ref: "#/components/schemas/Label"
@ -7386,6 +7402,19 @@ components:
type: array
items:
type: integer
PkgSummaryLabel:
type: object
properties:
id:
type: string
orgID:
type: string
name:
type: string
description:
type: string
retentionPeriod:
type: string
PkgChart:
type: object
properties:

View File

@ -1,6 +1,7 @@
package pkger
import (
"encoding/json"
"errors"
"fmt"
"net/url"
@ -176,8 +177,8 @@ type DiffBucketValues struct {
// DiffBucket is a diff of an individual bucket.
type DiffBucket struct {
ID SafeID
Name string
ID SafeID `json:"id"`
Name string `json:"name"`
New DiffBucketValues `json:"new"`
Old *DiffBucketValues `json:"old,omitempty"` // using omitempty here to signal there was no prev state with a nil
}
@ -299,6 +300,16 @@ type DiffNotificationEndpointValues struct {
influxdb.NotificationEndpoint
}
// UnmarshalJSON decodes the notification endpoint. This is necessary unfortunately.
func (d *DiffNotificationEndpointValues) UnmarshalJSON(b []byte) error {
e, err := endpoint.UnmarshalJSON(b)
if err != nil {
fmt.Println("broken here")
}
d.NotificationEndpoint = e
return err
}
// DiffNotificationEndpoint is a diff of an individual notification endpoint.
type DiffNotificationEndpoint struct {
ID SafeID `json:"id"`
@ -396,8 +407,13 @@ type Summary struct {
// SummaryBucket provides a summary of a pkg bucket.
type SummaryBucket struct {
influxdb.Bucket
LabelAssociations []influxdb.Label `json:"labelAssociations"`
ID SafeID `json:"id,omitempty"`
OrgID SafeID `json:"orgID,omitempty"`
Name string `json:"name"`
Description string `json:"description"`
// TODO: return retention rules?
RetentionPeriod time.Duration `json:"retentionPeriod"`
LabelAssociations []SummaryLabel `json:"labelAssociations"`
}
// SummaryDashboard provides a summary of a pkg dashboard.
@ -408,7 +424,7 @@ type SummaryDashboard struct {
Description string `json:"description"`
Charts []SummaryChart `json:"charts"`
LabelAssociations []influxdb.Label `json:"labelAssociations"`
LabelAssociations []SummaryLabel `json:"labelAssociations"`
}
// chartKind identifies what kind of chart is eluded too. Each
@ -447,7 +463,7 @@ func (c chartKind) title() string {
// SummaryChart provides a summary of a pkg dashboard's chart.
type SummaryChart struct {
Properties influxdb.ViewProperties `json:"properties"`
Properties influxdb.ViewProperties `json:"-"`
XPosition int `json:"xPos"`
YPosition int `json:"yPos"`
@ -455,35 +471,103 @@ type SummaryChart struct {
Width int `json:"width"`
}
// MarshalJSON marshals a summary chart.
func (s *SummaryChart) MarshalJSON() ([]byte, error) {
b, err := influxdb.MarshalViewPropertiesJSON(s.Properties)
if err != nil {
return nil, err
}
type alias SummaryChart
out := struct {
Props json.RawMessage `json:"properties"`
alias
}{
Props: b,
alias: alias(*s),
}
return json.Marshal(out)
}
// UnmarshalJSON unmarshals a view properities and other data.
func (s *SummaryChart) UnmarshalJSON(b []byte) error {
type alias SummaryChart
a := (*alias)(s)
if err := json.Unmarshal(b, a); err != nil {
return err
}
s.XPosition = a.XPosition
s.XPosition = a.YPosition
s.Height = a.Height
s.Width = a.Width
vp, err := influxdb.UnmarshalViewPropertiesJSON(b)
if err != nil {
return err
}
s.Properties = vp
return nil
}
// SummaryNotificationEndpoint provides a summary of a pkg endpoint rule.
type SummaryNotificationEndpoint struct {
influxdb.NotificationEndpoint
LabelAssociations []influxdb.Label `json:"labelAssociations"`
NotificationEndpoint influxdb.NotificationEndpoint `json:"notificationEndpoint"`
LabelAssociations []SummaryLabel `json:"labelAssociations"`
}
// UnmarshalJSON unmarshals the notificatio endpoint. This is necessary b/c of
// the notification endpoint does not have a means ot unmarshal itself.
func (s *SummaryNotificationEndpoint) UnmarshalJSON(b []byte) error {
var a struct {
NotificationEndpoint json.RawMessage `json:"notificationEndpoint"`
LabelAssociations []SummaryLabel `json:"labelAssociations"`
}
if err := json.Unmarshal(b, &a); err != nil {
return err
}
s.LabelAssociations = a.LabelAssociations
e, err := endpoint.UnmarshalJSON(a.NotificationEndpoint)
s.NotificationEndpoint = e
return err
}
// SummaryLabel provides a summary of a pkg label.
type SummaryLabel struct {
influxdb.Label
ID SafeID `json:"id"`
OrgID SafeID `json:"orgID"`
Name string `json:"name"`
Properties struct {
Color string `json:"color"`
Description string `json:"description"`
} `json:"properties"`
}
// SummaryLabelMapping provides a summary of a label mapped with a single resource.
type SummaryLabelMapping struct {
exists bool
ResourceName string `json:"resourceName"`
LabelName string `json:"labelName"`
influxdb.LabelMapping
ResourceID SafeID `json:"resourceID"`
ResourceName string `json:"resourceName"`
ResourceType influxdb.ResourceType `json:"resourceType"`
LabelName string `json:"labelName"`
LabelID SafeID `json:"labelID"`
}
// SummaryTelegraf provides a summary of a pkg telegraf config.
type SummaryTelegraf struct {
influxdb.TelegrafConfig
LabelAssociations []influxdb.Label `json:"labelAssociations"`
TelegrafConfig influxdb.TelegrafConfig `json:"telegrafConfig"`
LabelAssociations []SummaryLabel `json:"labelAssociations"`
}
// SummaryVariable provides a summary of a pkg variable.
type SummaryVariable struct {
influxdb.Variable
LabelAssociations []influxdb.Label `json:"labelAssociations"`
ID SafeID `json:"id,omitempty"`
OrgID SafeID `json:"orgID,omitempty"`
Name string `json:"name"`
Description string `json:"description"`
Arguments *influxdb.VariableArguments `json:"arguments"`
LabelAssociations []SummaryLabel `json:"labelAssociations"`
}
const (
@ -544,14 +628,12 @@ func (b *bucket) Exists() bool {
func (b *bucket) summarize() SummaryBucket {
return SummaryBucket{
Bucket: influxdb.Bucket{
ID: b.ID(),
OrgID: b.OrgID,
Name: b.Name(),
Description: b.Description,
RetentionPeriod: b.RetentionRules.RP(),
},
LabelAssociations: toInfluxLabels(b.labels...),
ID: SafeID(b.ID()),
OrgID: SafeID(b.OrgID),
Name: b.Name(),
Description: b.Description,
RetentionPeriod: b.RetentionRules.RP(),
LabelAssociations: toSummaryLabels(b.labels...),
}
}
@ -730,11 +812,15 @@ func (l *label) shouldApply() bool {
func (l *label) summarize() SummaryLabel {
return SummaryLabel{
Label: influxdb.Label{
ID: l.ID(),
OrgID: l.OrgID,
Name: l.Name(),
Properties: l.properties(),
ID: SafeID(l.ID()),
OrgID: SafeID(l.OrgID),
Name: l.Name(),
Properties: struct {
Color string `json:"color"`
Description string `json:"description"`
}{
Color: l.Color,
Description: l.Description,
},
}
}
@ -745,13 +831,11 @@ func (l *label) mappingSummary() []SummaryLabelMapping {
for _, v := range vals {
mappings = append(mappings, SummaryLabelMapping{
exists: v.exists,
ResourceID: SafeID(v.ID()),
ResourceName: resource.name,
ResourceType: resource.resType,
LabelID: SafeID(l.ID()),
LabelName: l.Name(),
LabelMapping: influxdb.LabelMapping{
LabelID: l.ID(),
ResourceID: v.ID(),
ResourceType: resource.resType,
},
})
}
}
@ -775,10 +859,10 @@ func (l *label) toInfluxLabel() influxdb.Label {
}
}
func toInfluxLabels(labels ...*label) []influxdb.Label {
var iLabels []influxdb.Label
func toSummaryLabels(labels ...*label) []SummaryLabel {
var iLabels []SummaryLabel
for _, l := range labels {
iLabels = append(iLabels, l.toInfluxLabel())
iLabels = append(iLabels, l.summarize())
}
return iLabels
}
@ -864,13 +948,18 @@ func (n *notificationEndpoint) ResourceType() influxdb.ResourceType {
}
func (n *notificationEndpoint) base() endpoint.Base {
return endpoint.Base{
ID: n.ID(),
OrgID: n.OrgID,
e := endpoint.Base{
Name: n.Name(),
Description: n.description,
Status: influxdb.TaskStatusActive,
}
if id := n.ID(); id > 0 {
e.ID = &id
}
if orgID := n.OrgID; orgID > 0 {
e.OrgID = &orgID
}
return e
}
func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint {
@ -879,7 +968,7 @@ func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint {
base.Status = influxdb.Status(n.status)
}
sum := SummaryNotificationEndpoint{
LabelAssociations: toInfluxLabels(n.labels...),
LabelAssociations: toSummaryLabels(n.labels...),
}
switch n.kind {
@ -921,7 +1010,7 @@ func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint {
return sum
}
var validHTTPMethods = map[string]bool{
var validEndpointHTTPMethods = map[string]bool{
"DELETE": true,
"GET": true,
"HEAD": true,
@ -952,11 +1041,11 @@ func (n *notificationEndpoint) valid() []validationErr {
if n.routingKey == "" {
failures = append(failures, validationErr{
Field: fieldNotificationEndpointRoutingKey,
Msg: "must provide non empty string",
Msg: "must be provide",
})
}
case notificationKindHTTP:
if !validHTTPMethods[n.method] {
if !validEndpointHTTPMethods[n.method] {
failures = append(failures, validationErr{
Field: fieldNotificationEndpointHTTPMethod,
Msg: "http method must be a valid HTTP verb",
@ -1044,7 +1133,7 @@ func (t *telegraf) Exists() bool {
func (t *telegraf) summarize() SummaryTelegraf {
return SummaryTelegraf{
TelegrafConfig: t.config,
LabelAssociations: toInfluxLabels(t.labels...),
LabelAssociations: toSummaryLabels(t.labels...),
}
}
@ -1112,14 +1201,12 @@ func (v *variable) shouldApply() bool {
func (v *variable) summarize() SummaryVariable {
return SummaryVariable{
Variable: influxdb.Variable{
ID: v.ID(),
OrganizationID: v.OrgID,
Name: v.Name(),
Description: v.Description,
Arguments: v.influxVarArgs(),
},
LabelAssociations: toInfluxLabels(v.labels...),
ID: SafeID(v.ID()),
OrgID: SafeID(v.OrgID),
Name: v.Name(),
Description: v.Description,
Arguments: v.influxVarArgs(),
LabelAssociations: toSummaryLabels(v.labels...),
}
}
@ -1225,7 +1312,7 @@ func (d *dashboard) summarize() SummaryDashboard {
OrgID: SafeID(d.OrgID),
Name: d.Name(),
Description: d.Description,
LabelAssociations: toInfluxLabels(d.labels...),
LabelAssociations: toSummaryLabels(d.labels...),
}
for _, c := range d.Charts {
iDash.Charts = append(iDash.Charts, SummaryChart{

View File

@ -37,8 +37,8 @@ func TestPkg(t *testing.T) {
require.Len(t, summary.Buckets, len(pkg.mBuckets))
for i := 1; i <= len(summary.Buckets); i++ {
buck := summary.Buckets[i-1]
assert.Equal(t, influxdb.ID(i), buck.ID)
assert.Equal(t, influxdb.ID(100), buck.OrgID)
assert.Equal(t, SafeID(i), buck.ID)
assert.Equal(t, SafeID(100), buck.OrgID)
assert.Equal(t, "desc"+strconv.Itoa(i), buck.Description)
assert.Equal(t, "name"+strconv.Itoa(i), buck.Name)
assert.Equal(t, time.Duration(i)*time.Hour, buck.RetentionPeriod)
@ -69,18 +69,18 @@ func TestPkg(t *testing.T) {
require.Len(t, summary.Labels, len(pkg.mLabels))
label1 := summary.Labels[0]
assert.Equal(t, influxdb.ID(1), label1.ID)
assert.Equal(t, influxdb.ID(100), label1.OrgID)
assert.Equal(t, "desc1", label1.Properties["description"])
assert.Equal(t, SafeID(1), label1.ID)
assert.Equal(t, SafeID(100), label1.OrgID)
assert.Equal(t, "name1", label1.Name)
assert.Equal(t, "peru", label1.Properties["color"])
assert.Equal(t, "desc1", label1.Properties.Description)
assert.Equal(t, "peru", label1.Properties.Color)
label2 := summary.Labels[1]
assert.Equal(t, influxdb.ID(2), label2.ID)
assert.Equal(t, influxdb.ID(100), label2.OrgID)
assert.Equal(t, "desc2", label2.Properties["description"])
assert.Equal(t, SafeID(2), label2.ID)
assert.Equal(t, SafeID(100), label2.OrgID)
assert.Equal(t, "desc2", label2.Properties.Description)
assert.Equal(t, "name2", label2.Name)
assert.Equal(t, "blurple", label2.Properties["color"])
assert.Equal(t, "blurple", label2.Properties.Color)
})
t.Run("label mappings returned in asc order by name", func(t *testing.T) {
@ -116,10 +116,10 @@ func TestPkg(t *testing.T) {
require.Len(t, summary.LabelMappings, 1)
mapping1 := summary.LabelMappings[0]
assert.Equal(t, bucket1.id, mapping1.ResourceID)
assert.Equal(t, SafeID(bucket1.id), mapping1.ResourceID)
assert.Equal(t, bucket1.Name(), mapping1.ResourceName)
assert.Equal(t, influxdb.BucketsResourceType, mapping1.ResourceType)
assert.Equal(t, label1.id, mapping1.LabelID)
assert.Equal(t, SafeID(label1.id), mapping1.LabelID)
assert.Equal(t, label1.Name(), mapping1.LabelName)
})
})

View File

@ -162,13 +162,7 @@ func (p *Pkg) Summary() Summary {
sum.Labels = append(sum.Labels, l.summarize())
}
for _, m := range p.labelMappings() {
sum.LabelMappings = append(sum.LabelMappings, SummaryLabelMapping{
ResourceName: m.ResourceName,
LabelName: m.LabelName,
LabelMapping: m.LabelMapping,
})
}
sum.LabelMappings = p.labelMappings()
for _, n := range p.notificationEndpoints() {
sum.NotificationEndpoints = append(sum.NotificationEndpoints, n.summarize())
@ -729,10 +723,10 @@ func (p *Pkg) parseNestedLabel(nr Resource, fn func(lb *label) error) *validatio
k, err := nr.kind()
if err != nil {
return &validationErr{
Field: "associations",
Field: fieldAssociations,
Nested: []validationErr{
{
Field: "kind",
Field: fieldKind,
Msg: err.Error(),
},
},
@ -745,14 +739,14 @@ func (p *Pkg) parseNestedLabel(nr Resource, fn func(lb *label) error) *validatio
lb, found := p.mLabels[nr.Name()]
if !found {
return &validationErr{
Field: "associations",
Field: fieldAssociations,
Msg: fmt.Sprintf("label %q does not exist in pkg", nr.Name()),
}
}
if err := fn(lb); err != nil {
return &validationErr{
Field: "associations",
Field: fieldAssociations,
Msg: err.Error(),
}
}
@ -1123,9 +1117,18 @@ type ParseError interface {
ValidationErrs() []ValidationErr
}
// NewParseError creates a new parse error from existing validation errors.
func NewParseError(errs ...ValidationErr) error {
if len(errs) == 0 {
return nil
}
return &parseErr{rawErrs: errs}
}
type (
parseErr struct {
Resources []resourceErr
rawErrs []ValidationErr
}
// resourceErr describes the error for a particular resource. In
@ -1150,7 +1153,7 @@ type (
// Error implements the error interface.
func (e *parseErr) Error() string {
var errMsg []string
for _, ve := range e.ValidationErrs() {
for _, ve := range append(e.ValidationErrs(), e.rawErrs...) {
errMsg = append(errMsg, ve.Error())
}
@ -1158,9 +1161,8 @@ func (e *parseErr) Error() string {
}
func (e *parseErr) ValidationErrs() []ValidationErr {
var errs []ValidationErr
errs := e.rawErrs[:]
for _, r := range e.Resources {
rootErr := ValidationErr{
Kind: r.Kind,
}

View File

@ -118,11 +118,9 @@ spec:
actual := buckets[0]
expectedBucket := SummaryBucket{
Bucket: influxdb.Bucket{
Name: "rucket_11",
Description: "bucket 1 description",
RetentionPeriod: time.Hour,
},
Name: "rucket_11",
Description: "bucket 1 description",
RetentionPeriod: time.Hour,
}
assert.Equal(t, expectedBucket, actual)
})
@ -345,7 +343,7 @@ spec:
require.Len(t, sum.LabelMappings, len(expectedMappings))
for i, expected := range expectedMappings {
expected.LabelMapping.ResourceType = influxdb.BucketsResourceType
expected.ResourceType = influxdb.BucketsResourceType
assert.Equal(t, expected, sum.LabelMappings[i])
}
})
@ -2605,7 +2603,7 @@ spec:
require.Len(t, sum.LabelMappings, len(expectedMappings))
for i, expected := range expectedMappings {
expected.LabelMapping.ResourceType = influxdb.DashboardsResourceType
expected.ResourceType = influxdb.DashboardsResourceType
assert.Equal(t, expected, sum.LabelMappings[i])
}
})
@ -2782,11 +2780,9 @@ spec:
require.Len(t, sum.LabelMappings, len(expectedEndpoints))
expectedMapping := SummaryLabelMapping{
ResourceName: expected.GetName(),
ResourceName: expected.NotificationEndpoint.GetName(),
LabelName: "label_1",
LabelMapping: influxdb.LabelMapping{
ResourceType: influxdb.NotificationEndpointResourceType,
},
ResourceType: influxdb.NotificationEndpointResourceType,
}
assert.Contains(t, sum.LabelMappings, expectedMapping)
}
@ -3091,8 +3087,8 @@ spec:
require.Len(t, sum.TelegrafConfigs, 1)
actual := sum.TelegrafConfigs[0]
assert.Equal(t, "first_tele_config", actual.Name)
assert.Equal(t, "desc", actual.Description)
assert.Equal(t, "first_tele_config", actual.TelegrafConfig.Name)
assert.Equal(t, "desc", actual.TelegrafConfig.Description)
require.Len(t, actual.LabelAssociations, 1)
assert.Equal(t, "label_1", actual.LabelAssociations[0].Name)
@ -3101,9 +3097,7 @@ spec:
expectedMapping := SummaryLabelMapping{
ResourceName: "first_tele_config",
LabelName: "label_1",
LabelMapping: influxdb.LabelMapping{
ResourceType: influxdb.TelegrafsResourceType,
},
ResourceType: influxdb.TelegrafsResourceType,
}
assert.Equal(t, expectedMapping, sum.LabelMappings[0])
})
@ -3366,7 +3360,7 @@ spec:
require.Len(t, sum.LabelMappings, len(expectedMappings))
for i, expected := range expectedMappings {
expected.LabelMapping.ResourceType = influxdb.VariablesResourceType
expected.ResourceType = influxdb.VariablesResourceType
assert.Equal(t, expected, sum.LabelMappings[i])
}
})

View File

@ -21,8 +21,8 @@ const APIVersion = "0.1.0"
// SVC is the packages service interface.
type SVC interface {
CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error)
DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summary, Diff, error)
Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summary, error)
DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg) (Summary, Diff, error)
Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg) (Summary, error)
}
type serviceOpt struct {
@ -89,15 +89,6 @@ func WithVariableSVC(varSVC influxdb.VariableService) ServiceSetterFn {
}
}
// WithApplyReqLimit sets the concurrency request limit.
func WithApplyReqLimit(limit int) ServiceSetterFn {
return func(o *serviceOpt) {
if limit > 0 {
o.applyReqLimit = limit
}
}
}
// Service provides the pkger business logic including all the dependencies to make
// this resource sausage.
type Service struct {
@ -113,6 +104,8 @@ type Service struct {
applyReqLimit int
}
var _ SVC = (*Service)(nil)
// NewService is a constructor for a pkger Service.
func NewService(opts ...ServiceSetterFn) *Service {
opt := &serviceOpt{
@ -399,11 +392,10 @@ func (s *Service) resourceCloneToResource(ctx context.Context, r ResourceToClone
}
newResource = bucketToResource(*bkt, r.Name)
case r.Kind.is(KindDashboard):
dash, err := s.dashSVC.FindDashboardByID(ctx, r.ID)
dash, err := s.findDashboardByIDFull(ctx, r.ID)
if err != nil {
return nil, err
}
newResource = dashboardToResource(*dash, r.Name)
case r.Kind.is(KindLabel):
l, err := s.labelSVC.FindLabelByID(ctx, r.ID)
@ -487,7 +479,7 @@ func (s *Service) resourceCloneAssociationsGen() cloneAssociationsFn {
// DryRun provides a dry run of the pkg application. The pkg will be marked verified
// for later calls to Apply. This func will be run on an Apply if it has not been run
// already.
func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summary, Diff, error) {
func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg) (Summary, Diff, error) {
// so here's the deal, when we have issues with the parsing validation, we
// continue to do the diff anyhow. any resource that does not have a name
// will be skipped, and won't bleed into the dry run here. We can now return
@ -806,7 +798,7 @@ func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssoci
// Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied
// in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state
// from before the pkg were applied.
func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum Summary, e error) {
func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg) (sum Summary, e error) {
if !pkg.isParsed {
if err := pkg.Validate(); err != nil {
return Summary{}, err
@ -814,7 +806,7 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S
}
if !pkg.isVerified {
_, _, err := s.DryRun(ctx, orgID, pkg)
_, _, err := s.DryRun(ctx, orgID, userID, pkg)
if err != nil {
return Summary{}, err
}
@ -850,7 +842,7 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S
}
for _, group := range appliers {
if err := coordinator.runTilEnd(ctx, orgID, group...); err != nil {
if err := coordinator.runTilEnd(ctx, orgID, userID, group...); err != nil {
return Summary{}, err
}
}
@ -858,7 +850,7 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S
// secondary resources
// this last grouping relies on the above 2 steps having completely successfully
secondary := []applier{s.applyLabelMappings(pkg.labelMappings())}
if err := coordinator.runTilEnd(ctx, orgID, secondary...); err != nil {
if err := coordinator.runTilEnd(ctx, orgID, userID, secondary...); err != nil {
return Summary{}, err
}
@ -871,7 +863,7 @@ func (s *Service) applyBuckets(buckets []*bucket) applier {
mutex := new(doMutex)
rollbackBuckets := make([]*bucket, 0, len(buckets))
createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody {
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var b bucket
mutex.Do(func() {
buckets[i].OrgID = orgID
@ -971,7 +963,7 @@ func (s *Service) applyDashboards(dashboards []*dashboard) applier {
mutex := new(doMutex)
rollbackDashboards := make([]*dashboard, 0, len(dashboards))
createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody {
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var d dashboard
mutex.Do(func() {
dashboards[i].OrgID = orgID
@ -1051,7 +1043,7 @@ func (s *Service) applyLabels(labels []*label) applier {
mutex := new(doMutex)
rollBackLabels := make([]*label, 0, len(labels))
createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody {
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var l label
mutex.Do(func() {
labels[i].OrgID = orgID
@ -1141,14 +1133,14 @@ func (s *Service) applyNotificationEndpoints(endpoints []*notificationEndpoint)
mutex := new(doMutex)
rollbackEndpoints := make([]*notificationEndpoint, 0, len(endpoints))
createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody {
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var endpoint notificationEndpoint
mutex.Do(func() {
endpoints[i].OrgID = orgID
endpoint = *endpoints[i]
})
influxEndpoint, err := s.applyNotificationEndpoint(ctx, endpoint)
influxEndpoint, err := s.applyNotificationEndpoint(ctx, endpoint, userID)
if err != nil {
return &applyErrBody{
name: endpoint.Name(),
@ -1178,7 +1170,7 @@ func (s *Service) applyNotificationEndpoints(endpoints []*notificationEndpoint)
}
}
func (s *Service) applyNotificationEndpoint(ctx context.Context, e notificationEndpoint) (influxdb.NotificationEndpoint, error) {
func (s *Service) applyNotificationEndpoint(ctx context.Context, e notificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
if e.existing != nil {
// stub out userID since we're always using hte http client which will fill it in for us with the token
// feels a bit broken that is required.
@ -1191,7 +1183,7 @@ func (s *Service) applyNotificationEndpoint(ctx context.Context, e notificationE
}
actual := e.summarize().NotificationEndpoint
err := s.endpointSVC.CreateNotificationEndpoint(ctx, actual, 0)
err := s.endpointSVC.CreateNotificationEndpoint(ctx, actual, userID)
if err != nil {
return nil, err
}
@ -1229,14 +1221,14 @@ func (s *Service) applyTelegrafs(teles []*telegraf) applier {
mutex := new(doMutex)
rollbackTelegrafs := make([]*telegraf, 0, len(teles))
createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody {
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var cfg influxdb.TelegrafConfig
mutex.Do(func() {
teles[i].config.OrgID = orgID
cfg = teles[i].config
})
err := s.teleSVC.CreateTelegrafConfig(ctx, &cfg, 0)
err := s.teleSVC.CreateTelegrafConfig(ctx, &cfg, userID)
if err != nil {
return &applyErrBody{
name: cfg.Name,
@ -1274,7 +1266,7 @@ func (s *Service) applyVariables(vars []*variable) applier {
mutex := new(doMutex)
rollBackVars := make([]*variable, 0, len(vars))
createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody {
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var v variable
mutex.Do(func() {
vars[i].OrgID = orgID
@ -1369,7 +1361,7 @@ func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applie
mutex := new(doMutex)
rollbackMappings := make([]influxdb.LabelMapping, 0, len(labelMappings))
createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody {
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var mapping SummaryLabelMapping
mutex.Do(func() {
mapping = labelMappings[i]
@ -1384,7 +1376,12 @@ func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applie
return nil
}
err := s.labelSVC.CreateLabelMapping(ctx, &mapping.LabelMapping)
m := influxdb.LabelMapping{
LabelID: influxdb.ID(mapping.LabelID),
ResourceID: influxdb.ID(mapping.ResourceID),
ResourceType: mapping.ResourceType,
}
err := s.labelSVC.CreateLabelMapping(ctx, &m)
if err != nil {
return &applyErrBody{
name: fmt.Sprintf("%s:%s:%s", mapping.ResourceType, mapping.ResourceID, mapping.LabelID),
@ -1393,8 +1390,7 @@ func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applie
}
mutex.Do(func() {
labelMappings[i].LabelMapping = mapping.LabelMapping
rollbackMappings = append(rollbackMappings, mapping.LabelMapping)
rollbackMappings = append(rollbackMappings, m)
})
return nil
@ -1446,6 +1442,21 @@ func (s *Service) deleteByIDs(resource string, numIDs int, deleteFn func(context
return nil
}
func (s *Service) findDashboardByIDFull(ctx context.Context, id influxdb.ID) (*influxdb.Dashboard, error) {
dash, err := s.dashSVC.FindDashboardByID(ctx, id)
if err != nil {
return nil, err
}
for _, cell := range dash.Cells {
v, err := s.dashSVC.GetDashboardCellView(ctx, id, cell.ID)
if err != nil {
return nil, err
}
cell.View = v
}
return dash, nil
}
type doMutex struct {
sync.Mutex
}
@ -1469,7 +1480,7 @@ type (
creater struct {
entries int
fn func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody
fn func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody
}
)
@ -1479,7 +1490,7 @@ type rollbackCoordinator struct {
sem chan struct{}
}
func (r *rollbackCoordinator) runTilEnd(ctx context.Context, orgID influxdb.ID, appliers ...applier) error {
func (r *rollbackCoordinator) runTilEnd(ctx context.Context, orgID, userID influxdb.ID, appliers ...applier) error {
errStr := newErrStream(ctx)
wg := new(sync.WaitGroup)
@ -1501,7 +1512,7 @@ func (r *rollbackCoordinator) runTilEnd(ctx context.Context, orgID influxdb.ID,
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := app.creater.fn(ctx, i, orgID); err != nil {
if err := app.creater.fn(ctx, i, orgID, userID); err != nil {
errStr.add(errMsg{resource: resource, err: *err})
}
}(idx, app.rollbacker.resource)

View File

@ -56,7 +56,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithBucketSVC(fakeBktSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
require.NoError(t, err)
require.Len(t, diff.Buckets, 1)
@ -85,7 +85,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithBucketSVC(fakeBktSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
require.NoError(t, err)
require.Len(t, diff.Buckets, 1)
@ -120,7 +120,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithLabelSVC(fakeLabelSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
require.NoError(t, err)
require.Len(t, diff.Labels, 2)
@ -154,7 +154,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithLabelSVC(fakeLabelSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
require.NoError(t, err)
require.Len(t, diff.Labels, 2)
@ -179,9 +179,10 @@ func TestService(t *testing.T) {
t.Run("notification endpoints", func(t *testing.T) {
testfileRunner(t, "testdata/notification_endpoint.yml", func(t *testing.T, pkg *Pkg) {
fakeEndpointSVC := mock.NewNotificationEndpointService()
id := influxdb.ID(1)
existing := &endpoint.HTTP{
Base: endpoint.Base{
ID: 1,
ID: &id,
Name: "http_none_auth_notification_endpoint",
Description: "old desc",
Status: influxdb.TaskStatusInactive,
@ -196,7 +197,7 @@ func TestService(t *testing.T) {
svc := newTestService(WithNoticationEndpointSVC(fakeEndpointSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
require.NoError(t, err)
require.Len(t, diff.NotificationEndpoints, 5)
@ -224,7 +225,7 @@ func TestService(t *testing.T) {
New: DiffNotificationEndpointValues{
NotificationEndpoint: &endpoint.HTTP{
Base: endpoint.Base{
ID: 1,
ID: &id,
Name: "http_none_auth_notification_endpoint",
Description: "http none auth desc",
Status: influxdb.TaskStatusActive,
@ -253,7 +254,7 @@ func TestService(t *testing.T) {
}
svc := newTestService(WithVariableSVC(fakeVarSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), 0, pkg)
require.NoError(t, err)
require.Len(t, diff.Variables, 4)
@ -311,13 +312,13 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Buckets, 1)
buck1 := sum.Buckets[0]
assert.Equal(t, influxdb.ID(time.Hour), buck1.ID)
assert.Equal(t, orgID, buck1.OrgID)
assert.Equal(t, SafeID(time.Hour), buck1.ID)
assert.Equal(t, SafeID(orgID), buck1.OrgID)
assert.Equal(t, "rucket_11", buck1.Name)
assert.Equal(t, time.Hour, buck1.RetentionPeriod)
assert.Equal(t, "bucket 1 description", buck1.Description)
@ -346,13 +347,13 @@ func TestService(t *testing.T) {
svc := newTestService(WithBucketSVC(fakeBktSVC))
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Buckets, 1)
buck1 := sum.Buckets[0]
assert.Equal(t, influxdb.ID(3), buck1.ID)
assert.Equal(t, orgID, buck1.OrgID)
assert.Equal(t, SafeID(3), buck1.ID)
assert.Equal(t, SafeID(orgID), buck1.OrgID)
assert.Equal(t, "rucket_11", buck1.Name)
assert.Equal(t, time.Hour, buck1.RetentionPeriod)
assert.Equal(t, "bucket 1 description", buck1.Description)
@ -382,7 +383,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeBktSVC.DeleteBucketCalls.Count(), 1)
@ -407,23 +408,23 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Labels, 2)
label1 := sum.Labels[0]
assert.Equal(t, influxdb.ID(1), label1.ID)
assert.Equal(t, orgID, label1.OrgID)
assert.Equal(t, SafeID(1), label1.ID)
assert.Equal(t, SafeID(orgID), label1.OrgID)
assert.Equal(t, "label_1", label1.Name)
assert.Equal(t, "#FFFFFF", label1.Properties["color"])
assert.Equal(t, "label 1 description", label1.Properties["description"])
assert.Equal(t, "#FFFFFF", label1.Properties.Color)
assert.Equal(t, "label 1 description", label1.Properties.Description)
label2 := sum.Labels[1]
assert.Equal(t, influxdb.ID(2), label2.ID)
assert.Equal(t, orgID, label2.OrgID)
assert.Equal(t, SafeID(2), label2.ID)
assert.Equal(t, SafeID(orgID), label2.OrgID)
assert.Equal(t, "label_2", label2.Name)
assert.Equal(t, "#000000", label2.Properties["color"])
assert.Equal(t, "label 2 description", label2.Properties["description"])
assert.Equal(t, "#000000", label2.Properties.Color)
assert.Equal(t, "label 2 description", label2.Properties.Description)
})
})
@ -445,7 +446,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelCalls.Count(), 1)
@ -486,23 +487,23 @@ func TestService(t *testing.T) {
svc := newTestService(WithLabelSVC(fakeLabelSVC))
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Labels, 2)
label1 := sum.Labels[0]
assert.Equal(t, influxdb.ID(1), label1.ID)
assert.Equal(t, orgID, label1.OrgID)
assert.Equal(t, SafeID(1), label1.ID)
assert.Equal(t, SafeID(orgID), label1.OrgID)
assert.Equal(t, "label_1", label1.Name)
assert.Equal(t, "#FFFFFF", label1.Properties["color"])
assert.Equal(t, "label 1 description", label1.Properties["description"])
assert.Equal(t, "#FFFFFF", label1.Properties.Color)
assert.Equal(t, "label 1 description", label1.Properties.Description)
label2 := sum.Labels[1]
assert.Equal(t, influxdb.ID(2), label2.ID)
assert.Equal(t, orgID, label2.OrgID)
assert.Equal(t, SafeID(2), label2.ID)
assert.Equal(t, SafeID(orgID), label2.OrgID)
assert.Equal(t, "label_2", label2.Name)
assert.Equal(t, "#000000", label2.Properties["color"])
assert.Equal(t, "label 2 description", label2.Properties["description"])
assert.Equal(t, "#000000", label2.Properties.Color)
assert.Equal(t, "label 2 description", label2.Properties.Description)
assert.Equal(t, 1, fakeLabelSVC.CreateLabelCalls.Count()) // only called for second label
})
@ -525,7 +526,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Dashboards, 1)
@ -560,7 +561,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.True(t, deletedDashs[1])
@ -593,7 +594,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
assert.Equal(t, numExpected, fakeLabelSVC.CreateLabelMappingCalls.Count())
@ -635,7 +636,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelMappingCalls.Count(), numExpected)
@ -726,13 +727,14 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.NotificationEndpoints, 5)
containsWithID := func(t *testing.T, name string) {
for _, actual := range sum.NotificationEndpoints {
for _, actualNotification := range sum.NotificationEndpoints {
actual := actualNotification.NotificationEndpoint
if actual.GetID() == 0 {
assert.NotZero(t, actual.GetID())
}
@ -776,7 +778,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeEndpointSVC.DeleteNotificationEndpointCalls.Count(), 5)
@ -797,12 +799,12 @@ func TestService(t *testing.T) {
svc := newTestService(WithTelegrafSVC(fakeTeleSVC))
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.TelegrafConfigs, 1)
assert.Equal(t, "first_tele_config", sum.TelegrafConfigs[0].Name)
assert.Equal(t, "desc", sum.TelegrafConfigs[0].Description)
assert.Equal(t, "first_tele_config", sum.TelegrafConfigs[0].TelegrafConfig.Name)
assert.Equal(t, "desc", sum.TelegrafConfigs[0].TelegrafConfig.Description)
})
})
@ -829,7 +831,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.Equal(t, 1, fakeTeleSVC.DeleteTelegrafConfigCalls.Count())
@ -854,20 +856,20 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Variables, 4)
expected := sum.Variables[0]
assert.Equal(t, influxdb.ID(3), expected.ID)
assert.Equal(t, orgID, expected.OrganizationID)
assert.Equal(t, SafeID(3), expected.ID)
assert.Equal(t, SafeID(orgID), expected.OrgID)
assert.Equal(t, "var_const_3", expected.Name)
assert.Equal(t, "var_const_3 desc", expected.Description)
require.NotNil(t, expected.Arguments)
assert.Equal(t, influxdb.VariableConstantValues{"first val"}, expected.Arguments.Values)
for _, actual := range sum.Variables {
assert.Contains(t, []influxdb.ID{1, 2, 3, 4}, actual.ID)
assert.Contains(t, []SafeID{1, 2, 3, 4}, actual.ID)
}
})
})
@ -887,7 +889,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeVarSVC.DeleteVariableCalls.Count(), 1)
@ -927,12 +929,12 @@ func TestService(t *testing.T) {
svc := newTestService(WithVariableSVC(fakeVarSVC))
sum, err := svc.Apply(context.TODO(), orgID, pkg)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Variables, 4)
expected := sum.Variables[0]
assert.Equal(t, influxdb.ID(1), expected.ID)
assert.Equal(t, SafeID(1), expected.ID)
assert.Equal(t, "var_const_3", expected.Name)
assert.Equal(t, 3, fakeVarSVC.CreateVariableCalls.Count()) // only called for last 3 labels
@ -1376,7 +1378,8 @@ func TestService(t *testing.T) {
expectedName = tt.newName
}
assert.Equal(t, expectedName, actual.Name)
assert.Equal(t, expectedLabel.Properties, actual.Properties)
assert.Equal(t, expectedLabel.Properties["color"], actual.Properties.Color)
assert.Equal(t, expectedLabel.Properties["description"], actual.Properties.Description)
}
t.Run(tt.name, fn)
}