refactor(notification): move rule service into own package ()

* refactor(notification): move rule service into own package

* chore(launcher): fix tests to use clients as opposed to direct kv service

* chore(influx): update task cli to consume core domain model task from client

* chore(kv): remove rule service behaviours from kv

This also introduces the org id resolver type. Which is transplanted
from the kv service. As this one function coupled all resource
capabilities onto the kv service. Making removing these capabilities
impossible. Moving this type out into its own package which depends on
each service explicitly ensures we don't have one type which has to
implement all the service contracts.

* fix(launcher): remove double reference to influxdb package
pull/19818/head
George 2020-10-27 11:45:05 +00:00 committed by GitHub
parent 87061bb1cb
commit 3d643e0681
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1365 additions and 1576 deletions

View File

@ -284,7 +284,7 @@ func AuthorizeFindChecks(ctx context.Context, rs []influxdb.Check) ([]influxdb.C
}
// AuthorizeFindUserResourceMappings takes the given items and returns only the ones that the user is authorized to read.
func AuthorizeFindUserResourceMappings(ctx context.Context, os OrganizationService, rs []*influxdb.UserResourceMapping) ([]*influxdb.UserResourceMapping, int, error) {
func AuthorizeFindUserResourceMappings(ctx context.Context, os OrgIDResolver, rs []*influxdb.UserResourceMapping) ([]*influxdb.UserResourceMapping, int, error) {
// This filters without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
rrs := rs[:0]

View File

@ -2,7 +2,6 @@ package authorizer
import (
"context"
"errors"
"github.com/influxdata/influxdb/v2"
)
@ -12,16 +11,16 @@ var _ influxdb.LabelService = (*LabelService)(nil)
// LabelService wraps a influxdb.LabelService and authorizes actions
// against it appropriately.
type LabelService struct {
s influxdb.LabelService
orgSvc OrganizationService
s influxdb.LabelService
orgIDResolver OrgIDResolver
}
// NewLabelServiceWithOrg constructs an instance of an authorizing label serivce.
// Replaces NewLabelService.
func NewLabelServiceWithOrg(s influxdb.LabelService, orgSvc OrganizationService) *LabelService {
func NewLabelServiceWithOrg(s influxdb.LabelService, orgIDResolver OrgIDResolver) *LabelService {
return &LabelService{
s: s,
orgSvc: orgSvc,
s: s,
orgIDResolver: orgIDResolver,
}
}
@ -55,10 +54,8 @@ func (s *LabelService) FindResourceLabels(ctx context.Context, filter influxdb.L
if err := filter.ResourceType.Valid(); err != nil {
return nil, err
}
if s.orgSvc == nil {
return nil, errors.New("failed to find orgSvc")
}
orgID, err := s.orgSvc.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID)
orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID)
if err != nil {
return nil, err
}

View File

@ -6,19 +6,19 @@ import (
"github.com/influxdata/influxdb/v2"
)
type OrganizationService interface {
type OrgIDResolver interface {
FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error)
}
type URMService struct {
s influxdb.UserResourceMappingService
orgService OrganizationService
s influxdb.UserResourceMappingService
orgIDResolver OrgIDResolver
}
func NewURMService(orgSvc OrganizationService, s influxdb.UserResourceMappingService) *URMService {
func NewURMService(orgIDResolver OrgIDResolver, s influxdb.UserResourceMappingService) *URMService {
return &URMService{
s: s,
orgService: orgSvc,
s: s,
orgIDResolver: orgIDResolver,
}
}
@ -27,11 +27,11 @@ func (s *URMService) FindUserResourceMappings(ctx context.Context, filter influx
if err != nil {
return nil, 0, err
}
return AuthorizeFindUserResourceMappings(ctx, s.orgService, urms)
return AuthorizeFindUserResourceMappings(ctx, s.orgIDResolver, urms)
}
func (s *URMService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
orgID, err := s.orgService.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID)
orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID)
if err != nil {
return err
}
@ -49,7 +49,7 @@ func (s *URMService) DeleteUserResourceMapping(ctx context.Context, resourceID i
}
for _, urm := range urms {
orgID, err := s.orgService.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID)
orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID)
if err != nil {
return err
}

View File

@ -23,7 +23,7 @@ func (s *OrgService) FindResourceOrganizationID(ctx context.Context, rt influxdb
func TestURMService_FindUserResourceMappings(t *testing.T) {
type fields struct {
UserResourceMappingService influxdb.UserResourceMappingService
OrgService authorizer.OrganizationService
OrgService authorizer.OrgIDResolver
}
type args struct {
permission influxdb.Permission
@ -146,7 +146,7 @@ func TestURMService_FindUserResourceMappings(t *testing.T) {
func TestURMService_WriteUserResourceMapping(t *testing.T) {
type fields struct {
UserResourceMappingService influxdb.UserResourceMappingService
OrgService authorizer.OrganizationService
OrgService authorizer.OrgIDResolver
}
type args struct {
permission influxdb.Permission

View File

@ -171,7 +171,7 @@ func taskFindF(cmd *cobra.Command, args []string) error {
}
filter.Limit = taskFindFlags.limit
var tasks []http.Task
var tasks []*influxdb.Task
if taskFindFlags.id != "" {
id, err := influxdb.IDFromString(taskFindFlags.id)
@ -184,7 +184,7 @@ func taskFindF(cmd *cobra.Command, args []string) error {
return err
}
tasks = append(tasks, *task)
tasks = append(tasks, task)
} else {
tasks, _, err = s.FindTasks(context.Background(), filter)
if err != nil {
@ -322,8 +322,8 @@ func taskDeleteF(cmd *cobra.Command, args []string) error {
type taskPrintOpts struct {
hideHeaders bool
json bool
task *http.Task
tasks []http.Task
task *influxdb.Task
tasks []*influxdb.Task
}
func printTasks(w io.Writer, opts taskPrintOpts) error {
@ -351,7 +351,7 @@ func printTasks(w io.Writer, opts taskPrintOpts) error {
)
if opts.task != nil {
opts.tasks = append(opts.tasks, *opts.task)
opts.tasks = append(opts.tasks, opts.task)
}
for _, t := range opts.tasks {

View File

@ -31,6 +31,7 @@ import (
iqlquery "github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/internal/resource"
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/kit/feature"
overrideflagger "github.com/influxdata/influxdb/v2/kit/feature/override"
@ -45,6 +46,7 @@ import (
"github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service"
"github.com/influxdata/influxdb/v2/pkger"
infprom "github.com/influxdata/influxdb/v2/prometheus"
"github.com/influxdata/influxdb/v2/query"
@ -989,7 +991,14 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator)
notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService)
if err != nil {
return err
}
// tasks service notification middleware which keeps task service up to date
// with persisted changes to notification rules.
notificationRuleSvc = middleware.NewNotificationRuleStore(notificationRuleSvc, m.kvService, coordinator)
}
// NATS streaming server
@ -1121,6 +1130,25 @@ func (m *Launcher) run(ctx context.Context) (err error) {
onboardSvc = tenant.NewOnboardingMetrics(m.reg, onboardSvc, metric.WithSuffix("new")) // with metrics
onboardSvc = tenant.NewOnboardingLogger(m.log.With(zap.String("handler", "onboard")), onboardSvc) // with logging
// orgIDResolver is a deprecated type which combines the lookups
// of multiple resources into one type, used to resolve the resources
// associated org ID. It is a stop-gap while we move this behaviour
// off of *kv.Service to aid in reducing the coupling on this type.
orgIDResolver := &resource.OrgIDResolver{
AuthorizationFinder: authSvc,
BucketFinder: ts.BucketService,
OrganizationFinder: ts.OrganizationService,
DashboardFinder: dashboardSvc,
SourceFinder: sourceSvc,
TaskFinder: taskSvc,
TelegrafConfigFinder: telegrafSvc,
VariableFinder: variableSvc,
TargetFinder: scraperTargetSvc,
CheckFinder: checkSvc,
NotificationEndpointFinder: notificationEndpointStore,
NotificationRuleFinder: notificationRuleSvc,
}
m.apibackend = &http.APIBackend{
AssetsPath: m.assetsPath,
HTTPErrorHandler: kithttp.ErrorHandler(0),
@ -1169,7 +1197,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
SecretService: secretSvc,
LookupService: lookupSvc,
DocumentService: m.kvService,
OrgLookupService: m.kvService,
OrgLookupService: orgIDResolver,
WriteEventRecorder: infprom.NewEventRecorder("write"),
QueryEventRecorder: infprom.NewEventRecorder("query"),
Flagger: m.flagger,

View File

@ -16,7 +16,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
influxdbcontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/http"
@ -37,10 +37,10 @@ type TestLauncher struct {
Path string
// Initialized after calling the Setup() helper.
User *platform.User
Org *platform.Organization
Bucket *platform.Bucket
Auth *platform.Authorization
User *influxdb.User
Org *influxdb.Organization
Bucket *influxdb.Bucket
Auth *influxdb.Authorization
httpClient *httpc.Client
@ -127,7 +127,7 @@ func (tl *TestLauncher) ShutdownOrFail(tb testing.TB, ctx context.Context) {
// Setup creates a new user, bucket, org, and auth token.
func (tl *TestLauncher) Setup() error {
results, err := tl.OnBoard(&platform.OnboardingRequest{
results, err := tl.OnBoard(&influxdb.OnboardingRequest{
User: "USER",
Password: "PASSWORD",
Org: "ORG",
@ -153,13 +153,13 @@ func (tl *TestLauncher) SetupOrFail(tb testing.TB) {
// OnBoard attempts an on-boarding request.
// The on-boarding status is also reset to allow multiple user/org/buckets to be created.
func (tl *TestLauncher) OnBoard(req *platform.OnboardingRequest) (*platform.OnboardingResults, error) {
func (tl *TestLauncher) OnBoard(req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) {
return tl.apibackend.OnboardingService.OnboardInitialUser(context.Background(), req)
}
// OnBoardOrFail attempts an on-boarding request or fails on error.
// The on-boarding status is also reset to allow multiple user/org/buckets to be created.
func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingRequest) *platform.OnboardingResults {
func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *influxdb.OnboardingRequest) *influxdb.OnboardingResults {
tb.Helper()
res, err := tl.OnBoard(req)
if err != nil {
@ -169,7 +169,7 @@ func (tl *TestLauncher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingReq
}
// WriteOrFail attempts a write to the organization and bucket identified by to or fails if there is an error.
func (tl *TestLauncher) WriteOrFail(tb testing.TB, to *platform.OnboardingResults, data string) {
func (tl *TestLauncher) WriteOrFail(tb testing.TB, to *influxdb.OnboardingResults, data string) {
tb.Helper()
resp, err := nethttp.DefaultClient.Do(tl.NewHTTPRequestOrFail(tb, "POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", to.Org.ID, to.Bucket.ID), to.Auth.Token, data))
if err != nil {
@ -295,7 +295,7 @@ func (tl *TestLauncher) QueryAndNopConsume(ctx context.Context, req *query.Reque
// FluxQueryOrFail performs a query to the specified organization and returns the results
// or fails if there is an error.
func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *platform.Organization, token string, query string) string {
func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *influxdb.Organization, token string, query string) string {
tb.Helper()
b, err := http.SimpleQuery(tl.URL(), query, org.Name, token)
@ -308,7 +308,7 @@ func (tl *TestLauncher) FluxQueryOrFail(tb testing.TB, org *platform.Organizatio
// QueryFlux returns the csv response from a flux query.
// It also removes all the \r to make it easier to write tests.
func (tl *TestLauncher) QueryFlux(tb testing.TB, org *platform.Organization, token, query string) string {
func (tl *TestLauncher) QueryFlux(tb testing.TB, org *influxdb.Organization, token, query string) string {
tb.Helper()
b, err := http.SimpleQuery(tl.URL(), query, org.Name, token)
@ -367,7 +367,7 @@ func (tl *TestLauncher) BucketService(tb testing.TB) *http.BucketService {
return &http.BucketService{Client: tl.HTTPClient(tb)}
}
func (tl *TestLauncher) CheckService() platform.CheckService {
func (tl *TestLauncher) CheckService() influxdb.CheckService {
return tl.kvService
}
@ -386,11 +386,12 @@ func (tl *TestLauncher) NotificationEndpointService(tb testing.TB) *http.Notific
return http.NewNotificationEndpointService(tl.HTTPClient(tb))
}
func (tl *TestLauncher) NotificationRuleService() platform.NotificationRuleStore {
return tl.kvService
func (tl *TestLauncher) NotificationRuleService(tb testing.TB) influxdb.NotificationRuleStore {
tb.Helper()
return http.NewNotificationRuleService(tl.HTTPClient(tb))
}
func (tl *TestLauncher) OrgService(tb testing.TB) platform.OrganizationService {
func (tl *TestLauncher) OrgService(tb testing.TB) influxdb.OrganizationService {
return tl.kvService
}
@ -398,7 +399,7 @@ func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC {
return &pkger.HTTPRemoteService{Client: tl.HTTPClient(tb)}
}
func (tl *TestLauncher) TaskServiceKV() platform.TaskService {
func (tl *TestLauncher) TaskServiceKV(tb testing.TB) influxdb.TaskService {
return tl.kvService
}
@ -416,7 +417,7 @@ func (tl *TestLauncher) AuthorizationService(tb testing.TB) *http.AuthorizationS
return &http.AuthorizationService{Client: tl.HTTPClient(tb)}
}
func (tl *TestLauncher) TaskService(tb testing.TB) *http.TaskService {
func (tl *TestLauncher) TaskService(tb testing.TB) influxdb.TaskService {
return &http.TaskService{Client: tl.HTTPClient(tb)}
}

View File

@ -13,7 +13,6 @@ import (
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/notification"
"github.com/influxdata/influxdb/v2/notification/check"
@ -833,9 +832,9 @@ func TestLauncher_Pkger(t *testing.T) {
deleteKillCount: 3,
}),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithNotificationRuleSVC(l.NotificationRuleService()),
pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)),
pkger.WithStore(pkger.NewStoreKV(l.Launcher.kvStore)),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTaskSVC(l.TaskServiceKV(t)),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
@ -1134,11 +1133,11 @@ func TestLauncher_Pkger(t *testing.T) {
pkger.WithLabelSVC(l.LabelService(t)),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithNotificationRuleSVC(&fakeRuleStore{
NotificationRuleStore: l.NotificationRuleService(),
NotificationRuleStore: l.NotificationRuleService(t),
createKillCount: 2,
}),
pkger.WithStore(pkger.NewStoreKV(l.Launcher.kvStore)),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTaskSVC(l.TaskServiceKV(t)),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
@ -2319,10 +2318,10 @@ func TestLauncher_Pkger(t *testing.T) {
createKillCount: 2, // hits error on 3rd attempt at creating a mapping
}),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithNotificationRuleSVC(l.NotificationRuleService()),
pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)),
pkger.WithOrganizationService(l.OrganizationService()),
pkger.WithStore(pkger.NewStoreKV(l.kvStore)),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTaskSVC(l.TaskServiceKV(t)),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
@ -2356,13 +2355,13 @@ func TestLauncher_Pkger(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, endpoints)
rules, _, err := l.NotificationRuleService().FindNotificationRules(ctx, influxdb.NotificationRuleFilter{
rules, _, err := l.NotificationRuleService(t).FindNotificationRules(ctx, influxdb.NotificationRuleFilter{
OrgID: &l.Org.ID,
})
require.NoError(t, err)
assert.Empty(t, rules)
tasks, _, err := l.TaskServiceKV().FindTasks(ctx, influxdb.TaskFilter{
tasks, _, err := l.TaskServiceKV(t).FindTasks(ctx, influxdb.TaskFilter{
OrganizationID: &l.Org.ID,
})
require.NoError(t, err)
@ -3445,10 +3444,10 @@ spec:
pkger.WithDashboardSVC(l.DashboardService(t)),
pkger.WithLabelSVC(l.LabelService(t)),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithNotificationRuleSVC(l.NotificationRuleService()),
pkger.WithNotificationRuleSVC(l.NotificationRuleService(t)),
pkger.WithOrganizationService(l.OrganizationService()),
pkger.WithStore(pkger.NewStoreKV(l.kvStore)),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTaskSVC(l.TaskServiceKV(t)),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
@ -4902,7 +4901,7 @@ func (r resourceChecker) mustDeleteLabel(t *testing.T, id influxdb.ID) {
func (r resourceChecker) getRule(t *testing.T, getOpt getResourceOptFn) (influxdb.NotificationRule, error) {
t.Helper()
ruleSVC := r.tl.NotificationRuleService()
ruleSVC := r.tl.NotificationRuleService(t)
var (
rule influxdb.NotificationRule
@ -4944,16 +4943,16 @@ func (r resourceChecker) mustGetRule(t *testing.T, getOpt getResourceOptFn) infl
func (r resourceChecker) mustDeleteRule(t *testing.T, id influxdb.ID) {
t.Helper()
require.NoError(t, r.tl.NotificationRuleService().DeleteNotificationRule(ctx, id))
require.NoError(t, r.tl.NotificationRuleService(t).DeleteNotificationRule(ctx, id))
}
func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Task, error) {
func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (influxdb.Task, error) {
t.Helper()
taskSVC := r.tl.TaskService(t)
var (
task *http.Task
task *influxdb.Task
err error
)
switch opt := getOpt(); {
@ -4963,11 +4962,11 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta
OrganizationID: &r.tl.Org.ID,
})
if err != nil {
return http.Task{}, err
return influxdb.Task{}, err
}
for _, tt := range tasks {
if tt.Name == opt.name {
task = &tasks[0]
task = tasks[0]
break
}
}
@ -4977,13 +4976,13 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta
require.Fail(t, "did not provide a valid get option")
}
if task == nil {
return http.Task{}, errors.New("did not find expected task by name")
return influxdb.Task{}, errors.New("did not find expected task by name")
}
return *task, err
}
func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) http.Task {
func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) influxdb.Task {
t.Helper()
task, err := r.getTask(t, getOpt)

View File

@ -87,7 +87,7 @@ type APIBackend struct {
SecretService influxdb.SecretService
LookupService influxdb.LookupService
ChronografService *server.Service
OrgLookupService authorizer.OrganizationService
OrgLookupService authorizer.OrgIDResolver
DocumentService influxdb.DocumentService
NotificationRuleStore influxdb.NotificationRuleStore
NotificationEndpointService influxdb.NotificationEndpointService

View File

@ -65,13 +65,14 @@ func setup(t *testing.T) (func(auth influxdb.Authorizer) *httptest.Server, func(
if err := ds.CreateDocument(ctx, adoc); err != nil {
panic(err)
}
// Organizations are needed only for creation.
// Need to cleanup for comparison later.
adoc.Organizations = nil
backend := NewMockDocumentBackend(t)
backend.HTTPErrorHandler = http.ErrorHandler(0)
backend.DocumentService = authorizer.NewDocumentService(svc)
backend.LabelService = authorizer.NewLabelServiceWithOrg(svc, svc)
backend.LabelService = authorizer.NewLabelServiceWithOrg(svc, staticOrgIDResolver(org.ID))
serverFn := func(auth influxdb.Authorizer) *httptest.Server {
handler := httpmock.NewAuthMiddlewareHandler(NewDocumentHandler(backend), auth)
return httptest.NewServer(handler)
@ -751,3 +752,9 @@ func DeleteLabel(t *testing.T) {
}
})
}
type staticOrgIDResolver influxdb.ID
func (s staticOrgIDResolver) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) {
return (influxdb.ID)(s), nil
}

View File

@ -163,21 +163,41 @@ type notificationRuleResponse struct {
LastRunError string `json:"LastRunError,omitempty"`
}
type ruleResponseMeta struct {
Labels []influxdb.Label `json:"labels"`
Links notificationRuleLinks `json:"links"`
Status string `json:"status"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
}
func (resp *notificationRuleResponse) UnmarshalJSON(v []byte) (err error) {
var responseMeta ruleResponseMeta
if err = json.Unmarshal(v, &responseMeta); err != nil {
return
}
resp.Labels = responseMeta.Labels
resp.Links = responseMeta.Links
resp.Status = responseMeta.Status
resp.LatestCompleted = responseMeta.LatestCompleted
resp.LatestScheduled = responseMeta.LatestScheduled
resp.LastRunStatus = responseMeta.LastRunStatus
resp.LastRunError = responseMeta.LastRunError
resp.NotificationRule, err = rule.UnmarshalJSON(v)
return
}
func (resp notificationRuleResponse) MarshalJSON() ([]byte, error) {
b1, err := json.Marshal(resp.NotificationRule)
if err != nil {
return nil, err
}
b2, err := json.Marshal(struct {
Labels []influxdb.Label `json:"labels"`
Links notificationRuleLinks `json:"links"`
Status string `json:"status"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
}{
b2, err := json.Marshal(ruleResponseMeta{
Links: resp.Links,
Labels: resp.Labels,
Status: resp.Status,
@ -825,6 +845,7 @@ func (s *NotificationRuleService) FindNotificationRules(ctx context.Context, fil
for _, r := range resp.NotificationRules {
rules = append(rules, r.rule)
}
return rules, len(rules), nil
}

View File

@ -17,6 +17,7 @@ import (
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/pkg/httpc"
"github.com/influxdata/influxdb/v2/task/options"
"go.uber.org/zap"
)
@ -220,6 +221,54 @@ func NewFrontEndTask(t influxdb.Task) Task {
}
}
func convertTask(t Task) *influxdb.Task {
var (
latestCompleted time.Time
createdAt time.Time
updatedAt time.Time
offset time.Duration
)
if t.LatestCompleted != "" {
latestCompleted, _ = time.Parse(time.RFC3339, t.LatestCompleted)
}
if t.CreatedAt != "" {
createdAt, _ = time.Parse(time.RFC3339, t.CreatedAt)
}
if t.UpdatedAt != "" {
updatedAt, _ = time.Parse(time.RFC3339, t.UpdatedAt)
}
if t.Offset != "" {
var duration options.Duration
if err := duration.Parse(t.Offset); err == nil {
offset, _ = duration.DurationFrom(time.Now())
}
}
return &influxdb.Task{
ID: t.ID,
OrganizationID: t.OrganizationID,
Organization: t.Organization,
OwnerID: t.OwnerID,
Name: t.Name,
Description: t.Description,
Status: t.Status,
Flux: t.Flux,
Every: t.Every,
Cron: t.Cron,
Offset: offset,
LatestCompleted: latestCompleted,
LastRunStatus: t.LastRunStatus,
LastRunError: t.LastRunError,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Metadata: t.Metadata,
}
}
func customParseDuration(d time.Duration) string {
str := ""
if d < 0 {
@ -1414,7 +1463,7 @@ type TaskService struct {
}
// FindTaskByID returns a single task
func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, error) {
func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -1424,12 +1473,12 @@ func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, e
return nil, err
}
return &tr.Task, nil
return convertTask(tr.Task), nil
}
// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
// of matching tasks.
func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]Task, int, error) {
func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -1470,15 +1519,15 @@ func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter)
return nil, 0, err
}
tasks := make([]Task, len(tr.Tasks))
tasks := make([]*influxdb.Task, len(tr.Tasks))
for i := range tr.Tasks {
tasks[i] = tr.Tasks[i].Task
tasks[i] = convertTask(tr.Tasks[i].Task)
}
return tasks, len(tasks), nil
}
// CreateTask creates a new task.
func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*Task, error) {
func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var tr taskResponse
@ -1491,11 +1540,11 @@ func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*T
return nil, err
}
return &tr.Task, nil
return convertTask(tr.Task), nil
}
// UpdateTask updates a single task with changeset.
func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*Task, error) {
func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -1507,7 +1556,7 @@ func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxd
return nil, err
}
return &tr.Task, nil
return convertTask(tr.Task), nil
}
// DeleteTask removes a task by ID and purges all associated data and scheduled runs.

194
internal/resource/org_id.go Normal file
View File

@ -0,0 +1,194 @@
package resource
import (
"context"
"fmt"
"github.com/influxdata/influxdb/v2"
)
// OrgIDResolver is a type which combines multiple resource services
// in order to resolve the resources associated org ID.
// Ideally you do not need to use this type, it is mostly a stop-gap
// while we migrate responsibilities off of *kv.Service.
// Consider it deprecated.
type OrgIDResolver struct {
AuthorizationFinder interface {
FindAuthorizationByID(context.Context, influxdb.ID) (*influxdb.Authorization, error)
}
BucketFinder interface {
FindBucketByID(context.Context, influxdb.ID) (*influxdb.Bucket, error)
}
OrganizationFinder interface {
FindOrganizationByID(context.Context, influxdb.ID) (*influxdb.Organization, error)
}
DashboardFinder interface {
FindDashboardByID(context.Context, influxdb.ID) (*influxdb.Dashboard, error)
}
SourceFinder interface {
FindSourceByID(context.Context, influxdb.ID) (*influxdb.Source, error)
}
TaskFinder interface {
FindTaskByID(context.Context, influxdb.ID) (*influxdb.Task, error)
}
TelegrafConfigFinder interface {
FindTelegrafConfigByID(context.Context, influxdb.ID) (*influxdb.TelegrafConfig, error)
}
VariableFinder interface {
FindVariableByID(context.Context, influxdb.ID) (*influxdb.Variable, error)
}
TargetFinder interface {
GetTargetByID(context.Context, influxdb.ID) (*influxdb.ScraperTarget, error)
}
CheckFinder interface {
FindCheckByID(context.Context, influxdb.ID) (influxdb.Check, error)
}
NotificationEndpointFinder interface {
FindNotificationEndpointByID(context.Context, influxdb.ID) (influxdb.NotificationEndpoint, error)
}
NotificationRuleFinder interface {
FindNotificationRuleByID(context.Context, influxdb.ID) (influxdb.NotificationRule, error)
}
}
// FindResourceOrganizationID is used to find the organization that a resource belongs to five the id of a resource and a resource type.
func (o *OrgIDResolver) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) {
switch rt {
case influxdb.AuthorizationsResourceType:
if o.AuthorizationFinder == nil {
break
}
r, err := o.AuthorizationFinder.FindAuthorizationByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrgID, nil
case influxdb.BucketsResourceType:
if o.BucketFinder == nil {
break
}
r, err := o.BucketFinder.FindBucketByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrgID, nil
case influxdb.OrgsResourceType:
if o.OrganizationFinder == nil {
break
}
r, err := o.OrganizationFinder.FindOrganizationByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.ID, nil
case influxdb.DashboardsResourceType:
if o.DashboardFinder == nil {
break
}
r, err := o.DashboardFinder.FindDashboardByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrganizationID, nil
case influxdb.SourcesResourceType:
if o.SourceFinder == nil {
break
}
r, err := o.SourceFinder.FindSourceByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrganizationID, nil
case influxdb.TasksResourceType:
if o.TaskFinder == nil {
break
}
r, err := o.TaskFinder.FindTaskByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrganizationID, nil
case influxdb.TelegrafsResourceType:
if o.TelegrafConfigFinder == nil {
break
}
r, err := o.TelegrafConfigFinder.FindTelegrafConfigByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrgID, nil
case influxdb.VariablesResourceType:
if o.VariableFinder == nil {
break
}
r, err := o.VariableFinder.FindVariableByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrganizationID, nil
case influxdb.ScraperResourceType:
if o.TargetFinder == nil {
break
}
r, err := o.TargetFinder.GetTargetByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrgID, nil
case influxdb.ChecksResourceType:
if o.CheckFinder == nil {
break
}
r, err := o.CheckFinder.FindCheckByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.GetOrgID(), nil
case influxdb.NotificationEndpointResourceType:
if o.NotificationEndpointFinder == nil {
break
}
r, err := o.NotificationEndpointFinder.FindNotificationEndpointByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.GetOrgID(), nil
case influxdb.NotificationRuleResourceType:
if o.NotificationRuleFinder == nil {
break
}
r, err := o.NotificationRuleFinder.FindNotificationRuleByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.GetOrgID(), nil
}
return influxdb.InvalidID(), &influxdb.Error{
Msg: fmt.Sprintf("unsupported resource type %s", rt),
}
}

View File

@ -46,7 +46,7 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error {
telegrafBucket,
telegrafPluginsBucket,
urmBucket,
notificationRuleBucket,
[]byte("notificationRulev1"),
userBucket,
userIndex,
sourceBucket,

View File

@ -1,498 +0,0 @@
package kv
import (
"context"
"encoding/json"
"fmt"
"github.com/influxdata/influxdb/v2/notification/rule"
"go.uber.org/zap"
"github.com/influxdata/influxdb/v2"
)
var (
notificationRuleBucket = []byte("notificationRulev1")
// ErrNotificationRuleNotFound is used when the notification rule is not found.
ErrNotificationRuleNotFound = &influxdb.Error{
Msg: "notification rule not found",
Code: influxdb.ENotFound,
}
// ErrInvalidNotificationRuleID is used when the service was provided
// an invalid ID format.
ErrInvalidNotificationRuleID = &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "provided notification rule ID has invalid format",
}
)
var _ influxdb.NotificationRuleStore = (*Service)(nil)
// UnavailableNotificationRuleStoreError is used if we aren't able to interact with the
// store, it means the store is not available at the moment (e.g. network).
func UnavailableNotificationRuleStoreError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unable to connect to notification rule store service. Please try again; Err: %v", err),
Op: "kv/notificationRule",
}
}
// InternalNotificationRuleStoreError is used when the error comes from an
// internal system.
func InternalNotificationRuleStoreError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unknown internal notificationRule data error; Err: %v", err),
Op: "kv/notificationRule",
}
}
func (s *Service) notificationRuleBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(notificationRuleBucket)
if err != nil {
return nil, UnavailableNotificationRuleStoreError(err)
}
return b, nil
}
// CreateNotificationRule creates a new notification rule and sets b.ID with the new identifier.
func (s *Service) CreateNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
return s.createNotificationRule(ctx, tx, nr, userID)
})
}
func (s *Service) createNotificationRule(ctx context.Context, tx Tx, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error {
id := s.IDGenerator.ID()
nr.SetID(id)
now := s.TimeGenerator.Now()
nr.SetOwnerID(userID)
nr.SetCreatedAt(now)
nr.SetUpdatedAt(now)
t, err := s.createNotificationTask(ctx, tx, nr)
if err != nil {
return err
}
nr.SetTaskID(t.ID)
if err := nr.Valid(); err != nil {
return err
}
if err := nr.Status.Valid(); err != nil {
return err
}
if err := s.putNotificationRule(ctx, tx, nr.NotificationRule); err != nil {
return err
}
urm := &influxdb.UserResourceMapping{
ResourceID: id,
UserID: userID,
UserType: influxdb.Owner,
ResourceType: influxdb.NotificationRuleResourceType,
}
return s.createUserResourceMapping(ctx, tx, urm)
}
func (s *Service) createNotificationTask(ctx context.Context, tx Tx, r influxdb.NotificationRuleCreate) (*influxdb.Task, error) {
ep, err := s.findNotificationEndpointByID(ctx, tx, r.GetEndpointID())
if err != nil {
return nil, err
}
script, err := r.GenerateFlux(ep)
if err != nil {
return nil, err
}
status := string(r.Status)
tc := influxdb.TaskCreate{
Type: r.Type(),
Flux: script,
OwnerID: r.GetOwnerID(),
OrganizationID: r.GetOrgID(),
Status: status,
}
t, err := s.createTask(ctx, tx, tc)
if err != nil {
return nil, err
}
return t, nil
}
func (s *Service) updateNotificationTask(ctx context.Context, tx Tx, r influxdb.NotificationRule, status *string) (*influxdb.Task, error) {
ep, err := s.findNotificationEndpointByID(ctx, tx, r.GetEndpointID())
if err != nil {
return nil, err
}
script, err := r.GenerateFlux(ep)
if err != nil {
return nil, err
}
tu := influxdb.TaskUpdate{
Flux: &script,
Description: strPtr(r.GetDescription()),
Status: status,
}
t, err := s.updateTask(ctx, tx, r.GetTaskID(), tu)
if err != nil {
return nil, err
}
return t, nil
}
// UpdateNotificationRule updates a single notification rule.
// Returns the new notification rule after update.
func (s *Service) UpdateNotificationRule(ctx context.Context, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) {
var err error
var rule influxdb.NotificationRule
err = s.kv.Update(ctx, func(tx Tx) error {
rule, err = s.updateNotificationRule(ctx, tx, id, nr, userID)
return err
})
return rule, err
}
func (s *Service) updateNotificationRule(ctx context.Context, tx Tx, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) {
current, err := s.findNotificationRuleByID(ctx, tx, id)
if err != nil {
return nil, err
}
// ID and OrganizationID can not be updated
nr.SetID(current.GetID())
nr.SetOrgID(current.GetOrgID())
nr.SetOwnerID(current.GetOwnerID())
nr.SetCreatedAt(current.GetCRUDLog().CreatedAt)
nr.SetUpdatedAt(s.TimeGenerator.Now())
nr.SetTaskID(current.GetTaskID())
if err := nr.Valid(); err != nil {
return nil, err
}
if err := nr.Status.Valid(); err != nil {
return nil, err
}
_, err = s.updateNotificationTask(ctx, tx, nr, strPtr(string(nr.Status)))
if err != nil {
return nil, err
}
if err := s.putNotificationRule(ctx, tx, nr.NotificationRule); err != nil {
return nil, err
}
return nr.NotificationRule, nil
}
// PatchNotificationRule updates a single notification rule with changeset.
// Returns the new notification rule state after update.
func (s *Service) PatchNotificationRule(ctx context.Context, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) {
var nr influxdb.NotificationRule
if err := s.kv.Update(ctx, func(tx Tx) (err error) {
nr, err = s.patchNotificationRule(ctx, tx, id, upd)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return nr, nil
}
func (s *Service) patchNotificationRule(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) {
nr, err := s.findNotificationRuleByID(ctx, tx, id)
if err != nil {
return nil, err
}
if upd.Name != nil {
nr.SetName(*upd.Name)
}
if upd.Description != nil {
nr.SetDescription(*upd.Description)
}
var status *string
if upd.Status != nil {
status = strPtr(string(*upd.Status))
}
nr.SetUpdatedAt(s.TimeGenerator.Now())
if err := nr.Valid(); err != nil {
return nil, err
}
_, err = s.updateNotificationTask(ctx, tx, nr, status)
if err != nil {
return nil, err
}
err = s.putNotificationRule(ctx, tx, nr)
if err != nil {
return nil, err
}
return nr, nil
}
// PutNotificationRule put a notification rule to storage.
func (s *Service) PutNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate) error {
return s.kv.Update(ctx, func(tx Tx) (err error) {
if err := nr.Valid(); err != nil {
return err
}
if err := nr.Status.Valid(); err != nil {
return err
}
return s.putNotificationRule(ctx, tx, nr)
})
}
func (s *Service) putNotificationRule(ctx context.Context, tx Tx, nr influxdb.NotificationRule) error {
encodedID, _ := nr.GetID().Encode()
v, err := json.Marshal(nr)
if err != nil {
return err
}
bucket, err := s.notificationRuleBucket(tx)
if err != nil {
return err
}
if err := bucket.Put(encodedID, v); err != nil {
return UnavailableNotificationRuleStoreError(err)
}
return nil
}
// FindNotificationRuleByID returns a single notification rule by ID.
func (s *Service) FindNotificationRuleByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationRule, error) {
var (
nr influxdb.NotificationRule
err error
)
err = s.kv.View(ctx, func(tx Tx) error {
nr, err = s.findNotificationRuleByID(ctx, tx, id)
return err
})
return nr, err
}
func (s *Service) findNotificationRuleByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.NotificationRule, error) {
encID, err := id.Encode()
if err != nil {
return nil, ErrInvalidNotificationRuleID
}
bucket, err := s.notificationRuleBucket(tx)
if err != nil {
return nil, err
}
v, err := bucket.Get(encID)
if IsNotFound(err) {
return nil, ErrNotificationRuleNotFound
}
if err != nil {
return nil, InternalNotificationRuleStoreError(err)
}
return rule.UnmarshalJSON(v)
}
// FindNotificationRules returns a list of notification rules that match filter and the total count of matching notification rules.
// Additional options provide pagination & sorting.
func (s *Service) FindNotificationRules(ctx context.Context, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) (nrs []influxdb.NotificationRule, n int, err error) {
err = s.kv.View(ctx, func(tx Tx) error {
nrs, n, err = s.findNotificationRules(ctx, tx, filter, opt...)
return err
})
return nrs, n, err
}
func (s *Service) findNotificationRules(ctx context.Context, tx Tx, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationRule, int, error) {
nrs := make([]influxdb.NotificationRule, 0)
m, err := s.findUserResourceMappings(ctx, tx, filter.UserResourceMappingFilter)
if err != nil {
return nil, 0, err
}
if len(m) == 0 {
return nrs, 0, nil
}
idMap := make(map[influxdb.ID]bool)
for _, item := range m {
idMap[item.ResourceID] = false
}
if filter.OrgID != nil || filter.Organization != nil {
o, err := s.findOrganization(ctx, tx, influxdb.OrganizationFilter{
ID: filter.OrgID,
Name: filter.Organization,
})
if err != nil {
return nrs, 0, err
}
filter.OrgID = &o.ID
}
var offset, limit, count int
var descending bool
if len(opt) > 0 {
offset = opt[0].Offset
limit = opt[0].Limit
descending = opt[0].Descending
}
filterFn := filterNotificationRulesFn(idMap, filter)
err = s.forEachNotificationRule(ctx, tx, descending, func(nr influxdb.NotificationRule) bool {
if filterFn(nr) {
if count >= offset {
nrs = append(nrs, nr)
}
count++
}
if limit > 0 && len(nrs) >= limit {
return false
}
return true
})
return nrs, len(nrs), err
}
// forEachNotificationRule will iterate through all notification rules while fn returns true.
func (s *Service) forEachNotificationRule(ctx context.Context, tx Tx, descending bool, fn func(influxdb.NotificationRule) bool) error {
bkt, err := s.notificationRuleBucket(tx)
if err != nil {
return err
}
direction := CursorAscending
if descending {
direction = CursorDescending
}
cur, err := bkt.ForwardCursor(nil, WithCursorDirection(direction))
if err != nil {
return err
}
for k, v := cur.Next(); k != nil; k, v = cur.Next() {
nr, err := rule.UnmarshalJSON(v)
if err != nil {
return err
}
if !fn(nr) {
break
}
}
return nil
}
func filterNotificationRulesFn(idMap map[influxdb.ID]bool, filter influxdb.NotificationRuleFilter) func(nr influxdb.NotificationRule) bool {
if filter.OrgID != nil {
return func(nr influxdb.NotificationRule) bool {
if !nr.MatchesTags(filter.Tags) {
return false
}
_, ok := idMap[nr.GetID()]
return nr.GetOrgID() == *filter.OrgID && ok
}
}
return func(nr influxdb.NotificationRule) bool {
if !nr.MatchesTags(filter.Tags) {
return false
}
_, ok := idMap[nr.GetID()]
return ok
}
}
// DeleteNotificationRule removes a notification rule by ID.
func (s *Service) DeleteNotificationRule(ctx context.Context, id influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
return s.deleteNotificationRule(ctx, tx, id)
})
}
func (s *Service) deleteNotificationRule(ctx context.Context, tx Tx, id influxdb.ID) error {
r, err := s.findNotificationRuleByID(ctx, tx, id)
if err != nil {
return err
}
if err := s.deleteTask(ctx, tx, r.GetTaskID()); err != nil {
return err
}
encodedID, err := id.Encode()
if err != nil {
return ErrInvalidNotificationRuleID
}
bucket, err := s.notificationRuleBucket(tx)
if err != nil {
return err
}
_, err = bucket.Get(encodedID)
if IsNotFound(err) {
return ErrNotificationRuleNotFound
}
if err != nil {
return InternalNotificationRuleStoreError(err)
}
if err := bucket.Delete(encodedID); err != nil {
return InternalNotificationRuleStoreError(err)
}
if err := s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: id,
ResourceType: influxdb.NotificationRuleResourceType,
}); err != nil {
// TODO(desa): it is possible that there were no user resource mappings for a resource so this likely shouldn't be a blocking
// condition for deleting a notification rule.
s.log.Info("Failed to remove user resource mappings for notification rule", zap.Error(err), zap.Stringer("rule_id", id))
}
return nil
}

View File

@ -1,93 +0,0 @@
package kv_test
import (
"context"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/query/fluxlang"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func TestBoltNotificationRuleStore(t *testing.T) {
influxdbtesting.NotificationRuleStore(initBoltNotificationRuleStore, t)
}
func initBoltNotificationRuleStore(f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initNotificationRuleStore(s, f, t)
return svc, func() {
closeSvc()
closeBolt()
}
}
func initNotificationRuleStore(s kv.SchemaStore, f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
if f.TimeGenerator == nil {
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
for _, o := range f.Orgs {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
}
for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
}
for _, e := range f.Endpoints {
if err := svc.CreateNotificationEndpoint(ctx, e, 1); err != nil {
t.Fatalf("failed to populate notification endpoint: %v", err)
}
}
for _, nr := range f.NotificationRules {
nrc := influxdb.NotificationRuleCreate{
NotificationRule: nr,
Status: influxdb.Active,
}
if err := svc.PutNotificationRule(ctx, nrc); err != nil {
t.Fatalf("failed to populate notification rule: %v", err)
}
}
for _, c := range f.Tasks {
if _, err := svc.CreateTask(ctx, c); err != nil {
t.Fatalf("failed to populate task: %v", err)
}
}
return svc, func() {
for _, nr := range f.NotificationRules {
if err := svc.DeleteNotificationRule(ctx, nr.GetID()); err != nil {
t.Logf("failed to remove notification rule: %v", err)
}
}
for _, urm := range f.UserResourceMappings {
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
t.Logf("failed to remove urm rule: %v", err)
}
}
for _, o := range f.Orgs {
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
t.Fatalf("failed to remove org: %v", err)
}
}
}
}

View File

@ -717,88 +717,6 @@ func (s *Service) appendOrganizationEventToLog(ctx context.Context, tx Tx, id in
return s.addLogEntry(ctx, tx, k, v, s.Now())
}
// FindResourceOrganizationID is used to find the organization that a resource belongs to five the id of a resource and a resource type.
func (s *Service) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) {
switch rt {
case influxdb.AuthorizationsResourceType:
r, err := s.FindAuthorizationByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrgID, nil
case influxdb.BucketsResourceType:
r, err := s.FindBucketByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrgID, nil
case influxdb.OrgsResourceType:
r, err := s.FindOrganizationByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.ID, nil
case influxdb.DashboardsResourceType:
r, err := s.FindDashboardByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrganizationID, nil
case influxdb.SourcesResourceType:
r, err := s.FindSourceByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrganizationID, nil
case influxdb.TasksResourceType:
r, err := s.FindTaskByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrganizationID, nil
case influxdb.TelegrafsResourceType:
r, err := s.FindTelegrafConfigByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrgID, nil
case influxdb.VariablesResourceType:
r, err := s.FindVariableByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrganizationID, nil
case influxdb.ScraperResourceType:
r, err := s.GetTargetByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.OrgID, nil
case influxdb.ChecksResourceType:
r, err := s.FindCheckByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.GetOrgID(), nil
case influxdb.NotificationEndpointResourceType:
r, err := s.FindNotificationEndpointByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.GetOrgID(), nil
case influxdb.NotificationRuleResourceType:
r, err := s.FindNotificationRuleByID(ctx, id)
if err != nil {
return influxdb.InvalidID(), err
}
return r.GetOrgID(), nil
}
return influxdb.InvalidID(), &influxdb.Error{
Msg: fmt.Sprintf("unsupported resource type %s", rt),
}
}
// OrgAlreadyExistsError is used when creating a new organization with
// a name that has already been used. Organization names must be unique.
func OrgAlreadyExistsError(o *influxdb.Organization) error {

View File

@ -2,7 +2,6 @@ package label
import (
"context"
"errors"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
@ -11,15 +10,15 @@ import (
var _ influxdb.LabelService = (*AuthedLabelService)(nil)
type AuthedLabelService struct {
s influxdb.LabelService
orgSvc authorizer.OrganizationService
s influxdb.LabelService
orgIDResolver authorizer.OrgIDResolver
}
// NewAuthedLabelService constructs an instance of an authorizing label serivce.
func NewAuthedLabelService(s influxdb.LabelService, orgSvc authorizer.OrganizationService) *AuthedLabelService {
func NewAuthedLabelService(s influxdb.LabelService, orgIDResolver authorizer.OrgIDResolver) *AuthedLabelService {
return &AuthedLabelService{
s: s,
orgSvc: orgSvc,
s: s,
orgIDResolver: orgIDResolver,
}
}
func (s *AuthedLabelService) CreateLabel(ctx context.Context, l *influxdb.Label) error {
@ -59,10 +58,7 @@ func (s *AuthedLabelService) FindResourceLabels(ctx context.Context, filter infl
return nil, err
}
if s.orgSvc == nil {
return nil, errors.New("failed to find orgSvc")
}
orgID, err := s.orgSvc.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID)
orgID, err := s.orgIDResolver.FindResourceOrganizationID(ctx, filter.ResourceType, filter.ResourceID)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,497 @@
package service
import (
"context"
"encoding/json"
"fmt"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/notification/rule"
"github.com/influxdata/influxdb/v2/pkg/pointer"
"github.com/influxdata/influxdb/v2/snowflake"
"go.uber.org/zap"
)
var (
notificationRuleBucket = []byte("notificationRulev1")
// ErrNotificationRuleNotFound is used when the notification rule is not found.
ErrNotificationRuleNotFound = &influxdb.Error{
Msg: "notification rule not found",
Code: influxdb.ENotFound,
}
// ErrInvalidNotificationRuleID is used when the service was provided
// an invalid ID format.
ErrInvalidNotificationRuleID = &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "provided notification rule ID has invalid format",
}
)
// RuleService is an implementation of the influxdb CheckService
// It is backed by the kv store abstraction.
type RuleService struct {
log *zap.Logger
kv kv.Store
tasks influxdb.TaskService
orgs influxdb.OrganizationService
endpoints influxdb.NotificationEndpointService
idGenerator influxdb.IDGenerator
timeGenerator influxdb.TimeGenerator
}
// NewRuleService constructs and configures a notification rule service
func NewRuleService(logger *zap.Logger, store kv.Store, tasks influxdb.TaskService, orgs influxdb.OrganizationService, endpoints influxdb.NotificationEndpointService) (*RuleService, error) {
s := &RuleService{
log: logger,
kv: store,
tasks: tasks,
orgs: orgs,
endpoints: endpoints,
timeGenerator: influxdb.RealTimeGenerator{},
idGenerator: snowflake.NewIDGenerator(),
}
ctx := context.Background()
if err := store.Update(ctx, func(tx kv.Tx) error {
return s.initializeNotificationRule(ctx, tx)
}); err != nil {
return nil, err
}
return s, nil
}
var _ influxdb.NotificationRuleStore = (*RuleService)(nil)
func (s *RuleService) initializeNotificationRule(ctx context.Context, tx kv.Tx) error {
if _, err := s.notificationRuleBucket(tx); err != nil {
return err
}
return nil
}
// UnavailableNotificationRuleStoreError is used if we aren't able to interact with the
// store, it means the store is not available at the moment (e.g. network).
func UnavailableNotificationRuleStoreError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unable to connect to notification rule store service. Please try again; Err: %v", err),
Op: "kv/notificationRule",
}
}
// InternalNotificationRuleStoreError is used when the error comes from an
// internal system.
func InternalNotificationRuleStoreError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unknown internal notificationRule data error; Err: %v", err),
Op: "kv/notificationRule",
}
}
func (s *RuleService) notificationRuleBucket(tx kv.Tx) (kv.Bucket, error) {
b, err := tx.Bucket(notificationRuleBucket)
if err != nil {
return nil, UnavailableNotificationRuleStoreError(err)
}
return b, nil
}
// CreateNotificationRule creates a new notification rule and sets b.ID with the new identifier.
func (s *RuleService) CreateNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error {
// set notification rule ID
id := s.idGenerator.ID()
nr.SetID(id)
// set notification rule created / updated times
now := s.timeGenerator.Now()
nr.SetOwnerID(userID)
nr.SetCreatedAt(now)
nr.SetUpdatedAt(now)
// create backing task and set ID (in inactive state initially)
t, err := s.createNotificationTask(ctx, nr)
if err != nil {
return err
}
nr.SetTaskID(t.ID)
if err := s.kv.Update(ctx, func(tx kv.Tx) error {
return s.createNotificationRule(ctx, tx, nr, userID)
}); err != nil {
// remove associated task
if derr := s.tasks.DeleteTask(ctx, t.ID); derr != nil {
s.log.Error("failed to remove task for invalid notification rule", zap.Error(derr))
}
return err
}
// set task to notification rule create status
_, err = s.tasks.UpdateTask(ctx, t.ID, influxdb.TaskUpdate{Status: pointer.String(string(nr.Status))})
return err
}
func (s *RuleService) createNotificationRule(ctx context.Context, tx kv.Tx, nr influxdb.NotificationRuleCreate, userID influxdb.ID) error {
if err := nr.Valid(); err != nil {
return err
}
if err := nr.Status.Valid(); err != nil {
return err
}
return s.putNotificationRule(ctx, tx, nr.NotificationRule)
}
func (s *RuleService) createNotificationTask(ctx context.Context, r influxdb.NotificationRuleCreate) (*influxdb.Task, error) {
ep, err := s.endpoints.FindNotificationEndpointByID(ctx, r.GetEndpointID())
if err != nil {
return nil, err
}
script, err := r.GenerateFlux(ep)
if err != nil {
return nil, err
}
tc := influxdb.TaskCreate{
Type: r.Type(),
Flux: script,
OwnerID: r.GetOwnerID(),
OrganizationID: r.GetOrgID(),
// create task initially in inactive status
Status: string(influxdb.Inactive),
}
t, err := s.tasks.CreateTask(ctx, tc)
if err != nil {
return nil, err
}
return t, nil
}
// UpdateNotificationRule updates a single notification rule.
// Returns the new notification rule after update.
func (s *RuleService) UpdateNotificationRule(ctx context.Context, id influxdb.ID, nr influxdb.NotificationRuleCreate, userID influxdb.ID) (influxdb.NotificationRule, error) {
rule, err := s.FindNotificationRuleByID(ctx, id)
if err != nil {
return nil, err
}
// ID and OrganizationID can not be updated
nr.SetID(rule.GetID())
nr.SetOrgID(rule.GetOrgID())
nr.SetOwnerID(rule.GetOwnerID())
nr.SetCreatedAt(rule.GetCRUDLog().CreatedAt)
nr.SetUpdatedAt(s.timeGenerator.Now())
nr.SetTaskID(rule.GetTaskID())
if err := nr.Valid(); err != nil {
return nil, err
}
if err := nr.Status.Valid(); err != nil {
return nil, err
}
_, err = s.updateNotificationTask(ctx, nr, pointer.String(string(nr.Status)))
if err != nil {
return nil, err
}
err = s.kv.Update(ctx, func(tx kv.Tx) error {
return s.putNotificationRule(ctx, tx, nr.NotificationRule)
})
return nr.NotificationRule, err
}
func (s *RuleService) updateNotificationTask(ctx context.Context, r influxdb.NotificationRule, status *string) (*influxdb.Task, error) {
ep, err := s.endpoints.FindNotificationEndpointByID(ctx, r.GetEndpointID())
if err != nil {
return nil, err
}
script, err := r.GenerateFlux(ep)
if err != nil {
return nil, err
}
tu := influxdb.TaskUpdate{
Flux: &script,
Description: pointer.String(r.GetDescription()),
Status: status,
}
t, err := s.tasks.UpdateTask(ctx, r.GetTaskID(), tu)
if err != nil {
return nil, err
}
return t, nil
}
// PatchNotificationRule updates a single notification rule with changeset.
// Returns the new notification rule state after update.
func (s *RuleService) PatchNotificationRule(ctx context.Context, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) {
nr, err := s.FindNotificationRuleByID(ctx, id)
if err != nil {
return nil, err
}
if upd.Name != nil {
nr.SetName(*upd.Name)
}
if upd.Description != nil {
nr.SetDescription(*upd.Description)
}
var status *string
if upd.Status != nil {
status = pointer.String(string(*upd.Status))
}
nr.SetUpdatedAt(s.timeGenerator.Now())
if err := nr.Valid(); err != nil {
return nil, err
}
_, err = s.updateNotificationTask(ctx, nr, status)
if err != nil {
return nil, err
}
if err := s.kv.Update(ctx, func(tx kv.Tx) (err error) {
return s.putNotificationRule(ctx, tx, nr)
}); err != nil {
return nil, err
}
return nr, nil
}
// PutNotificationRule put a notification rule to storage.
func (s *RuleService) PutNotificationRule(ctx context.Context, nr influxdb.NotificationRuleCreate) error {
return s.kv.Update(ctx, func(tx kv.Tx) (err error) {
if err := nr.Valid(); err != nil {
return err
}
if err := nr.Status.Valid(); err != nil {
return err
}
return s.putNotificationRule(ctx, tx, nr)
})
}
func (s *RuleService) putNotificationRule(ctx context.Context, tx kv.Tx, nr influxdb.NotificationRule) error {
encodedID, _ := nr.GetID().Encode()
v, err := json.Marshal(nr)
if err != nil {
return err
}
bucket, err := s.notificationRuleBucket(tx)
if err != nil {
return err
}
if err := bucket.Put(encodedID, v); err != nil {
return UnavailableNotificationRuleStoreError(err)
}
return nil
}
// FindNotificationRuleByID returns a single notification rule by ID.
func (s *RuleService) FindNotificationRuleByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationRule, error) {
var (
nr influxdb.NotificationRule
err error
)
err = s.kv.View(ctx, func(tx kv.Tx) error {
nr, err = s.findNotificationRuleByID(ctx, tx, id)
return err
})
return nr, err
}
func (s *RuleService) findNotificationRuleByID(ctx context.Context, tx kv.Tx, id influxdb.ID) (influxdb.NotificationRule, error) {
encID, err := id.Encode()
if err != nil {
return nil, ErrInvalidNotificationRuleID
}
bucket, err := s.notificationRuleBucket(tx)
if err != nil {
return nil, err
}
v, err := bucket.Get(encID)
if kv.IsNotFound(err) {
return nil, ErrNotificationRuleNotFound
}
if err != nil {
return nil, InternalNotificationRuleStoreError(err)
}
return rule.UnmarshalJSON(v)
}
// FindNotificationRules returns a list of notification rules that match filter and the total count of matching notification rules.
// Additional options provide pagination & sorting.
func (s *RuleService) FindNotificationRules(ctx context.Context, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) (nrs []influxdb.NotificationRule, n int, err error) {
if filter.OrgID == nil && filter.Organization != nil {
o, err := s.orgs.FindOrganization(ctx, influxdb.OrganizationFilter{
Name: filter.Organization,
})
if err != nil {
return nrs, 0, err
}
filter.OrgID = &o.ID
}
err = s.kv.View(ctx, func(tx kv.Tx) error {
nrs, n, err = s.findNotificationRules(ctx, tx, filter, opt...)
return err
})
return nrs, n, err
}
func (s *RuleService) findNotificationRules(ctx context.Context, tx kv.Tx, filter influxdb.NotificationRuleFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationRule, int, error) {
var (
nrs = make([]influxdb.NotificationRule, 0)
offset int
limit int
count int
descending bool
)
if len(opt) > 0 {
offset = opt[0].Offset
limit = opt[0].Limit
descending = opt[0].Descending
}
filterFn := filterNotificationRulesFn(filter)
err := s.forEachNotificationRule(ctx, tx, descending, func(nr influxdb.NotificationRule) bool {
if filterFn(nr) {
if count >= offset {
nrs = append(nrs, nr)
}
count++
}
if limit > 0 && len(nrs) >= limit {
return false
}
return true
})
return nrs, len(nrs), err
}
// forEachNotificationRule will iterate through all notification rules while fn returns true.
func (s *RuleService) forEachNotificationRule(ctx context.Context, tx kv.Tx, descending bool, fn func(influxdb.NotificationRule) bool) error {
bkt, err := s.notificationRuleBucket(tx)
if err != nil {
return err
}
direction := kv.CursorAscending
if descending {
direction = kv.CursorDescending
}
cur, err := bkt.ForwardCursor(nil, kv.WithCursorDirection(direction))
if err != nil {
return err
}
for k, v := cur.Next(); k != nil; k, v = cur.Next() {
nr, err := rule.UnmarshalJSON(v)
if err != nil {
return err
}
if !fn(nr) {
break
}
}
return nil
}
func filterNotificationRulesFn(filter influxdb.NotificationRuleFilter) func(nr influxdb.NotificationRule) bool {
if filter.OrgID != nil {
return func(nr influxdb.NotificationRule) bool {
if !nr.MatchesTags(filter.Tags) {
return false
}
return nr.GetOrgID() == *filter.OrgID
}
}
return func(nr influxdb.NotificationRule) bool {
return nr.MatchesTags(filter.Tags)
}
}
// DeleteNotificationRule removes a notification rule by ID.
func (s *RuleService) DeleteNotificationRule(ctx context.Context, id influxdb.ID) error {
r, err := s.FindNotificationRuleByID(ctx, id)
if err != nil {
return err
}
if err := s.tasks.DeleteTask(ctx, r.GetTaskID()); err != nil {
return err
}
return s.kv.Update(ctx, func(tx kv.Tx) error {
return s.deleteNotificationRule(ctx, tx, r)
})
}
func (s *RuleService) deleteNotificationRule(ctx context.Context, tx kv.Tx, r influxdb.NotificationRule) error {
encodedID, err := r.GetID().Encode()
if err != nil {
return ErrInvalidNotificationRuleID
}
bucket, err := s.notificationRuleBucket(tx)
if err != nil {
return err
}
_, err = bucket.Get(encodedID)
if kv.IsNotFound(err) {
return ErrNotificationRuleNotFound
}
if err != nil {
return InternalNotificationRuleStoreError(err)
}
if err := bucket.Delete(encodedID); err != nil {
return InternalNotificationRuleStoreError(err)
}
return nil
}

View File

@ -0,0 +1,162 @@
package service
import (
"context"
"errors"
"io/ioutil"
"os"
"testing"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
_ "github.com/influxdata/influxdb/v2/query/builtin"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/tenant"
"go.uber.org/zap/zaptest"
)
func TestInmemNotificationRuleStore(t *testing.T) {
NotificationRuleStore(initInmemNotificationRuleStore, t)
}
func initInmemNotificationRuleStore(f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) {
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
t.Fatal(err)
}
svc, tsvc, closeSvc := initNotificationRuleStore(store, f, t)
return svc, tsvc, func() {
closeSvc()
}
}
func initBoltNotificationRuleStore(f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) {
store, closeBolt, err := newTestBoltStore(t)
if err != nil {
t.Fatal(err)
}
svc, tsvc, closeSvc := initNotificationRuleStore(store, f, t)
return svc, tsvc, func() {
closeSvc()
closeBolt()
}
}
func TestBoltNotificationRuleStore(t *testing.T) {
NotificationRuleStore(initBoltNotificationRuleStore, t)
}
func initNotificationRuleStore(s kv.Store, f NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, influxdb.TaskService, func()) {
logger := zaptest.NewLogger(t)
kvsvc := kv.NewService(logger, s, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
kvsvc.IDGenerator = f.IDGenerator
kvsvc.TimeGenerator = f.TimeGenerator
if f.TimeGenerator == nil {
kvsvc.TimeGenerator = influxdb.RealTimeGenerator{}
}
var (
tenantStore = tenant.NewStore(s)
tenantSvc = tenant.NewService(tenantStore)
)
svc, err := NewRuleService(logger, s, kvsvc, tenantSvc, kvsvc)
if err != nil {
t.Fatal(err)
}
svc.idGenerator = f.IDGenerator
if f.TimeGenerator != nil {
svc.timeGenerator = f.TimeGenerator
}
ctx := context.Background()
for _, o := range f.Orgs {
withOrgID(tenantStore, o.ID, func() {
if err := tenantSvc.CreateOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
})
}
for _, e := range f.Endpoints {
if err := kvsvc.CreateNotificationEndpoint(ctx, e, 1); err != nil {
t.Fatalf("failed to populate notification endpoint: %v", err)
}
}
for _, nr := range f.NotificationRules {
nrc := influxdb.NotificationRuleCreate{
NotificationRule: nr,
Status: influxdb.Active,
}
if err := svc.PutNotificationRule(ctx, nrc); err != nil {
t.Fatalf("failed to populate notification rule: %v", err)
}
}
for _, c := range f.Tasks {
if _, err := kvsvc.CreateTask(ctx, c); err != nil {
t.Fatalf("failed to populate task: %v", err)
}
}
return svc, kvsvc, func() {
for _, nr := range f.NotificationRules {
if err := svc.DeleteNotificationRule(ctx, nr.GetID()); err != nil {
t.Logf("failed to remove notification rule: %v", err)
}
}
for _, o := range f.Orgs {
if err := tenantSvc.DeleteOrganization(ctx, o.ID); err != nil {
t.Fatalf("failed to remove org: %v", err)
}
}
}
}
func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) {
backup := store.OrgIDGen
defer func() { store.OrgIDGen = backup }()
store.OrgIDGen = mock.NewStaticIDGenerator(orgID)
fn()
}
func newTestBoltStore(t *testing.T) (kv.SchemaStore, func(), error) {
f, err := ioutil.TempFile("", "influxdata-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()
ctx := context.Background()
logger := zaptest.NewLogger(t)
path := f.Name()
// skip fsync to improve test performance
s := bolt.NewKVStore(logger, path, bolt.WithNoSync)
if err := s.Open(context.Background()); err != nil {
return nil, nil, err
}
if err := all.Up(ctx, logger, s); err != nil {
return nil, nil, err
}
close := func() {
s.Close()
os.Remove(path)
}
return s, close, nil
}

View File

@ -3536,10 +3536,11 @@ func (r *rollbackCoordinator) runTilEnd(ctx context.Context, orgID, userID influ
defer cancel()
defer func() {
if recover() != nil {
if err := recover(); err != nil {
r.logger.Error(
"panic applying "+resource,
zap.String("stack_trace", fmt.Sprintf("%+v", stack.Trace())),
zap.Reflect("panic", err),
)
errStr.add(errMsg{
resource: resource,

View File

@ -27,7 +27,6 @@ type NotificationEndpointFields struct {
var timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)}
var timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)}
var time3 = time.Date(2006, time.July, 15, 5, 23, 53, 10, time.UTC)
var notificationEndpointCmpOptions = cmp.Options{
cmp.Transformer("Sort", func(in []influxdb.NotificationEndpoint) []influxdb.NotificationEndpoint {