1042 lines
26 KiB
Go
1042 lines
26 KiB
Go
//go:build sqlite_json && sqlite_foreign_keys
|
|
|
|
package annotations
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
|
"github.com/influxdata/influxdb/v2/snowflake"
|
|
"github.com/influxdata/influxdb/v2/sqlite"
|
|
"github.com/influxdata/influxdb/v2/sqlite/migrations"
|
|
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
idGen = snowflake.NewIDGenerator()
|
|
)
|
|
|
|
func TestAnnotationsCRUD(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// intialize some variables that can be shared across tests
|
|
// the timeline for the 3 test annotations start & end times is visualized below.
|
|
// now
|
|
// v
|
|
// |---|---|---|---|
|
|
// ^ ^ ^ ^ ^
|
|
// st1 et1
|
|
// st2 et2
|
|
// st3 et3
|
|
// st4 et4
|
|
|
|
et1 := time.Now().UTC()
|
|
st1 := et1.Add(-10 * time.Minute)
|
|
|
|
et2 := et1.Add(-5 * time.Minute)
|
|
st2 := et2.Add(-10 * time.Minute)
|
|
|
|
et3 := et1.Add(-10 * time.Minute)
|
|
st3 := et2.Add(-15 * time.Minute)
|
|
|
|
et4 := et3
|
|
st4 := st3
|
|
|
|
// used for tests involving time filters
|
|
earlierEt1 := et1.Add(-1 * time.Millisecond)
|
|
laterSt3 := st3.Add(1 * time.Millisecond)
|
|
beforeAny := st3.Add(-1 * time.Millisecond)
|
|
afterAny := et1.Add(1 * time.Millisecond)
|
|
|
|
orgID := *influxdbtesting.IDPtr(1)
|
|
otherOrgID := *influxdbtesting.IDPtr(2)
|
|
ctx := context.Background()
|
|
|
|
s1 := influxdb.StoredAnnotation{
|
|
OrgID: orgID,
|
|
StreamTag: "stream1",
|
|
Summary: "summary1",
|
|
Message: "message1",
|
|
Stickers: map[string]string{"stick1": "val1", "stick2": "val2"},
|
|
Duration: timesToDuration(st1, et1),
|
|
Lower: st1.Format(time.RFC3339Nano),
|
|
Upper: et1.Format(time.RFC3339Nano),
|
|
}
|
|
|
|
c1, err := s1.ToCreate()
|
|
require.NoError(t, err)
|
|
|
|
s2 := influxdb.StoredAnnotation{
|
|
OrgID: orgID,
|
|
StreamTag: "stream2",
|
|
Summary: "summary2",
|
|
Message: "message2",
|
|
Stickers: map[string]string{"stick2": "val2", "stick3": "val3", "stick4": "val4"},
|
|
Duration: timesToDuration(st2, et2),
|
|
Lower: st2.Format(time.RFC3339Nano),
|
|
Upper: et2.Format(time.RFC3339Nano),
|
|
}
|
|
|
|
c2, err := s2.ToCreate()
|
|
require.NoError(t, err)
|
|
|
|
s3 := influxdb.StoredAnnotation{
|
|
OrgID: orgID,
|
|
StreamTag: "stream2",
|
|
Summary: "summary3",
|
|
Message: "message3",
|
|
Stickers: map[string]string{"stick1": "val2"},
|
|
Duration: timesToDuration(st3, et3),
|
|
Lower: st3.Format(time.RFC3339Nano),
|
|
Upper: et3.Format(time.RFC3339Nano),
|
|
}
|
|
|
|
c3, err := s3.ToCreate()
|
|
require.NoError(t, err)
|
|
|
|
// s4 is an annotation without any stickers, with the same start/end time as s3
|
|
s4 := influxdb.StoredAnnotation{
|
|
OrgID: orgID,
|
|
StreamTag: "stream4",
|
|
Summary: "summary4",
|
|
Message: "message4",
|
|
Stickers: map[string]string{},
|
|
Duration: timesToDuration(st4, et4),
|
|
Lower: st3.Format(time.RFC3339Nano),
|
|
Upper: et3.Format(time.RFC3339Nano),
|
|
}
|
|
|
|
c4, err := s4.ToCreate()
|
|
require.NoError(t, err)
|
|
|
|
// helper function for setting up the database with data that can be used for tests
|
|
// that involve querying the database. uses the annotations objects initialized above
|
|
// via the closure.
|
|
populateAnnotationsData := func(t *testing.T, svc *Service) []influxdb.AnnotationEvent {
|
|
t.Helper()
|
|
|
|
got, err := svc.CreateAnnotations(ctx, orgID, []influxdb.AnnotationCreate{*c1, *c2, *c3, *c4})
|
|
require.NoError(t, err)
|
|
assertAnnotationEvents(t, got, []influxdb.AnnotationEvent{
|
|
{AnnotationCreate: *c1},
|
|
{AnnotationCreate: *c2},
|
|
{AnnotationCreate: *c3},
|
|
{AnnotationCreate: *c4},
|
|
})
|
|
|
|
return got
|
|
}
|
|
|
|
t.Run("create annotations", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
|
|
tests := []struct {
|
|
name string
|
|
creates []influxdb.AnnotationCreate
|
|
want []influxdb.AnnotationEvent
|
|
}{
|
|
{
|
|
"empty creates list returns empty events list",
|
|
[]influxdb.AnnotationCreate{},
|
|
[]influxdb.AnnotationEvent{},
|
|
},
|
|
{
|
|
"creates annotations successfully",
|
|
[]influxdb.AnnotationCreate{*c1, *c2, *c3, *c4},
|
|
[]influxdb.AnnotationEvent{
|
|
{AnnotationCreate: *c1},
|
|
{AnnotationCreate: *c2},
|
|
{AnnotationCreate: *c3},
|
|
{AnnotationCreate: *c4},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
got, err := svc.CreateAnnotations(ctx, orgID, tt.creates)
|
|
require.NoError(t, err)
|
|
assertAnnotationEvents(t, got, tt.want)
|
|
})
|
|
}
|
|
})
|
|
|
|
t.Run("select with filters", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
populateAnnotationsData(t, svc)
|
|
|
|
tests := []struct {
|
|
name string
|
|
orgID platform.ID
|
|
f influxdb.AnnotationListFilter
|
|
want []influxdb.StoredAnnotation
|
|
skip string // link to issue and/or reason
|
|
}{
|
|
{
|
|
name: "time filter is inclusive - gets all",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
BasicFilter: influxdb.BasicFilter{
|
|
StartTime: &st3,
|
|
EndTime: &et1,
|
|
},
|
|
},
|
|
want: []influxdb.StoredAnnotation{s1, s2, s3, s4},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "doesn't get results for other org",
|
|
orgID: otherOrgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
BasicFilter: influxdb.BasicFilter{
|
|
StartTime: &st3,
|
|
EndTime: &et1,
|
|
},
|
|
},
|
|
want: []influxdb.StoredAnnotation{},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "end time will filter out annotations",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
BasicFilter: influxdb.BasicFilter{
|
|
StartTime: &st3,
|
|
EndTime: &earlierEt1,
|
|
},
|
|
},
|
|
want: []influxdb.StoredAnnotation{s2, s3, s4},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "start time will filter out annotations",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
BasicFilter: influxdb.BasicFilter{
|
|
StartTime: &laterSt3,
|
|
EndTime: &et1,
|
|
},
|
|
},
|
|
want: []influxdb.StoredAnnotation{s1, s2},
|
|
skip: "https://github.com/influxdata/influxdb/issues/23272",
|
|
},
|
|
{
|
|
name: "time can filter out all annotations if it's too soon",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
BasicFilter: influxdb.BasicFilter{
|
|
StartTime: &beforeAny,
|
|
EndTime: &beforeAny,
|
|
},
|
|
},
|
|
want: []influxdb.StoredAnnotation{},
|
|
skip: "https://github.com/influxdata/influxdb/issues/23272",
|
|
},
|
|
{
|
|
name: "time can filter out all annotations if it's too late",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
BasicFilter: influxdb.BasicFilter{
|
|
StartTime: &afterAny,
|
|
EndTime: &afterAny,
|
|
},
|
|
},
|
|
want: []influxdb.StoredAnnotation{},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "time can filter out all annotations if it's too narrow",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
BasicFilter: influxdb.BasicFilter{
|
|
StartTime: &laterSt3,
|
|
EndTime: &et3,
|
|
},
|
|
},
|
|
want: []influxdb.StoredAnnotation{},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "can filter by stickers - one sticker matches one",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
StickerIncludes: map[string]string{"stick1": "val2"},
|
|
},
|
|
want: []influxdb.StoredAnnotation{s3},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "can filter by stickers - one sticker matches multiple",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
StickerIncludes: map[string]string{"stick2": "val2"},
|
|
},
|
|
want: []influxdb.StoredAnnotation{s1, s2},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "can filter by stickers - matching key but wrong value",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
StickerIncludes: map[string]string{"stick2": "val3"},
|
|
},
|
|
want: []influxdb.StoredAnnotation{},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "can filter by stream - matches one",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
StreamIncludes: []string{"stream1"},
|
|
},
|
|
want: []influxdb.StoredAnnotation{s1},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "can filter by stream - matches multiple",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
StreamIncludes: []string{"stream2"},
|
|
},
|
|
want: []influxdb.StoredAnnotation{s2, s3},
|
|
skip: "",
|
|
},
|
|
{
|
|
name: "can filter by stream - no match",
|
|
orgID: orgID,
|
|
f: influxdb.AnnotationListFilter{
|
|
StreamIncludes: []string{"badStream"},
|
|
},
|
|
want: []influxdb.StoredAnnotation{},
|
|
skip: "",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
if tt.skip != "" {
|
|
t.Skip(tt.skip)
|
|
}
|
|
tt.f.Validate(time.Now)
|
|
got, err := svc.ListAnnotations(ctx, tt.orgID, tt.f)
|
|
require.NoError(t, err)
|
|
assertStoredAnnotations(t, got, tt.want)
|
|
})
|
|
}
|
|
})
|
|
|
|
t.Run("get by id", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
anns := populateAnnotationsData(t, svc)
|
|
|
|
tests := []struct {
|
|
name string
|
|
id platform.ID
|
|
want *influxdb.AnnotationEvent
|
|
wantErr error
|
|
}{
|
|
{
|
|
"gets the first one by id",
|
|
anns[0].ID,
|
|
&anns[0],
|
|
nil,
|
|
},
|
|
{
|
|
"gets the second one by id",
|
|
anns[1].ID,
|
|
&anns[1],
|
|
nil,
|
|
},
|
|
{
|
|
"has the correct error if not found",
|
|
idGen.ID(),
|
|
nil,
|
|
errAnnotationNotFound,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
got, err := svc.GetAnnotation(ctx, tt.id)
|
|
require.Equal(t, tt.wantErr, err)
|
|
|
|
if tt.want == nil {
|
|
require.Nil(t, got)
|
|
} else {
|
|
e, err := got.ToEvent()
|
|
require.NoError(t, err)
|
|
require.Equal(t, tt.want, e)
|
|
}
|
|
})
|
|
}
|
|
})
|
|
|
|
t.Run("delete multiple with a filter", func(t *testing.T) {
|
|
t.Run("delete by stream id", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
populateAnnotationsData(t, svc)
|
|
|
|
ctx := context.Background()
|
|
|
|
lf := influxdb.AnnotationListFilter{BasicFilter: influxdb.BasicFilter{}}
|
|
lf.Validate(time.Now)
|
|
ans, err := svc.ListAnnotations(ctx, orgID, lf)
|
|
require.NoError(t, err)
|
|
|
|
annID1 := ans[0].ID
|
|
streamID1 := ans[0].StreamID
|
|
st1, err := time.Parse(time.RFC3339Nano, ans[0].Lower)
|
|
require.NoError(t, err)
|
|
et1, err := time.Parse(time.RFC3339Nano, ans[0].Upper)
|
|
require.NoError(t, err)
|
|
|
|
streamID2 := ans[1].StreamID
|
|
st2, err := time.Parse(time.RFC3339Nano, ans[1].Lower)
|
|
require.NoError(t, err)
|
|
et2, err := time.Parse(time.RFC3339Nano, ans[1].Upper)
|
|
require.NoError(t, err)
|
|
|
|
tests := []struct {
|
|
name string
|
|
deleteOrgID platform.ID
|
|
id platform.ID
|
|
filter influxdb.AnnotationDeleteFilter
|
|
shouldDelete bool
|
|
}{
|
|
{
|
|
"matches stream id but not time range",
|
|
orgID,
|
|
annID1,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamID: streamID1,
|
|
StartTime: &st2,
|
|
EndTime: &et2,
|
|
},
|
|
false,
|
|
},
|
|
{
|
|
"matches time range but not stream id",
|
|
orgID,
|
|
annID1,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamID: streamID2,
|
|
StartTime: &st1,
|
|
EndTime: &et1,
|
|
},
|
|
false,
|
|
},
|
|
{
|
|
"doesn't delete for other org",
|
|
otherOrgID,
|
|
annID1,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamID: streamID1,
|
|
StartTime: &st1,
|
|
EndTime: &et1,
|
|
},
|
|
false,
|
|
},
|
|
{
|
|
"matches stream id and time range",
|
|
orgID,
|
|
annID1,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamID: streamID1,
|
|
StartTime: &st1,
|
|
EndTime: &et1,
|
|
},
|
|
true,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
err := svc.DeleteAnnotations(ctx, tt.deleteOrgID, tt.filter)
|
|
require.NoError(t, err)
|
|
|
|
lf := influxdb.AnnotationListFilter{BasicFilter: influxdb.BasicFilter{}}
|
|
lf.Validate(time.Now)
|
|
list, err := svc.ListAnnotations(ctx, orgID, lf)
|
|
require.NoError(t, err)
|
|
get, getErr := svc.GetAnnotation(ctx, tt.id)
|
|
|
|
if tt.shouldDelete {
|
|
require.Equal(t, 3, len(list))
|
|
require.Nil(t, get)
|
|
require.Equal(t, errAnnotationNotFound, getErr)
|
|
} else {
|
|
require.Equal(t, 4, len(list))
|
|
require.NoError(t, getErr)
|
|
require.Equal(t, *get, ans[0])
|
|
}
|
|
})
|
|
}
|
|
})
|
|
|
|
t.Run("delete with non-id filters", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
populateAnnotationsData(t, svc)
|
|
|
|
tests := []struct {
|
|
name string
|
|
deleteOrgID platform.ID
|
|
filter influxdb.AnnotationDeleteFilter
|
|
wantList []influxdb.StoredAnnotation
|
|
}{
|
|
{
|
|
"matches stream tag but not time range",
|
|
orgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamTag: "stream1",
|
|
StartTime: &st1,
|
|
EndTime: &earlierEt1,
|
|
},
|
|
[]influxdb.StoredAnnotation{s1, s2, s3, s4},
|
|
},
|
|
{
|
|
"matches stream tag and time range",
|
|
orgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamTag: "stream1",
|
|
StartTime: &st1,
|
|
EndTime: &et1,
|
|
},
|
|
[]influxdb.StoredAnnotation{s2, s3, s4},
|
|
},
|
|
{
|
|
"matches stream tag and time range for item with no stickers",
|
|
orgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamTag: "stream4",
|
|
StartTime: &st4,
|
|
EndTime: &et4,
|
|
},
|
|
[]influxdb.StoredAnnotation{s1, s2, s3},
|
|
},
|
|
{
|
|
"matches stream tag for multiple",
|
|
orgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamTag: "stream2",
|
|
StartTime: &st3,
|
|
EndTime: &et1,
|
|
},
|
|
[]influxdb.StoredAnnotation{s1, s4},
|
|
},
|
|
{
|
|
"matches stream tag but wrong org",
|
|
otherOrgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
StreamTag: "stream1",
|
|
StartTime: &st1,
|
|
EndTime: &et1,
|
|
},
|
|
[]influxdb.StoredAnnotation{s1, s2, s3, s4},
|
|
},
|
|
|
|
{
|
|
"matches stickers but not time range",
|
|
orgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
Stickers: map[string]string{"stick1": "val1"},
|
|
StartTime: &st1,
|
|
EndTime: &earlierEt1,
|
|
},
|
|
[]influxdb.StoredAnnotation{s1, s2, s3, s4},
|
|
},
|
|
{
|
|
"matches stickers and time range",
|
|
orgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
Stickers: map[string]string{"stick1": "val1"},
|
|
StartTime: &st1,
|
|
EndTime: &et1,
|
|
},
|
|
[]influxdb.StoredAnnotation{s2, s3, s4},
|
|
},
|
|
{
|
|
"matches stickers for multiple",
|
|
orgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
Stickers: map[string]string{"stick2": "val2"},
|
|
StartTime: &st2,
|
|
EndTime: &et1,
|
|
},
|
|
[]influxdb.StoredAnnotation{s3, s4},
|
|
},
|
|
{
|
|
"matches stickers but wrong org",
|
|
otherOrgID,
|
|
influxdb.AnnotationDeleteFilter{
|
|
Stickers: map[string]string{"stick1": "val1"},
|
|
StartTime: &st1,
|
|
EndTime: &et1,
|
|
},
|
|
[]influxdb.StoredAnnotation{s1, s2, s3, s4},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
populateAnnotationsData(t, svc)
|
|
|
|
err := svc.DeleteAnnotations(ctx, tt.deleteOrgID, tt.filter)
|
|
require.NoError(t, err)
|
|
|
|
f := influxdb.AnnotationListFilter{}
|
|
f.Validate(time.Now)
|
|
list, err := svc.ListAnnotations(ctx, orgID, f)
|
|
require.NoError(t, err)
|
|
assertStoredAnnotations(t, list, tt.wantList)
|
|
})
|
|
}
|
|
})
|
|
})
|
|
|
|
t.Run("delete a single annotation by id", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
ans := populateAnnotationsData(t, svc)
|
|
|
|
tests := []struct {
|
|
name string
|
|
id platform.ID
|
|
shouldDelete bool
|
|
}{
|
|
{
|
|
"has the correct error if not found",
|
|
idGen.ID(),
|
|
false,
|
|
},
|
|
{
|
|
"deletes the first one by id",
|
|
ans[0].ID,
|
|
true,
|
|
},
|
|
{
|
|
"deletes the second one by id",
|
|
ans[1].ID,
|
|
true,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
err := svc.DeleteAnnotation(ctx, tt.id)
|
|
|
|
if tt.shouldDelete {
|
|
require.NoError(t, err)
|
|
} else {
|
|
require.Equal(t, errAnnotationNotFound, err)
|
|
}
|
|
|
|
got, err := svc.GetAnnotation(ctx, tt.id)
|
|
require.Equal(t, errAnnotationNotFound, err)
|
|
require.Nil(t, got)
|
|
})
|
|
}
|
|
})
|
|
|
|
t.Run("update a single annotation by id", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
ans := populateAnnotationsData(t, svc)
|
|
|
|
updatedTime := time.Time{}.Add(time.Minute)
|
|
|
|
tests := []struct {
|
|
name string
|
|
id platform.ID
|
|
update influxdb.AnnotationCreate
|
|
wantErr error
|
|
}{
|
|
{
|
|
"has the correct error if not found",
|
|
idGen.ID(),
|
|
influxdb.AnnotationCreate{
|
|
StreamTag: "updated tag",
|
|
Summary: "updated summary",
|
|
Message: "updated message",
|
|
Stickers: map[string]string{"updated": "sticker"},
|
|
EndTime: &updatedTime,
|
|
StartTime: &updatedTime,
|
|
},
|
|
errAnnotationNotFound,
|
|
},
|
|
{
|
|
"updates the first one by id",
|
|
ans[0].ID,
|
|
influxdb.AnnotationCreate{
|
|
StreamTag: "updated tag",
|
|
Summary: "updated summary",
|
|
Message: "updated message",
|
|
Stickers: map[string]string{"updated": "sticker"},
|
|
EndTime: &updatedTime,
|
|
StartTime: &updatedTime,
|
|
},
|
|
nil,
|
|
},
|
|
{
|
|
"updates the second one by id",
|
|
ans[1].ID,
|
|
influxdb.AnnotationCreate{
|
|
StreamTag: "updated tag2",
|
|
Summary: "updated summary2",
|
|
Message: "updated message2",
|
|
Stickers: map[string]string{"updated2": "sticker2"},
|
|
EndTime: &updatedTime,
|
|
StartTime: &updatedTime,
|
|
},
|
|
nil,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
want := &influxdb.AnnotationEvent{ID: tt.id, AnnotationCreate: tt.update}
|
|
if tt.wantErr != nil {
|
|
want = nil
|
|
}
|
|
|
|
got, err := svc.UpdateAnnotation(ctx, tt.id, tt.update)
|
|
require.Equal(t, tt.wantErr, err)
|
|
require.Equal(t, want, got)
|
|
|
|
if tt.wantErr == nil {
|
|
new, err := svc.GetAnnotation(ctx, tt.id)
|
|
require.NoError(t, err)
|
|
e, err := new.ToEvent()
|
|
require.NoError(t, err)
|
|
require.Equal(t, got, e)
|
|
}
|
|
})
|
|
}
|
|
})
|
|
|
|
t.Run("deleted streams cascade to deleted annotations", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
|
|
ctx := context.Background()
|
|
ans := populateAnnotationsData(t, svc)
|
|
sort.Slice(ans, func(i, j int) bool {
|
|
return ans[i].StreamTag < ans[j].StreamTag
|
|
})
|
|
|
|
// annotations s2 and s3 have the stream tag of "stream2", so get the id of that stream
|
|
id := ans[1].ID
|
|
a, err := svc.GetAnnotation(ctx, id)
|
|
require.NoError(t, err)
|
|
streamID := a.StreamID
|
|
|
|
// delete the stream
|
|
err = svc.DeleteStreamByID(ctx, streamID)
|
|
require.NoError(t, err)
|
|
|
|
// s1 and s4 should still be there
|
|
s1, err := svc.GetAnnotation(ctx, ans[0].ID)
|
|
require.NoError(t, err)
|
|
s4, err := svc.GetAnnotation(ctx, ans[3].ID)
|
|
require.NoError(t, err)
|
|
|
|
// both s2 and s3 should now be deleted
|
|
f := influxdb.AnnotationListFilter{}
|
|
f.Validate(time.Now)
|
|
remaining, err := svc.ListAnnotations(ctx, orgID, f)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []influxdb.StoredAnnotation{*s1, *s4}, remaining)
|
|
})
|
|
|
|
t.Run("renamed streams are reflected in subsequent annotation queries", func(t *testing.T) {
|
|
svc := newTestService(t)
|
|
|
|
ctx := context.Background()
|
|
populateAnnotationsData(t, svc)
|
|
|
|
// get all the annotations with the tag "stream2"
|
|
f := influxdb.AnnotationListFilter{StreamIncludes: []string{"stream2"}}
|
|
f.Validate(time.Now)
|
|
originalList, err := svc.ListAnnotations(ctx, orgID, f)
|
|
require.NoError(t, err)
|
|
assertStoredAnnotations(t, []influxdb.StoredAnnotation{s2, s3}, originalList)
|
|
|
|
// check that the original list has the right stream tag for all annotations
|
|
for _, a := range originalList {
|
|
require.Equal(t, "stream2", a.StreamTag)
|
|
}
|
|
|
|
// update the name for stream2
|
|
streamID := originalList[0].StreamID
|
|
_, err = svc.UpdateStream(ctx, streamID, influxdb.Stream{Name: "new name", Description: "new desc"})
|
|
require.NoError(t, err)
|
|
|
|
// get all the annotations with the new tag
|
|
f = influxdb.AnnotationListFilter{StreamIncludes: []string{"new name"}}
|
|
f.Validate(time.Now)
|
|
newList, err := svc.ListAnnotations(ctx, orgID, f)
|
|
require.NoError(t, err)
|
|
|
|
// check that the new list has the right stream tag for all annotations
|
|
for _, a := range newList {
|
|
require.Equal(t, "new name", a.StreamTag)
|
|
}
|
|
|
|
// verify that the new list of annotations is the same as the original except for the stream name change
|
|
require.Equal(t, len(originalList), len(newList))
|
|
|
|
sort.Slice(originalList, func(i, j int) bool {
|
|
return originalList[i].ID < originalList[j].ID
|
|
})
|
|
|
|
sort.Slice(newList, func(i, j int) bool {
|
|
return originalList[i].ID < originalList[j].ID
|
|
})
|
|
|
|
for i := range newList {
|
|
originalList[i].StreamTag = "new name"
|
|
require.Equal(t, originalList[i], newList[i])
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestStreamsCRUDSingle(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
svc := newTestService(t)
|
|
|
|
ctx := context.Background()
|
|
orgID := *influxdbtesting.IDPtr(1)
|
|
|
|
stream := influxdb.Stream{
|
|
Name: "testName",
|
|
Description: "original description",
|
|
}
|
|
|
|
var err error
|
|
var s1, s2, s3 *influxdb.ReadStream
|
|
|
|
t.Run("create a single stream", func(t *testing.T) {
|
|
s1, err = svc.CreateOrUpdateStream(ctx, orgID, stream)
|
|
require.NoError(t, err)
|
|
require.Equal(t, stream.Name, s1.Name)
|
|
require.Equal(t, stream.Description, s1.Description)
|
|
})
|
|
|
|
t.Run("stream updates", func(t *testing.T) {
|
|
u1 := influxdb.Stream{
|
|
Name: "testName",
|
|
Description: "updated description",
|
|
}
|
|
|
|
u2 := influxdb.Stream{
|
|
Name: "otherName",
|
|
Description: "other description",
|
|
}
|
|
|
|
t.Run("updating an existing stream with CreateOrUpdateStream does not change id but does change description", func(t *testing.T) {
|
|
s2, err = svc.CreateOrUpdateStream(ctx, orgID, u1)
|
|
require.NoError(t, err)
|
|
require.Equal(t, stream.Name, s2.Name)
|
|
require.Equal(t, u1.Description, s2.Description)
|
|
require.Equal(t, s1.ID, s2.ID)
|
|
})
|
|
|
|
t.Run("updating a non-existant stream with UpdateStream returns not found error", func(t *testing.T) {
|
|
readGot, err := svc.UpdateStream(ctx, idGen.ID(), u2)
|
|
require.Nil(t, readGot)
|
|
require.Equal(t, errStreamNotFound, err)
|
|
})
|
|
|
|
t.Run("updating an existing stream with UpdateStream changes both name & description", func(t *testing.T) {
|
|
s3, err = svc.UpdateStream(ctx, s2.ID, u2)
|
|
require.NoError(t, err)
|
|
require.Equal(t, s2.ID, s3.ID)
|
|
require.Equal(t, u2.Name, s3.Name)
|
|
require.Equal(t, u2.Description, s3.Description)
|
|
})
|
|
})
|
|
|
|
t.Run("getting a stream", func(t *testing.T) {
|
|
t.Run("non-existant stream returns a not found error", func(t *testing.T) {
|
|
storedGot, err := svc.GetStream(ctx, idGen.ID())
|
|
require.Nil(t, storedGot)
|
|
require.Equal(t, errStreamNotFound, err)
|
|
})
|
|
|
|
t.Run("existing stream returns without error", func(t *testing.T) {
|
|
storedGot, err := svc.GetStream(ctx, s3.ID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, s3.Name, storedGot.Name)
|
|
require.Equal(t, s3.Description, storedGot.Description)
|
|
})
|
|
})
|
|
|
|
t.Run("deleting a stream", func(t *testing.T) {
|
|
t.Run("non-existant stream returns a not found error", func(t *testing.T) {
|
|
err := svc.DeleteStreamByID(ctx, idGen.ID())
|
|
require.Equal(t, errStreamNotFound, err)
|
|
})
|
|
|
|
t.Run("deletes an existing stream without error", func(t *testing.T) {
|
|
err := svc.DeleteStreamByID(ctx, s1.ID)
|
|
require.NoError(t, err)
|
|
|
|
storedGot, err := svc.GetStream(ctx, s1.ID)
|
|
require.Nil(t, storedGot)
|
|
require.Equal(t, err, errStreamNotFound)
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestStreamsCRUDMany(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
svc := newTestService(t)
|
|
|
|
ctx := context.Background()
|
|
|
|
orgID1 := influxdbtesting.IDPtr(1)
|
|
orgID2 := influxdbtesting.IDPtr(2)
|
|
orgID3 := influxdbtesting.IDPtr(3)
|
|
|
|
// populate the database with some streams for testing delete and select many
|
|
combos := map[platform.ID][]string{
|
|
*orgID1: {"org1_s1", "org1_s2", "org1_s3", "org1_s4"},
|
|
*orgID2: {"org2_s1"},
|
|
*orgID3: {"org3_s1", "org3_s2"},
|
|
}
|
|
|
|
for orgID, streams := range combos {
|
|
for _, s := range streams {
|
|
_, err := svc.CreateOrUpdateStream(ctx, orgID, influxdb.Stream{
|
|
Name: s,
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
t.Run("all streams can be listed for each org if passing an empty list", func(t *testing.T) {
|
|
for orgID, streams := range combos {
|
|
got, err := svc.ListStreams(ctx, orgID, influxdb.StreamListFilter{
|
|
StreamIncludes: []string{},
|
|
})
|
|
require.NoError(t, err)
|
|
assertStreamNames(t, streams, got)
|
|
}
|
|
})
|
|
|
|
t.Run("can select specific streams and get only those for that org", func(t *testing.T) {
|
|
for orgID, streams := range combos {
|
|
got, err := svc.ListStreams(ctx, orgID, influxdb.StreamListFilter{
|
|
StreamIncludes: streams,
|
|
})
|
|
require.NoError(t, err)
|
|
assertStreamNames(t, streams, got)
|
|
}
|
|
})
|
|
|
|
t.Run("can delete a single stream with DeleteStreams, but does not delete streams for other org", func(t *testing.T) {
|
|
err := svc.DeleteStreams(ctx, *orgID1, influxdb.BasicStream{
|
|
Names: []string{"org1_s1", "org2_s1"},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
got, err := svc.ListStreams(ctx, *orgID1, influxdb.StreamListFilter{
|
|
StreamIncludes: []string{},
|
|
})
|
|
require.NoError(t, err)
|
|
assertStreamNames(t, []string{"org1_s2", "org1_s3", "org1_s4"}, got)
|
|
|
|
got, err = svc.ListStreams(ctx, *orgID2, influxdb.StreamListFilter{
|
|
StreamIncludes: []string{},
|
|
})
|
|
require.NoError(t, err)
|
|
assertStreamNames(t, []string{"org2_s1"}, got)
|
|
})
|
|
|
|
t.Run("can delete all streams for all orgs", func(t *testing.T) {
|
|
for orgID, streams := range combos {
|
|
err := svc.DeleteStreams(ctx, orgID, influxdb.BasicStream{
|
|
Names: streams,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
got, err := svc.ListStreams(ctx, orgID, influxdb.StreamListFilter{
|
|
StreamIncludes: []string{},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, []influxdb.StoredStream{}, got)
|
|
}
|
|
})
|
|
}
|
|
|
|
func assertAnnotationEvents(t *testing.T, got, want []influxdb.AnnotationEvent) {
|
|
t.Helper()
|
|
|
|
require.Equal(t, len(want), len(got))
|
|
|
|
sort.Slice(want, func(i, j int) bool {
|
|
return want[i].StreamTag < want[j].StreamTag
|
|
})
|
|
|
|
sort.Slice(got, func(i, j int) bool {
|
|
return got[i].StreamTag < got[j].StreamTag
|
|
})
|
|
|
|
for idx, w := range want {
|
|
w.ID = got[idx].ID
|
|
require.Equal(t, w, got[idx])
|
|
}
|
|
}
|
|
|
|
// should make these are lists similar
|
|
func assertStoredAnnotations(t *testing.T, got, want []influxdb.StoredAnnotation) {
|
|
t.Helper()
|
|
|
|
require.Equal(t, len(want), len(got))
|
|
|
|
sort.Slice(want, func(i, j int) bool {
|
|
return want[i].ID < want[j].ID
|
|
})
|
|
|
|
sort.Slice(got, func(i, j int) bool {
|
|
return got[i].ID < got[j].ID
|
|
})
|
|
|
|
for idx, w := range want {
|
|
w.ID = got[idx].ID
|
|
w.StreamID = got[idx].StreamID
|
|
require.Equal(t, w, got[idx])
|
|
}
|
|
}
|
|
|
|
func assertStreamNames(t *testing.T, want []string, got []influxdb.StoredStream) {
|
|
t.Helper()
|
|
|
|
storedNames := make([]string, len(got))
|
|
for i, s := range got {
|
|
storedNames[i] = s.Name
|
|
}
|
|
|
|
require.ElementsMatch(t, want, storedNames)
|
|
}
|
|
|
|
func newTestService(t *testing.T) *Service {
|
|
t.Helper()
|
|
|
|
store := sqlite.NewTestStore(t)
|
|
ctx := context.Background()
|
|
|
|
sqliteMigrator := sqlite.NewMigrator(store, zap.NewNop())
|
|
err := sqliteMigrator.Up(ctx, migrations.AllUp)
|
|
require.NoError(t, err)
|
|
|
|
svc := NewService(store)
|
|
|
|
return svc
|
|
}
|