Merge pull request #14714 from influxdata/create_task_in_stack

chore(http): split taskservice from check and notification
pull/14719/head
kelwang 2019-08-20 11:20:15 -04:00 committed by GitHub
commit 99cebbbdc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 161 additions and 161 deletions

View File

@ -43,7 +43,6 @@ type CheckService interface {
UserResourceMappingService
// OrganizationService is needed for search filter
OrganizationService
TaskService
// FindCheckByID returns a single check by ID.
FindCheckByID(ctx context.Context, id ID) (Check, error)

View File

@ -167,6 +167,7 @@ func NewAPIHandler(b *APIBackend) *APIHandler {
notificationRuleBackend := NewNotificationRuleBackend(b)
notificationRuleBackend.NotificationRuleStore = authorizer.NewNotificationRuleStore(b.NotificationRuleStore,
b.UserResourceMappingService, b.OrganizationService)
notificationRuleBackend.TaskService = b.TaskService
h.NotificationRuleHandler = NewNotificationRuleHandler(notificationRuleBackend)
notificationEndpointBackend := NewNotificationEndpointBackend(b)
@ -177,6 +178,7 @@ func NewAPIHandler(b *APIBackend) *APIHandler {
checkBackend := NewCheckBackend(b)
checkBackend.CheckService = authorizer.NewCheckService(b.CheckService,
b.UserResourceMappingService, b.OrganizationService)
checkBackend.TaskService = b.TaskService
h.CheckHandler = NewCheckHandler(checkBackend)
writeBackend := NewWriteBackend(b)

View File

@ -21,6 +21,7 @@ type CheckBackend struct {
Logger *zap.Logger
CheckService influxdb.CheckService
TaskService influxdb.TaskService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
@ -48,6 +49,7 @@ type CheckHandler struct {
Logger *zap.Logger
CheckService influxdb.CheckService
TaskService influxdb.TaskService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
@ -73,6 +75,7 @@ func NewCheckHandler(b *CheckBackend) *CheckHandler {
Logger: b.Logger,
CheckService: b.CheckService,
TaskService: b.TaskService,
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
@ -378,6 +381,41 @@ func decodePatchCheckRequest(ctx context.Context, r *http.Request) (*patchCheckR
return req, nil
}
func createCheckTask(ctx context.Context, s influxdb.TaskService, c influxdb.Check) error {
if c.GetStatus() == influxdb.Inactive {
return nil
}
script, err := c.GenerateFlux()
if err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
tc := influxdb.TaskCreate{
Type: c.Type(),
Flux: script,
OwnerID: c.GetOwnerID(),
OrganizationID: c.GetOrgID(),
}
t, err := s.CreateTask(ctx, tc)
if err != nil {
return err
}
c.SetTaskID(t.ID)
return nil
}
func removeCheckTask(ctx context.Context, s influxdb.TaskService, c influxdb.Check) error {
err := s.DeleteTask(ctx, c.GetTaskID())
if err != nil {
return err
}
return nil
}
// handlePostCheck is the HTTP handler for the POST /api/v2/checks route.
func (h *CheckHandler) handlePostCheck(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
@ -388,6 +426,10 @@ func (h *CheckHandler) handlePostCheck(w http.ResponseWriter, r *http.Request) {
h.HandleHTTPError(ctx, err, w)
return
}
if err := createCheckTask(ctx, h.TaskService, chk); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
@ -418,6 +460,16 @@ func (h *CheckHandler) handlePutCheck(w http.ResponseWriter, r *http.Request) {
return
}
if err = removeCheckTask(ctx, h.TaskService, chk); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err = createCheckTask(ctx, h.TaskService, chk); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
chk, err = h.CheckService.UpdateCheck(ctx, chk.GetID(), chk)
if err != nil {
h.HandleHTTPError(ctx, err, w)
@ -448,6 +500,18 @@ func (h *CheckHandler) handlePatchCheck(w http.ResponseWriter, r *http.Request)
return
}
if req.Update.Status != nil && *req.Update.Status == influxdb.Inactive {
chk, err := h.CheckService.FindCheckByID(ctx, req.ID)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err = removeCheckTask(ctx, h.TaskService, chk); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
chk, err := h.CheckService.PatchCheck(ctx, req.ID, req.Update)
if err != nil {
h.HandleHTTPError(ctx, err, w)

View File

@ -28,6 +28,7 @@ func NewMockCheckBackend() *CheckBackend {
Logger: zap.NewNop().With(zap.String("handler", "check")),
CheckService: mock.NewCheckService(),
TaskService: &mock.TaskService{},
UserResourceMappingService: mock.NewUserResourceMappingService(),
LabelService: mock.NewLabelService(),
UserService: mock.NewUserService(),
@ -468,6 +469,7 @@ func TestService_handleGetCheck(t *testing.T) {
func TestService_handlePostCheck(t *testing.T) {
type fields struct {
CheckService influxdb.CheckService
TaskService influxdb.TaskService
OrganizationService influxdb.OrganizationService
}
type args struct {
@ -489,6 +491,11 @@ func TestService_handlePostCheck(t *testing.T) {
{
name: "create a new check",
fields: fields{
TaskService: &mock.TaskService{
CreateTaskFn: func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
return &influxdb.Task{ID: 3}, nil
},
},
CheckService: &mock.CheckService{
CreateCheckFn: func(ctx context.Context, c influxdb.Check, userID influxdb.ID) error {
c.SetID(influxTesting.MustIDBase16("020f755c3c082000"))
@ -582,6 +589,7 @@ func TestService_handlePostCheck(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
checkBackend := NewMockCheckBackend()
checkBackend.TaskService = tt.fields.TaskService
checkBackend.CheckService = tt.fields.CheckService
checkBackend.OrganizationService = tt.fields.OrganizationService
h := NewCheckHandler(checkBackend)
@ -891,6 +899,7 @@ func TestService_handlePatchCheck(t *testing.T) {
func TestService_handleUpdateCheck(t *testing.T) {
type fields struct {
TaskService influxdb.TaskService
CheckService influxdb.CheckService
}
type args struct {
@ -912,7 +921,15 @@ func TestService_handleUpdateCheck(t *testing.T) {
{
name: "update a check name",
fields: fields{
&mock.CheckService{
TaskService: &mock.TaskService{
CreateTaskFn: func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
return &influxdb.Task{ID: 3}, nil
},
DeleteTaskFn: func(ctx context.Context, id influxdb.ID) error {
return nil
},
},
CheckService: &mock.CheckService{
UpdateCheckFn: func(ctx context.Context, id influxdb.ID, chk influxdb.Check) (influxdb.Check, error) {
if id == influxTesting.MustIDBase16("020f755c3c082000") {
d := &check.Deadman{
@ -991,7 +1008,15 @@ func TestService_handleUpdateCheck(t *testing.T) {
{
name: "check not found",
fields: fields{
&mock.CheckService{
TaskService: &mock.TaskService{
CreateTaskFn: func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
return &influxdb.Task{ID: 3}, nil
},
DeleteTaskFn: func(ctx context.Context, id influxdb.ID) error {
return nil
},
},
CheckService: &mock.CheckService{
UpdateCheckFn: func(ctx context.Context, id influxdb.ID, chk influxdb.Check) (influxdb.Check, error) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
@ -1018,6 +1043,7 @@ func TestService_handleUpdateCheck(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
checkBackend := NewMockCheckBackend()
checkBackend.HTTPErrorHandler = ErrorHandler(0)
checkBackend.TaskService = tt.fields.TaskService
checkBackend.CheckService = tt.fields.CheckService
h := NewCheckHandler(checkBackend)

View File

@ -21,6 +21,7 @@ type NotificationRuleBackend struct {
Logger *zap.Logger
NotificationRuleStore influxdb.NotificationRuleStore
TaskService influxdb.TaskService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
@ -34,6 +35,7 @@ func NewNotificationRuleBackend(b *APIBackend) *NotificationRuleBackend {
Logger: b.Logger.With(zap.String("handler", "notification_rule")),
NotificationRuleStore: b.NotificationRuleStore,
TaskService: b.TaskService,
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
@ -48,6 +50,7 @@ type NotificationRuleHandler struct {
Logger *zap.Logger
NotificationRuleStore influxdb.NotificationRuleStore
TaskService influxdb.TaskService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
@ -73,6 +76,7 @@ func NewNotificationRuleHandler(b *NotificationRuleBackend) *NotificationRuleHan
Logger: b.Logger,
NotificationRuleStore: b.NotificationRuleStore,
TaskService: b.TaskService,
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
@ -135,6 +139,43 @@ type notificationRuleResponse struct {
Links notificationRuleLinks `json:"links"`
}
func createRuleTask(ctx context.Context,
s influxdb.TaskService,
nr influxdb.NotificationRule) error {
if nr.GetStatus() == influxdb.Inactive {
return nil
}
script, err := nr.GenerateFlux(nil)
if err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
tc := influxdb.TaskCreate{
Type: nr.Type(),
Flux: script,
OwnerID: nr.GetOwnerID(),
OrganizationID: nr.GetOrgID(),
}
t, err := s.CreateTask(ctx, tc)
if err != nil {
return err
}
nr.SetTaskID(t.ID)
return nil
}
func removeRuleTask(ctx context.Context, s influxdb.TaskService, nr influxdb.NotificationRule) error {
err := s.DeleteTask(ctx, nr.GetTaskID())
if err != nil {
return err
}
return nil
}
func (resp notificationRuleResponse) MarshalJSON() ([]byte, error) {
b1, err := json.Marshal(resp.NotificationRule)
if err != nil {
@ -412,6 +453,11 @@ func (h *NotificationRuleHandler) handlePostNotificationRule(w http.ResponseWrit
h.HandleHTTPError(ctx, err, w)
return
}
if err = createRuleTask(ctx, h.TaskService, nr); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
h.HandleHTTPError(ctx, err, w)
@ -440,6 +486,15 @@ func (h *NotificationRuleHandler) handlePutNotificationRule(w http.ResponseWrite
h.HandleHTTPError(ctx, err, w)
return
}
if err = removeRuleTask(ctx, h.TaskService, nr); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err = createRuleTask(ctx, h.TaskService, nr); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
h.HandleHTTPError(ctx, err, w)
@ -475,6 +530,17 @@ func (h *NotificationRuleHandler) handlePatchNotificationRule(w http.ResponseWri
h.HandleHTTPError(ctx, err, w)
return
}
if req.Update.Status != nil && *req.Update.Status == influxdb.Inactive {
nr, err := h.NotificationRuleStore.FindNotificationRuleByID(ctx, req.ID)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err = removeRuleTask(ctx, h.TaskService, nr); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
nr, err := h.NotificationRuleStore.PatchNotificationRule(ctx, req.ID, req.Update)
if err != nil {

View File

@ -323,13 +323,6 @@ func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.Check, user
c.SetCreatedAt(s.Now())
c.SetUpdatedAt(s.Now())
t, err := s.createCheckTask(ctx, tx, c)
if err != nil {
return err
}
c.SetTaskID(t.ID)
if err := s.putCheck(ctx, tx, c); err != nil {
return err
}
@ -340,27 +333,6 @@ func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.Check, user
return nil
}
func (s *Service) createCheckTask(ctx context.Context, tx Tx, c influxdb.Check) (*influxdb.Task, error) {
script, err := c.GenerateFlux()
if err != nil {
return nil, err
}
tc := influxdb.TaskCreate{
Type: c.Type(),
Flux: script,
OwnerID: c.GetOwnerID(),
OrganizationID: c.GetOrgID(),
}
t, err := s.createTask(ctx, tx, tc)
if err != nil {
return nil, err
}
return t, nil
}
// PutCheck will put a check without setting an ID.
func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error {
return s.kv.Update(ctx, func(tx Tx) error {
@ -571,19 +543,9 @@ func (s *Service) updateCheck(ctx context.Context, tx Tx, id influxdb.ID, chk in
}
}
if err := s.deleteTask(ctx, tx, chk.GetTaskID()); err != nil {
return nil, err
}
chk.SetOwnerID(current.GetOwnerID())
t, err := s.createCheckTask(ctx, tx, chk)
if err != nil {
return nil, err
}
// ID and OrganizationID can not be updated
chk.SetTaskID(t.ID)
chk.SetID(current.GetID())
chk.SetOrgID(current.GetOrgID())
chk.SetCreatedAt(current.GetCRUDLog().CreatedAt)
@ -699,10 +661,6 @@ func (s *Service) deleteCheck(ctx context.Context, tx Tx, id influxdb.ID) error
return pe
}
if err := s.deleteTask(ctx, tx, c.GetTaskID()); err != nil {
return err
}
key, pe := checkIndexKey(c.GetOrgID(), c.GetName())
if pe != nil {
return pe

View File

@ -70,11 +70,6 @@ func initCheckService(s kv.Store, f influxdbtesting.CheckFields, t *testing.T) (
t.Fatalf("failed to populate checks")
}
}
for _, tc := range f.Tasks {
if _, err := svc.CreateTask(ctx, tc); err != nil {
t.Fatalf("failed to populate tasks: %v", err)
}
}
return svc, kv.OpPrefix, func() {
for _, o := range f.Organizations {
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {

View File

@ -79,13 +79,6 @@ func (s *Service) createNotificationRule(ctx context.Context, tx Tx, nr influxdb
nr.SetCreatedAt(now)
nr.SetUpdatedAt(now)
t, err := s.createNotificationTask(ctx, tx, nr)
if err != nil {
return err
}
nr.SetTaskID(t.ID)
if err := s.putNotificationRule(ctx, tx, nr); err != nil {
return err
}
@ -99,34 +92,6 @@ func (s *Service) createNotificationRule(ctx context.Context, tx Tx, nr influxdb
return s.createUserResourceMapping(ctx, tx, urm)
}
func (s *Service) createNotificationTask(ctx context.Context, tx Tx, r influxdb.NotificationRule) (*influxdb.Task, error) {
// TODO(desa): figure out what to do about this
//ep, _, _, err := s.findNotificationEndpointByID(ctx, tx, r.GetEndpointID())
//if err != nil {
// return nil, err
//}
// TODO(desa): pass in non nil notification endpoint.
script, err := r.GenerateFlux(nil)
if err != nil {
return nil, err
}
tc := influxdb.TaskCreate{
Type: r.Type(),
Flux: script,
OwnerID: r.GetOwnerID(),
OrganizationID: r.GetOrgID(),
}
t, err := s.createTask(ctx, tx, tc)
if err != nil {
return nil, err
}
return t, nil
}
// UpdateNotificationRule updates a single notification rule.
// Returns the new notification rule after update.
func (s *Service) UpdateNotificationRule(ctx context.Context, id influxdb.ID, nr influxdb.NotificationRule, userID influxdb.ID) (influxdb.NotificationRule, error) {
@ -153,17 +118,6 @@ func (s *Service) updateNotificationRule(ctx context.Context, tx Tx, id influxdb
nr.SetUpdatedAt(s.TimeGenerator.Now())
nr.SetTaskID(current.GetTaskID())
if err := s.deleteTask(ctx, tx, nr.GetTaskID()); err != nil {
return nil, err
}
t, err := s.createNotificationTask(ctx, tx, nr)
if err != nil {
return nil, err
}
nr.SetTaskID(t.ID)
if err := s.putNotificationRule(ctx, tx, nr); err != nil {
return nil, err
}
@ -406,15 +360,11 @@ func (s *Service) DeleteNotificationRule(ctx context.Context, id influxdb.ID) er
}
func (s *Service) deleteNotificationRule(ctx context.Context, tx Tx, id influxdb.ID) error {
r, err := s.findNotificationRuleByID(ctx, tx, id)
_, err := s.findNotificationRuleByID(ctx, tx, id)
if err != nil {
return err
}
if err := s.deleteTask(ctx, tx, r.GetTaskID()); err != nil {
return err
}
encodedID, err := id.Encode()
if err != nil {
return ErrInvalidNotificationRuleID

View File

@ -74,12 +74,6 @@ func initNotificationRuleStore(s kv.Store, f influxdbtesting.NotificationRuleFie
}
}
for _, c := range f.Tasks {
if _, err := svc.CreateTask(ctx, c); err != nil {
t.Fatalf("failed to populate task: %v", err)
}
}
return svc, func() {
for _, nr := range f.NotificationRules {
if err := svc.DeleteNotificationRule(ctx, nr.GetID()); err != nil {

View File

@ -11,7 +11,6 @@ import (
type CheckService struct {
OrganizationService
UserResourceMappingService
TaskService
// Methods for an influxdb.CheckService
FindCheckByIDFn func(context.Context, influxdb.ID) (influxdb.Check, error)

View File

@ -111,7 +111,6 @@ type CheckFields struct {
Checks []influxdb.Check
Organizations []*influxdb.Organization
UserResourceMappings []*influxdb.UserResourceMapping
Tasks []influxdb.TaskCreate
}
type checkServiceF func(
@ -869,14 +868,6 @@ func DeleteCheck(
ID: MustIDBase16(orgOneID),
},
},
Tasks: []influxdb.TaskCreate{
{
Flux: `option task = { every: 10s, name: "foo" }
data = from(bucket: "telegraf") |> range(start: -1m)`,
OrganizationID: MustIDBase16(orgOneID),
OwnerID: MustIDBase16(sixID),
},
},
Checks: []influxdb.Check{
deadman1,
threshold1,
@ -901,14 +892,6 @@ data = from(bucket: "telegraf") |> range(start: -1m)`,
ID: MustIDBase16(orgOneID),
},
},
Tasks: []influxdb.TaskCreate{
{
Flux: `option task = { every: 10s, name: "foo" }
data = from(bucket: "telegraf") |> range(start: -1m)`,
OrganizationID: MustIDBase16(orgOneID),
OwnerID: MustIDBase16(sixID),
},
},
Checks: []influxdb.Check{
deadman1,
threshold1,
@ -1105,14 +1088,6 @@ func UpdateCheck(
Checks: []influxdb.Check{
deadman1,
},
Tasks: []influxdb.TaskCreate{
{
Flux: `option task = { every: 10s, name: "foo" }
data = from(bucket: "telegraf") |> range(start: -1m)`,
OrganizationID: MustIDBase16(orgOneID),
OwnerID: MustIDBase16(sixID),
},
},
},
args: args{
id: MustIDBase16(checkOneID),

View File

@ -20,7 +20,6 @@ type NotificationRuleFields struct {
NotificationRules []influxdb.NotificationRule
Orgs []*influxdb.Organization
UserResourceMappings []*influxdb.UserResourceMapping
Tasks []influxdb.TaskCreate
}
var notificationRuleCmpOptions = cmp.Options{
@ -1397,15 +1396,6 @@ func UpdateNotificationRule(
ResourceType: influxdb.NotificationRuleResourceType,
},
},
Tasks: []influxdb.TaskCreate{
{
OwnerID: MustIDBase16(sixID),
OrganizationID: MustIDBase16(fourID),
Flux: `from(bucket: "foo") |> range(start: -1m)
option task = {name: "bar", every: 1m}
`,
},
},
Orgs: []*influxdb.Organization{
{
ID: MustIDBase16(fourID),
@ -1868,15 +1858,6 @@ func DeleteNotificationRule(
ResourceType: influxdb.NotificationRuleResourceType,
},
},
Tasks: []influxdb.TaskCreate{
{
OwnerID: MustIDBase16(sixID),
OrganizationID: MustIDBase16(fourID),
Flux: `from(bucket: "foo") |> range(start: -1m)
option task = {name: "bar", every: 1m}
`,
},
},
IDGenerator: mock.NewIDGenerator(twoID, t),
Orgs: []*influxdb.Organization{
{
@ -2006,15 +1987,6 @@ func DeleteNotificationRule(
ResourceType: influxdb.NotificationRuleResourceType,
},
},
Tasks: []influxdb.TaskCreate{
{
OwnerID: MustIDBase16(sixID),
OrganizationID: MustIDBase16(fourID),
Flux: `from(bucket: "foo") |> range(start: -1m)
option task = {name: "bar", every: 1m}
`,
},
},
IDGenerator: mock.NewIDGenerator(twoID, t),
Orgs: []*influxdb.Organization{
{