feat(annotations): annotations api handlers (#21645)

* feat(annotations): annotations handler; annotations & streams router

* chore: fix typos & clarify comments
pull/21648/head
William Baker 2021-06-09 16:20:22 -04:00 committed by GitHub
parent 9b02820a0f
commit fa31037d37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1010 additions and 46 deletions

View File

@ -70,24 +70,24 @@ type AnnotationService interface {
// ListAnnotations lists all annotations matching the filter.
ListAnnotations(ctx context.Context, orgID platform.ID, filter AnnotationListFilter) (ReadAnnotations, error)
// GetAnnotation gets an annotation by id.
GetAnnotation(ctx context.Context, orgID, id platform.ID) (*AnnotationEvent, error)
GetAnnotation(ctx context.Context, id platform.ID) (*AnnotationEvent, error)
// DeleteAnnotations deletes annotations matching the filter.
DeleteAnnotations(ctx context.Context, orgID platform.ID, delete AnnotationDeleteFilter) error
// DeleteAnnotation deletes an annotation by id.
DeleteAnnotation(ctx context.Context, orgID, id platform.ID) error
DeleteAnnotation(ctx context.Context, id platform.ID) error
// UpdateAnnotation updates an annotation.
UpdateAnnotation(ctx context.Context, orgID, id platform.ID, update AnnotationCreate) (*AnnotationEvent, error)
UpdateAnnotation(ctx context.Context, id platform.ID, update AnnotationCreate) (*AnnotationEvent, error)
// ListStreams lists all streams matching the filter.
ListStreams(ctx context.Context, orgID platform.ID, filter StreamListFilter) ([]ReadStream, error)
// CreateOrUpdateStream creates or updates the matching stream by name.
CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream Stream) (*ReadStream, error)
CreateOrUpdateStream(ctx context.Context, stream Stream) (*ReadStream, error)
// UpdateStream updates the stream by the ID.
UpdateStream(ctx context.Context, orgID, id platform.ID, stream Stream) (*ReadStream, error)
// DeleteStream deletes the stream metadata by name.
DeleteStream(ctx context.Context, orgID platform.ID, streamName string) error
UpdateStream(ctx context.Context, id platform.ID, stream Stream) (*ReadStream, error)
// DeleteStreams deletes one or more streams by name.
DeleteStreams(ctx context.Context, delete BasicStream) error
// DeleteStreamByID deletes the stream metadata by id.
DeleteStreamByID(ctx context.Context, orgID, id platform.ID) error
DeleteStreamByID(ctx context.Context, id platform.ID) error
}
// AnnotationEvent contains fields for annotating an event.

View File

@ -0,0 +1,236 @@
package transport
import (
"encoding/json"
"net/http"
"time"
"github.com/go-chi/chi"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
)
func (h *AnnotationHandler) annotationsRouter() http.Handler {
r := chi.NewRouter()
r.Post("/", h.handleCreateAnnotations)
r.Get("/", h.handleGetAnnotations)
r.Delete("/", h.handleDeleteAnnotations)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.handleGetAnnotation)
r.Delete("/", h.handleDeleteAnnotation)
r.Put("/", h.handleUpdateAnnotation)
})
return r
}
func (h *AnnotationHandler) handleCreateAnnotations(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
o, err := platform.IDFromString(r.URL.Query().Get("orgID"))
if err != nil {
h.api.Err(w, r, errBadOrg)
return
}
c, err := decodeCreateAnnotationsRequest(r)
if err != nil {
h.api.Err(w, r, err)
return
}
l, err := h.annotationService.CreateAnnotations(ctx, *o, c)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, l)
}
func (h *AnnotationHandler) handleGetAnnotations(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
o, err := platform.IDFromString(r.URL.Query().Get("orgID"))
if err != nil {
h.api.Err(w, r, errBadOrg)
return
}
f, err := decodeListAnnotationsRequest(r)
if err != nil {
h.api.Err(w, r, err)
return
}
l, err := h.annotationService.ListAnnotations(ctx, *o, *f)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, l)
}
func (h *AnnotationHandler) handleDeleteAnnotations(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
o, err := platform.IDFromString(r.URL.Query().Get("orgID"))
if err != nil {
h.api.Err(w, r, errBadOrg)
return
}
f, err := decodeDeleteAnnotationsRequest(r)
if err != nil {
h.api.Err(w, r, err)
return
}
if err = h.annotationService.DeleteAnnotations(ctx, *o, *f); err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, nil)
}
func (h *AnnotationHandler) handleGetAnnotation(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadAnnotationId)
return
}
a, err := h.annotationService.GetAnnotation(ctx, *id)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, a)
}
func (h *AnnotationHandler) handleDeleteAnnotation(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadAnnotationId)
return
}
if err := h.annotationService.DeleteAnnotation(ctx, *id); err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, nil)
}
func (h *AnnotationHandler) handleUpdateAnnotation(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadAnnotationId)
return
}
u, err := decodeUpdateAnnotationRequest(r)
if err != nil {
h.api.Err(w, r, err)
return
}
a, err := h.annotationService.UpdateAnnotation(ctx, *id, *u)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, a)
}
func decodeCreateAnnotationsRequest(r *http.Request) ([]influxdb.AnnotationCreate, error) {
cs := []influxdb.AnnotationCreate{}
if err := json.NewDecoder(r.Body).Decode(&cs); err != nil {
return nil, err
}
for _, c := range cs {
if err := c.Validate(time.Now); err != nil {
return nil, err
}
}
return cs, nil
}
func decodeListAnnotationsRequest(r *http.Request) (*influxdb.AnnotationListFilter, error) {
startTime, endTime, err := tFromReq(r)
if err != nil {
return nil, err
}
f := &influxdb.AnnotationListFilter{
StreamIncludes: r.URL.Query()["streamIncludes"],
BasicFilter: influxdb.BasicFilter{
EndTime: endTime,
StartTime: startTime,
},
}
f.SetStickerIncludes(r.URL.Query())
if err := f.Validate(time.Now); err != nil {
return nil, err
}
return f, nil
}
func decodeDeleteAnnotationsRequest(r *http.Request) (*influxdb.AnnotationDeleteFilter, error) {
// Try to get a stream ID from the query params. The stream ID is not required,
// so if one is not set we can leave streamID as the zero value.
var streamID platform.ID
if qid := chi.URLParam(r, "streamID"); qid != "" {
id, err := platform.IDFromString(qid)
// if a streamID parameter was provided but is not valid, return an error
if err != nil {
return nil, errBadStreamId
}
streamID = *id
}
startTime, endTime, err := tFromReq(r)
if err != nil {
return nil, err
}
f := &influxdb.AnnotationDeleteFilter{
StreamTag: r.URL.Query().Get("stream"),
StreamID: streamID,
EndTime: endTime,
StartTime: startTime,
}
f.SetStickers(r.URL.Query())
if err := f.Validate(); err != nil {
return nil, err
}
return f, nil
}
func decodeUpdateAnnotationRequest(r *http.Request) (*influxdb.AnnotationCreate, error) {
u := &influxdb.AnnotationCreate{}
if err := json.NewDecoder(r.Body).Decode(u); err != nil {
return nil, err
} else if err := u.Validate(time.Now); err != nil {
return nil, err
}
return u, nil
}

View File

@ -0,0 +1,218 @@
package transport
import (
"encoding/json"
"net/http"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"github.com/stretchr/testify/require"
)
var (
testCreateAnnotation = influxdb.AnnotationCreate{
StreamTag: "sometag",
Summary: "testing the api",
EndTime: &now,
StartTime: &now,
}
testEvent = influxdb.AnnotationEvent{
ID: *id,
AnnotationCreate: testCreateAnnotation,
}
testReadAnnotation1 = influxdb.ReadAnnotation{
ID: *influxdbtesting.IDPtr(1),
}
testReadAnnotation2 = influxdb.ReadAnnotation{
ID: *influxdbtesting.IDPtr(2),
}
)
func TestAnnotationRouter(t *testing.T) {
t.Parallel()
t.Run("get annotations happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "GET", ts.URL+"/annotations", nil)
q := req.URL.Query()
q.Add("orgID", orgStr)
q.Add("endTime", now.Format(time.RFC3339))
q.Add("stickerIncludes[product]", "oss")
q.Add("stickerIncludes[env]", "dev")
q.Add("streamIncludes", "stream1")
q.Add("streamIncludes", "stream2")
req.URL.RawQuery = q.Encode()
want := []influxdb.AnnotationList{
{
StreamTag: "stream1",
Annotations: []influxdb.ReadAnnotation{testReadAnnotation1},
},
{
StreamTag: "stream2",
Annotations: []influxdb.ReadAnnotation{testReadAnnotation2},
},
}
svc.EXPECT().
ListAnnotations(gomock.Any(), *orgID, influxdb.AnnotationListFilter{
StickerIncludes: map[string]string{"product": "oss", "env": "dev"},
StreamIncludes: []string{"stream1", "stream2"},
BasicFilter: influxdb.BasicFilter{
StartTime: &time.Time{},
EndTime: &now,
},
}).
Return(influxdb.ReadAnnotations{
"stream1": []influxdb.ReadAnnotation{testReadAnnotation1},
"stream2": []influxdb.ReadAnnotation{testReadAnnotation2},
}, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := []influxdb.AnnotationList{}
err := json.NewDecoder(res.Body).Decode(&got)
require.NoError(t, err)
require.ElementsMatch(t, want, got)
})
t.Run("create annotations happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
createAnnotations := []influxdb.AnnotationCreate{testCreateAnnotation}
req := newTestRequest(t, "POST", ts.URL+"/annotations", createAnnotations)
q := req.URL.Query()
q.Add("orgID", orgStr)
req.URL.RawQuery = q.Encode()
want := []influxdb.AnnotationEvent{testEvent}
svc.EXPECT().
CreateAnnotations(gomock.Any(), *orgID, createAnnotations).
Return(want, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := []influxdb.AnnotationEvent{}
err := json.NewDecoder(res.Body).Decode(&got)
require.NoError(t, err)
require.Equal(t, want, got)
})
t.Run("delete annotations happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "DELETE", ts.URL+"/annotations", nil)
q := req.URL.Query()
q.Add("orgID", orgStr)
q.Add("stream", "someTag")
q.Add("startTime", now.Format(time.RFC3339))
q.Add("endTime", later.Format(time.RFC3339))
req.URL.RawQuery = q.Encode()
svc.EXPECT().
DeleteAnnotations(gomock.Any(), *orgID, influxdb.AnnotationDeleteFilter{
StreamTag: "someTag",
StartTime: &now,
EndTime: &later,
Stickers: map[string]string{},
}).
Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("get annotation happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "GET", ts.URL+"/annotations/"+idStr, nil)
svc.EXPECT().
GetAnnotation(gomock.Any(), *id).
Return(&testEvent, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := &influxdb.AnnotationEvent{}
err := json.NewDecoder(res.Body).Decode(got)
require.NoError(t, err)
require.Equal(t, &testEvent, got)
})
t.Run("delete annotation happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "DELETE", ts.URL+"/annotations/"+idStr, nil)
svc.EXPECT().
DeleteAnnotation(gomock.Any(), *id).
Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("update annotation happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "PUT", ts.URL+"/annotations/"+idStr, testCreateAnnotation)
svc.EXPECT().
UpdateAnnotation(gomock.Any(), *id, testCreateAnnotation).
Return(&testEvent, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := &influxdb.AnnotationEvent{}
err := json.NewDecoder(res.Body).Decode(got)
require.NoError(t, err)
require.Equal(t, &testEvent, got)
})
t.Run("invalid org ids return 400 when required", func(t *testing.T) {
methods := []string{"POST", "GET", "DELETE"}
for _, m := range methods {
t.Run(m, func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, m, ts.URL+"/annotations", nil)
q := req.URL.Query()
q.Add("orgID", "badid")
req.URL.RawQuery = q.Encode()
doTestRequest(t, req, http.StatusBadRequest, false)
})
}
})
t.Run("invalid annotation ids return 400 when required", func(t *testing.T) {
methods := []string{"GET", "DELETE", "PUT"}
for _, m := range methods {
t.Run(m, func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, m, ts.URL+"/annotations/badID", nil)
doTestRequest(t, req, http.StatusBadRequest, false)
})
}
})
}

View File

@ -0,0 +1,54 @@
package transport
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
var (
orgStr = "1234123412341234"
orgID, _ = platform.IDFromString(orgStr)
idStr = "4321432143214321"
id, _ = platform.IDFromString(idStr)
now = time.Now().UTC().Truncate(time.Second)
later = now.Add(5 * time.Minute)
)
func newTestServer(t *testing.T) (*httptest.Server, *mock.MockAnnotationService) {
ctrlr := gomock.NewController(t)
svc := mock.NewMockAnnotationService(ctrlr)
server := NewAnnotationHandler(zaptest.NewLogger(t), svc)
return httptest.NewServer(server), svc
}
func newTestRequest(t *testing.T, method, path string, body interface{}) *http.Request {
dat, err := json.Marshal(body)
require.NoError(t, err)
req, err := http.NewRequest(method, path, bytes.NewBuffer(dat))
require.NoError(t, err)
req.Header.Add("Content-Type", "application/json")
return req
}
func doTestRequest(t *testing.T, req *http.Request, wantCode int, needJSON bool) *http.Response {
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, wantCode, res.StatusCode)
if needJSON {
require.Equal(t, "application/json; charset=utf-8", res.Header.Get("Content-Type"))
}
return res
}

View File

@ -0,0 +1,104 @@
package transport
import (
"net/http"
"time"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"go.uber.org/zap"
)
const (
// this is the base api prefix, since the annotations system mounts handlers at
// both the ../annotations and ../streams paths.
prefixAnnotations = "/api/v2private"
)
var (
errBadOrg = &errors.Error{
Code: errors.EInvalid,
Msg: "invalid or missing org id",
}
errBadAnnotationId = &errors.Error{
Code: errors.EInvalid,
Msg: "annotation id is invalid",
}
errBadStreamId = &errors.Error{
Code: errors.EInvalid,
Msg: "stream id is invalid",
}
errBadStreamName = &errors.Error{
Code: errors.EInvalid,
Msg: "invalid stream name",
}
)
// AnnotationsHandler is the handler for the annotation service
type AnnotationHandler struct {
chi.Router
log *zap.Logger
api *kithttp.API
annotationService influxdb.AnnotationService
}
func NewAnnotationHandler(log *zap.Logger, annotationService influxdb.AnnotationService) *AnnotationHandler {
h := &AnnotationHandler{
log: log,
api: kithttp.NewAPI(kithttp.WithLog(log)),
annotationService: annotationService,
}
r := chi.NewRouter()
r.Use(
middleware.Recoverer,
middleware.RequestID,
middleware.RealIP,
)
r.Mount("/annotations", h.annotationsRouter())
r.Mount("/streams", h.streamsRouter())
h.Router = r
return h
}
func (h *AnnotationHandler) Prefix() string {
return prefixAnnotations
}
// tFromReq and tStringToPointer are used in handlers to extract time values from query parameters.
// pointers to time.Time structs are used, since the JSON responses may omit empty (nil pointer) times.
func tFromReq(r *http.Request) (*time.Time, *time.Time, error) {
st, err := tStringToPointer(r.URL.Query().Get("startTime"))
if err != nil {
return nil, nil, err
}
et, err := tStringToPointer(r.URL.Query().Get("endTime"))
if err != nil {
return nil, nil, err
}
return st, et, nil
}
func tStringToPointer(s string) (*time.Time, error) {
if s == "" {
return nil, nil
}
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return nil, err
}
return &t, nil
}

View File

@ -0,0 +1,178 @@
package transport
import (
"encoding/json"
"net/http"
"time"
"github.com/go-chi/chi"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
)
func (h *AnnotationHandler) streamsRouter() http.Handler {
r := chi.NewRouter()
r.Put("/", h.handleCreateOrUpdateStream)
r.Get("/", h.handleGetStreams)
r.Delete("/", h.handleDeleteStreams)
r.Route("/{id}", func(r chi.Router) {
r.Delete("/", h.handleDeleteStream)
r.Put("/", h.handleUpdateStreamByID)
})
return r
}
func (h *AnnotationHandler) handleCreateOrUpdateStream(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
u, err := decodeCreateOrUpdateStreamRequest(r)
if err != nil {
h.api.Err(w, r, err)
return
}
s, err := h.annotationService.CreateOrUpdateStream(ctx, *u)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, s)
}
func (h *AnnotationHandler) handleGetStreams(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
o, err := platform.IDFromString(r.URL.Query().Get("orgID"))
if err != nil {
h.api.Err(w, r, errBadOrg)
return
}
f, err := decodeListStreamsRequest(r)
if err != nil {
h.api.Err(w, r, err)
return
}
l, err := h.annotationService.ListStreams(ctx, *o, *f)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, l)
}
// Delete stream(s) by name, capable of handling a list of names
func (h *AnnotationHandler) handleDeleteStreams(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
f, err := decodeDeleteStreamsRequest(r)
if err != nil {
h.api.Err(w, r, err)
return
}
// delete all of the streams according to the filter. annotations associated with the stream
// will be deleted by the ON DELETE CASCADE relationship between streams and annotations.
if err = h.annotationService.DeleteStreams(ctx, *f); err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, nil)
}
// Delete a single stream by ID
func (h *AnnotationHandler) handleDeleteStream(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadAnnotationId)
return
}
// as in the handleDeleteStreams method above, deleting a stream will delete annotations
// associated with it due to the ON DELETE CASCADE relationship between the two
if err := h.annotationService.DeleteStreamByID(ctx, *id); err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, nil)
}
func (h *AnnotationHandler) handleUpdateStreamByID(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadAnnotationId)
return
}
u, err := decodeCreateOrUpdateStreamRequest(r)
if err != nil {
h.api.Err(w, r, err)
return
}
s, err := h.annotationService.UpdateStream(ctx, *id, *u)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, s)
}
func decodeCreateOrUpdateStreamRequest(r *http.Request) (*influxdb.Stream, error) {
s := influxdb.Stream{}
if err := json.NewDecoder(r.Body).Decode(&s); err != nil {
return nil, err
}
if err := s.Validate(false); err != nil {
return nil, err
}
return &s, nil
}
func decodeListStreamsRequest(r *http.Request) (*influxdb.StreamListFilter, error) {
startTime, endTime, err := tFromReq(r)
if err != nil {
return nil, err
}
f := &influxdb.StreamListFilter{
StreamIncludes: r.URL.Query()["streamIncludes"],
BasicFilter: influxdb.BasicFilter{
EndTime: endTime,
StartTime: startTime,
},
}
if err := f.Validate(time.Now); err != nil {
return nil, err
}
return f, nil
}
func decodeDeleteStreamsRequest(r *http.Request) (*influxdb.BasicStream, error) {
f := &influxdb.BasicStream{
Names: r.URL.Query()["stream"],
}
if !f.IsValid() {
return nil, errBadStreamName
}
return f, nil
}

View File

@ -0,0 +1,174 @@
package transport
import (
"encoding/json"
"net/http"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"github.com/stretchr/testify/require"
)
var (
testCreateStream = influxdb.Stream{
Name: "test stream",
}
testReadStream1 = &influxdb.ReadStream{
ID: *influxdbtesting.IDPtr(1),
Name: "test stream 1",
CreatedAt: now,
UpdatedAt: now,
}
testReadStream2 = &influxdb.ReadStream{
ID: *influxdbtesting.IDPtr(2),
Name: "test stream 2",
CreatedAt: now,
UpdatedAt: now,
}
)
func TestStreamsRouter(t *testing.T) {
t.Parallel()
t.Run("create or update stream happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "PUT", ts.URL+"/streams", testCreateStream)
q := req.URL.Query()
q.Add("orgID", orgStr)
req.URL.RawQuery = q.Encode()
svc.EXPECT().
CreateOrUpdateStream(gomock.Any(), testCreateStream).
Return(testReadStream1, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := &influxdb.ReadStream{}
err := json.NewDecoder(res.Body).Decode(got)
require.NoError(t, err)
require.Equal(t, testReadStream1, got)
})
t.Run("get streams happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "GET", ts.URL+"/streams", nil)
q := req.URL.Query()
q.Add("orgID", orgStr)
q.Add("endTime", now.Format(time.RFC3339))
q.Add("streamIncludes", "stream1")
q.Add("streamIncludes", "stream2")
req.URL.RawQuery = q.Encode()
want := []influxdb.ReadStream{*testReadStream1, *testReadStream2}
svc.EXPECT().
ListStreams(gomock.Any(), *orgID, influxdb.StreamListFilter{
StreamIncludes: []string{"stream1", "stream2"},
BasicFilter: influxdb.BasicFilter{
StartTime: &time.Time{},
EndTime: &now,
},
}).
Return(want, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := []influxdb.ReadStream{}
err := json.NewDecoder(res.Body).Decode(&got)
require.NoError(t, err)
require.ElementsMatch(t, want, got)
})
t.Run("delete streams (by name) happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "DELETE", ts.URL+"/streams", nil)
q := req.URL.Query()
q.Add("stream", "stream1")
q.Add("stream", "stream2")
req.URL.RawQuery = q.Encode()
svc.EXPECT().
DeleteStreams(gomock.Any(), influxdb.BasicStream{
Names: []string{"stream1", "stream2"},
}).
Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("delete stream happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "DELETE", ts.URL+"/streams/"+idStr, nil)
svc.EXPECT().
DeleteStreamByID(gomock.Any(), *id).
Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("update stream by id happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "PUT", ts.URL+"/streams/"+idStr, testCreateStream)
svc.EXPECT().
UpdateStream(gomock.Any(), *id, testCreateStream).
Return(testReadStream1, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := &influxdb.ReadStream{}
err := json.NewDecoder(res.Body).Decode(got)
require.NoError(t, err)
require.Equal(t, testReadStream1, got)
})
t.Run("invalid org ids return 400 when required", func(t *testing.T) {
methods := []string{"GET"}
for _, m := range methods {
t.Run(m, func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, m, ts.URL+"/streams", nil)
q := req.URL.Query()
q.Add("orgID", "badid")
req.URL.RawQuery = q.Encode()
doTestRequest(t, req, http.StatusBadRequest, false)
})
}
})
t.Run("invalid stream ids return 400 when required", func(t *testing.T) {
methods := []string{"DELETE", "PUT"}
for _, m := range methods {
t.Run(m, func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, m, ts.URL+"/streams/badID", nil)
doTestRequest(t, req, http.StatusBadRequest, false)
})
}
})
}

View File

@ -52,32 +52,32 @@ func (mr *MockAnnotationServiceMockRecorder) CreateAnnotations(ctx, orgID, creat
}
// CreateOrUpdateStream mocks base method.
func (m *MockAnnotationService) CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) {
func (m *MockAnnotationService) CreateOrUpdateStream(ctx context.Context, stream influxdb.Stream) (*influxdb.ReadStream, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateOrUpdateStream", ctx, orgID, stream)
ret := m.ctrl.Call(m, "CreateOrUpdateStream", ctx, stream)
ret0, _ := ret[0].(*influxdb.ReadStream)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateOrUpdateStream indicates an expected call of CreateOrUpdateStream.
func (mr *MockAnnotationServiceMockRecorder) CreateOrUpdateStream(ctx, orgID, stream interface{}) *gomock.Call {
func (mr *MockAnnotationServiceMockRecorder) CreateOrUpdateStream(ctx, stream interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateStream", reflect.TypeOf((*MockAnnotationService)(nil).CreateOrUpdateStream), ctx, orgID, stream)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOrUpdateStream", reflect.TypeOf((*MockAnnotationService)(nil).CreateOrUpdateStream), ctx, stream)
}
// DeleteAnnotation mocks base method.
func (m *MockAnnotationService) DeleteAnnotation(ctx context.Context, orgID, id platform.ID) error {
func (m *MockAnnotationService) DeleteAnnotation(ctx context.Context, id platform.ID) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteAnnotation", ctx, orgID, id)
ret := m.ctrl.Call(m, "DeleteAnnotation", ctx, id)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteAnnotation indicates an expected call of DeleteAnnotation.
func (mr *MockAnnotationServiceMockRecorder) DeleteAnnotation(ctx, orgID, id interface{}) *gomock.Call {
func (mr *MockAnnotationServiceMockRecorder) DeleteAnnotation(ctx, id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).DeleteAnnotation), ctx, orgID, id)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).DeleteAnnotation), ctx, id)
}
// DeleteAnnotations mocks base method.
@ -94,47 +94,47 @@ func (mr *MockAnnotationServiceMockRecorder) DeleteAnnotations(ctx, orgID, delet
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAnnotations", reflect.TypeOf((*MockAnnotationService)(nil).DeleteAnnotations), ctx, orgID, delete)
}
// DeleteStream mocks base method.
func (m *MockAnnotationService) DeleteStream(ctx context.Context, orgID platform.ID, streamName string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteStream", ctx, orgID, streamName)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteStream indicates an expected call of DeleteStream.
func (mr *MockAnnotationServiceMockRecorder) DeleteStream(ctx, orgID, streamName interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStream", reflect.TypeOf((*MockAnnotationService)(nil).DeleteStream), ctx, orgID, streamName)
}
// DeleteStreamByID mocks base method.
func (m *MockAnnotationService) DeleteStreamByID(ctx context.Context, orgID, id platform.ID) error {
func (m *MockAnnotationService) DeleteStreamByID(ctx context.Context, id platform.ID) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteStreamByID", ctx, orgID, id)
ret := m.ctrl.Call(m, "DeleteStreamByID", ctx, id)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteStreamByID indicates an expected call of DeleteStreamByID.
func (mr *MockAnnotationServiceMockRecorder) DeleteStreamByID(ctx, orgID, id interface{}) *gomock.Call {
func (mr *MockAnnotationServiceMockRecorder) DeleteStreamByID(ctx, id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStreamByID", reflect.TypeOf((*MockAnnotationService)(nil).DeleteStreamByID), ctx, orgID, id)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStreamByID", reflect.TypeOf((*MockAnnotationService)(nil).DeleteStreamByID), ctx, id)
}
// DeleteStreams mocks base method.
func (m *MockAnnotationService) DeleteStreams(ctx context.Context, delete influxdb.BasicStream) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteStreams", ctx, delete)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteStreams indicates an expected call of DeleteStreams.
func (mr *MockAnnotationServiceMockRecorder) DeleteStreams(ctx, delete interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteStreams", reflect.TypeOf((*MockAnnotationService)(nil).DeleteStreams), ctx, delete)
}
// GetAnnotation mocks base method.
func (m *MockAnnotationService) GetAnnotation(ctx context.Context, orgID, id platform.ID) (*influxdb.AnnotationEvent, error) {
func (m *MockAnnotationService) GetAnnotation(ctx context.Context, id platform.ID) (*influxdb.AnnotationEvent, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetAnnotation", ctx, orgID, id)
ret := m.ctrl.Call(m, "GetAnnotation", ctx, id)
ret0, _ := ret[0].(*influxdb.AnnotationEvent)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetAnnotation indicates an expected call of GetAnnotation.
func (mr *MockAnnotationServiceMockRecorder) GetAnnotation(ctx, orgID, id interface{}) *gomock.Call {
func (mr *MockAnnotationServiceMockRecorder) GetAnnotation(ctx, id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).GetAnnotation), ctx, orgID, id)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).GetAnnotation), ctx, id)
}
// ListAnnotations mocks base method.
@ -168,31 +168,31 @@ func (mr *MockAnnotationServiceMockRecorder) ListStreams(ctx, orgID, filter inte
}
// UpdateAnnotation mocks base method.
func (m *MockAnnotationService) UpdateAnnotation(ctx context.Context, orgID, id platform.ID, update influxdb.AnnotationCreate) (*influxdb.AnnotationEvent, error) {
func (m *MockAnnotationService) UpdateAnnotation(ctx context.Context, id platform.ID, update influxdb.AnnotationCreate) (*influxdb.AnnotationEvent, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateAnnotation", ctx, orgID, id, update)
ret := m.ctrl.Call(m, "UpdateAnnotation", ctx, id, update)
ret0, _ := ret[0].(*influxdb.AnnotationEvent)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// UpdateAnnotation indicates an expected call of UpdateAnnotation.
func (mr *MockAnnotationServiceMockRecorder) UpdateAnnotation(ctx, orgID, id, update interface{}) *gomock.Call {
func (mr *MockAnnotationServiceMockRecorder) UpdateAnnotation(ctx, id, update interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).UpdateAnnotation), ctx, orgID, id, update)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAnnotation", reflect.TypeOf((*MockAnnotationService)(nil).UpdateAnnotation), ctx, id, update)
}
// UpdateStream mocks base method.
func (m *MockAnnotationService) UpdateStream(ctx context.Context, orgID, id platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) {
func (m *MockAnnotationService) UpdateStream(ctx context.Context, id platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateStream", ctx, orgID, id, stream)
ret := m.ctrl.Call(m, "UpdateStream", ctx, id, stream)
ret0, _ := ret[0].(*influxdb.ReadStream)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// UpdateStream indicates an expected call of UpdateStream.
func (mr *MockAnnotationServiceMockRecorder) UpdateStream(ctx, orgID, id, stream interface{}) *gomock.Call {
func (mr *MockAnnotationServiceMockRecorder) UpdateStream(ctx, id, stream interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateStream", reflect.TypeOf((*MockAnnotationService)(nil).UpdateStream), ctx, orgID, id, stream)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateStream", reflect.TypeOf((*MockAnnotationService)(nil).UpdateStream), ctx, id, stream)
}