feat(annotations): storage service (#21690)

* feat(annotations): storage service

* feat: stickers are in db as array

* chore: fix some unintended diffs

* fix: fixes from review

* fix: specific table name for json_each

* fix: update primary keys and constraints

* fix: fix schema

* feat: stream name updates are reflected in annotations via FK
pull/21699/head
William Baker 2021-06-15 18:36:11 -04:00 committed by GitHub
parent 56833b772b
commit 1935c13c16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1789 additions and 94 deletions

View File

@ -42,7 +42,7 @@ builds:
goarch: arm64 goarch: arm64
main: ./cmd/influxd/ main: ./cmd/influxd/
flags: flags:
- -tags=assets{{if eq .Os "linux"}},osusergo,netgo,static_build{{if not (eq .Arch "amd64")}},noasm{{end}}{{end}} - -tags=assets,sqlite_foreign_keys,sqlite_json{{if eq .Os "linux"}},osusergo,netgo,static_build{{if not (eq .Arch "amd64")}},noasm{{end}}{{end}}
- -buildmode={{if eq .Os "windows"}}exe{{else}}pie{{end}} - -buildmode={{if eq .Os "windows"}}exe{{else}}pie{{end}}
env: env:
- GO111MODULE=on - GO111MODULE=on

View File

@ -30,8 +30,11 @@ else
GO_BUILD_TAGS := assets,noasm GO_BUILD_TAGS := assets,noasm
endif endif
GO_TEST_ARGS := -tags '$(GO_TEST_TAGS)' # Tags used for builds and tests on all architectures
GO_BUILD_ARGS := -tags '$(GO_BUILD_TAGS)' COMMON_TAGS := sqlite_foreign_keys,sqlite_json
GO_TEST_ARGS := -tags '$(COMMON_TAGS),$(GO_TEST_TAGS)'
GO_BUILD_ARGS := -tags '$(COMMON_TAGS),$(GO_BUILD_TAGS)'
ifeq ($(OS), Windows_NT) ifeq ($(OS), Windows_NT)
VERSION := $(shell git describe --exact-match --tags 2>nil) VERSION := $(shell git describe --exact-match --tags 2>nil)

View File

@ -2,7 +2,9 @@ package influxdb
import ( import (
"context" "context"
"database/sql/driver"
"encoding/json" "encoding/json"
"fmt"
"regexp" "regexp"
"strings" "strings"
"time" "time"
@ -63,6 +65,27 @@ var (
} }
) )
func invalidStickerError(s string) error {
return &errors.Error{
Code: errors.EInternal,
Msg: fmt.Sprintf("invalid sticker: %q", s),
}
}
func stickerSliceToMap(stickers []string) (map[string]string, error) {
stickerMap := map[string]string{}
for i := range stickers {
sticks := strings.SplitN(stickers[i], "=", 2)
if len(sticks) < 2 {
return nil, invalidStickerError(stickers[i])
}
stickerMap[sticks[0]] = sticks[1]
}
return stickerMap, nil
}
// Service is the service contract for Annotations // Service is the service contract for Annotations
type AnnotationService interface { type AnnotationService interface {
// CreateAnnotations creates annotations. // CreateAnnotations creates annotations.
@ -101,26 +124,107 @@ type AnnotationEvent struct {
// AnnotationCreate contains user providable fields for annotating an event. // AnnotationCreate contains user providable fields for annotating an event.
type AnnotationCreate struct { type AnnotationCreate struct {
StreamTag string `json:"stream,omitempty"` // StreamTag provides a means to logically group a set of annotated events. StreamTag string `json:"stream,omitempty"` // StreamTag provides a means to logically group a set of annotated events.
Summary string `json:"summary"` // Summary is the only field required to annotate an event. Summary string `json:"summary"` // Summary is the only field required to annotate an event.
Message string `json:"message,omitempty"` // Message provides more details about the event being annotated. Message string `json:"message,omitempty"` // Message provides more details about the event being annotated.
Stickers map[string]string `json:"stickers,omitempty"` // Stickers are like tags, but named something obscure to differentiate them from influx tags. They are there to differentiate an annotated event. Stickers AnnotationStickers `json:"stickers,omitempty"` // Stickers are like tags, but named something obscure to differentiate them from influx tags. They are there to differentiate an annotated event.
EndTime *time.Time `json:"endTime,omitempty"` // EndTime is the time of the event being annotated. Defaults to now if not set. EndTime *time.Time `json:"endTime,omitempty"` // EndTime is the time of the event being annotated. Defaults to now if not set.
StartTime *time.Time `json:"startTime,omitempty"` // StartTime is the start time of the event being annotated. Defaults to EndTime if not set. StartTime *time.Time `json:"startTime,omitempty"` // StartTime is the start time of the event being annotated. Defaults to EndTime if not set.
} }
// StoredAnnotation represents annotation data to be stored in the database. // StoredAnnotation represents annotation data to be stored in the database.
type StoredAnnotation struct { type StoredAnnotation struct {
ID platform.ID `db:"id"` // ID is the annotation's id. ID platform.ID `db:"id"` // ID is the annotation's id.
OrgID platform.ID `db:"org_id"` // OrgID is the annotations's owning organization. OrgID platform.ID `db:"org_id"` // OrgID is the annotations's owning organization.
StreamID platform.ID `db:"stream_id"` // StreamID is the id of a stream. StreamID platform.ID `db:"stream_id"` // StreamID is the id of a stream.
StreamTag string `db:"name"` // StreamTag is the name of a stream (when selecting with join of streams). StreamTag string `db:"stream"` // StreamTag is the name of a stream (when selecting with join of streams).
Summary string `db:"summary"` // Summary is the summary of the annotated event. Summary string `db:"summary"` // Summary is the summary of the annotated event.
Message string `db:"message"` // Message is a longer description of the annotated event. Message string `db:"message"` // Message is a longer description of the annotated event.
Stickers []string `db:"stickers"` // Stickers are additional labels to group annotations by. Stickers AnnotationStickers `db:"stickers"` // Stickers are additional labels to group annotations by.
Duration string `db:"duration"` // Duration is the time range (with zone) of an annotated event. Duration string `db:"duration"` // Duration is the time range (with zone) of an annotated event.
Lower string `db:"lower"` // Lower is the time an annotated event begins. Lower string `db:"lower"` // Lower is the time an annotated event begins.
Upper string `db:"upper"` // Upper is the time an annotated event ends. Upper string `db:"upper"` // Upper is the time an annotated event ends.
}
// ToCreate is a utility method for converting a StoredAnnotation to an AnnotationCreate type
func (s StoredAnnotation) ToCreate() (*AnnotationCreate, error) {
et, err := time.Parse(time.RFC3339Nano, s.Upper)
if err != nil {
return nil, err
}
st, err := time.Parse(time.RFC3339Nano, s.Lower)
if err != nil {
return nil, err
}
return &AnnotationCreate{
StreamTag: s.StreamTag,
Summary: s.Summary,
Message: s.Message,
Stickers: s.Stickers,
EndTime: &et,
StartTime: &st,
}, nil
}
// ToEvent is a utility method for converting a StoredAnnotation to an AnnotationEvent type
func (s StoredAnnotation) ToEvent() (*AnnotationEvent, error) {
c, err := s.ToCreate()
if err != nil {
return nil, err
}
return &AnnotationEvent{
ID: s.ID,
AnnotationCreate: *c,
}, nil
}
type AnnotationStickers map[string]string
// Value implements the database/sql Valuer interface for adding AnnotationStickers to the database
// Stickers are stored in the database as a slice of strings like "[key=val]"
// They are encoded into a JSON string for storing into the database, and the JSON sqlite extension is
// able to manipulate them like an object.
func (a AnnotationStickers) Value() (driver.Value, error) {
stickSlice := make([]string, 0, len(a))
for k, v := range a {
stickSlice = append(stickSlice, fmt.Sprintf("%s=%s", k, v))
}
sticks, err := json.Marshal(stickSlice)
if err != nil {
return nil, err
}
return string(sticks), nil
}
// Scan implements the database/sql Scanner interface for retrieving AnnotationStickers from the database
// The string is decoded into a slice of strings, which are then converted back into a map
func (a *AnnotationStickers) Scan(value interface{}) error {
vString, ok := value.(string)
if !ok {
return &errors.Error{
Code: errors.EInternal,
Msg: "could not load stickers from sqlite",
}
}
var stickSlice []string
if err := json.NewDecoder(strings.NewReader(vString)).Decode(&stickSlice); err != nil {
return err
}
stickMap, err := stickerSliceToMap(stickSlice)
if err != nil {
return nil
}
*a = stickMap
return nil
} }
// Validate validates the creation object. // Validate validates the creation object.
@ -254,8 +358,8 @@ type ReadAnnotation struct {
// AnnotationListFilter is a selection filter for listing annotations. // AnnotationListFilter is a selection filter for listing annotations.
type AnnotationListFilter struct { type AnnotationListFilter struct {
StickerIncludes map[string]string `json:"stickerIncludes,omitempty"` // StickerIncludes allows the user to filter annotated events based on it's sticker. StickerIncludes AnnotationStickers `json:"stickerIncludes,omitempty"` // StickerIncludes allows the user to filter annotated events based on it's sticker.
StreamIncludes []string `json:"streamIncludes,omitempty"` // StreamIncludes allows the user to filter annotated events by stream. StreamIncludes []string `json:"streamIncludes,omitempty"` // StreamIncludes allows the user to filter annotated events by stream.
BasicFilter BasicFilter
} }

View File

@ -479,7 +479,7 @@ func TestSetStickerIncludes(t *testing.T) {
type tst struct { type tst struct {
name string name string
input map[string][]string input map[string][]string
expected map[string]string expected AnnotationStickers
} }
tests := []tst{ tests := []tst{
@ -554,3 +554,39 @@ func TestSetStickers(t *testing.T) {
}) })
} }
} }
func TestStickerSliceToMap(t *testing.T) {
t.Parallel()
tests := []struct {
name string
stickers []string
want map[string]string
wantErr error
}{
{
"good stickers",
[]string{"good1=val1", "good2=val2"},
map[string]string{"good1": "val1", "good2": "val2"},
nil,
},
{
"bad stickers",
[]string{"this is an invalid sticker", "shouldbe=likethis"},
nil,
invalidStickerError("this is an invalid sticker"),
},
{
"no stickers",
[]string{},
map[string]string{},
nil,
},
}
for _, tt := range tests {
got, err := stickerSliceToMap(tt.stickers)
require.Equal(t, tt.want, got)
require.Equal(t, tt.wantErr, err)
}
}

578
annotations/service.go Normal file
View File

@ -0,0 +1,578 @@
package annotations
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/snowflake"
"github.com/influxdata/influxdb/v2/sqlite"
"go.uber.org/zap"
)
var (
errAnnotationNotFound = &ierrors.Error{
Code: ierrors.EInvalid,
Msg: "annotation not found",
}
errStreamNotFound = &ierrors.Error{
Code: ierrors.EInvalid,
Msg: "stream not found",
}
)
var _ influxdb.AnnotationService = (*Service)(nil)
type Service struct {
store *sqlite.SqlStore
log *zap.Logger
idGenerator platform.IDGenerator
}
func NewService(logger *zap.Logger, store *sqlite.SqlStore) *Service {
return &Service{
store: store,
log: logger,
idGenerator: snowflake.NewIDGenerator(),
}
}
// CreateAnnotations creates annotations in the database for the provided orgID as defined by the provided list
// Streams corresponding to the StreamTag property of each annotation are created if they don't already exist
// as part of a transaction
func (s *Service) CreateAnnotations(ctx context.Context, orgID platform.ID, creates []influxdb.AnnotationCreate) ([]influxdb.AnnotationEvent, error) {
// Guard clause - an empty list was provided for some reason, immediately return an empty result
// set without doing the transaction
if len(creates) == 0 {
return []influxdb.AnnotationEvent{}, nil
}
s.store.Mu.Lock()
defer s.store.Mu.Unlock()
// store a unique list of stream names first. the invalid ID is a placeholder for the real id,
// which will be obtained separately
streamNamesIDs := make(map[string]platform.ID)
for _, c := range creates {
streamNamesIDs[c.StreamTag] = platform.InvalidID()
}
// streamIDsNames is used for re-populating the resulting list of annotations with the stream names
// from the stream IDs returned from the database
streamIDsNames := make(map[platform.ID]string)
tx, err := s.store.DB.BeginTxx(ctx, nil)
if err != nil {
tx.Rollback()
return nil, err
}
// upsert each stream individually. a possible enhancement might be to do this as a single batched query
// it is unlikely that this would offer much benefit since there is currently no mechanism for creating large numbers
// of annotations simultaneously
now := time.Now()
for name := range streamNamesIDs {
query, args, err := newUpsertStreamQuery(orgID, s.idGenerator.ID(), now, influxdb.Stream{Name: name})
if err != nil {
tx.Rollback()
return nil, err
}
var streamID platform.ID
if err = tx.GetContext(ctx, &streamID, query, args...); err != nil {
tx.Rollback()
return nil, err
}
streamNamesIDs[name] = streamID
streamIDsNames[streamID] = name
}
// bulk insert for the creates. this also is unlikely to offer much performance benefit, but since the query
// is only used here it is easy enough to form to bulk query.
q := sq.Insert("annotations").
Columns("id", "org_id", "stream_id", "summary", "message", "stickers", "duration", "lower", "upper").
Suffix("RETURNING *")
for _, create := range creates {
// double check that we have a valid name for this stream tag - error if we don't. this should never be an error.
streamID, ok := streamNamesIDs[create.StreamTag]
if !ok {
tx.Rollback()
return nil, &ierrors.Error{
Code: ierrors.EInternal,
Msg: fmt.Sprintf("unable to find id for stream %q", create.StreamTag),
}
}
// add the row to the query
newID := s.idGenerator.ID()
lower := create.StartTime.Format(time.RFC3339Nano)
upper := create.EndTime.Format(time.RFC3339Nano)
duration := timesToDuration(*create.StartTime, *create.EndTime)
q = q.Values(newID, orgID, streamID, create.Summary, create.Message, create.Stickers, duration, lower, upper)
}
// get the query string and args list for the bulk insert
query, args, err := q.ToSql()
if err != nil {
tx.Rollback()
return nil, err
}
// run the bulk insert and store the result
var res []*influxdb.StoredAnnotation
if err := tx.SelectContext(ctx, &res, query, args...); err != nil {
tx.Rollback()
return nil, err
}
if err = tx.Commit(); err != nil {
return nil, err
}
// add the stream names to the list of results
for _, a := range res {
a.StreamTag = streamIDsNames[a.StreamID]
}
// convert the StoredAnnotation structs to AnnotationEvent structs before returning
return storedAnnotationsToEvents(res)
}
// ListAnnotations returns a list of annotations from the database matching the filter
// For time range matching, sqlite is able to compare times with millisecond accuracy
func (s *Service) ListAnnotations(ctx context.Context, orgID platform.ID, filter influxdb.AnnotationListFilter) ([]influxdb.StoredAnnotation, error) {
// we need to explicitly format time strings here and elsewhere to ensure they are
// interpreted by the database consistently
sf := filter.StartTime.Format(time.RFC3339Nano)
ef := filter.EndTime.Format(time.RFC3339Nano)
q := sq.Select("annotations.*", "streams.name AS stream").
Distinct().
From("annotations, json_each(annotations.stickers) AS json").
InnerJoin("streams ON annotations.stream_id = streams.id").
Where(sq.Eq{"annotations.org_id": orgID}).
Where(sq.GtOrEq{"lower": sf}).
Where(sq.LtOrEq{"upper": ef})
// Add stream name filters to the query
if len(filter.StreamIncludes) > 0 {
q = q.Where(sq.Eq{"stream": filter.StreamIncludes})
}
// Add sticker filters to the query
for k, v := range filter.StickerIncludes {
q = q.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}})
}
sql, args, err := q.ToSql()
if err != nil {
return nil, err
}
ans := []influxdb.StoredAnnotation{}
if err := s.store.DB.SelectContext(ctx, &ans, sql, args...); err != nil {
return nil, err
}
return ans, nil
}
// GetAnnotation gets a single annotation by ID
func (s *Service) GetAnnotation(ctx context.Context, id platform.ID) (*influxdb.StoredAnnotation, error) {
q := sq.Select("annotations.*, streams.name AS stream").
From("annotations").
InnerJoin("streams ON annotations.stream_id = streams.id").
Where(sq.Eq{"annotations.id": id})
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var a influxdb.StoredAnnotation
if err := s.store.DB.GetContext(ctx, &a, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errAnnotationNotFound
}
return nil, err
}
return &a, nil
}
// DeleteAnnotations deletes multiple annotations according to the provided filter
func (s *Service) DeleteAnnotations(ctx context.Context, orgID platform.ID, delete influxdb.AnnotationDeleteFilter) error {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()
sf := delete.StartTime.Format(time.RFC3339Nano)
ef := delete.EndTime.Format(time.RFC3339Nano)
// This is a subquery that will be as part of a DELETE FROM ... WHERE id IN (subquery)
// A subquery is used because the json_each virtual table can only be used in a SELECT
subQ := sq.Select("annotations.id").
Distinct().
From("annotations, json_each(annotations.stickers) AS json").
InnerJoin("streams ON annotations.stream_id = streams.id").
Where(sq.Eq{"annotations.org_id": orgID}).
Where(sq.GtOrEq{"lower": sf}).
Where(sq.LtOrEq{"upper": ef})
// Add the stream name filter to the subquery (if present)
if len(delete.StreamTag) > 0 {
subQ = subQ.Where(sq.Eq{"streams.name": delete.StreamTag})
}
// Add the stream ID filter to the subquery (if present)
if delete.StreamID.Valid() {
subQ = subQ.Where(sq.Eq{"stream_id": delete.StreamID})
}
// Add any sticker filters to the subquery
for k, v := range delete.Stickers {
subQ = subQ.Where(sq.And{sq.Eq{"json.value": fmt.Sprintf("%s=%s", k, v)}})
}
// Parse the subquery into a string and list of args
subQuery, subArgs, err := subQ.ToSql()
if err != nil {
return err
}
// Convert the subquery into a sq.Sqlizer so that it can be used in the actual DELETE
// operation. This is a bit of a hack since squirrel doesn't have great support for subqueries
// outside of SELECT statements
subExpr := sq.Expr("("+subQuery+")", subArgs...)
q := sq.
Delete("annotations").
Suffix("WHERE annotations.id IN").
SuffixExpr(subExpr)
query, args, err := q.ToSql()
if err != nil {
return err
}
if _, err := s.store.DB.ExecContext(ctx, query, args...); err != nil {
return err
}
return nil
}
// DeleteAnnoation deletes a single annotation by ID
func (s *Service) DeleteAnnotation(ctx context.Context, id platform.ID) error {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()
q := sq.Delete("annotations").
Where(sq.Eq{"id": id}).
Suffix("RETURNING id")
query, args, err := q.ToSql()
if err != nil {
return err
}
var d platform.ID
if err := s.store.DB.GetContext(ctx, &d, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errAnnotationNotFound
}
return err
}
return nil
}
// UpdateAnnotation updates a single annotation by ID
// In a similar fashion as CreateAnnotations, if the StreamTag in the update request does not exist,
// a stream will be created as part of a transaction with the update operation
func (s *Service) UpdateAnnotation(ctx context.Context, id platform.ID, update influxdb.AnnotationCreate) (*influxdb.AnnotationEvent, error) {
// get the full data for this annotation first so we can get its orgID
// this will ensure that the annotation already exists before starting the transaction
ann, err := s.GetAnnotation(ctx, id)
if err != nil {
return nil, err
}
now := time.Now()
// get a write lock on the database before starting the transaction to create/update the stream
// while simultaneously updating the annotation
s.store.Mu.Lock()
defer s.store.Mu.Unlock()
tx, err := s.store.DB.BeginTxx(ctx, nil)
if err != nil {
tx.Rollback()
return nil, err
}
query, args, err := newUpsertStreamQuery(ann.OrgID, s.idGenerator.ID(), now, influxdb.Stream{Name: update.StreamTag})
if err != nil {
tx.Rollback()
return nil, err
}
var streamID platform.ID
if err = tx.GetContext(ctx, &streamID, query, args...); err != nil {
tx.Rollback()
return nil, err
}
q := sq.Update("annotations").
SetMap(sq.Eq{
"stream_id": streamID,
"summary": update.Summary,
"message": update.Message,
"stickers": update.Stickers,
"duration": timesToDuration(*update.StartTime, *update.EndTime),
"lower": update.StartTime.Format(time.RFC3339Nano),
"upper": update.EndTime.Format(time.RFC3339Nano),
}).
Where(sq.Eq{"id": id}).
Suffix("RETURNING *")
query, args, err = q.ToSql()
if err != nil {
return nil, err
}
var st influxdb.StoredAnnotation
err = tx.GetContext(ctx, &st, query, args...)
if err != nil {
tx.Rollback()
return nil, err
}
if err = tx.Commit(); err != nil {
return nil, err
}
// add the stream name to the result. we know that this StreamTag value was updated to the
// stream via the transaction having completed successfully.
st.StreamTag = update.StreamTag
return st.ToEvent()
}
// ListStreams returns a list of streams matching the filter for the provided orgID.
func (s *Service) ListStreams(ctx context.Context, orgID platform.ID, filter influxdb.StreamListFilter) ([]influxdb.StoredStream, error) {
q := sq.Select("id", "org_id", "name", "description", "created_at", "updated_at").
From("streams").
Where(sq.Eq{"org_id": orgID})
// Add stream name filters to the query
if len(filter.StreamIncludes) > 0 {
q = q.Where(sq.Eq{"name": filter.StreamIncludes})
}
sql, args, err := q.ToSql()
if err != nil {
return nil, err
}
sts := []influxdb.StoredStream{}
err = s.store.DB.SelectContext(ctx, &sts, sql, args...)
if err != nil {
return nil, err
}
return sts, nil
}
// GetStream gets a single stream by ID
func (s *Service) GetStream(ctx context.Context, id platform.ID) (*influxdb.StoredStream, error) {
q := sq.Select("id", "org_id", "name", "description", "created_at", "updated_at").
From("streams").
Where(sq.Eq{"id": id})
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var st influxdb.StoredStream
if err := s.store.DB.GetContext(ctx, &st, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errStreamNotFound
}
return nil, err
}
return &st, nil
}
// CreateOrUpdateStream creates a new stream, or updates the description of an existing stream.
// Doesn't support updating a stream desctription to "". For that use the UpdateStream method.
func (s *Service) CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()
newID := s.idGenerator.ID()
now := time.Now()
query, args, err := newUpsertStreamQuery(orgID, newID, now, stream)
if err != nil {
return nil, err
}
var id platform.ID
if err = s.store.DB.GetContext(ctx, &id, query, args...); err != nil {
return nil, err
}
// do a separate query to read the stream back from the database and return it.
// this is necessary because the sqlite driver does not support scanning time values from
// a RETURNING clause back into time.Time
return s.getReadStream(ctx, id)
}
// UpdateStream updates a stream name and/or a description. It is strictly used for updating an existing stream.
func (s *Service) UpdateStream(ctx context.Context, id platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()
q := sq.Update("streams").
SetMap(sq.Eq{
"name": stream.Name,
"description": stream.Description,
"updated_at": sq.Expr(`datetime('now')`),
}).
Where(sq.Eq{"id": id}).
Suffix(`RETURNING id`)
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var newID platform.ID
err = s.store.DB.GetContext(ctx, &newID, query, args...)
if err != nil {
if err == sql.ErrNoRows {
return nil, errStreamNotFound
}
return nil, err
}
// do a separate query to read the stream back from the database and return it.
// this is necessary because the sqlite driver does not support scanning time values from
// a RETURNING clause back into time.Time
return s.getReadStream(ctx, newID)
}
// DeleteStreams is used for deleting multiple streams by name
func (s *Service) DeleteStreams(ctx context.Context, orgID platform.ID, delete influxdb.BasicStream) error {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()
q := sq.Delete("streams").
Where(sq.Eq{"org_id": orgID}).
Where(sq.Eq{"name": delete.Names})
query, args, err := q.ToSql()
if err != nil {
return err
}
_, err = s.store.DB.ExecContext(ctx, query, args...)
if err != nil {
return err
}
return nil
}
// DeleteStreamByID deletes a single stream by ID. Returns an error if the ID could not be found.
func (s *Service) DeleteStreamByID(ctx context.Context, id platform.ID) error {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()
q := sq.Delete("streams").
Where(sq.Eq{"id": id}).
Suffix("RETURNING id")
query, args, err := q.ToSql()
if err != nil {
return err
}
var d platform.ID
if err := s.store.DB.GetContext(ctx, &d, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errStreamNotFound
}
return err
}
return nil
}
func newUpsertStreamQuery(orgID, newID platform.ID, t time.Time, stream influxdb.Stream) (string, []interface{}, error) {
q := sq.Insert("streams").
Columns("id", "org_id", "name", "description", "created_at", "updated_at").
Values(newID, orgID, stream.Name, stream.Description, t, t).
Suffix(`ON CONFLICT(org_id, name) DO UPDATE
SET
updated_at = excluded.updated_at,
description = IIF(length(excluded.description) = 0, description, excluded.description)`).
Suffix("RETURNING id")
return q.ToSql()
}
// getReadStream is a helper which should only be called when the stream has been verified to exist
// via an update or insert.
func (s *Service) getReadStream(ctx context.Context, id platform.ID) (*influxdb.ReadStream, error) {
q := sq.Select("id", "name", "description", "created_at", "updated_at").
From("streams").
Where(sq.Eq{"id": id})
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
r := &influxdb.ReadStream{}
if err := s.store.DB.GetContext(ctx, r, query, args...); err != nil {
return nil, err
}
return r, nil
}
func storedAnnotationsToEvents(stored []*influxdb.StoredAnnotation) ([]influxdb.AnnotationEvent, error) {
events := make([]influxdb.AnnotationEvent, 0, len(stored))
for _, s := range stored {
c, err := s.ToCreate()
if err != nil {
return nil, err
}
events = append(events, influxdb.AnnotationEvent{
ID: s.ID,
AnnotationCreate: *c,
})
}
return events, nil
}
func timesToDuration(l, u time.Time) string {
return fmt.Sprintf("[%s, %s]", l.Format(time.RFC3339Nano), u.Format(time.RFC3339Nano))
}

1002
annotations/service_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,6 @@ package transport
import ( import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"strings"
"time" "time"
"github.com/go-chi/chi" "github.com/go-chi/chi"
@ -252,16 +251,11 @@ func storedAnnotationsToReadAnnotations(s []influxdb.StoredAnnotation) (influxdb
r := influxdb.ReadAnnotations{} r := influxdb.ReadAnnotations{}
for _, val := range s { for _, val := range s {
stickers, err := stickerSliceToMap(val.Stickers)
if err != nil {
return nil, err
}
r[val.StreamTag] = append(r[val.StreamTag], influxdb.ReadAnnotation{ r[val.StreamTag] = append(r[val.StreamTag], influxdb.ReadAnnotation{
ID: val.ID, ID: val.ID,
Summary: val.Summary, Summary: val.Summary,
Message: val.Message, Message: val.Message,
Stickers: stickers, Stickers: val.Stickers,
StartTime: val.Lower, StartTime: val.Lower,
EndTime: val.Upper, EndTime: val.Upper,
}) })
@ -281,34 +275,15 @@ func storedAnnotationToEvent(s *influxdb.StoredAnnotation) (*influxdb.Annotation
return nil, err return nil, err
} }
stickers, err := stickerSliceToMap(s.Stickers)
if err != nil {
return nil, err
}
return &influxdb.AnnotationEvent{ return &influxdb.AnnotationEvent{
ID: s.ID, ID: s.ID,
AnnotationCreate: influxdb.AnnotationCreate{ AnnotationCreate: influxdb.AnnotationCreate{
StreamTag: s.StreamTag, StreamTag: s.StreamTag,
Summary: s.Summary, Summary: s.Summary,
Message: s.Message, Message: s.Message,
Stickers: stickers, Stickers: s.Stickers,
EndTime: et, EndTime: et,
StartTime: st, StartTime: st,
}, },
}, nil }, nil
} }
func stickerSliceToMap(stickers []string) (map[string]string, error) {
stickerMap := map[string]string{}
for i := range stickers {
sticks := strings.SplitN(stickers[i], "=", 2)
if len(sticks) < 2 {
return nil, invalidStickerError(stickers[i])
}
stickerMap[sticks[0]] = sticks[1]
}
return stickerMap, nil
}

View File

@ -42,7 +42,7 @@ var (
StreamTag: "sometag", StreamTag: "sometag",
Summary: "testing the api", Summary: "testing the api",
Message: "stored annotation message", Message: "stored annotation message",
Stickers: []string{"val1=sticker1", "val2=sticker2"}, Stickers: map[string]string{"val1": "sticker1", "val2": "sticker2"},
Lower: now.Format(time.RFC3339), Lower: now.Format(time.RFC3339),
Upper: now.Format(time.RFC3339), Upper: now.Format(time.RFC3339),
} }
@ -265,39 +265,3 @@ func TestStoredAnnotationToEvent(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, got, &testEvent) require.Equal(t, got, &testEvent)
} }
func TestStickerSliceToMap(t *testing.T) {
t.Parallel()
tests := []struct {
name string
stickers []string
want map[string]string
wantErr error
}{
{
"good stickers",
[]string{"good1=val1", "good2=val2"},
map[string]string{"good1": "val1", "good2": "val2"},
nil,
},
{
"bad stickers",
[]string{"this is an invalid sticker", "shouldbe=likethis"},
nil,
invalidStickerError("this is an invalid sticker"),
},
{
"no stickers",
[]string{},
map[string]string{},
nil,
},
}
for _, tt := range tests {
got, err := stickerSliceToMap(tt.stickers)
require.Equal(t, tt.want, got)
require.Equal(t, tt.wantErr, err)
}
}

View File

@ -1,7 +1,6 @@
package transport package transport
import ( import (
"fmt"
"net/http" "net/http"
"time" "time"
@ -41,13 +40,6 @@ var (
} }
) )
func invalidStickerError(s string) error {
return &errors.Error{
Code: errors.EInternal,
Msg: fmt.Sprintf("invalid sticker: %q", s),
}
}
// AnnotationsHandler is the handler for the annotation service // AnnotationsHandler is the handler for the annotation service
type AnnotationHandler struct { type AnnotationHandler struct {
chi.Router chi.Router

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.16
require ( require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/Masterminds/squirrel v1.5.0
github.com/NYTimes/gziphandler v1.0.1 github.com/NYTimes/gziphandler v1.0.1
github.com/RoaringBitmap/roaring v0.4.16 github.com/RoaringBitmap/roaring v0.4.16
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883

6
go.sum
View File

@ -61,6 +61,8 @@ github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITg
github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
github.com/Masterminds/sprig v2.16.0+incompatible h1:QZbMUPxRQ50EKAq3LFMnxddMu88/EUUG3qmxwtDmPsY= github.com/Masterminds/sprig v2.16.0+incompatible h1:QZbMUPxRQ50EKAq3LFMnxddMu88/EUUG3qmxwtDmPsY=
github.com/Masterminds/sprig v2.16.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= github.com/Masterminds/sprig v2.16.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o=
github.com/Masterminds/squirrel v1.5.0 h1:JukIZisrUXadA9pl3rMkjhiamxiB0cXiu+HGp/Y8cY8=
github.com/Masterminds/squirrel v1.5.0/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
github.com/Microsoft/go-winio v0.4.11 h1:zoIOcVf0xPN1tnMVbTtEdI+P8OofVk3NObnwOQ6nK2Q= github.com/Microsoft/go-winio v0.4.11 h1:zoIOcVf0xPN1tnMVbTtEdI+P8OofVk3NObnwOQ6nK2Q=
github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0= github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0=
@ -384,6 +386,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=

View File

@ -0,0 +1,34 @@
-- The user_version should match the "000X" from the file name
-- Ex: 0001_create_notebooks_table should have a user_verison of 1
PRAGMA user_version=2;
-- Create the initial table to store streams
CREATE TABLE streams (
id VARCHAR(16) PRIMARY KEY,
org_id VARCHAR(16) NOT NULL,
name TEXT NOT NULL,
description TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
CONSTRAINT streams_uniq_orgid_name UNIQUE (org_id, name)
);
-- Create the initial table to store annotations
CREATE TABLE annotations (
id VARCHAR(16) PRIMARY KEY,
org_id VARCHAR(16) NOT NULL,
stream_id VARCHAR(16) NOT NULL,
summary TEXT NOT NULL,
message TEXT NOT NULL,
stickers TEXT NOT NULL,
duration TEXT NOT NULL,
lower TIMESTAMP NOT NULL,
upper TIMESTAMP NOT NULL,
FOREIGN KEY (stream_id) REFERENCES streams(id) ON DELETE CASCADE
);
-- Create indexes for stream_id and stickers to support fast queries
CREATE INDEX idx_annotations_stream ON annotations (stream_id);
CREATE INDEX idx_annotations_stickers ON annotations (stickers);