influxdb/annotations/transport/streams_router_test.go

199 lines
5.0 KiB
Go

package transport
import (
"encoding/json"
"net/http"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"github.com/stretchr/testify/require"
)
var (
testCreateStream = influxdb.Stream{
Name: "test stream",
}
testReadStream1 = &influxdb.ReadStream{
ID: *influxdbtesting.IDPtr(1),
Name: "test stream 1",
CreatedAt: now,
UpdatedAt: now,
}
testReadStream2 = &influxdb.ReadStream{
ID: *influxdbtesting.IDPtr(2),
Name: "test stream 2",
CreatedAt: now,
UpdatedAt: now,
}
testStoredStream1 = influxdb.StoredStream{
ID: testReadStream1.ID,
OrgID: *orgID,
Name: testReadStream1.Name,
Description: testReadStream1.Description,
CreatedAt: testReadStream1.CreatedAt,
UpdatedAt: testReadStream1.UpdatedAt,
}
testStoredStream2 = influxdb.StoredStream{
ID: testReadStream2.ID,
OrgID: *orgID,
Name: testReadStream2.Name,
Description: testReadStream2.Description,
CreatedAt: testReadStream2.CreatedAt,
UpdatedAt: testReadStream2.UpdatedAt,
}
)
func TestStreamsRouter(t *testing.T) {
t.Parallel()
t.Run("create or update stream happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "PUT", ts.URL+"/streams", testCreateStream)
q := req.URL.Query()
q.Add("orgID", orgStr)
req.URL.RawQuery = q.Encode()
svc.EXPECT().
CreateOrUpdateStream(gomock.Any(), *orgID, testCreateStream).
Return(testReadStream1, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := &influxdb.ReadStream{}
err := json.NewDecoder(res.Body).Decode(got)
require.NoError(t, err)
require.Equal(t, testReadStream1, got)
})
t.Run("get streams happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "GET", ts.URL+"/streams", nil)
q := req.URL.Query()
q.Add("orgID", orgStr)
q.Add("endTime", now.Format(time.RFC3339))
q.Add("streamIncludes", "stream1")
q.Add("streamIncludes", "stream2")
req.URL.RawQuery = q.Encode()
svc.EXPECT().
ListStreams(gomock.Any(), *orgID, influxdb.StreamListFilter{
StreamIncludes: []string{"stream1", "stream2"},
BasicFilter: influxdb.BasicFilter{
StartTime: &time.Time{},
EndTime: &now,
},
}).
Return([]influxdb.StoredStream{testStoredStream1, testStoredStream2}, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := []influxdb.ReadStream{}
err := json.NewDecoder(res.Body).Decode(&got)
require.NoError(t, err)
require.ElementsMatch(t, []influxdb.ReadStream{*testReadStream1, *testReadStream2}, got)
})
t.Run("delete streams (by name) happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "DELETE", ts.URL+"/streams", nil)
q := req.URL.Query()
q.Add("orgID", orgStr)
q.Add("stream", "stream1")
q.Add("stream", "stream2")
req.URL.RawQuery = q.Encode()
svc.EXPECT().
DeleteStreams(gomock.Any(), *orgID, influxdb.BasicStream{
Names: []string{"stream1", "stream2"},
}).
Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("delete stream happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "DELETE", ts.URL+"/streams/"+idStr, nil)
svc.EXPECT().
DeleteStreamByID(gomock.Any(), *id).
Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("update stream by id happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "PUT", ts.URL+"/streams/"+idStr, testCreateStream)
svc.EXPECT().
UpdateStream(gomock.Any(), *id, testCreateStream).
Return(testReadStream1, nil)
res := doTestRequest(t, req, http.StatusOK, true)
got := &influxdb.ReadStream{}
err := json.NewDecoder(res.Body).Decode(got)
require.NoError(t, err)
require.Equal(t, testReadStream1, got)
})
t.Run("invalid org ids return 400 when required", func(t *testing.T) {
methods := []string{"GET", "PUT", "DELETE"}
for _, m := range methods {
t.Run(m, func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, m, ts.URL+"/streams", nil)
q := req.URL.Query()
q.Add("orgID", "badid")
req.URL.RawQuery = q.Encode()
doTestRequest(t, req, http.StatusBadRequest, false)
})
}
})
t.Run("invalid stream ids return 400 when required", func(t *testing.T) {
methods := []string{"DELETE", "PUT"}
for _, m := range methods {
t.Run(m, func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, m, ts.URL+"/streams/badID", nil)
doTestRequest(t, req, http.StatusBadRequest, false)
})
}
})
}
func TestStoredStreamsToReadStreams(t *testing.T) {
t.Parallel()
got := storedStreamsToReadStreams([]influxdb.StoredStream{testStoredStream1, testStoredStream2})
require.Equal(t, got, []influxdb.ReadStream{*testReadStream1, *testReadStream2})
}