diff --git a/annotations/service.go b/annotations/service.go index 7b8adfc87c..25c7f95718 100644 --- a/annotations/service.go +++ b/annotations/service.go @@ -156,22 +156,32 @@ func (s *Service) ListAnnotations(ctx context.Context, orgID platform.ID, filter q := sq.Select("annotations.*", "streams.name AS stream"). Distinct(). - From("annotations, json_each(annotations.stickers) AS json"). InnerJoin("streams ON annotations.stream_id = streams.id"). Where(sq.Eq{"annotations.org_id": orgID}). Where(sq.GtOrEq{"lower": sf}). Where(sq.LtOrEq{"upper": ef}) + // If the filter contains stickers, use the json_each table value function to break out + // rows with the sticker array values. If the filter does not contain stickers, using + // the json_each TVF would exclude annotations with an empty array of stickers, so select + // from the annotations table only. This allows a filter with no sticker constraints to + // return annotations that don't have any stickers. + if len(filter.StickerIncludes) > 0 { + q = q.From("annotations, json_each(annotations.stickers) AS json") + + // Add sticker filters to the query + for k, v := range filter.StickerIncludes { + q = q.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}}) + } + } else { + q = q.From("annotations") + } + // Add stream name filters to the query if len(filter.StreamIncludes) > 0 { q = q.Where(sq.Eq{"stream": filter.StreamIncludes}) } - // Add sticker filters to the query - for k, v := range filter.StickerIncludes { - q = q.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}}) - } - sql, args, err := q.ToSql() if err != nil { return nil, err @@ -221,12 +231,27 @@ func (s *Service) DeleteAnnotations(ctx context.Context, orgID platform.ID, dele // A subquery is used because the json_each virtual table can only be used in a SELECT subQ := sq.Select("annotations.id"). Distinct(). - From("annotations, json_each(annotations.stickers) AS json"). InnerJoin("streams ON annotations.stream_id = streams.id"). Where(sq.Eq{"annotations.org_id": orgID}). Where(sq.GtOrEq{"lower": sf}). Where(sq.LtOrEq{"upper": ef}) + // If the filter contains stickers, use the json_each table value function to break out + // rows with the sticker array values. If the filter does not contain stickers, using + // the json_each TVF would exclude annotations with an empty array of stickers, so select + // from the annotations table only. This allows a filter with no sticker constraints to + // delete annotations that don't have any stickers. + if len(delete.Stickers) > 0 { + subQ = subQ.From("annotations, json_each(annotations.stickers) AS json") + + // Add sticker filters to the subquery + for k, v := range delete.Stickers { + subQ = subQ.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}}) + } + } else { + subQ = subQ.From("annotations") + } + // Add the stream name filter to the subquery (if present) if len(delete.StreamTag) > 0 { subQ = subQ.Where(sq.Eq{"streams.name": delete.StreamTag}) @@ -237,16 +262,12 @@ func (s *Service) DeleteAnnotations(ctx context.Context, orgID platform.ID, dele subQ = subQ.Where(sq.Eq{"stream_id": delete.StreamID}) } - // Add any sticker filters to the subquery - for k, v := range delete.Stickers { - subQ = subQ.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}}) - } - // Parse the subquery into a string and list of args subQuery, subArgs, err := subQ.ToSql() if err != nil { return err } + // Convert the subquery into a sq.Sqlizer so that it can be used in the actual DELETE // operation. This is a bit of a hack since squirrel doesn't have great support for subqueries // outside of SELECT statements diff --git a/annotations/service_test.go b/annotations/service_test.go index f51e0512c5..169c68aded 100644 --- a/annotations/service_test.go +++ b/annotations/service_test.go @@ -35,6 +35,7 @@ func TestAnnotationsCRUD(t *testing.T) { // st1 et1 // st2 et2 // st3 et3 + // st4 et4 et1 := time.Now().UTC() st1 := et1.Add(-10 * time.Minute) @@ -45,6 +46,9 @@ func TestAnnotationsCRUD(t *testing.T) { et3 := et1.Add(-10 * time.Minute) st3 := et2.Add(-15 * time.Minute) + et4 := et3 + st4 := st3 + // used for tests involving time filters earlierEt1 := et1.Add(-1 * time.Millisecond) laterSt3 := st3.Add(1 * time.Millisecond) @@ -97,18 +101,34 @@ func TestAnnotationsCRUD(t *testing.T) { c3, err := s3.ToCreate() require.NoError(t, err) + // s4 is an annotation without any stickers, with the same start/end time as s3 + s4 := influxdb.StoredAnnotation{ + OrgID: orgID, + StreamTag: "stream4", + Summary: "summary4", + Message: "message4", + Stickers: map[string]string{}, + Duration: timesToDuration(st4, et4), + Lower: st3.Format(time.RFC3339Nano), + Upper: et3.Format(time.RFC3339Nano), + } + + c4, err := s4.ToCreate() + require.NoError(t, err) + // helper function for setting up the database with data that can be used for tests // that involve querying the database. uses the annotations objects initialized above // via the closure. populateAnnotationsData := func(t *testing.T, svc *Service) []influxdb.AnnotationEvent { t.Helper() - got, err := svc.CreateAnnotations(ctx, orgID, []influxdb.AnnotationCreate{*c1, *c2, *c3}) + got, err := svc.CreateAnnotations(ctx, orgID, []influxdb.AnnotationCreate{*c1, *c2, *c3, *c4}) require.NoError(t, err) assertAnnotationEvents(t, got, []influxdb.AnnotationEvent{ {AnnotationCreate: *c1}, {AnnotationCreate: *c2}, {AnnotationCreate: *c3}, + {AnnotationCreate: *c4}, }) return got @@ -130,19 +150,22 @@ func TestAnnotationsCRUD(t *testing.T) { }, { "creates annotations successfully", - []influxdb.AnnotationCreate{*c1, *c2, *c3}, + []influxdb.AnnotationCreate{*c1, *c2, *c3, *c4}, []influxdb.AnnotationEvent{ {AnnotationCreate: *c1}, {AnnotationCreate: *c2}, {AnnotationCreate: *c3}, + {AnnotationCreate: *c4}, }, }, } for _, tt := range tests { - got, err := svc.CreateAnnotations(ctx, orgID, tt.creates) - require.NoError(t, err) - assertAnnotationEvents(t, got, tt.want) + t.Run(tt.name, func(t *testing.T) { + got, err := svc.CreateAnnotations(ctx, orgID, tt.creates) + require.NoError(t, err) + assertAnnotationEvents(t, got, tt.want) + }) } }) @@ -166,7 +189,7 @@ func TestAnnotationsCRUD(t *testing.T) { EndTime: &et1, }, }, - []influxdb.StoredAnnotation{s1, s2, s3}, + []influxdb.StoredAnnotation{s1, s2, s3, s4}, }, { "doesn't get results for other org", @@ -188,7 +211,7 @@ func TestAnnotationsCRUD(t *testing.T) { EndTime: &earlierEt1, }, }, - []influxdb.StoredAnnotation{s2, s3}, + []influxdb.StoredAnnotation{s2, s3, s4}, }, { "start time will filter out annotations", @@ -432,11 +455,11 @@ func TestAnnotationsCRUD(t *testing.T) { get, getErr := svc.GetAnnotation(ctx, tt.id) if tt.shouldDelete { - require.Equal(t, 2, len(list)) + require.Equal(t, 3, len(list)) require.Nil(t, get) require.Equal(t, errAnnotationNotFound, getErr) } else { - require.Equal(t, 3, len(list)) + require.Equal(t, 4, len(list)) require.NoError(t, getErr) require.Equal(t, *get, ans[0]) } @@ -463,7 +486,7 @@ func TestAnnotationsCRUD(t *testing.T) { StartTime: &st1, EndTime: &earlierEt1, }, - []influxdb.StoredAnnotation{s1, s2, s3}, + []influxdb.StoredAnnotation{s1, s2, s3, s4}, }, { "matches stream tag and time range", @@ -473,7 +496,17 @@ func TestAnnotationsCRUD(t *testing.T) { StartTime: &st1, EndTime: &et1, }, - []influxdb.StoredAnnotation{s2, s3}, + []influxdb.StoredAnnotation{s2, s3, s4}, + }, + { + "matches stream tag and time range for item with no stickers", + orgID, + influxdb.AnnotationDeleteFilter{ + StreamTag: "stream4", + StartTime: &st4, + EndTime: &et4, + }, + []influxdb.StoredAnnotation{s1, s2, s3}, }, { "matches stream tag for multiple", @@ -483,7 +516,7 @@ func TestAnnotationsCRUD(t *testing.T) { StartTime: &st3, EndTime: &et1, }, - []influxdb.StoredAnnotation{s1}, + []influxdb.StoredAnnotation{s1, s4}, }, { "matches stream tag but wrong org", @@ -493,7 +526,7 @@ func TestAnnotationsCRUD(t *testing.T) { StartTime: &st1, EndTime: &et1, }, - []influxdb.StoredAnnotation{s1, s2, s3}, + []influxdb.StoredAnnotation{s1, s2, s3, s4}, }, { @@ -504,7 +537,7 @@ func TestAnnotationsCRUD(t *testing.T) { StartTime: &st1, EndTime: &earlierEt1, }, - []influxdb.StoredAnnotation{s1, s2, s3}, + []influxdb.StoredAnnotation{s1, s2, s3, s4}, }, { "matches stickers and time range", @@ -514,7 +547,7 @@ func TestAnnotationsCRUD(t *testing.T) { StartTime: &st1, EndTime: &et1, }, - []influxdb.StoredAnnotation{s2, s3}, + []influxdb.StoredAnnotation{s2, s3, s4}, }, { "matches stickers for multiple", @@ -524,7 +557,7 @@ func TestAnnotationsCRUD(t *testing.T) { StartTime: &st2, EndTime: &et1, }, - []influxdb.StoredAnnotation{s3}, + []influxdb.StoredAnnotation{s3, s4}, }, { "matches stickers but wrong org", @@ -534,7 +567,7 @@ func TestAnnotationsCRUD(t *testing.T) { StartTime: &st1, EndTime: &et1, }, - []influxdb.StoredAnnotation{s1, s2, s3}, + []influxdb.StoredAnnotation{s1, s2, s3, s4}, }, } @@ -551,7 +584,7 @@ func TestAnnotationsCRUD(t *testing.T) { f.Validate(time.Now) list, err := svc.ListAnnotations(ctx, orgID, f) require.NoError(t, err) - assertStoredAnnotations(t, tt.wantList, list) + assertStoredAnnotations(t, list, tt.wantList) }) } }) @@ -697,16 +730,18 @@ func TestAnnotationsCRUD(t *testing.T) { err = svc.DeleteStreamByID(ctx, streamID) require.NoError(t, err) - // s1 should still be there + // s1 and s4 should still be there s1, err := svc.GetAnnotation(ctx, ans[0].ID) require.NoError(t, err) + s4, err := svc.GetAnnotation(ctx, ans[3].ID) + require.NoError(t, err) // both s2 and s3 should now be deleted f := influxdb.AnnotationListFilter{} f.Validate(time.Now) remaining, err := svc.ListAnnotations(ctx, orgID, f) require.NoError(t, err) - require.Equal(t, []influxdb.StoredAnnotation{*s1}, remaining) + require.Equal(t, []influxdb.StoredAnnotation{*s1, *s4}, remaining) }) t.Run("renamed streams are reflected in subsequent annotation queries", func(t *testing.T) { diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 7318a3edc5..1c2d68fa3d 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -17,6 +17,8 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/dependencies/testing" platform "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/annotations" + annotationTransport "github.com/influxdata/influxdb/v2/annotations/transport" "github.com/influxdata/influxdb/v2/authorization" "github.com/influxdata/influxdb/v2/authorizer" "github.com/influxdata/influxdb/v2/backup" @@ -954,6 +956,15 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { authorizer.NewNotebookService(notebookSvc), ) + annotationSvc := annotations.NewService( + m.log.With(zap.String("service", "annotations")), + m.sqlStore, + ) + annotationServer := annotationTransport.NewAnnotationHandler( + m.log.With(zap.String("handler", "annotations")), + authorizer.NewAnnotationService(annotationSvc), + ) + platformHandler := http.NewPlatformHandler( m.apibackend, http.WithResourceHandler(stacksHTTPServer), @@ -970,6 +981,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { http.WithResourceHandler(v1AuthHTTPServer), http.WithResourceHandler(dashboardServer), http.WithResourceHandler(notebookServer), + http.WithResourceHandler(annotationServer), ) httpLogger := m.log.With(zap.String("service", "http"))