From fa31037d374248b9a6635b49eef218298b20345e Mon Sep 17 00:00:00 2001 From: William Baker Date: Wed, 9 Jun 2021 16:20:22 -0400 Subject: [PATCH] feat(annotations): annotations api handlers (#21645) * feat(annotations): annotations handler; annotations & streams router * chore: fix typos & clarify comments --- annotation.go | 16 +- annotations/transport/annotations_router.go | 236 ++++++++++++++++++ .../transport/annotations_router_test.go | 218 ++++++++++++++++ annotations/transport/helpers_test.go | 54 ++++ annotations/transport/http.go | 104 ++++++++ annotations/transport/streams_router.go | 178 +++++++++++++ annotations/transport/streams_router_test.go | 174 +++++++++++++ mock/annotation_service.go | 76 +++--- 8 files changed, 1010 insertions(+), 46 deletions(-) create mode 100644 annotations/transport/annotations_router.go create mode 100644 annotations/transport/annotations_router_test.go create mode 100644 annotations/transport/helpers_test.go create mode 100644 annotations/transport/http.go create mode 100644 annotations/transport/streams_router.go create mode 100644 annotations/transport/streams_router_test.go diff --git a/annotation.go b/annotation.go index 085b43b27c..6eb15c5b33 100644 --- a/annotation.go +++ b/annotation.go @@ -70,24 +70,24 @@ type AnnotationService interface { // ListAnnotations lists all annotations matching the filter. ListAnnotations(ctx context.Context, orgID platform.ID, filter AnnotationListFilter) (ReadAnnotations, error) // GetAnnotation gets an annotation by id. - GetAnnotation(ctx context.Context, orgID, id platform.ID) (*AnnotationEvent, error) + GetAnnotation(ctx context.Context, id platform.ID) (*AnnotationEvent, error) // DeleteAnnotations deletes annotations matching the filter. DeleteAnnotations(ctx context.Context, orgID platform.ID, delete AnnotationDeleteFilter) error // DeleteAnnotation deletes an annotation by id. - DeleteAnnotation(ctx context.Context, orgID, id platform.ID) error + DeleteAnnotation(ctx context.Context, id platform.ID) error // UpdateAnnotation updates an annotation. - UpdateAnnotation(ctx context.Context, orgID, id platform.ID, update AnnotationCreate) (*AnnotationEvent, error) + UpdateAnnotation(ctx context.Context, id platform.ID, update AnnotationCreate) (*AnnotationEvent, error) // ListStreams lists all streams matching the filter. ListStreams(ctx context.Context, orgID platform.ID, filter StreamListFilter) ([]ReadStream, error) // CreateOrUpdateStream creates or updates the matching stream by name. - CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream Stream) (*ReadStream, error) + CreateOrUpdateStream(ctx context.Context, stream Stream) (*ReadStream, error) // UpdateStream updates the stream by the ID. - UpdateStream(ctx context.Context, orgID, id platform.ID, stream Stream) (*ReadStream, error) - // DeleteStream deletes the stream metadata by name. - DeleteStream(ctx context.Context, orgID platform.ID, streamName string) error + UpdateStream(ctx context.Context, id platform.ID, stream Stream) (*ReadStream, error) + // DeleteStreams deletes one or more streams by name. + DeleteStreams(ctx context.Context, delete BasicStream) error // DeleteStreamByID deletes the stream metadata by id. - DeleteStreamByID(ctx context.Context, orgID, id platform.ID) error + DeleteStreamByID(ctx context.Context, id platform.ID) error } // AnnotationEvent contains fields for annotating an event. diff --git a/annotations/transport/annotations_router.go b/annotations/transport/annotations_router.go new file mode 100644 index 0000000000..ccd25a5178 --- /dev/null +++ b/annotations/transport/annotations_router.go @@ -0,0 +1,236 @@ +package transport + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/go-chi/chi" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform" +) + +func (h *AnnotationHandler) annotationsRouter() http.Handler { + r := chi.NewRouter() + + r.Post("/", h.handleCreateAnnotations) + r.Get("/", h.handleGetAnnotations) + r.Delete("/", h.handleDeleteAnnotations) + + r.Route("/{id}", func(r chi.Router) { + r.Get("/", h.handleGetAnnotation) + r.Delete("/", h.handleDeleteAnnotation) + r.Put("/", h.handleUpdateAnnotation) + }) + + return r +} + +func (h *AnnotationHandler) handleCreateAnnotations(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + o, err := platform.IDFromString(r.URL.Query().Get("orgID")) + if err != nil { + h.api.Err(w, r, errBadOrg) + return + } + + c, err := decodeCreateAnnotationsRequest(r) + if err != nil { + h.api.Err(w, r, err) + return + } + + l, err := h.annotationService.CreateAnnotations(ctx, *o, c) + if err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusOK, l) +} + +func (h *AnnotationHandler) handleGetAnnotations(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + o, err := platform.IDFromString(r.URL.Query().Get("orgID")) + if err != nil { + h.api.Err(w, r, errBadOrg) + return + } + + f, err := decodeListAnnotationsRequest(r) + if err != nil { + h.api.Err(w, r, err) + return + } + + l, err := h.annotationService.ListAnnotations(ctx, *o, *f) + if err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusOK, l) +} + +func (h *AnnotationHandler) handleDeleteAnnotations(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + o, err := platform.IDFromString(r.URL.Query().Get("orgID")) + if err != nil { + h.api.Err(w, r, errBadOrg) + return + } + + f, err := decodeDeleteAnnotationsRequest(r) + if err != nil { + h.api.Err(w, r, err) + return + } + + if err = h.annotationService.DeleteAnnotations(ctx, *o, *f); err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusNoContent, nil) +} + +func (h *AnnotationHandler) handleGetAnnotation(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadAnnotationId) + return + } + + a, err := h.annotationService.GetAnnotation(ctx, *id) + if err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusOK, a) +} + +func (h *AnnotationHandler) handleDeleteAnnotation(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadAnnotationId) + return + } + + if err := h.annotationService.DeleteAnnotation(ctx, *id); err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusNoContent, nil) +} + +func (h *AnnotationHandler) handleUpdateAnnotation(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadAnnotationId) + return + } + + u, err := decodeUpdateAnnotationRequest(r) + if err != nil { + h.api.Err(w, r, err) + return + } + + a, err := h.annotationService.UpdateAnnotation(ctx, *id, *u) + if err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusOK, a) +} + +func decodeCreateAnnotationsRequest(r *http.Request) ([]influxdb.AnnotationCreate, error) { + cs := []influxdb.AnnotationCreate{} + if err := json.NewDecoder(r.Body).Decode(&cs); err != nil { + return nil, err + } + + for _, c := range cs { + if err := c.Validate(time.Now); err != nil { + return nil, err + } + } + + return cs, nil +} + +func decodeListAnnotationsRequest(r *http.Request) (*influxdb.AnnotationListFilter, error) { + startTime, endTime, err := tFromReq(r) + if err != nil { + return nil, err + } + + f := &influxdb.AnnotationListFilter{ + StreamIncludes: r.URL.Query()["streamIncludes"], + BasicFilter: influxdb.BasicFilter{ + EndTime: endTime, + StartTime: startTime, + }, + } + f.SetStickerIncludes(r.URL.Query()) + if err := f.Validate(time.Now); err != nil { + return nil, err + } + + return f, nil +} + +func decodeDeleteAnnotationsRequest(r *http.Request) (*influxdb.AnnotationDeleteFilter, error) { + // Try to get a stream ID from the query params. The stream ID is not required, + // so if one is not set we can leave streamID as the zero value. + var streamID platform.ID + if qid := chi.URLParam(r, "streamID"); qid != "" { + id, err := platform.IDFromString(qid) + // if a streamID parameter was provided but is not valid, return an error + if err != nil { + return nil, errBadStreamId + } + streamID = *id + } + + startTime, endTime, err := tFromReq(r) + if err != nil { + return nil, err + } + + f := &influxdb.AnnotationDeleteFilter{ + StreamTag: r.URL.Query().Get("stream"), + StreamID: streamID, + EndTime: endTime, + StartTime: startTime, + } + f.SetStickers(r.URL.Query()) + if err := f.Validate(); err != nil { + return nil, err + } + + return f, nil +} + +func decodeUpdateAnnotationRequest(r *http.Request) (*influxdb.AnnotationCreate, error) { + u := &influxdb.AnnotationCreate{} + if err := json.NewDecoder(r.Body).Decode(u); err != nil { + return nil, err + } else if err := u.Validate(time.Now); err != nil { + return nil, err + } + + return u, nil +} diff --git a/annotations/transport/annotations_router_test.go b/annotations/transport/annotations_router_test.go new file mode 100644 index 0000000000..13c9f190c4 --- /dev/null +++ b/annotations/transport/annotations_router_test.go @@ -0,0 +1,218 @@ +package transport + +import ( + "encoding/json" + "net/http" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/influxdata/influxdb/v2" + influxdbtesting "github.com/influxdata/influxdb/v2/testing" + "github.com/stretchr/testify/require" +) + +var ( + testCreateAnnotation = influxdb.AnnotationCreate{ + StreamTag: "sometag", + Summary: "testing the api", + EndTime: &now, + StartTime: &now, + } + + testEvent = influxdb.AnnotationEvent{ + ID: *id, + AnnotationCreate: testCreateAnnotation, + } + + testReadAnnotation1 = influxdb.ReadAnnotation{ + ID: *influxdbtesting.IDPtr(1), + } + + testReadAnnotation2 = influxdb.ReadAnnotation{ + ID: *influxdbtesting.IDPtr(2), + } +) + +func TestAnnotationRouter(t *testing.T) { + t.Parallel() + + t.Run("get annotations happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "GET", ts.URL+"/annotations", nil) + + q := req.URL.Query() + q.Add("orgID", orgStr) + q.Add("endTime", now.Format(time.RFC3339)) + q.Add("stickerIncludes[product]", "oss") + q.Add("stickerIncludes[env]", "dev") + q.Add("streamIncludes", "stream1") + q.Add("streamIncludes", "stream2") + req.URL.RawQuery = q.Encode() + + want := []influxdb.AnnotationList{ + { + StreamTag: "stream1", + Annotations: []influxdb.ReadAnnotation{testReadAnnotation1}, + }, + { + StreamTag: "stream2", + Annotations: []influxdb.ReadAnnotation{testReadAnnotation2}, + }, + } + + svc.EXPECT(). + ListAnnotations(gomock.Any(), *orgID, influxdb.AnnotationListFilter{ + StickerIncludes: map[string]string{"product": "oss", "env": "dev"}, + StreamIncludes: []string{"stream1", "stream2"}, + BasicFilter: influxdb.BasicFilter{ + StartTime: &time.Time{}, + EndTime: &now, + }, + }). + Return(influxdb.ReadAnnotations{ + "stream1": []influxdb.ReadAnnotation{testReadAnnotation1}, + "stream2": []influxdb.ReadAnnotation{testReadAnnotation2}, + }, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + got := []influxdb.AnnotationList{} + err := json.NewDecoder(res.Body).Decode(&got) + require.NoError(t, err) + require.ElementsMatch(t, want, got) + }) + + t.Run("create annotations happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + createAnnotations := []influxdb.AnnotationCreate{testCreateAnnotation} + + req := newTestRequest(t, "POST", ts.URL+"/annotations", createAnnotations) + + q := req.URL.Query() + q.Add("orgID", orgStr) + req.URL.RawQuery = q.Encode() + + want := []influxdb.AnnotationEvent{testEvent} + + svc.EXPECT(). + CreateAnnotations(gomock.Any(), *orgID, createAnnotations). + Return(want, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + got := []influxdb.AnnotationEvent{} + err := json.NewDecoder(res.Body).Decode(&got) + require.NoError(t, err) + require.Equal(t, want, got) + }) + + t.Run("delete annotations happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "DELETE", ts.URL+"/annotations", nil) + q := req.URL.Query() + q.Add("orgID", orgStr) + q.Add("stream", "someTag") + q.Add("startTime", now.Format(time.RFC3339)) + q.Add("endTime", later.Format(time.RFC3339)) + req.URL.RawQuery = q.Encode() + + svc.EXPECT(). + DeleteAnnotations(gomock.Any(), *orgID, influxdb.AnnotationDeleteFilter{ + StreamTag: "someTag", + StartTime: &now, + EndTime: &later, + Stickers: map[string]string{}, + }). + Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + + t.Run("get annotation happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "GET", ts.URL+"/annotations/"+idStr, nil) + + svc.EXPECT(). + GetAnnotation(gomock.Any(), *id). + Return(&testEvent, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + got := &influxdb.AnnotationEvent{} + err := json.NewDecoder(res.Body).Decode(got) + require.NoError(t, err) + require.Equal(t, &testEvent, got) + }) + + t.Run("delete annotation happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "DELETE", ts.URL+"/annotations/"+idStr, nil) + + svc.EXPECT(). + DeleteAnnotation(gomock.Any(), *id). + Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + + t.Run("update annotation happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "PUT", ts.URL+"/annotations/"+idStr, testCreateAnnotation) + + svc.EXPECT(). + UpdateAnnotation(gomock.Any(), *id, testCreateAnnotation). + Return(&testEvent, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + got := &influxdb.AnnotationEvent{} + err := json.NewDecoder(res.Body).Decode(got) + require.NoError(t, err) + require.Equal(t, &testEvent, got) + }) + + t.Run("invalid org ids return 400 when required", func(t *testing.T) { + methods := []string{"POST", "GET", "DELETE"} + + for _, m := range methods { + t.Run(m, func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, m, ts.URL+"/annotations", nil) + q := req.URL.Query() + q.Add("orgID", "badid") + req.URL.RawQuery = q.Encode() + + doTestRequest(t, req, http.StatusBadRequest, false) + }) + } + }) + + t.Run("invalid annotation ids return 400 when required", func(t *testing.T) { + methods := []string{"GET", "DELETE", "PUT"} + + for _, m := range methods { + t.Run(m, func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, m, ts.URL+"/annotations/badID", nil) + doTestRequest(t, req, http.StatusBadRequest, false) + }) + } + }) +} diff --git a/annotations/transport/helpers_test.go b/annotations/transport/helpers_test.go new file mode 100644 index 0000000000..6b0c501b95 --- /dev/null +++ b/annotations/transport/helpers_test.go @@ -0,0 +1,54 @@ +package transport + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +var ( + orgStr = "1234123412341234" + orgID, _ = platform.IDFromString(orgStr) + idStr = "4321432143214321" + id, _ = platform.IDFromString(idStr) + now = time.Now().UTC().Truncate(time.Second) + later = now.Add(5 * time.Minute) +) + +func newTestServer(t *testing.T) (*httptest.Server, *mock.MockAnnotationService) { + ctrlr := gomock.NewController(t) + svc := mock.NewMockAnnotationService(ctrlr) + server := NewAnnotationHandler(zaptest.NewLogger(t), svc) + return httptest.NewServer(server), svc +} + +func newTestRequest(t *testing.T, method, path string, body interface{}) *http.Request { + dat, err := json.Marshal(body) + require.NoError(t, err) + + req, err := http.NewRequest(method, path, bytes.NewBuffer(dat)) + require.NoError(t, err) + + req.Header.Add("Content-Type", "application/json") + + return req +} + +func doTestRequest(t *testing.T, req *http.Request, wantCode int, needJSON bool) *http.Response { + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, wantCode, res.StatusCode) + if needJSON { + require.Equal(t, "application/json; charset=utf-8", res.Header.Get("Content-Type")) + } + return res +} diff --git a/annotations/transport/http.go b/annotations/transport/http.go new file mode 100644 index 0000000000..f603e9b7b4 --- /dev/null +++ b/annotations/transport/http.go @@ -0,0 +1,104 @@ +package transport + +import ( + "net/http" + "time" + + "github.com/go-chi/chi" + "github.com/go-chi/chi/middleware" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform/errors" + kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" + "go.uber.org/zap" +) + +const ( + // this is the base api prefix, since the annotations system mounts handlers at + // both the ../annotations and ../streams paths. + prefixAnnotations = "/api/v2private" +) + +var ( + errBadOrg = &errors.Error{ + Code: errors.EInvalid, + Msg: "invalid or missing org id", + } + + errBadAnnotationId = &errors.Error{ + Code: errors.EInvalid, + Msg: "annotation id is invalid", + } + + errBadStreamId = &errors.Error{ + Code: errors.EInvalid, + Msg: "stream id is invalid", + } + + errBadStreamName = &errors.Error{ + Code: errors.EInvalid, + Msg: "invalid stream name", + } +) + +// AnnotationsHandler is the handler for the annotation service +type AnnotationHandler struct { + chi.Router + + log *zap.Logger + api *kithttp.API + + annotationService influxdb.AnnotationService +} + +func NewAnnotationHandler(log *zap.Logger, annotationService influxdb.AnnotationService) *AnnotationHandler { + h := &AnnotationHandler{ + log: log, + api: kithttp.NewAPI(kithttp.WithLog(log)), + annotationService: annotationService, + } + + r := chi.NewRouter() + r.Use( + middleware.Recoverer, + middleware.RequestID, + middleware.RealIP, + ) + + r.Mount("/annotations", h.annotationsRouter()) + r.Mount("/streams", h.streamsRouter()) + h.Router = r + + return h +} + +func (h *AnnotationHandler) Prefix() string { + return prefixAnnotations +} + +// tFromReq and tStringToPointer are used in handlers to extract time values from query parameters. +// pointers to time.Time structs are used, since the JSON responses may omit empty (nil pointer) times. +func tFromReq(r *http.Request) (*time.Time, *time.Time, error) { + st, err := tStringToPointer(r.URL.Query().Get("startTime")) + if err != nil { + return nil, nil, err + } + + et, err := tStringToPointer(r.URL.Query().Get("endTime")) + if err != nil { + return nil, nil, err + } + + return st, et, nil +} + +func tStringToPointer(s string) (*time.Time, error) { + if s == "" { + return nil, nil + } + + t, err := time.Parse(time.RFC3339, s) + if err != nil { + return nil, err + } + return &t, nil +} diff --git a/annotations/transport/streams_router.go b/annotations/transport/streams_router.go new file mode 100644 index 0000000000..470fd949ed --- /dev/null +++ b/annotations/transport/streams_router.go @@ -0,0 +1,178 @@ +package transport + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/go-chi/chi" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform" +) + +func (h *AnnotationHandler) streamsRouter() http.Handler { + r := chi.NewRouter() + + r.Put("/", h.handleCreateOrUpdateStream) + r.Get("/", h.handleGetStreams) + r.Delete("/", h.handleDeleteStreams) + + r.Route("/{id}", func(r chi.Router) { + r.Delete("/", h.handleDeleteStream) + r.Put("/", h.handleUpdateStreamByID) + }) + + return r +} + +func (h *AnnotationHandler) handleCreateOrUpdateStream(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + u, err := decodeCreateOrUpdateStreamRequest(r) + if err != nil { + h.api.Err(w, r, err) + return + } + + s, err := h.annotationService.CreateOrUpdateStream(ctx, *u) + if err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusOK, s) +} + +func (h *AnnotationHandler) handleGetStreams(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + o, err := platform.IDFromString(r.URL.Query().Get("orgID")) + if err != nil { + h.api.Err(w, r, errBadOrg) + return + } + + f, err := decodeListStreamsRequest(r) + if err != nil { + h.api.Err(w, r, err) + return + } + + l, err := h.annotationService.ListStreams(ctx, *o, *f) + if err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusOK, l) +} + +// Delete stream(s) by name, capable of handling a list of names +func (h *AnnotationHandler) handleDeleteStreams(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + f, err := decodeDeleteStreamsRequest(r) + if err != nil { + h.api.Err(w, r, err) + return + } + + // delete all of the streams according to the filter. annotations associated with the stream + // will be deleted by the ON DELETE CASCADE relationship between streams and annotations. + if err = h.annotationService.DeleteStreams(ctx, *f); err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusNoContent, nil) +} + +// Delete a single stream by ID +func (h *AnnotationHandler) handleDeleteStream(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadAnnotationId) + return + } + + // as in the handleDeleteStreams method above, deleting a stream will delete annotations + // associated with it due to the ON DELETE CASCADE relationship between the two + if err := h.annotationService.DeleteStreamByID(ctx, *id); err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusNoContent, nil) +} + +func (h *AnnotationHandler) handleUpdateStreamByID(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadAnnotationId) + return + } + + u, err := decodeCreateOrUpdateStreamRequest(r) + if err != nil { + h.api.Err(w, r, err) + return + } + + s, err := h.annotationService.UpdateStream(ctx, *id, *u) + if err != nil { + h.api.Err(w, r, err) + return + } + + h.api.Respond(w, r, http.StatusOK, s) +} + +func decodeCreateOrUpdateStreamRequest(r *http.Request) (*influxdb.Stream, error) { + s := influxdb.Stream{} + + if err := json.NewDecoder(r.Body).Decode(&s); err != nil { + return nil, err + } + + if err := s.Validate(false); err != nil { + return nil, err + } + + return &s, nil +} + +func decodeListStreamsRequest(r *http.Request) (*influxdb.StreamListFilter, error) { + startTime, endTime, err := tFromReq(r) + if err != nil { + return nil, err + } + + f := &influxdb.StreamListFilter{ + StreamIncludes: r.URL.Query()["streamIncludes"], + BasicFilter: influxdb.BasicFilter{ + EndTime: endTime, + StartTime: startTime, + }, + } + + if err := f.Validate(time.Now); err != nil { + return nil, err + } + return f, nil +} + +func decodeDeleteStreamsRequest(r *http.Request) (*influxdb.BasicStream, error) { + f := &influxdb.BasicStream{ + Names: r.URL.Query()["stream"], + } + + if !f.IsValid() { + return nil, errBadStreamName + } + + return f, nil +} diff --git a/annotations/transport/streams_router_test.go b/annotations/transport/streams_router_test.go new file mode 100644 index 0000000000..8b5893ce4a --- /dev/null +++ b/annotations/transport/streams_router_test.go @@ -0,0 +1,174 @@ +package transport + +import ( + "encoding/json" + "net/http" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/influxdata/influxdb/v2" + influxdbtesting "github.com/influxdata/influxdb/v2/testing" + "github.com/stretchr/testify/require" +) + +var ( + testCreateStream = influxdb.Stream{ + Name: "test stream", + } + + testReadStream1 = &influxdb.ReadStream{ + ID: *influxdbtesting.IDPtr(1), + Name: "test stream 1", + CreatedAt: now, + UpdatedAt: now, + } + + testReadStream2 = &influxdb.ReadStream{ + ID: *influxdbtesting.IDPtr(2), + Name: "test stream 2", + CreatedAt: now, + UpdatedAt: now, + } +) + +func TestStreamsRouter(t *testing.T) { + t.Parallel() + + t.Run("create or update stream happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "PUT", ts.URL+"/streams", testCreateStream) + + q := req.URL.Query() + q.Add("orgID", orgStr) + req.URL.RawQuery = q.Encode() + + svc.EXPECT(). + CreateOrUpdateStream(gomock.Any(), testCreateStream). + Return(testReadStream1, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + got := &influxdb.ReadStream{} + err := json.NewDecoder(res.Body).Decode(got) + require.NoError(t, err) + require.Equal(t, testReadStream1, got) + }) + + t.Run("get streams happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "GET", ts.URL+"/streams", nil) + + q := req.URL.Query() + q.Add("orgID", orgStr) + q.Add("endTime", now.Format(time.RFC3339)) + q.Add("streamIncludes", "stream1") + q.Add("streamIncludes", "stream2") + req.URL.RawQuery = q.Encode() + + want := []influxdb.ReadStream{*testReadStream1, *testReadStream2} + + svc.EXPECT(). + ListStreams(gomock.Any(), *orgID, influxdb.StreamListFilter{ + StreamIncludes: []string{"stream1", "stream2"}, + BasicFilter: influxdb.BasicFilter{ + StartTime: &time.Time{}, + EndTime: &now, + }, + }). + Return(want, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + got := []influxdb.ReadStream{} + err := json.NewDecoder(res.Body).Decode(&got) + require.NoError(t, err) + require.ElementsMatch(t, want, got) + }) + + t.Run("delete streams (by name) happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "DELETE", ts.URL+"/streams", nil) + q := req.URL.Query() + q.Add("stream", "stream1") + q.Add("stream", "stream2") + req.URL.RawQuery = q.Encode() + + svc.EXPECT(). + DeleteStreams(gomock.Any(), influxdb.BasicStream{ + Names: []string{"stream1", "stream2"}, + }). + Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + + t.Run("delete stream happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "DELETE", ts.URL+"/streams/"+idStr, nil) + + svc.EXPECT(). + DeleteStreamByID(gomock.Any(), *id). + Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + + t.Run("update stream by id happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "PUT", ts.URL+"/streams/"+idStr, testCreateStream) + + svc.EXPECT(). + UpdateStream(gomock.Any(), *id, testCreateStream). + Return(testReadStream1, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + got := &influxdb.ReadStream{} + err := json.NewDecoder(res.Body).Decode(got) + require.NoError(t, err) + require.Equal(t, testReadStream1, got) + }) + + t.Run("invalid org ids return 400 when required", func(t *testing.T) { + methods := []string{"GET"} + + for _, m := range methods { + t.Run(m, func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, m, ts.URL+"/streams", nil) + q := req.URL.Query() + q.Add("orgID", "badid") + req.URL.RawQuery = q.Encode() + + doTestRequest(t, req, http.StatusBadRequest, false) + }) + } + }) + + t.Run("invalid stream ids return 400 when required", func(t *testing.T) { + methods := []string{"DELETE", "PUT"} + + for _, m := range methods { + t.Run(m, func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, m, ts.URL+"/streams/badID", nil) + doTestRequest(t, req, http.StatusBadRequest, false) + }) + } + }) +} diff --git a/mock/annotation_service.go b/mock/annotation_service.go index 9562591957..c83eda5269 100644 --- a/mock/annotation_service.go +++ b/mock/annotation_service.go @@ -52,32 +52,32 @@ func (mr *MockAnnotationServiceMockRecorder) CreateAnnotations(ctx, orgID, creat } // CreateOrUpdateStream mocks base method. -func (m *MockAnnotationService) CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) { +func (m *MockAnnotationService) CreateOrUpdateStream(ctx context.Context, stream influxdb.Stream) (*influxdb.ReadStream, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateOrUpdateStream", ctx, orgID, stream) + ret := m.ctrl.Call(m, "CreateOrUpdateStream", ctx, stream) ret0, _ := ret[0].(*influxdb.ReadStream) ret1, _ := ret[1].(error) return ret0, ret1 } // CreateOrUpdateStream indicates an expected call of CreateOrUpdateStream. -func (mr *MockAnnotationServiceMockRecorder) CreateOrUpdateStream(ctx, orgID, stream interface{}) *gomock.Call { +func (mr *MockAnnotationServiceMockRecorder) CreateOrUpdateStream(ctx, stream interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateStream", reflect.TypeOf((*MockAnnotationService)(nil).CreateOrUpdateStream), ctx, orgID, stream) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateStream", reflect.TypeOf((*MockAnnotationService)(nil).CreateOrUpdateStream), ctx, stream) } // DeleteAnnotation mocks base method. -func (m *MockAnnotationService) DeleteAnnotation(ctx context.Context, orgID, id platform.ID) error { +func (m *MockAnnotationService) DeleteAnnotation(ctx context.Context, id platform.ID) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteAnnotation", ctx, orgID, id) + ret := m.ctrl.Call(m, "DeleteAnnotation", ctx, id) ret0, _ := ret[0].(error) return ret0 } // DeleteAnnotation indicates an expected call of DeleteAnnotation. -func (mr *MockAnnotationServiceMockRecorder) DeleteAnnotation(ctx, orgID, id interface{}) *gomock.Call { +func (mr *MockAnnotationServiceMockRecorder) DeleteAnnotation(ctx, id interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).DeleteAnnotation), ctx, orgID, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).DeleteAnnotation), ctx, id) } // DeleteAnnotations mocks base method. @@ -94,47 +94,47 @@ func (mr *MockAnnotationServiceMockRecorder) DeleteAnnotations(ctx, orgID, delet return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAnnotations", reflect.TypeOf((*MockAnnotationService)(nil).DeleteAnnotations), ctx, orgID, delete) } -// DeleteStream mocks base method. -func (m *MockAnnotationService) DeleteStream(ctx context.Context, orgID platform.ID, streamName string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteStream", ctx, orgID, streamName) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteStream indicates an expected call of DeleteStream. -func (mr *MockAnnotationServiceMockRecorder) DeleteStream(ctx, orgID, streamName interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStream", reflect.TypeOf((*MockAnnotationService)(nil).DeleteStream), ctx, orgID, streamName) -} - // DeleteStreamByID mocks base method. -func (m *MockAnnotationService) DeleteStreamByID(ctx context.Context, orgID, id platform.ID) error { +func (m *MockAnnotationService) DeleteStreamByID(ctx context.Context, id platform.ID) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteStreamByID", ctx, orgID, id) + ret := m.ctrl.Call(m, "DeleteStreamByID", ctx, id) ret0, _ := ret[0].(error) return ret0 } // DeleteStreamByID indicates an expected call of DeleteStreamByID. -func (mr *MockAnnotationServiceMockRecorder) DeleteStreamByID(ctx, orgID, id interface{}) *gomock.Call { +func (mr *MockAnnotationServiceMockRecorder) DeleteStreamByID(ctx, id interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStreamByID", reflect.TypeOf((*MockAnnotationService)(nil).DeleteStreamByID), ctx, orgID, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStreamByID", reflect.TypeOf((*MockAnnotationService)(nil).DeleteStreamByID), ctx, id) +} + +// DeleteStreams mocks base method. +func (m *MockAnnotationService) DeleteStreams(ctx context.Context, delete influxdb.BasicStream) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteStreams", ctx, delete) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteStreams indicates an expected call of DeleteStreams. +func (mr *MockAnnotationServiceMockRecorder) DeleteStreams(ctx, delete interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStreams", reflect.TypeOf((*MockAnnotationService)(nil).DeleteStreams), ctx, delete) } // GetAnnotation mocks base method. -func (m *MockAnnotationService) GetAnnotation(ctx context.Context, orgID, id platform.ID) (*influxdb.AnnotationEvent, error) { +func (m *MockAnnotationService) GetAnnotation(ctx context.Context, id platform.ID) (*influxdb.AnnotationEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAnnotation", ctx, orgID, id) + ret := m.ctrl.Call(m, "GetAnnotation", ctx, id) ret0, _ := ret[0].(*influxdb.AnnotationEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // GetAnnotation indicates an expected call of GetAnnotation. -func (mr *MockAnnotationServiceMockRecorder) GetAnnotation(ctx, orgID, id interface{}) *gomock.Call { +func (mr *MockAnnotationServiceMockRecorder) GetAnnotation(ctx, id interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).GetAnnotation), ctx, orgID, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).GetAnnotation), ctx, id) } // ListAnnotations mocks base method. @@ -168,31 +168,31 @@ func (mr *MockAnnotationServiceMockRecorder) ListStreams(ctx, orgID, filter inte } // UpdateAnnotation mocks base method. -func (m *MockAnnotationService) UpdateAnnotation(ctx context.Context, orgID, id platform.ID, update influxdb.AnnotationCreate) (*influxdb.AnnotationEvent, error) { +func (m *MockAnnotationService) UpdateAnnotation(ctx context.Context, id platform.ID, update influxdb.AnnotationCreate) (*influxdb.AnnotationEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateAnnotation", ctx, orgID, id, update) + ret := m.ctrl.Call(m, "UpdateAnnotation", ctx, id, update) ret0, _ := ret[0].(*influxdb.AnnotationEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // UpdateAnnotation indicates an expected call of UpdateAnnotation. -func (mr *MockAnnotationServiceMockRecorder) UpdateAnnotation(ctx, orgID, id, update interface{}) *gomock.Call { +func (mr *MockAnnotationServiceMockRecorder) UpdateAnnotation(ctx, id, update interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).UpdateAnnotation), ctx, orgID, id, update) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).UpdateAnnotation), ctx, id, update) } // UpdateStream mocks base method. -func (m *MockAnnotationService) UpdateStream(ctx context.Context, orgID, id platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) { +func (m *MockAnnotationService) UpdateStream(ctx context.Context, id platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateStream", ctx, orgID, id, stream) + ret := m.ctrl.Call(m, "UpdateStream", ctx, id, stream) ret0, _ := ret[0].(*influxdb.ReadStream) ret1, _ := ret[1].(error) return ret0, ret1 } // UpdateStream indicates an expected call of UpdateStream. -func (mr *MockAnnotationServiceMockRecorder) UpdateStream(ctx, orgID, id, stream interface{}) *gomock.Call { +func (mr *MockAnnotationServiceMockRecorder) UpdateStream(ctx, id, stream interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateStream", reflect.TypeOf((*MockAnnotationService)(nil).UpdateStream), ctx, orgID, id, stream) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateStream", reflect.TypeOf((*MockAnnotationService)(nil).UpdateStream), ctx, id, stream) }