feat: enable annotations backend (#21702)

* feat: enable annotations backend
pull/21709/head
William Baker 2021-06-16 12:41:23 -04:00 committed by GitHub
parent 2a4dc9e356
commit 235366a603
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 32 deletions

View File

@ -156,22 +156,32 @@ func (s *Service) ListAnnotations(ctx context.Context, orgID platform.ID, filter
q := sq.Select("annotations.*", "streams.name AS stream"). q := sq.Select("annotations.*", "streams.name AS stream").
Distinct(). Distinct().
From("annotations, json_each(annotations.stickers) AS json").
InnerJoin("streams ON annotations.stream_id = streams.id"). InnerJoin("streams ON annotations.stream_id = streams.id").
Where(sq.Eq{"annotations.org_id": orgID}). Where(sq.Eq{"annotations.org_id": orgID}).
Where(sq.GtOrEq{"lower": sf}). Where(sq.GtOrEq{"lower": sf}).
Where(sq.LtOrEq{"upper": ef}) Where(sq.LtOrEq{"upper": ef})
// If the filter contains stickers, use the json_each table value function to break out
// rows with the sticker array values. If the filter does not contain stickers, using
// the json_each TVF would exclude annotations with an empty array of stickers, so select
// from the annotations table only. This allows a filter with no sticker constraints to
// return annotations that don't have any stickers.
if len(filter.StickerIncludes) > 0 {
q = q.From("annotations, json_each(annotations.stickers) AS json")
// Add sticker filters to the query
for k, v := range filter.StickerIncludes {
q = q.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}})
}
} else {
q = q.From("annotations")
}
// Add stream name filters to the query // Add stream name filters to the query
if len(filter.StreamIncludes) > 0 { if len(filter.StreamIncludes) > 0 {
q = q.Where(sq.Eq{"stream": filter.StreamIncludes}) q = q.Where(sq.Eq{"stream": filter.StreamIncludes})
} }
// Add sticker filters to the query
for k, v := range filter.StickerIncludes {
q = q.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}})
}
sql, args, err := q.ToSql() sql, args, err := q.ToSql()
if err != nil { if err != nil {
return nil, err return nil, err
@ -221,12 +231,27 @@ func (s *Service) DeleteAnnotations(ctx context.Context, orgID platform.ID, dele
// A subquery is used because the json_each virtual table can only be used in a SELECT // A subquery is used because the json_each virtual table can only be used in a SELECT
subQ := sq.Select("annotations.id"). subQ := sq.Select("annotations.id").
Distinct(). Distinct().
From("annotations, json_each(annotations.stickers) AS json").
InnerJoin("streams ON annotations.stream_id = streams.id"). InnerJoin("streams ON annotations.stream_id = streams.id").
Where(sq.Eq{"annotations.org_id": orgID}). Where(sq.Eq{"annotations.org_id": orgID}).
Where(sq.GtOrEq{"lower": sf}). Where(sq.GtOrEq{"lower": sf}).
Where(sq.LtOrEq{"upper": ef}) Where(sq.LtOrEq{"upper": ef})
// If the filter contains stickers, use the json_each table value function to break out
// rows with the sticker array values. If the filter does not contain stickers, using
// the json_each TVF would exclude annotations with an empty array of stickers, so select
// from the annotations table only. This allows a filter with no sticker constraints to
// delete annotations that don't have any stickers.
if len(delete.Stickers) > 0 {
subQ = subQ.From("annotations, json_each(annotations.stickers) AS json")
// Add sticker filters to the subquery
for k, v := range delete.Stickers {
subQ = subQ.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}})
}
} else {
subQ = subQ.From("annotations")
}
// Add the stream name filter to the subquery (if present) // Add the stream name filter to the subquery (if present)
if len(delete.StreamTag) > 0 { if len(delete.StreamTag) > 0 {
subQ = subQ.Where(sq.Eq{"streams.name": delete.StreamTag}) subQ = subQ.Where(sq.Eq{"streams.name": delete.StreamTag})
@ -237,16 +262,12 @@ func (s *Service) DeleteAnnotations(ctx context.Context, orgID platform.ID, dele
subQ = subQ.Where(sq.Eq{"stream_id": delete.StreamID}) subQ = subQ.Where(sq.Eq{"stream_id": delete.StreamID})
} }
// Add any sticker filters to the subquery
for k, v := range delete.Stickers {
subQ = subQ.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}})
}
// Parse the subquery into a string and list of args // Parse the subquery into a string and list of args
subQuery, subArgs, err := subQ.ToSql() subQuery, subArgs, err := subQ.ToSql()
if err != nil { if err != nil {
return err return err
} }
// Convert the subquery into a sq.Sqlizer so that it can be used in the actual DELETE // Convert the subquery into a sq.Sqlizer so that it can be used in the actual DELETE
// operation. This is a bit of a hack since squirrel doesn't have great support for subqueries // operation. This is a bit of a hack since squirrel doesn't have great support for subqueries
// outside of SELECT statements // outside of SELECT statements

View File

@ -35,6 +35,7 @@ func TestAnnotationsCRUD(t *testing.T) {
// st1 et1 // st1 et1
// st2 et2 // st2 et2
// st3 et3 // st3 et3
// st4 et4
et1 := time.Now().UTC() et1 := time.Now().UTC()
st1 := et1.Add(-10 * time.Minute) st1 := et1.Add(-10 * time.Minute)
@ -45,6 +46,9 @@ func TestAnnotationsCRUD(t *testing.T) {
et3 := et1.Add(-10 * time.Minute) et3 := et1.Add(-10 * time.Minute)
st3 := et2.Add(-15 * time.Minute) st3 := et2.Add(-15 * time.Minute)
et4 := et3
st4 := st3
// used for tests involving time filters // used for tests involving time filters
earlierEt1 := et1.Add(-1 * time.Millisecond) earlierEt1 := et1.Add(-1 * time.Millisecond)
laterSt3 := st3.Add(1 * time.Millisecond) laterSt3 := st3.Add(1 * time.Millisecond)
@ -97,18 +101,34 @@ func TestAnnotationsCRUD(t *testing.T) {
c3, err := s3.ToCreate() c3, err := s3.ToCreate()
require.NoError(t, err) require.NoError(t, err)
// s4 is an annotation without any stickers, with the same start/end time as s3
s4 := influxdb.StoredAnnotation{
OrgID: orgID,
StreamTag: "stream4",
Summary: "summary4",
Message: "message4",
Stickers: map[string]string{},
Duration: timesToDuration(st4, et4),
Lower: st3.Format(time.RFC3339Nano),
Upper: et3.Format(time.RFC3339Nano),
}
c4, err := s4.ToCreate()
require.NoError(t, err)
// helper function for setting up the database with data that can be used for tests // helper function for setting up the database with data that can be used for tests
// that involve querying the database. uses the annotations objects initialized above // that involve querying the database. uses the annotations objects initialized above
// via the closure. // via the closure.
populateAnnotationsData := func(t *testing.T, svc *Service) []influxdb.AnnotationEvent { populateAnnotationsData := func(t *testing.T, svc *Service) []influxdb.AnnotationEvent {
t.Helper() t.Helper()
got, err := svc.CreateAnnotations(ctx, orgID, []influxdb.AnnotationCreate{*c1, *c2, *c3}) got, err := svc.CreateAnnotations(ctx, orgID, []influxdb.AnnotationCreate{*c1, *c2, *c3, *c4})
require.NoError(t, err) require.NoError(t, err)
assertAnnotationEvents(t, got, []influxdb.AnnotationEvent{ assertAnnotationEvents(t, got, []influxdb.AnnotationEvent{
{AnnotationCreate: *c1}, {AnnotationCreate: *c1},
{AnnotationCreate: *c2}, {AnnotationCreate: *c2},
{AnnotationCreate: *c3}, {AnnotationCreate: *c3},
{AnnotationCreate: *c4},
}) })
return got return got
@ -130,19 +150,22 @@ func TestAnnotationsCRUD(t *testing.T) {
}, },
{ {
"creates annotations successfully", "creates annotations successfully",
[]influxdb.AnnotationCreate{*c1, *c2, *c3}, []influxdb.AnnotationCreate{*c1, *c2, *c3, *c4},
[]influxdb.AnnotationEvent{ []influxdb.AnnotationEvent{
{AnnotationCreate: *c1}, {AnnotationCreate: *c1},
{AnnotationCreate: *c2}, {AnnotationCreate: *c2},
{AnnotationCreate: *c3}, {AnnotationCreate: *c3},
{AnnotationCreate: *c4},
}, },
}, },
} }
for _, tt := range tests { for _, tt := range tests {
got, err := svc.CreateAnnotations(ctx, orgID, tt.creates) t.Run(tt.name, func(t *testing.T) {
require.NoError(t, err) got, err := svc.CreateAnnotations(ctx, orgID, tt.creates)
assertAnnotationEvents(t, got, tt.want) require.NoError(t, err)
assertAnnotationEvents(t, got, tt.want)
})
} }
}) })
@ -166,7 +189,7 @@ func TestAnnotationsCRUD(t *testing.T) {
EndTime: &et1, EndTime: &et1,
}, },
}, },
[]influxdb.StoredAnnotation{s1, s2, s3}, []influxdb.StoredAnnotation{s1, s2, s3, s4},
}, },
{ {
"doesn't get results for other org", "doesn't get results for other org",
@ -188,7 +211,7 @@ func TestAnnotationsCRUD(t *testing.T) {
EndTime: &earlierEt1, EndTime: &earlierEt1,
}, },
}, },
[]influxdb.StoredAnnotation{s2, s3}, []influxdb.StoredAnnotation{s2, s3, s4},
}, },
{ {
"start time will filter out annotations", "start time will filter out annotations",
@ -432,11 +455,11 @@ func TestAnnotationsCRUD(t *testing.T) {
get, getErr := svc.GetAnnotation(ctx, tt.id) get, getErr := svc.GetAnnotation(ctx, tt.id)
if tt.shouldDelete { if tt.shouldDelete {
require.Equal(t, 2, len(list)) require.Equal(t, 3, len(list))
require.Nil(t, get) require.Nil(t, get)
require.Equal(t, errAnnotationNotFound, getErr) require.Equal(t, errAnnotationNotFound, getErr)
} else { } else {
require.Equal(t, 3, len(list)) require.Equal(t, 4, len(list))
require.NoError(t, getErr) require.NoError(t, getErr)
require.Equal(t, *get, ans[0]) require.Equal(t, *get, ans[0])
} }
@ -463,7 +486,7 @@ func TestAnnotationsCRUD(t *testing.T) {
StartTime: &st1, StartTime: &st1,
EndTime: &earlierEt1, EndTime: &earlierEt1,
}, },
[]influxdb.StoredAnnotation{s1, s2, s3}, []influxdb.StoredAnnotation{s1, s2, s3, s4},
}, },
{ {
"matches stream tag and time range", "matches stream tag and time range",
@ -473,7 +496,17 @@ func TestAnnotationsCRUD(t *testing.T) {
StartTime: &st1, StartTime: &st1,
EndTime: &et1, EndTime: &et1,
}, },
[]influxdb.StoredAnnotation{s2, s3}, []influxdb.StoredAnnotation{s2, s3, s4},
},
{
"matches stream tag and time range for item with no stickers",
orgID,
influxdb.AnnotationDeleteFilter{
StreamTag: "stream4",
StartTime: &st4,
EndTime: &et4,
},
[]influxdb.StoredAnnotation{s1, s2, s3},
}, },
{ {
"matches stream tag for multiple", "matches stream tag for multiple",
@ -483,7 +516,7 @@ func TestAnnotationsCRUD(t *testing.T) {
StartTime: &st3, StartTime: &st3,
EndTime: &et1, EndTime: &et1,
}, },
[]influxdb.StoredAnnotation{s1}, []influxdb.StoredAnnotation{s1, s4},
}, },
{ {
"matches stream tag but wrong org", "matches stream tag but wrong org",
@ -493,7 +526,7 @@ func TestAnnotationsCRUD(t *testing.T) {
StartTime: &st1, StartTime: &st1,
EndTime: &et1, EndTime: &et1,
}, },
[]influxdb.StoredAnnotation{s1, s2, s3}, []influxdb.StoredAnnotation{s1, s2, s3, s4},
}, },
{ {
@ -504,7 +537,7 @@ func TestAnnotationsCRUD(t *testing.T) {
StartTime: &st1, StartTime: &st1,
EndTime: &earlierEt1, EndTime: &earlierEt1,
}, },
[]influxdb.StoredAnnotation{s1, s2, s3}, []influxdb.StoredAnnotation{s1, s2, s3, s4},
}, },
{ {
"matches stickers and time range", "matches stickers and time range",
@ -514,7 +547,7 @@ func TestAnnotationsCRUD(t *testing.T) {
StartTime: &st1, StartTime: &st1,
EndTime: &et1, EndTime: &et1,
}, },
[]influxdb.StoredAnnotation{s2, s3}, []influxdb.StoredAnnotation{s2, s3, s4},
}, },
{ {
"matches stickers for multiple", "matches stickers for multiple",
@ -524,7 +557,7 @@ func TestAnnotationsCRUD(t *testing.T) {
StartTime: &st2, StartTime: &st2,
EndTime: &et1, EndTime: &et1,
}, },
[]influxdb.StoredAnnotation{s3}, []influxdb.StoredAnnotation{s3, s4},
}, },
{ {
"matches stickers but wrong org", "matches stickers but wrong org",
@ -534,7 +567,7 @@ func TestAnnotationsCRUD(t *testing.T) {
StartTime: &st1, StartTime: &st1,
EndTime: &et1, EndTime: &et1,
}, },
[]influxdb.StoredAnnotation{s1, s2, s3}, []influxdb.StoredAnnotation{s1, s2, s3, s4},
}, },
} }
@ -551,7 +584,7 @@ func TestAnnotationsCRUD(t *testing.T) {
f.Validate(time.Now) f.Validate(time.Now)
list, err := svc.ListAnnotations(ctx, orgID, f) list, err := svc.ListAnnotations(ctx, orgID, f)
require.NoError(t, err) require.NoError(t, err)
assertStoredAnnotations(t, tt.wantList, list) assertStoredAnnotations(t, list, tt.wantList)
}) })
} }
}) })
@ -697,16 +730,18 @@ func TestAnnotationsCRUD(t *testing.T) {
err = svc.DeleteStreamByID(ctx, streamID) err = svc.DeleteStreamByID(ctx, streamID)
require.NoError(t, err) require.NoError(t, err)
// s1 should still be there // s1 and s4 should still be there
s1, err := svc.GetAnnotation(ctx, ans[0].ID) s1, err := svc.GetAnnotation(ctx, ans[0].ID)
require.NoError(t, err) require.NoError(t, err)
s4, err := svc.GetAnnotation(ctx, ans[3].ID)
require.NoError(t, err)
// both s2 and s3 should now be deleted // both s2 and s3 should now be deleted
f := influxdb.AnnotationListFilter{} f := influxdb.AnnotationListFilter{}
f.Validate(time.Now) f.Validate(time.Now)
remaining, err := svc.ListAnnotations(ctx, orgID, f) remaining, err := svc.ListAnnotations(ctx, orgID, f)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []influxdb.StoredAnnotation{*s1}, remaining) require.Equal(t, []influxdb.StoredAnnotation{*s1, *s4}, remaining)
}) })
t.Run("renamed streams are reflected in subsequent annotation queries", func(t *testing.T) { t.Run("renamed streams are reflected in subsequent annotation queries", func(t *testing.T) {

View File

@ -17,6 +17,8 @@ import (
"github.com/influxdata/flux" "github.com/influxdata/flux"
"github.com/influxdata/flux/dependencies/testing" "github.com/influxdata/flux/dependencies/testing"
platform "github.com/influxdata/influxdb/v2" platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/annotations"
annotationTransport "github.com/influxdata/influxdb/v2/annotations/transport"
"github.com/influxdata/influxdb/v2/authorization" "github.com/influxdata/influxdb/v2/authorization"
"github.com/influxdata/influxdb/v2/authorizer" "github.com/influxdata/influxdb/v2/authorizer"
"github.com/influxdata/influxdb/v2/backup" "github.com/influxdata/influxdb/v2/backup"
@ -954,6 +956,15 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
authorizer.NewNotebookService(notebookSvc), authorizer.NewNotebookService(notebookSvc),
) )
annotationSvc := annotations.NewService(
m.log.With(zap.String("service", "annotations")),
m.sqlStore,
)
annotationServer := annotationTransport.NewAnnotationHandler(
m.log.With(zap.String("handler", "annotations")),
authorizer.NewAnnotationService(annotationSvc),
)
platformHandler := http.NewPlatformHandler( platformHandler := http.NewPlatformHandler(
m.apibackend, m.apibackend,
http.WithResourceHandler(stacksHTTPServer), http.WithResourceHandler(stacksHTTPServer),
@ -970,6 +981,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
http.WithResourceHandler(v1AuthHTTPServer), http.WithResourceHandler(v1AuthHTTPServer),
http.WithResourceHandler(dashboardServer), http.WithResourceHandler(dashboardServer),
http.WithResourceHandler(notebookServer), http.WithResourceHandler(notebookServer),
http.WithResourceHandler(annotationServer),
) )
httpLogger := m.log.With(zap.String("service", "http")) httpLogger := m.log.With(zap.String("service", "http"))