From 1935c13c1651e0b63f5a7f813c6178773e565043 Mon Sep 17 00:00:00 2001 From: William Baker Date: Tue, 15 Jun 2021 18:36:11 -0400 Subject: [PATCH] feat(annotations): storage service (#21690) * feat(annotations): storage service * feat: stickers are in db as array * chore: fix some unintended diffs * fix: fixes from review * fix: specific table name for json_each * fix: update primary keys and constraints * fix: fix schema * feat: stream name updates are reflected in annotations via FK --- .goreleaser.yml | 2 +- Makefile | 7 +- annotation.go | 140 ++- annotation_test.go | 38 +- annotations/service.go | 578 ++++++++++ annotations/service_test.go | 1002 +++++++++++++++++ annotations/transport/annotations_router.go | 29 +- .../transport/annotations_router_test.go | 38 +- annotations/transport/http.go | 8 - go.mod | 1 + go.sum | 6 + .../0002_create_annotations_tables.sql | 34 + 12 files changed, 1789 insertions(+), 94 deletions(-) create mode 100644 annotations/service.go create mode 100644 annotations/service_test.go create mode 100644 sqlite/migrations/0002_create_annotations_tables.sql diff --git a/.goreleaser.yml b/.goreleaser.yml index 0cd76b7a58..897eeeffba 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -42,7 +42,7 @@ builds: goarch: arm64 main: ./cmd/influxd/ flags: - - -tags=assets{{if eq .Os "linux"}},osusergo,netgo,static_build{{if not (eq .Arch "amd64")}},noasm{{end}}{{end}} + - -tags=assets,sqlite_foreign_keys,sqlite_json{{if eq .Os "linux"}},osusergo,netgo,static_build{{if not (eq .Arch "amd64")}},noasm{{end}}{{end}} - -buildmode={{if eq .Os "windows"}}exe{{else}}pie{{end}} env: - GO111MODULE=on diff --git a/Makefile b/Makefile index 20184deae8..4b8e42c3ac 100644 --- a/Makefile +++ b/Makefile @@ -30,8 +30,11 @@ else GO_BUILD_TAGS := assets,noasm endif -GO_TEST_ARGS := -tags '$(GO_TEST_TAGS)' -GO_BUILD_ARGS := -tags '$(GO_BUILD_TAGS)' +# Tags used for builds and tests on all architectures +COMMON_TAGS := sqlite_foreign_keys,sqlite_json + +GO_TEST_ARGS := -tags '$(COMMON_TAGS),$(GO_TEST_TAGS)' +GO_BUILD_ARGS := -tags '$(COMMON_TAGS),$(GO_BUILD_TAGS)' ifeq ($(OS), Windows_NT) VERSION := $(shell git describe --exact-match --tags 2>nil) diff --git a/annotation.go b/annotation.go index 3056a0d44f..169b4f3761 100644 --- a/annotation.go +++ b/annotation.go @@ -2,7 +2,9 @@ package influxdb import ( "context" + "database/sql/driver" "encoding/json" + "fmt" "regexp" "strings" "time" @@ -63,6 +65,27 @@ var ( } ) +func invalidStickerError(s string) error { + return &errors.Error{ + Code: errors.EInternal, + Msg: fmt.Sprintf("invalid sticker: %q", s), + } +} + +func stickerSliceToMap(stickers []string) (map[string]string, error) { + stickerMap := map[string]string{} + + for i := range stickers { + sticks := strings.SplitN(stickers[i], "=", 2) + if len(sticks) < 2 { + return nil, invalidStickerError(stickers[i]) + } + stickerMap[sticks[0]] = sticks[1] + } + + return stickerMap, nil +} + // Service is the service contract for Annotations type AnnotationService interface { // CreateAnnotations creates annotations. @@ -101,26 +124,107 @@ type AnnotationEvent struct { // AnnotationCreate contains user providable fields for annotating an event. type AnnotationCreate struct { - StreamTag string `json:"stream,omitempty"` // StreamTag provides a means to logically group a set of annotated events. - Summary string `json:"summary"` // Summary is the only field required to annotate an event. - Message string `json:"message,omitempty"` // Message provides more details about the event being annotated. - Stickers map[string]string `json:"stickers,omitempty"` // Stickers are like tags, but named something obscure to differentiate them from influx tags. They are there to differentiate an annotated event. - EndTime *time.Time `json:"endTime,omitempty"` // EndTime is the time of the event being annotated. Defaults to now if not set. - StartTime *time.Time `json:"startTime,omitempty"` // StartTime is the start time of the event being annotated. Defaults to EndTime if not set. + StreamTag string `json:"stream,omitempty"` // StreamTag provides a means to logically group a set of annotated events. + Summary string `json:"summary"` // Summary is the only field required to annotate an event. + Message string `json:"message,omitempty"` // Message provides more details about the event being annotated. + Stickers AnnotationStickers `json:"stickers,omitempty"` // Stickers are like tags, but named something obscure to differentiate them from influx tags. They are there to differentiate an annotated event. + EndTime *time.Time `json:"endTime,omitempty"` // EndTime is the time of the event being annotated. Defaults to now if not set. + StartTime *time.Time `json:"startTime,omitempty"` // StartTime is the start time of the event being annotated. Defaults to EndTime if not set. } // StoredAnnotation represents annotation data to be stored in the database. type StoredAnnotation struct { - ID platform.ID `db:"id"` // ID is the annotation's id. - OrgID platform.ID `db:"org_id"` // OrgID is the annotations's owning organization. - StreamID platform.ID `db:"stream_id"` // StreamID is the id of a stream. - StreamTag string `db:"name"` // StreamTag is the name of a stream (when selecting with join of streams). - Summary string `db:"summary"` // Summary is the summary of the annotated event. - Message string `db:"message"` // Message is a longer description of the annotated event. - Stickers []string `db:"stickers"` // Stickers are additional labels to group annotations by. - Duration string `db:"duration"` // Duration is the time range (with zone) of an annotated event. - Lower string `db:"lower"` // Lower is the time an annotated event begins. - Upper string `db:"upper"` // Upper is the time an annotated event ends. + ID platform.ID `db:"id"` // ID is the annotation's id. + OrgID platform.ID `db:"org_id"` // OrgID is the annotations's owning organization. + StreamID platform.ID `db:"stream_id"` // StreamID is the id of a stream. + StreamTag string `db:"stream"` // StreamTag is the name of a stream (when selecting with join of streams). + Summary string `db:"summary"` // Summary is the summary of the annotated event. + Message string `db:"message"` // Message is a longer description of the annotated event. + Stickers AnnotationStickers `db:"stickers"` // Stickers are additional labels to group annotations by. + Duration string `db:"duration"` // Duration is the time range (with zone) of an annotated event. + Lower string `db:"lower"` // Lower is the time an annotated event begins. + Upper string `db:"upper"` // Upper is the time an annotated event ends. +} + +// ToCreate is a utility method for converting a StoredAnnotation to an AnnotationCreate type +func (s StoredAnnotation) ToCreate() (*AnnotationCreate, error) { + et, err := time.Parse(time.RFC3339Nano, s.Upper) + if err != nil { + return nil, err + } + + st, err := time.Parse(time.RFC3339Nano, s.Lower) + if err != nil { + return nil, err + } + + return &AnnotationCreate{ + StreamTag: s.StreamTag, + Summary: s.Summary, + Message: s.Message, + Stickers: s.Stickers, + EndTime: &et, + StartTime: &st, + }, nil +} + +// ToEvent is a utility method for converting a StoredAnnotation to an AnnotationEvent type +func (s StoredAnnotation) ToEvent() (*AnnotationEvent, error) { + c, err := s.ToCreate() + if err != nil { + return nil, err + } + + return &AnnotationEvent{ + ID: s.ID, + AnnotationCreate: *c, + }, nil +} + +type AnnotationStickers map[string]string + +// Value implements the database/sql Valuer interface for adding AnnotationStickers to the database +// Stickers are stored in the database as a slice of strings like "[key=val]" +// They are encoded into a JSON string for storing into the database, and the JSON sqlite extension is +// able to manipulate them like an object. +func (a AnnotationStickers) Value() (driver.Value, error) { + stickSlice := make([]string, 0, len(a)) + + for k, v := range a { + stickSlice = append(stickSlice, fmt.Sprintf("%s=%s", k, v)) + } + + sticks, err := json.Marshal(stickSlice) + if err != nil { + return nil, err + } + + return string(sticks), nil +} + +// Scan implements the database/sql Scanner interface for retrieving AnnotationStickers from the database +// The string is decoded into a slice of strings, which are then converted back into a map +func (a *AnnotationStickers) Scan(value interface{}) error { + vString, ok := value.(string) + if !ok { + return &errors.Error{ + Code: errors.EInternal, + Msg: "could not load stickers from sqlite", + } + } + + var stickSlice []string + if err := json.NewDecoder(strings.NewReader(vString)).Decode(&stickSlice); err != nil { + return err + } + + stickMap, err := stickerSliceToMap(stickSlice) + if err != nil { + return nil + } + + *a = stickMap + return nil } // Validate validates the creation object. @@ -254,8 +358,8 @@ type ReadAnnotation struct { // AnnotationListFilter is a selection filter for listing annotations. type AnnotationListFilter struct { - StickerIncludes map[string]string `json:"stickerIncludes,omitempty"` // StickerIncludes allows the user to filter annotated events based on it's sticker. - StreamIncludes []string `json:"streamIncludes,omitempty"` // StreamIncludes allows the user to filter annotated events by stream. + StickerIncludes AnnotationStickers `json:"stickerIncludes,omitempty"` // StickerIncludes allows the user to filter annotated events based on it's sticker. + StreamIncludes []string `json:"streamIncludes,omitempty"` // StreamIncludes allows the user to filter annotated events by stream. BasicFilter } diff --git a/annotation_test.go b/annotation_test.go index 8e45ef9e2e..e8c48dafc2 100644 --- a/annotation_test.go +++ b/annotation_test.go @@ -479,7 +479,7 @@ func TestSetStickerIncludes(t *testing.T) { type tst struct { name string input map[string][]string - expected map[string]string + expected AnnotationStickers } tests := []tst{ @@ -554,3 +554,39 @@ func TestSetStickers(t *testing.T) { }) } } + +func TestStickerSliceToMap(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + stickers []string + want map[string]string + wantErr error + }{ + { + "good stickers", + []string{"good1=val1", "good2=val2"}, + map[string]string{"good1": "val1", "good2": "val2"}, + nil, + }, + { + "bad stickers", + []string{"this is an invalid sticker", "shouldbe=likethis"}, + nil, + invalidStickerError("this is an invalid sticker"), + }, + { + "no stickers", + []string{}, + map[string]string{}, + nil, + }, + } + + for _, tt := range tests { + got, err := stickerSliceToMap(tt.stickers) + require.Equal(t, tt.want, got) + require.Equal(t, tt.wantErr, err) + } +} diff --git a/annotations/service.go b/annotations/service.go new file mode 100644 index 0000000000..7b8adfc87c --- /dev/null +++ b/annotations/service.go @@ -0,0 +1,578 @@ +package annotations + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + sq "github.com/Masterminds/squirrel" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform" + ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors" + "github.com/influxdata/influxdb/v2/snowflake" + "github.com/influxdata/influxdb/v2/sqlite" + "go.uber.org/zap" +) + +var ( + errAnnotationNotFound = &ierrors.Error{ + Code: ierrors.EInvalid, + Msg: "annotation not found", + } + errStreamNotFound = &ierrors.Error{ + Code: ierrors.EInvalid, + Msg: "stream not found", + } +) + +var _ influxdb.AnnotationService = (*Service)(nil) + +type Service struct { + store *sqlite.SqlStore + log *zap.Logger + idGenerator platform.IDGenerator +} + +func NewService(logger *zap.Logger, store *sqlite.SqlStore) *Service { + return &Service{ + store: store, + log: logger, + idGenerator: snowflake.NewIDGenerator(), + } +} + +// CreateAnnotations creates annotations in the database for the provided orgID as defined by the provided list +// Streams corresponding to the StreamTag property of each annotation are created if they don't already exist +// as part of a transaction +func (s *Service) CreateAnnotations(ctx context.Context, orgID platform.ID, creates []influxdb.AnnotationCreate) ([]influxdb.AnnotationEvent, error) { + // Guard clause - an empty list was provided for some reason, immediately return an empty result + // set without doing the transaction + if len(creates) == 0 { + return []influxdb.AnnotationEvent{}, nil + } + + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + // store a unique list of stream names first. the invalid ID is a placeholder for the real id, + // which will be obtained separately + streamNamesIDs := make(map[string]platform.ID) + for _, c := range creates { + streamNamesIDs[c.StreamTag] = platform.InvalidID() + } + + // streamIDsNames is used for re-populating the resulting list of annotations with the stream names + // from the stream IDs returned from the database + streamIDsNames := make(map[platform.ID]string) + + tx, err := s.store.DB.BeginTxx(ctx, nil) + if err != nil { + tx.Rollback() + return nil, err + } + + // upsert each stream individually. a possible enhancement might be to do this as a single batched query + // it is unlikely that this would offer much benefit since there is currently no mechanism for creating large numbers + // of annotations simultaneously + now := time.Now() + for name := range streamNamesIDs { + query, args, err := newUpsertStreamQuery(orgID, s.idGenerator.ID(), now, influxdb.Stream{Name: name}) + if err != nil { + tx.Rollback() + return nil, err + } + + var streamID platform.ID + if err = tx.GetContext(ctx, &streamID, query, args...); err != nil { + tx.Rollback() + return nil, err + } + + streamNamesIDs[name] = streamID + streamIDsNames[streamID] = name + } + + // bulk insert for the creates. this also is unlikely to offer much performance benefit, but since the query + // is only used here it is easy enough to form to bulk query. + q := sq.Insert("annotations"). + Columns("id", "org_id", "stream_id", "summary", "message", "stickers", "duration", "lower", "upper"). + Suffix("RETURNING *") + + for _, create := range creates { + // double check that we have a valid name for this stream tag - error if we don't. this should never be an error. + streamID, ok := streamNamesIDs[create.StreamTag] + if !ok { + tx.Rollback() + return nil, &ierrors.Error{ + Code: ierrors.EInternal, + Msg: fmt.Sprintf("unable to find id for stream %q", create.StreamTag), + } + } + + // add the row to the query + newID := s.idGenerator.ID() + lower := create.StartTime.Format(time.RFC3339Nano) + upper := create.EndTime.Format(time.RFC3339Nano) + duration := timesToDuration(*create.StartTime, *create.EndTime) + q = q.Values(newID, orgID, streamID, create.Summary, create.Message, create.Stickers, duration, lower, upper) + } + + // get the query string and args list for the bulk insert + query, args, err := q.ToSql() + if err != nil { + tx.Rollback() + return nil, err + } + + // run the bulk insert and store the result + var res []*influxdb.StoredAnnotation + if err := tx.SelectContext(ctx, &res, query, args...); err != nil { + tx.Rollback() + return nil, err + } + + if err = tx.Commit(); err != nil { + return nil, err + } + + // add the stream names to the list of results + for _, a := range res { + a.StreamTag = streamIDsNames[a.StreamID] + } + + // convert the StoredAnnotation structs to AnnotationEvent structs before returning + return storedAnnotationsToEvents(res) +} + +// ListAnnotations returns a list of annotations from the database matching the filter +// For time range matching, sqlite is able to compare times with millisecond accuracy +func (s *Service) ListAnnotations(ctx context.Context, orgID platform.ID, filter influxdb.AnnotationListFilter) ([]influxdb.StoredAnnotation, error) { + // we need to explicitly format time strings here and elsewhere to ensure they are + // interpreted by the database consistently + sf := filter.StartTime.Format(time.RFC3339Nano) + ef := filter.EndTime.Format(time.RFC3339Nano) + + q := sq.Select("annotations.*", "streams.name AS stream"). + Distinct(). + From("annotations, json_each(annotations.stickers) AS json"). + InnerJoin("streams ON annotations.stream_id = streams.id"). + Where(sq.Eq{"annotations.org_id": orgID}). + Where(sq.GtOrEq{"lower": sf}). + Where(sq.LtOrEq{"upper": ef}) + + // Add stream name filters to the query + if len(filter.StreamIncludes) > 0 { + q = q.Where(sq.Eq{"stream": filter.StreamIncludes}) + } + + // Add sticker filters to the query + for k, v := range filter.StickerIncludes { + q = q.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}}) + } + + sql, args, err := q.ToSql() + if err != nil { + return nil, err + } + + ans := []influxdb.StoredAnnotation{} + if err := s.store.DB.SelectContext(ctx, &ans, sql, args...); err != nil { + return nil, err + } + + return ans, nil +} + +// GetAnnotation gets a single annotation by ID +func (s *Service) GetAnnotation(ctx context.Context, id platform.ID) (*influxdb.StoredAnnotation, error) { + q := sq.Select("annotations.*, streams.name AS stream"). + From("annotations"). + InnerJoin("streams ON annotations.stream_id = streams.id"). + Where(sq.Eq{"annotations.id": id}) + + query, args, err := q.ToSql() + if err != nil { + return nil, err + } + + var a influxdb.StoredAnnotation + if err := s.store.DB.GetContext(ctx, &a, query, args...); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, errAnnotationNotFound + } + + return nil, err + } + + return &a, nil +} + +// DeleteAnnotations deletes multiple annotations according to the provided filter +func (s *Service) DeleteAnnotations(ctx context.Context, orgID platform.ID, delete influxdb.AnnotationDeleteFilter) error { + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + sf := delete.StartTime.Format(time.RFC3339Nano) + ef := delete.EndTime.Format(time.RFC3339Nano) + + // This is a subquery that will be as part of a DELETE FROM ... WHERE id IN (subquery) + // A subquery is used because the json_each virtual table can only be used in a SELECT + subQ := sq.Select("annotations.id"). + Distinct(). + From("annotations, json_each(annotations.stickers) AS json"). + InnerJoin("streams ON annotations.stream_id = streams.id"). + Where(sq.Eq{"annotations.org_id": orgID}). + Where(sq.GtOrEq{"lower": sf}). + Where(sq.LtOrEq{"upper": ef}) + + // Add the stream name filter to the subquery (if present) + if len(delete.StreamTag) > 0 { + subQ = subQ.Where(sq.Eq{"streams.name": delete.StreamTag}) + } + + // Add the stream ID filter to the subquery (if present) + if delete.StreamID.Valid() { + subQ = subQ.Where(sq.Eq{"stream_id": delete.StreamID}) + } + + // Add any sticker filters to the subquery + for k, v := range delete.Stickers { + subQ = subQ.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}}) + } + + // Parse the subquery into a string and list of args + subQuery, subArgs, err := subQ.ToSql() + if err != nil { + return err + } + // Convert the subquery into a sq.Sqlizer so that it can be used in the actual DELETE + // operation. This is a bit of a hack since squirrel doesn't have great support for subqueries + // outside of SELECT statements + subExpr := sq.Expr("("+subQuery+")", subArgs...) + + q := sq. + Delete("annotations"). + Suffix("WHERE annotations.id IN"). + SuffixExpr(subExpr) + + query, args, err := q.ToSql() + + if err != nil { + return err + } + + if _, err := s.store.DB.ExecContext(ctx, query, args...); err != nil { + return err + } + + return nil +} + +// DeleteAnnoation deletes a single annotation by ID +func (s *Service) DeleteAnnotation(ctx context.Context, id platform.ID) error { + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + q := sq.Delete("annotations"). + Where(sq.Eq{"id": id}). + Suffix("RETURNING id") + + query, args, err := q.ToSql() + if err != nil { + return err + } + + var d platform.ID + if err := s.store.DB.GetContext(ctx, &d, query, args...); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return errAnnotationNotFound + } + + return err + } + + return nil +} + +// UpdateAnnotation updates a single annotation by ID +// In a similar fashion as CreateAnnotations, if the StreamTag in the update request does not exist, +// a stream will be created as part of a transaction with the update operation +func (s *Service) UpdateAnnotation(ctx context.Context, id platform.ID, update influxdb.AnnotationCreate) (*influxdb.AnnotationEvent, error) { + // get the full data for this annotation first so we can get its orgID + // this will ensure that the annotation already exists before starting the transaction + ann, err := s.GetAnnotation(ctx, id) + if err != nil { + return nil, err + } + + now := time.Now() + + // get a write lock on the database before starting the transaction to create/update the stream + // while simultaneously updating the annotation + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + tx, err := s.store.DB.BeginTxx(ctx, nil) + if err != nil { + tx.Rollback() + return nil, err + } + + query, args, err := newUpsertStreamQuery(ann.OrgID, s.idGenerator.ID(), now, influxdb.Stream{Name: update.StreamTag}) + if err != nil { + tx.Rollback() + return nil, err + } + + var streamID platform.ID + if err = tx.GetContext(ctx, &streamID, query, args...); err != nil { + tx.Rollback() + return nil, err + } + + q := sq.Update("annotations"). + SetMap(sq.Eq{ + "stream_id": streamID, + "summary": update.Summary, + "message": update.Message, + "stickers": update.Stickers, + "duration": timesToDuration(*update.StartTime, *update.EndTime), + "lower": update.StartTime.Format(time.RFC3339Nano), + "upper": update.EndTime.Format(time.RFC3339Nano), + }). + Where(sq.Eq{"id": id}). + Suffix("RETURNING *") + + query, args, err = q.ToSql() + if err != nil { + return nil, err + } + + var st influxdb.StoredAnnotation + err = tx.GetContext(ctx, &st, query, args...) + if err != nil { + tx.Rollback() + return nil, err + } + + if err = tx.Commit(); err != nil { + return nil, err + } + + // add the stream name to the result. we know that this StreamTag value was updated to the + // stream via the transaction having completed successfully. + st.StreamTag = update.StreamTag + + return st.ToEvent() +} + +// ListStreams returns a list of streams matching the filter for the provided orgID. +func (s *Service) ListStreams(ctx context.Context, orgID platform.ID, filter influxdb.StreamListFilter) ([]influxdb.StoredStream, error) { + q := sq.Select("id", "org_id", "name", "description", "created_at", "updated_at"). + From("streams"). + Where(sq.Eq{"org_id": orgID}) + + // Add stream name filters to the query + if len(filter.StreamIncludes) > 0 { + q = q.Where(sq.Eq{"name": filter.StreamIncludes}) + } + + sql, args, err := q.ToSql() + if err != nil { + return nil, err + } + + sts := []influxdb.StoredStream{} + err = s.store.DB.SelectContext(ctx, &sts, sql, args...) + if err != nil { + return nil, err + } + + return sts, nil +} + +// GetStream gets a single stream by ID +func (s *Service) GetStream(ctx context.Context, id platform.ID) (*influxdb.StoredStream, error) { + q := sq.Select("id", "org_id", "name", "description", "created_at", "updated_at"). + From("streams"). + Where(sq.Eq{"id": id}) + + query, args, err := q.ToSql() + if err != nil { + return nil, err + } + + var st influxdb.StoredStream + if err := s.store.DB.GetContext(ctx, &st, query, args...); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, errStreamNotFound + } + + return nil, err + } + + return &st, nil +} + +// CreateOrUpdateStream creates a new stream, or updates the description of an existing stream. +// Doesn't support updating a stream desctription to "". For that use the UpdateStream method. +func (s *Service) CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) { + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + newID := s.idGenerator.ID() + now := time.Now() + query, args, err := newUpsertStreamQuery(orgID, newID, now, stream) + if err != nil { + return nil, err + } + + var id platform.ID + if err = s.store.DB.GetContext(ctx, &id, query, args...); err != nil { + return nil, err + } + + // do a separate query to read the stream back from the database and return it. + // this is necessary because the sqlite driver does not support scanning time values from + // a RETURNING clause back into time.Time + return s.getReadStream(ctx, id) +} + +// UpdateStream updates a stream name and/or a description. It is strictly used for updating an existing stream. +func (s *Service) UpdateStream(ctx context.Context, id platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) { + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + q := sq.Update("streams"). + SetMap(sq.Eq{ + "name": stream.Name, + "description": stream.Description, + "updated_at": sq.Expr(`datetime('now')`), + }). + Where(sq.Eq{"id": id}). + Suffix(`RETURNING id`) + + query, args, err := q.ToSql() + if err != nil { + return nil, err + } + + var newID platform.ID + err = s.store.DB.GetContext(ctx, &newID, query, args...) + if err != nil { + if err == sql.ErrNoRows { + return nil, errStreamNotFound + } + + return nil, err + } + + // do a separate query to read the stream back from the database and return it. + // this is necessary because the sqlite driver does not support scanning time values from + // a RETURNING clause back into time.Time + return s.getReadStream(ctx, newID) +} + +// DeleteStreams is used for deleting multiple streams by name +func (s *Service) DeleteStreams(ctx context.Context, orgID platform.ID, delete influxdb.BasicStream) error { + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + q := sq.Delete("streams"). + Where(sq.Eq{"org_id": orgID}). + Where(sq.Eq{"name": delete.Names}) + + query, args, err := q.ToSql() + if err != nil { + return err + } + + _, err = s.store.DB.ExecContext(ctx, query, args...) + if err != nil { + return err + } + + return nil +} + +// DeleteStreamByID deletes a single stream by ID. Returns an error if the ID could not be found. +func (s *Service) DeleteStreamByID(ctx context.Context, id platform.ID) error { + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + q := sq.Delete("streams"). + Where(sq.Eq{"id": id}). + Suffix("RETURNING id") + + query, args, err := q.ToSql() + if err != nil { + return err + } + + var d platform.ID + if err := s.store.DB.GetContext(ctx, &d, query, args...); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return errStreamNotFound + } + + return err + } + + return nil +} + +func newUpsertStreamQuery(orgID, newID platform.ID, t time.Time, stream influxdb.Stream) (string, []interface{}, error) { + q := sq.Insert("streams"). + Columns("id", "org_id", "name", "description", "created_at", "updated_at"). + Values(newID, orgID, stream.Name, stream.Description, t, t). + Suffix(`ON CONFLICT(org_id, name) DO UPDATE + SET + updated_at = excluded.updated_at, + description = IIF(length(excluded.description) = 0, description, excluded.description)`). + Suffix("RETURNING id") + + return q.ToSql() +} + +// getReadStream is a helper which should only be called when the stream has been verified to exist +// via an update or insert. +func (s *Service) getReadStream(ctx context.Context, id platform.ID) (*influxdb.ReadStream, error) { + q := sq.Select("id", "name", "description", "created_at", "updated_at"). + From("streams"). + Where(sq.Eq{"id": id}) + + query, args, err := q.ToSql() + if err != nil { + return nil, err + } + + r := &influxdb.ReadStream{} + if err := s.store.DB.GetContext(ctx, r, query, args...); err != nil { + return nil, err + } + + return r, nil +} + +func storedAnnotationsToEvents(stored []*influxdb.StoredAnnotation) ([]influxdb.AnnotationEvent, error) { + events := make([]influxdb.AnnotationEvent, 0, len(stored)) + for _, s := range stored { + c, err := s.ToCreate() + if err != nil { + return nil, err + } + + events = append(events, influxdb.AnnotationEvent{ + ID: s.ID, + AnnotationCreate: *c, + }) + } + + return events, nil +} + +func timesToDuration(l, u time.Time) string { + return fmt.Sprintf("[%s, %s]", l.Format(time.RFC3339Nano), u.Format(time.RFC3339Nano)) +} diff --git a/annotations/service_test.go b/annotations/service_test.go new file mode 100644 index 0000000000..f51e0512c5 --- /dev/null +++ b/annotations/service_test.go @@ -0,0 +1,1002 @@ +// +build sqlite_json +// +build sqlite_foreign_keys + +package annotations + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/snowflake" + "github.com/influxdata/influxdb/v2/sqlite" + "github.com/influxdata/influxdb/v2/sqlite/migrations" + influxdbtesting "github.com/influxdata/influxdb/v2/testing" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +var ( + idGen = snowflake.NewIDGenerator() +) + +func TestAnnotationsCRUD(t *testing.T) { + t.Parallel() + + // intialize some variables that can be shared across tests + // the timeline for the 3 test annotations start & end times is visualized below. + // now + // v + // |---|---|---|---| + // ^ ^ ^ ^ ^ + // st1 et1 + // st2 et2 + // st3 et3 + + et1 := time.Now().UTC() + st1 := et1.Add(-10 * time.Minute) + + et2 := et1.Add(-5 * time.Minute) + st2 := et2.Add(-10 * time.Minute) + + et3 := et1.Add(-10 * time.Minute) + st3 := et2.Add(-15 * time.Minute) + + // used for tests involving time filters + earlierEt1 := et1.Add(-1 * time.Millisecond) + laterSt3 := st3.Add(1 * time.Millisecond) + beforeAny := st3.Add(-1 * time.Millisecond) + afterAny := et1.Add(1 * time.Millisecond) + + orgID := *influxdbtesting.IDPtr(1) + otherOrgID := *influxdbtesting.IDPtr(2) + ctx := context.Background() + + s1 := influxdb.StoredAnnotation{ + OrgID: orgID, + StreamTag: "stream1", + Summary: "summary1", + Message: "message1", + Stickers: map[string]string{"stick1": "val1", "stick2": "val2"}, + Duration: timesToDuration(st1, et1), + Lower: st1.Format(time.RFC3339Nano), + Upper: et1.Format(time.RFC3339Nano), + } + + c1, err := s1.ToCreate() + require.NoError(t, err) + + s2 := influxdb.StoredAnnotation{ + OrgID: orgID, + StreamTag: "stream2", + Summary: "summary2", + Message: "message2", + Stickers: map[string]string{"stick2": "val2", "stick3": "val3", "stick4": "val4"}, + Duration: timesToDuration(st2, et2), + Lower: st2.Format(time.RFC3339Nano), + Upper: et2.Format(time.RFC3339Nano), + } + + c2, err := s2.ToCreate() + require.NoError(t, err) + + s3 := influxdb.StoredAnnotation{ + OrgID: orgID, + StreamTag: "stream2", + Summary: "summary3", + Message: "message3", + Stickers: map[string]string{"stick1": "val2"}, + Duration: timesToDuration(st3, et3), + Lower: st3.Format(time.RFC3339Nano), + Upper: et3.Format(time.RFC3339Nano), + } + + c3, err := s3.ToCreate() + require.NoError(t, err) + + // helper function for setting up the database with data that can be used for tests + // that involve querying the database. uses the annotations objects initialized above + // via the closure. + populateAnnotationsData := func(t *testing.T, svc *Service) []influxdb.AnnotationEvent { + t.Helper() + + got, err := svc.CreateAnnotations(ctx, orgID, []influxdb.AnnotationCreate{*c1, *c2, *c3}) + require.NoError(t, err) + assertAnnotationEvents(t, got, []influxdb.AnnotationEvent{ + {AnnotationCreate: *c1}, + {AnnotationCreate: *c2}, + {AnnotationCreate: *c3}, + }) + + return got + } + + t.Run("create annotations", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + + tests := []struct { + name string + creates []influxdb.AnnotationCreate + want []influxdb.AnnotationEvent + }{ + { + "empty creates list returns empty events list", + []influxdb.AnnotationCreate{}, + []influxdb.AnnotationEvent{}, + }, + { + "creates annotations successfully", + []influxdb.AnnotationCreate{*c1, *c2, *c3}, + []influxdb.AnnotationEvent{ + {AnnotationCreate: *c1}, + {AnnotationCreate: *c2}, + {AnnotationCreate: *c3}, + }, + }, + } + + for _, tt := range tests { + got, err := svc.CreateAnnotations(ctx, orgID, tt.creates) + require.NoError(t, err) + assertAnnotationEvents(t, got, tt.want) + } + }) + + t.Run("select with filters", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + populateAnnotationsData(t, svc) + + tests := []struct { + name string + orgID platform.ID + f influxdb.AnnotationListFilter + want []influxdb.StoredAnnotation + }{ + { + "time filter is inclusive - gets all", + orgID, + influxdb.AnnotationListFilter{ + BasicFilter: influxdb.BasicFilter{ + StartTime: &st3, + EndTime: &et1, + }, + }, + []influxdb.StoredAnnotation{s1, s2, s3}, + }, + { + "doesn't get results for other org", + otherOrgID, + influxdb.AnnotationListFilter{ + BasicFilter: influxdb.BasicFilter{ + StartTime: &st3, + EndTime: &et1, + }, + }, + []influxdb.StoredAnnotation{}, + }, + { + "end time will filter out annotations", + orgID, + influxdb.AnnotationListFilter{ + BasicFilter: influxdb.BasicFilter{ + StartTime: &st3, + EndTime: &earlierEt1, + }, + }, + []influxdb.StoredAnnotation{s2, s3}, + }, + { + "start time will filter out annotations", + orgID, + influxdb.AnnotationListFilter{ + BasicFilter: influxdb.BasicFilter{ + StartTime: &laterSt3, + EndTime: &et1, + }, + }, + []influxdb.StoredAnnotation{s1, s2}, + }, + { + "time can filter out all annotations if it's too soon", + orgID, + influxdb.AnnotationListFilter{ + BasicFilter: influxdb.BasicFilter{ + StartTime: &beforeAny, + EndTime: &beforeAny, + }, + }, + []influxdb.StoredAnnotation{}, + }, + { + "time can filter out all annotations if it's too late", + orgID, + influxdb.AnnotationListFilter{ + BasicFilter: influxdb.BasicFilter{ + StartTime: &afterAny, + EndTime: &afterAny, + }, + }, + []influxdb.StoredAnnotation{}, + }, + { + "time can filter out all annotations if it's too narrow", + orgID, + influxdb.AnnotationListFilter{ + BasicFilter: influxdb.BasicFilter{ + StartTime: &laterSt3, + EndTime: &et3, + }, + }, + []influxdb.StoredAnnotation{}, + }, + { + "can filter by stickers - one sticker matches one", + orgID, + influxdb.AnnotationListFilter{ + StickerIncludes: map[string]string{"stick1": "val2"}, + }, + []influxdb.StoredAnnotation{s3}, + }, + { + "can filter by stickers - one sticker matches multiple", + orgID, + influxdb.AnnotationListFilter{ + StickerIncludes: map[string]string{"stick2": "val2"}, + }, + []influxdb.StoredAnnotation{s1, s2}, + }, + { + "can filter by stickers - matching key but wrong value", + orgID, + influxdb.AnnotationListFilter{ + StickerIncludes: map[string]string{"stick2": "val3"}, + }, + []influxdb.StoredAnnotation{}, + }, + { + "can filter by stream - matches one", + orgID, + influxdb.AnnotationListFilter{ + StreamIncludes: []string{"stream1"}, + }, + []influxdb.StoredAnnotation{s1}, + }, + { + "can filter by stream - matches multiple", + orgID, + influxdb.AnnotationListFilter{ + StreamIncludes: []string{"stream2"}, + }, + []influxdb.StoredAnnotation{s2, s3}, + }, + { + "can filter by stream - no match", + orgID, + influxdb.AnnotationListFilter{ + StreamIncludes: []string{"badStream"}, + }, + []influxdb.StoredAnnotation{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.f.Validate(time.Now) + got, err := svc.ListAnnotations(ctx, tt.orgID, tt.f) + require.NoError(t, err) + assertStoredAnnotations(t, got, tt.want) + }) + } + }) + + t.Run("get by id", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + anns := populateAnnotationsData(t, svc) + + tests := []struct { + name string + id platform.ID + want *influxdb.AnnotationEvent + wantErr error + }{ + { + "gets the first one by id", + anns[0].ID, + &anns[0], + nil, + }, + { + "gets the second one by id", + anns[1].ID, + &anns[1], + nil, + }, + { + "has the correct error if not found", + idGen.ID(), + nil, + errAnnotationNotFound, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := svc.GetAnnotation(ctx, tt.id) + require.Equal(t, tt.wantErr, err) + + if tt.want == nil { + require.Nil(t, got) + } else { + e, err := got.ToEvent() + require.NoError(t, err) + require.Equal(t, tt.want, e) + } + }) + } + }) + + t.Run("delete multiple with a filter", func(t *testing.T) { + t.Run("delete by stream id", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + populateAnnotationsData(t, svc) + + ctx := context.Background() + + lf := influxdb.AnnotationListFilter{BasicFilter: influxdb.BasicFilter{}} + lf.Validate(time.Now) + ans, err := svc.ListAnnotations(ctx, orgID, lf) + require.NoError(t, err) + + annID1 := ans[0].ID + streamID1 := ans[0].StreamID + st1, err := time.Parse(time.RFC3339Nano, ans[0].Lower) + require.NoError(t, err) + et1, err := time.Parse(time.RFC3339Nano, ans[0].Upper) + require.NoError(t, err) + + streamID2 := ans[1].StreamID + st2, err := time.Parse(time.RFC3339Nano, ans[1].Lower) + require.NoError(t, err) + et2, err := time.Parse(time.RFC3339Nano, ans[1].Upper) + require.NoError(t, err) + + tests := []struct { + name string + deleteOrgID platform.ID + id platform.ID + filter influxdb.AnnotationDeleteFilter + shouldDelete bool + }{ + { + "matches stream id but not time range", + orgID, + annID1, + influxdb.AnnotationDeleteFilter{ + StreamID: streamID1, + StartTime: &st2, + EndTime: &et2, + }, + false, + }, + { + "matches time range but not stream id", + orgID, + annID1, + influxdb.AnnotationDeleteFilter{ + StreamID: streamID2, + StartTime: &st1, + EndTime: &et1, + }, + false, + }, + { + "doesn't delete for other org", + otherOrgID, + annID1, + influxdb.AnnotationDeleteFilter{ + StreamID: streamID1, + StartTime: &st1, + EndTime: &et1, + }, + false, + }, + { + "matches stream id and time range", + orgID, + annID1, + influxdb.AnnotationDeleteFilter{ + StreamID: streamID1, + StartTime: &st1, + EndTime: &et1, + }, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := svc.DeleteAnnotations(ctx, tt.deleteOrgID, tt.filter) + require.NoError(t, err) + + lf := influxdb.AnnotationListFilter{BasicFilter: influxdb.BasicFilter{}} + lf.Validate(time.Now) + list, err := svc.ListAnnotations(ctx, orgID, lf) + require.NoError(t, err) + get, getErr := svc.GetAnnotation(ctx, tt.id) + + if tt.shouldDelete { + require.Equal(t, 2, len(list)) + require.Nil(t, get) + require.Equal(t, errAnnotationNotFound, getErr) + } else { + require.Equal(t, 3, len(list)) + require.NoError(t, getErr) + require.Equal(t, *get, ans[0]) + } + }) + } + }) + + t.Run("delete with non-id filters", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + populateAnnotationsData(t, svc) + + tests := []struct { + name string + deleteOrgID platform.ID + filter influxdb.AnnotationDeleteFilter + wantList []influxdb.StoredAnnotation + }{ + { + "matches stream tag but not time range", + orgID, + influxdb.AnnotationDeleteFilter{ + StreamTag: "stream1", + StartTime: &st1, + EndTime: &earlierEt1, + }, + []influxdb.StoredAnnotation{s1, s2, s3}, + }, + { + "matches stream tag and time range", + orgID, + influxdb.AnnotationDeleteFilter{ + StreamTag: "stream1", + StartTime: &st1, + EndTime: &et1, + }, + []influxdb.StoredAnnotation{s2, s3}, + }, + { + "matches stream tag for multiple", + orgID, + influxdb.AnnotationDeleteFilter{ + StreamTag: "stream2", + StartTime: &st3, + EndTime: &et1, + }, + []influxdb.StoredAnnotation{s1}, + }, + { + "matches stream tag but wrong org", + otherOrgID, + influxdb.AnnotationDeleteFilter{ + StreamTag: "stream1", + StartTime: &st1, + EndTime: &et1, + }, + []influxdb.StoredAnnotation{s1, s2, s3}, + }, + + { + "matches stickers but not time range", + orgID, + influxdb.AnnotationDeleteFilter{ + Stickers: map[string]string{"stick1": "val1"}, + StartTime: &st1, + EndTime: &earlierEt1, + }, + []influxdb.StoredAnnotation{s1, s2, s3}, + }, + { + "matches stickers and time range", + orgID, + influxdb.AnnotationDeleteFilter{ + Stickers: map[string]string{"stick1": "val1"}, + StartTime: &st1, + EndTime: &et1, + }, + []influxdb.StoredAnnotation{s2, s3}, + }, + { + "matches stickers for multiple", + orgID, + influxdb.AnnotationDeleteFilter{ + Stickers: map[string]string{"stick2": "val2"}, + StartTime: &st2, + EndTime: &et1, + }, + []influxdb.StoredAnnotation{s3}, + }, + { + "matches stickers but wrong org", + otherOrgID, + influxdb.AnnotationDeleteFilter{ + Stickers: map[string]string{"stick1": "val1"}, + StartTime: &st1, + EndTime: &et1, + }, + []influxdb.StoredAnnotation{s1, s2, s3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + populateAnnotationsData(t, svc) + + err := svc.DeleteAnnotations(ctx, tt.deleteOrgID, tt.filter) + require.NoError(t, err) + + f := influxdb.AnnotationListFilter{} + f.Validate(time.Now) + list, err := svc.ListAnnotations(ctx, orgID, f) + require.NoError(t, err) + assertStoredAnnotations(t, tt.wantList, list) + }) + } + }) + }) + + t.Run("delete a single annotation by id", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + ans := populateAnnotationsData(t, svc) + + tests := []struct { + name string + id platform.ID + shouldDelete bool + }{ + { + "has the correct error if not found", + idGen.ID(), + false, + }, + { + "deletes the first one by id", + ans[0].ID, + true, + }, + { + "deletes the second one by id", + ans[1].ID, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := svc.DeleteAnnotation(ctx, tt.id) + + if tt.shouldDelete { + require.NoError(t, err) + } else { + require.Equal(t, errAnnotationNotFound, err) + } + + got, err := svc.GetAnnotation(ctx, tt.id) + require.Equal(t, errAnnotationNotFound, err) + require.Nil(t, got) + }) + } + }) + + t.Run("update a single annotation by id", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + ans := populateAnnotationsData(t, svc) + + updatedTime := time.Time{}.Add(time.Minute) + + tests := []struct { + name string + id platform.ID + update influxdb.AnnotationCreate + wantErr error + }{ + { + "has the correct error if not found", + idGen.ID(), + influxdb.AnnotationCreate{ + StreamTag: "updated tag", + Summary: "updated summary", + Message: "updated message", + Stickers: map[string]string{"updated": "sticker"}, + EndTime: &updatedTime, + StartTime: &updatedTime, + }, + errAnnotationNotFound, + }, + { + "updates the first one by id", + ans[0].ID, + influxdb.AnnotationCreate{ + StreamTag: "updated tag", + Summary: "updated summary", + Message: "updated message", + Stickers: map[string]string{"updated": "sticker"}, + EndTime: &updatedTime, + StartTime: &updatedTime, + }, + nil, + }, + { + "updates the second one by id", + ans[1].ID, + influxdb.AnnotationCreate{ + StreamTag: "updated tag2", + Summary: "updated summary2", + Message: "updated message2", + Stickers: map[string]string{"updated2": "sticker2"}, + EndTime: &updatedTime, + StartTime: &updatedTime, + }, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + want := &influxdb.AnnotationEvent{ID: tt.id, AnnotationCreate: tt.update} + if tt.wantErr != nil { + want = nil + } + + got, err := svc.UpdateAnnotation(ctx, tt.id, tt.update) + require.Equal(t, tt.wantErr, err) + require.Equal(t, want, got) + + if tt.wantErr == nil { + new, err := svc.GetAnnotation(ctx, tt.id) + require.NoError(t, err) + e, err := new.ToEvent() + require.NoError(t, err) + require.Equal(t, got, e) + } + }) + } + }) + + t.Run("deleted streams cascade to deleted annotations", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + + ctx := context.Background() + ans := populateAnnotationsData(t, svc) + sort.Slice(ans, func(i, j int) bool { + return ans[i].StreamTag < ans[j].StreamTag + }) + + // annotations s2 and s3 have the stream tag of "stream2", so get the id of that stream + id := ans[1].ID + a, err := svc.GetAnnotation(ctx, id) + require.NoError(t, err) + streamID := a.StreamID + + // delete the stream + err = svc.DeleteStreamByID(ctx, streamID) + require.NoError(t, err) + + // s1 should still be there + s1, err := svc.GetAnnotation(ctx, ans[0].ID) + require.NoError(t, err) + + // both s2 and s3 should now be deleted + f := influxdb.AnnotationListFilter{} + f.Validate(time.Now) + remaining, err := svc.ListAnnotations(ctx, orgID, f) + require.NoError(t, err) + require.Equal(t, []influxdb.StoredAnnotation{*s1}, remaining) + }) + + t.Run("renamed streams are reflected in subsequent annotation queries", func(t *testing.T) { + svc, clean := newTestService(t) + defer clean(t) + + ctx := context.Background() + populateAnnotationsData(t, svc) + + // get all the annotations with the tag "stream2" + f := influxdb.AnnotationListFilter{StreamIncludes: []string{"stream2"}} + f.Validate(time.Now) + originalList, err := svc.ListAnnotations(ctx, orgID, f) + require.NoError(t, err) + assertStoredAnnotations(t, []influxdb.StoredAnnotation{s2, s3}, originalList) + + // check that the original list has the right stream tag for all annotations + for _, a := range originalList { + require.Equal(t, "stream2", a.StreamTag) + } + + // update the name for stream2 + streamID := originalList[0].StreamID + _, err = svc.UpdateStream(ctx, streamID, influxdb.Stream{Name: "new name", Description: "new desc"}) + require.NoError(t, err) + + // get all the annotations with the new tag + f = influxdb.AnnotationListFilter{StreamIncludes: []string{"new name"}} + f.Validate(time.Now) + newList, err := svc.ListAnnotations(ctx, orgID, f) + require.NoError(t, err) + + // check that the new list has the right stream tag for all annotations + for _, a := range newList { + require.Equal(t, "new name", a.StreamTag) + } + + // verify that the new list of annotations is the same as the original except for the stream name change + require.Equal(t, len(originalList), len(newList)) + + sort.Slice(originalList, func(i, j int) bool { + return originalList[i].ID < originalList[j].ID + }) + + sort.Slice(newList, func(i, j int) bool { + return originalList[i].ID < originalList[j].ID + }) + + for i := range newList { + originalList[i].StreamTag = "new name" + require.Equal(t, originalList[i], newList[i]) + } + }) +} + +func TestStreamsCRUDSingle(t *testing.T) { + t.Parallel() + + svc, clean := newTestService(t) + defer clean(t) + + ctx := context.Background() + orgID := *influxdbtesting.IDPtr(1) + + stream := influxdb.Stream{ + Name: "testName", + Description: "original description", + } + + var err error + var s1, s2, s3 *influxdb.ReadStream + + t.Run("create a single stream", func(t *testing.T) { + s1, err = svc.CreateOrUpdateStream(ctx, orgID, stream) + require.NoError(t, err) + require.Equal(t, stream.Name, s1.Name) + require.Equal(t, stream.Description, s1.Description) + }) + + t.Run("stream updates", func(t *testing.T) { + u1 := influxdb.Stream{ + Name: "testName", + Description: "updated description", + } + + u2 := influxdb.Stream{ + Name: "otherName", + Description: "other description", + } + + t.Run("updating an existing stream with CreateOrUpdateStream does not change id but does change description", func(t *testing.T) { + s2, err = svc.CreateOrUpdateStream(ctx, orgID, u1) + require.NoError(t, err) + require.Equal(t, stream.Name, s2.Name) + require.Equal(t, u1.Description, s2.Description) + require.Equal(t, s1.ID, s2.ID) + }) + + t.Run("updating a non-existant stream with UpdateStream returns not found error", func(t *testing.T) { + readGot, err := svc.UpdateStream(ctx, idGen.ID(), u2) + require.Nil(t, readGot) + require.Equal(t, errStreamNotFound, err) + }) + + t.Run("updating an existing stream with UpdateStream changes both name & description", func(t *testing.T) { + s3, err = svc.UpdateStream(ctx, s2.ID, u2) + require.NoError(t, err) + require.Equal(t, s2.ID, s3.ID) + require.Equal(t, u2.Name, s3.Name) + require.Equal(t, u2.Description, s3.Description) + }) + }) + + t.Run("getting a stream", func(t *testing.T) { + t.Run("non-existant stream returns a not found error", func(t *testing.T) { + storedGot, err := svc.GetStream(ctx, idGen.ID()) + require.Nil(t, storedGot) + require.Equal(t, errStreamNotFound, err) + }) + + t.Run("existing stream returns without error", func(t *testing.T) { + storedGot, err := svc.GetStream(ctx, s3.ID) + require.NoError(t, err) + require.Equal(t, s3.Name, storedGot.Name) + require.Equal(t, s3.Description, storedGot.Description) + }) + }) + + t.Run("deleting a stream", func(t *testing.T) { + t.Run("non-existant stream returns a not found error", func(t *testing.T) { + err := svc.DeleteStreamByID(ctx, idGen.ID()) + require.Equal(t, errStreamNotFound, err) + }) + + t.Run("deletes an existing stream without error", func(t *testing.T) { + err := svc.DeleteStreamByID(ctx, s1.ID) + require.NoError(t, err) + + storedGot, err := svc.GetStream(ctx, s1.ID) + require.Nil(t, storedGot) + require.Equal(t, err, errStreamNotFound) + }) + }) +} + +func TestStreamsCRUDMany(t *testing.T) { + t.Parallel() + + svc, clean := newTestService(t) + defer clean(t) + + ctx := context.Background() + + orgID1 := influxdbtesting.IDPtr(1) + orgID2 := influxdbtesting.IDPtr(2) + orgID3 := influxdbtesting.IDPtr(3) + + // populate the database with some streams for testing delete and select many + combos := map[platform.ID][]string{ + *orgID1: {"org1_s1", "org1_s2", "org1_s3", "org1_s4"}, + *orgID2: {"org2_s1"}, + *orgID3: {"org3_s1", "org3_s2"}, + } + + for orgID, streams := range combos { + for _, s := range streams { + _, err := svc.CreateOrUpdateStream(ctx, orgID, influxdb.Stream{ + Name: s, + }) + require.NoError(t, err) + } + } + + t.Run("all streams can be listed for each org if passing an empty list", func(t *testing.T) { + for orgID, streams := range combos { + got, err := svc.ListStreams(ctx, orgID, influxdb.StreamListFilter{ + StreamIncludes: []string{}, + }) + require.NoError(t, err) + assertStreamNames(t, streams, got) + } + }) + + t.Run("can select specific streams and get only those for that org", func(t *testing.T) { + for orgID, streams := range combos { + got, err := svc.ListStreams(ctx, orgID, influxdb.StreamListFilter{ + StreamIncludes: streams, + }) + require.NoError(t, err) + assertStreamNames(t, streams, got) + } + }) + + t.Run("can delete a single stream with DeleteStreams, but does not delete streams for other org", func(t *testing.T) { + err := svc.DeleteStreams(ctx, *orgID1, influxdb.BasicStream{ + Names: []string{"org1_s1", "org2_s1"}, + }) + require.NoError(t, err) + + got, err := svc.ListStreams(ctx, *orgID1, influxdb.StreamListFilter{ + StreamIncludes: []string{}, + }) + require.NoError(t, err) + assertStreamNames(t, []string{"org1_s2", "org1_s3", "org1_s4"}, got) + + got, err = svc.ListStreams(ctx, *orgID2, influxdb.StreamListFilter{ + StreamIncludes: []string{}, + }) + require.NoError(t, err) + assertStreamNames(t, []string{"org2_s1"}, got) + }) + + t.Run("can delete all streams for all orgs", func(t *testing.T) { + for orgID, streams := range combos { + err := svc.DeleteStreams(ctx, orgID, influxdb.BasicStream{ + Names: streams, + }) + require.NoError(t, err) + + got, err := svc.ListStreams(ctx, orgID, influxdb.StreamListFilter{ + StreamIncludes: []string{}, + }) + require.NoError(t, err) + require.Equal(t, []influxdb.StoredStream{}, got) + } + }) +} + +func assertAnnotationEvents(t *testing.T, got, want []influxdb.AnnotationEvent) { + t.Helper() + + require.Equal(t, len(want), len(got)) + + sort.Slice(want, func(i, j int) bool { + return want[i].StreamTag < want[j].StreamTag + }) + + sort.Slice(got, func(i, j int) bool { + return got[i].StreamTag < got[j].StreamTag + }) + + for idx, w := range want { + w.ID = got[idx].ID + require.Equal(t, w, got[idx]) + } +} + +// should make these are lists similar +func assertStoredAnnotations(t *testing.T, got, want []influxdb.StoredAnnotation) { + t.Helper() + + require.Equal(t, len(want), len(got)) + + sort.Slice(want, func(i, j int) bool { + return want[i].ID < want[j].ID + }) + + sort.Slice(got, func(i, j int) bool { + return got[i].ID < got[j].ID + }) + + for idx, w := range want { + w.ID = got[idx].ID + w.StreamID = got[idx].StreamID + require.Equal(t, w, got[idx]) + } +} + +func assertStreamNames(t *testing.T, want []string, got []influxdb.StoredStream) { + t.Helper() + + storedNames := make([]string, len(got)) + for i, s := range got { + storedNames[i] = s.Name + } + + require.ElementsMatch(t, want, storedNames) +} + +func newTestService(t *testing.T) (*Service, func(t *testing.T)) { + t.Helper() + + store, clean := sqlite.NewTestStore(t) + ctx := context.Background() + + sqliteMigrator := sqlite.NewMigrator(store, zap.NewNop()) + err := sqliteMigrator.Up(ctx, migrations.All) + require.NoError(t, err) + + svc := NewService(zap.NewNop(), store) + + return svc, clean +} diff --git a/annotations/transport/annotations_router.go b/annotations/transport/annotations_router.go index 63f8095dd9..e2f6b09604 100644 --- a/annotations/transport/annotations_router.go +++ b/annotations/transport/annotations_router.go @@ -3,7 +3,6 @@ package transport import ( "encoding/json" "net/http" - "strings" "time" "github.com/go-chi/chi" @@ -252,16 +251,11 @@ func storedAnnotationsToReadAnnotations(s []influxdb.StoredAnnotation) (influxdb r := influxdb.ReadAnnotations{} for _, val := range s { - stickers, err := stickerSliceToMap(val.Stickers) - if err != nil { - return nil, err - } - r[val.StreamTag] = append(r[val.StreamTag], influxdb.ReadAnnotation{ ID: val.ID, Summary: val.Summary, Message: val.Message, - Stickers: stickers, + Stickers: val.Stickers, StartTime: val.Lower, EndTime: val.Upper, }) @@ -281,34 +275,15 @@ func storedAnnotationToEvent(s *influxdb.StoredAnnotation) (*influxdb.Annotation return nil, err } - stickers, err := stickerSliceToMap(s.Stickers) - if err != nil { - return nil, err - } - return &influxdb.AnnotationEvent{ ID: s.ID, AnnotationCreate: influxdb.AnnotationCreate{ StreamTag: s.StreamTag, Summary: s.Summary, Message: s.Message, - Stickers: stickers, + Stickers: s.Stickers, EndTime: et, StartTime: st, }, }, nil } - -func stickerSliceToMap(stickers []string) (map[string]string, error) { - stickerMap := map[string]string{} - - for i := range stickers { - sticks := strings.SplitN(stickers[i], "=", 2) - if len(sticks) < 2 { - return nil, invalidStickerError(stickers[i]) - } - stickerMap[sticks[0]] = sticks[1] - } - - return stickerMap, nil -} diff --git a/annotations/transport/annotations_router_test.go b/annotations/transport/annotations_router_test.go index 5231c5e0de..0b0d89a042 100644 --- a/annotations/transport/annotations_router_test.go +++ b/annotations/transport/annotations_router_test.go @@ -42,7 +42,7 @@ var ( StreamTag: "sometag", Summary: "testing the api", Message: "stored annotation message", - Stickers: []string{"val1=sticker1", "val2=sticker2"}, + Stickers: map[string]string{"val1": "sticker1", "val2": "sticker2"}, Lower: now.Format(time.RFC3339), Upper: now.Format(time.RFC3339), } @@ -265,39 +265,3 @@ func TestStoredAnnotationToEvent(t *testing.T) { require.NoError(t, err) require.Equal(t, got, &testEvent) } - -func TestStickerSliceToMap(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - stickers []string - want map[string]string - wantErr error - }{ - { - "good stickers", - []string{"good1=val1", "good2=val2"}, - map[string]string{"good1": "val1", "good2": "val2"}, - nil, - }, - { - "bad stickers", - []string{"this is an invalid sticker", "shouldbe=likethis"}, - nil, - invalidStickerError("this is an invalid sticker"), - }, - { - "no stickers", - []string{}, - map[string]string{}, - nil, - }, - } - - for _, tt := range tests { - got, err := stickerSliceToMap(tt.stickers) - require.Equal(t, tt.want, got) - require.Equal(t, tt.wantErr, err) - } -} diff --git a/annotations/transport/http.go b/annotations/transport/http.go index 857472abfa..f603e9b7b4 100644 --- a/annotations/transport/http.go +++ b/annotations/transport/http.go @@ -1,7 +1,6 @@ package transport import ( - "fmt" "net/http" "time" @@ -41,13 +40,6 @@ var ( } ) -func invalidStickerError(s string) error { - return &errors.Error{ - Code: errors.EInternal, - Msg: fmt.Sprintf("invalid sticker: %q", s), - } -} - // AnnotationsHandler is the handler for the annotation service type AnnotationHandler struct { chi.Router diff --git a/go.mod b/go.mod index 0d82782799..99d2a4f519 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/BurntSushi/toml v0.3.1 + github.com/Masterminds/squirrel v1.5.0 github.com/NYTimes/gziphandler v1.0.1 github.com/RoaringBitmap/roaring v0.4.16 github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 diff --git a/go.sum b/go.sum index 6d270ad2c7..06bc578778 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITg github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Masterminds/sprig v2.16.0+incompatible h1:QZbMUPxRQ50EKAq3LFMnxddMu88/EUUG3qmxwtDmPsY= github.com/Masterminds/sprig v2.16.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= +github.com/Masterminds/squirrel v1.5.0 h1:JukIZisrUXadA9pl3rMkjhiamxiB0cXiu+HGp/Y8cY8= +github.com/Masterminds/squirrel v1.5.0/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/Microsoft/go-winio v0.4.11 h1:zoIOcVf0xPN1tnMVbTtEdI+P8OofVk3NObnwOQ6nK2Q= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0= @@ -384,6 +386,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= diff --git a/sqlite/migrations/0002_create_annotations_tables.sql b/sqlite/migrations/0002_create_annotations_tables.sql new file mode 100644 index 0000000000..8604f48990 --- /dev/null +++ b/sqlite/migrations/0002_create_annotations_tables.sql @@ -0,0 +1,34 @@ +-- The user_version should match the "000X" from the file name +-- Ex: 0001_create_notebooks_table should have a user_verison of 1 +PRAGMA user_version=2; + +-- Create the initial table to store streams +CREATE TABLE streams ( + id VARCHAR(16) PRIMARY KEY, + org_id VARCHAR(16) NOT NULL, + name TEXT NOT NULL, + description TEXT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + + CONSTRAINT streams_uniq_orgid_name UNIQUE (org_id, name) +); + +-- Create the initial table to store annotations +CREATE TABLE annotations ( + id VARCHAR(16) PRIMARY KEY, + org_id VARCHAR(16) NOT NULL, + stream_id VARCHAR(16) NOT NULL, + summary TEXT NOT NULL, + message TEXT NOT NULL, + stickers TEXT NOT NULL, + duration TEXT NOT NULL, + lower TIMESTAMP NOT NULL, + upper TIMESTAMP NOT NULL, + + FOREIGN KEY (stream_id) REFERENCES streams(id) ON DELETE CASCADE +); + +-- Create indexes for stream_id and stickers to support fast queries +CREATE INDEX idx_annotations_stream ON annotations (stream_id); +CREATE INDEX idx_annotations_stickers ON annotations (stickers);