influxdb/annotation.go

484 lines
17 KiB
Go

package influxdb
import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"regexp"
"strings"
"time"
"unicode/utf8"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
)
var (
errEmptySummary = &errors.Error{
Code: errors.EInvalid,
Msg: "summary cannot be empty",
}
errSummaryTooLong = &errors.Error{
Code: errors.EInvalid,
Msg: "summary must be less than 255 characters",
}
errStreamTagTooLong = &errors.Error{
Code: errors.EInvalid,
Msg: "stream tag must be less than 255 characters",
}
errStreamNameTooLong = &errors.Error{
Code: errors.EInvalid,
Msg: "stream name must be less than 255 characters",
}
errStreamDescTooLong = &errors.Error{
Code: errors.EInvalid,
Msg: "stream description must be less than 1024 characters",
}
errStickerTooLong = &errors.Error{
Code: errors.EInvalid,
Msg: "stickers must be less than 255 characters",
}
errMsgTooLong = &errors.Error{
Code: errors.EInvalid,
Msg: "message must be less than 4096 characters",
}
errReversedTimes = &errors.Error{
Code: errors.EInvalid,
Msg: "start time must come before end time",
}
errMissingStreamName = &errors.Error{
Code: errors.EInvalid,
Msg: "stream name must be set",
}
errMissingStreamTagOrId = &errors.Error{
Code: errors.EInvalid,
Msg: "stream tag or id must be set",
}
errMissingEndTime = &errors.Error{
Code: errors.EInvalid,
Msg: "end time must be set",
}
errMissingStartTime = &errors.Error{
Code: errors.EInvalid,
Msg: "start time must be set",
}
)
func invalidStickerError(s string) error {
return &errors.Error{
Code: errors.EInternal,
Msg: fmt.Sprintf("invalid sticker: %q", s),
}
}
func stickerSliceToMap(stickers []string) (map[string]string, error) {
stickerMap := map[string]string{}
for i := range stickers {
if stick0, stick1, found := strings.Cut(stickers[i], "="); found {
stickerMap[stick0] = stick1
} else {
return nil, invalidStickerError(stickers[i])
}
}
return stickerMap, nil
}
// AnnotationService is the service contract for Annotations
type AnnotationService interface {
// CreateAnnotations creates annotations.
CreateAnnotations(ctx context.Context, orgID platform.ID, create []AnnotationCreate) ([]AnnotationEvent, error)
// ListAnnotations lists all annotations matching the filter.
ListAnnotations(ctx context.Context, orgID platform.ID, filter AnnotationListFilter) ([]StoredAnnotation, error)
// GetAnnotation gets an annotation by id.
GetAnnotation(ctx context.Context, id platform.ID) (*StoredAnnotation, error)
// DeleteAnnotations deletes annotations matching the filter.
DeleteAnnotations(ctx context.Context, orgID platform.ID, delete AnnotationDeleteFilter) error
// DeleteAnnotation deletes an annotation by id.
DeleteAnnotation(ctx context.Context, id platform.ID) error
// UpdateAnnotation updates an annotation.
UpdateAnnotation(ctx context.Context, id platform.ID, update AnnotationCreate) (*AnnotationEvent, error)
// ListStreams lists all streams matching the filter.
ListStreams(ctx context.Context, orgID platform.ID, filter StreamListFilter) ([]StoredStream, error)
// CreateOrUpdateStream creates or updates the matching stream by name.
CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream Stream) (*ReadStream, error)
// GetStream gets a stream by id. Currently this is only used for authorization, and there are no
// API routes for getting a single stream by ID.
GetStream(ctx context.Context, id platform.ID) (*StoredStream, error)
// UpdateStream updates the stream by the ID.
UpdateStream(ctx context.Context, id platform.ID, stream Stream) (*ReadStream, error)
// DeleteStreams deletes one or more streams by name.
DeleteStreams(ctx context.Context, orgID platform.ID, delete BasicStream) error
// DeleteStreamByID deletes the stream metadata by id.
DeleteStreamByID(ctx context.Context, id platform.ID) error
}
// AnnotationEvent contains fields for annotating an event.
type AnnotationEvent struct {
ID platform.ID `json:"id,omitempty"` // ID is the annotation ID.
AnnotationCreate // AnnotationCreate defines the common input/output bits of an annotation.
}
// AnnotationCreate contains user providable fields for annotating an event.
type AnnotationCreate struct {
StreamTag string `json:"stream,omitempty"` // StreamTag provides a means to logically group a set of annotated events.
Summary string `json:"summary"` // Summary is the only field required to annotate an event.
Message string `json:"message,omitempty"` // Message provides more details about the event being annotated.
Stickers AnnotationStickers `json:"stickers,omitempty"` // Stickers are like tags, but named something obscure to differentiate them from influx tags. They are there to differentiate an annotated event.
EndTime *time.Time `json:"endTime,omitempty"` // EndTime is the time of the event being annotated. Defaults to now if not set.
StartTime *time.Time `json:"startTime,omitempty"` // StartTime is the start time of the event being annotated. Defaults to EndTime if not set.
}
// StoredAnnotation represents annotation data to be stored in the database.
type StoredAnnotation struct {
ID platform.ID `db:"id"` // ID is the annotation's id.
OrgID platform.ID `db:"org_id"` // OrgID is the annotations's owning organization.
StreamID platform.ID `db:"stream_id"` // StreamID is the id of a stream.
StreamTag string `db:"stream"` // StreamTag is the name of a stream (when selecting with join of streams).
Summary string `db:"summary"` // Summary is the summary of the annotated event.
Message string `db:"message"` // Message is a longer description of the annotated event.
Stickers AnnotationStickers `db:"stickers"` // Stickers are additional labels to group annotations by.
Duration string `db:"duration"` // Duration is the time range (with zone) of an annotated event.
Lower string `db:"lower"` // Lower is the time an annotated event begins.
Upper string `db:"upper"` // Upper is the time an annotated event ends.
}
// ToCreate is a utility method for converting a StoredAnnotation to an AnnotationCreate type
func (s StoredAnnotation) ToCreate() (*AnnotationCreate, error) {
et, err := time.Parse(time.RFC3339Nano, s.Upper)
if err != nil {
return nil, err
}
st, err := time.Parse(time.RFC3339Nano, s.Lower)
if err != nil {
return nil, err
}
return &AnnotationCreate{
StreamTag: s.StreamTag,
Summary: s.Summary,
Message: s.Message,
Stickers: s.Stickers,
EndTime: &et,
StartTime: &st,
}, nil
}
// ToEvent is a utility method for converting a StoredAnnotation to an AnnotationEvent type
func (s StoredAnnotation) ToEvent() (*AnnotationEvent, error) {
c, err := s.ToCreate()
if err != nil {
return nil, err
}
return &AnnotationEvent{
ID: s.ID,
AnnotationCreate: *c,
}, nil
}
type AnnotationStickers map[string]string
// Value implements the database/sql Valuer interface for adding AnnotationStickers to the database
// Stickers are stored in the database as a slice of strings like "[key=val]"
// They are encoded into a JSON string for storing into the database, and the JSON sqlite extension is
// able to manipulate them like an object.
func (a AnnotationStickers) Value() (driver.Value, error) {
stickSlice := make([]string, 0, len(a))
for k, v := range a {
stickSlice = append(stickSlice, fmt.Sprintf("%s=%s", k, v))
}
sticks, err := json.Marshal(stickSlice)
if err != nil {
return nil, err
}
return string(sticks), nil
}
// Scan implements the database/sql Scanner interface for retrieving AnnotationStickers from the database
// The string is decoded into a slice of strings, which are then converted back into a map
func (a *AnnotationStickers) Scan(value interface{}) error {
vString, ok := value.(string)
if !ok {
return &errors.Error{
Code: errors.EInternal,
Msg: "could not load stickers from sqlite",
}
}
var stickSlice []string
if err := json.NewDecoder(strings.NewReader(vString)).Decode(&stickSlice); err != nil {
return err
}
stickMap, err := stickerSliceToMap(stickSlice)
if err != nil {
return nil
}
*a = stickMap
return nil
}
// Validate validates the creation object.
func (a *AnnotationCreate) Validate(nowFunc func() time.Time) error {
switch s := utf8.RuneCountInString(a.Summary); {
case s <= 0:
return errEmptySummary
case s > 255:
return errSummaryTooLong
}
switch t := utf8.RuneCountInString(a.StreamTag); {
case t == 0:
a.StreamTag = "default"
case t > 255:
return errStreamTagTooLong
}
if utf8.RuneCountInString(a.Message) > 4096 {
return errMsgTooLong
}
for k, v := range a.Stickers {
if utf8.RuneCountInString(k) > 255 || utf8.RuneCountInString(v) > 255 {
return errStickerTooLong
}
}
now := nowFunc()
if a.EndTime == nil {
a.EndTime = &now
}
if a.StartTime == nil {
a.StartTime = a.EndTime
}
if a.EndTime.Before(*(a.StartTime)) {
return errReversedTimes
}
return nil
}
// AnnotationDeleteFilter contains fields for deleting an annotated event.
type AnnotationDeleteFilter struct {
StreamTag string `json:"stream,omitempty"` // StreamTag provides a means to logically group a set of annotated events.
StreamID platform.ID `json:"streamID,omitempty"` // StreamID provides a means to logically group a set of annotated events.
Stickers map[string]string `json:"stickers,omitempty"` // Stickers are like tags, but named something obscure to differentiate them from influx tags. They are there to differentiate an annotated event.
EndTime *time.Time `json:"endTime,omitempty"` // EndTime is the time of the event being annotated. Defaults to now if not set.
StartTime *time.Time `json:"startTime,omitempty"` // StartTime is the start time of the event being annotated. Defaults to EndTime if not set.
}
// Validate validates the deletion object.
func (a *AnnotationDeleteFilter) Validate() error {
var errs []string
if len(a.StreamTag) == 0 && !a.StreamID.Valid() {
errs = append(errs, errMissingStreamTagOrId.Error())
}
if a.EndTime == nil {
errs = append(errs, errMissingEndTime.Error())
}
if a.StartTime == nil {
errs = append(errs, errMissingStartTime.Error())
}
if len(errs) > 0 {
return &errors.Error{
Code: errors.EInvalid,
Msg: strings.Join(errs, "; "),
}
}
if a.EndTime.Before(*(a.StartTime)) {
return errReversedTimes
}
return nil
}
var dre = regexp.MustCompile(`stickers\[(.*)\]`)
// SetStickers sets the stickers from the query parameters.
func (a *AnnotationDeleteFilter) SetStickers(vals map[string][]string) {
if a.Stickers == nil {
a.Stickers = map[string]string{}
}
for k, v := range vals {
if ss := dre.FindStringSubmatch(k); len(ss) == 2 && len(v) > 0 {
a.Stickers[ss[1]] = v[0]
}
}
}
// AnnotationList defines the structure of the response when listing annotations.
type AnnotationList struct {
StreamTag string `json:"stream"`
Annotations []ReadAnnotation `json:"annotations"`
}
// ReadAnnotations allows annotations to be assigned to a stream.
type ReadAnnotations map[string][]ReadAnnotation
// MarshalJSON allows us to marshal the annotations belonging to a stream properly.
func (s ReadAnnotations) MarshalJSON() ([]byte, error) {
annotationList := []AnnotationList{}
for k, v := range s {
annotationList = append(annotationList, AnnotationList{
StreamTag: k,
Annotations: v,
})
}
return json.Marshal(annotationList)
}
// ReadAnnotation defines the simplest form of an annotation to be returned. Essentially, it's AnnotationEvent without stream info.
type ReadAnnotation struct {
ID platform.ID `json:"id"` // ID is the annotation's generated id.
Summary string `json:"summary"` // Summary is the only field required to annotate an event.
Message string `json:"message,omitempty"` // Message provides more details about the event being annotated.
Stickers map[string]string `json:"stickers,omitempty"` // Stickers are like tags, but named something obscure to differentiate them from influx tags. They are there to differentiate an annotated event.
EndTime string `json:"endTime"` // EndTime is the time of the event being annotated.
StartTime string `json:"startTime,omitempty"` // StartTime is the start time of the event being annotated.
}
// AnnotationListFilter is a selection filter for listing annotations.
type AnnotationListFilter struct {
StickerIncludes AnnotationStickers `json:"stickerIncludes,omitempty"` // StickerIncludes allows the user to filter annotated events based on it's sticker.
StreamIncludes []string `json:"streamIncludes,omitempty"` // StreamIncludes allows the user to filter annotated events by stream.
BasicFilter
}
// Validate validates the filter.
func (f *AnnotationListFilter) Validate(nowFunc func() time.Time) error {
return f.BasicFilter.Validate(nowFunc)
}
var re = regexp.MustCompile(`stickerIncludes\[(.*)\]`)
// SetStickerIncludes sets the stickerIncludes from the query parameters.
func (f *AnnotationListFilter) SetStickerIncludes(vals map[string][]string) {
if f.StickerIncludes == nil {
f.StickerIncludes = map[string]string{}
}
for k, v := range vals {
if ss := re.FindStringSubmatch(k); len(ss) == 2 && len(v) > 0 {
f.StickerIncludes[ss[1]] = v[0]
}
}
}
// StreamListFilter is a selection filter for listing streams. Streams are not considered first class resources, but depend on an annotation using them.
type StreamListFilter struct {
StreamIncludes []string `json:"streamIncludes,omitempty"` // StreamIncludes allows the user to filter streams returned.
BasicFilter
}
// Validate validates the filter.
func (f *StreamListFilter) Validate(nowFunc func() time.Time) error {
return f.BasicFilter.Validate(nowFunc)
}
// Stream defines the stream metadata. Used in create and update requests/responses. Delete requests will only require stream name.
type Stream struct {
Name string `json:"stream"` // Name is the name of a stream.
Description string `json:"description,omitempty"` // Description is more information about a stream.
}
// ReadStream defines the returned stream.
type ReadStream struct {
ID platform.ID `json:"id" db:"id"` // ID is the id of a stream.
Name string `json:"stream" db:"name"` // Name is the name of a stream.
Description string `json:"description,omitempty" db:"description"` // Description is more information about a stream.
CreatedAt time.Time `json:"createdAt" db:"created_at"` // CreatedAt is a timestamp.
UpdatedAt time.Time `json:"updatedAt" db:"updated_at"` // UpdatedAt is a timestamp.
}
// IsValid validates the stream.
func (s *Stream) Validate(strict bool) error {
switch nameChars := utf8.RuneCountInString(s.Name); {
case nameChars <= 0:
if strict {
return errMissingStreamName
}
s.Name = "default"
case nameChars > 255:
return errStreamNameTooLong
}
if utf8.RuneCountInString(s.Description) > 1024 {
return errStreamDescTooLong
}
return nil
}
// StoredStream represents stream data to be stored in the metadata database.
type StoredStream struct {
ID platform.ID `db:"id"` // ID is the stream's id.
OrgID platform.ID `db:"org_id"` // OrgID is the stream's owning organization.
Name string `db:"name"` // Name is the name of a stream.
Description string `db:"description"` // Description is more information about a stream.
CreatedAt time.Time `db:"created_at"` // CreatedAt is a timestamp.
UpdatedAt time.Time `db:"updated_at"` // UpdatedAt is a timestamp.
}
// BasicStream defines a stream by name. Used for stream deletes.
type BasicStream struct {
Names []string `json:"stream"`
}
// IsValid validates the stream is not empty.
func (s BasicStream) IsValid() bool {
if len(s.Names) <= 0 {
return false
}
for i := range s.Names {
if len(s.Names[i]) <= 0 {
return false
}
}
return true
}
// BasicFilter defines common filter options.
type BasicFilter struct {
StartTime *time.Time `json:"startTime,omitempty"` // StartTime is the time the event being annotated started.
EndTime *time.Time `json:"endTime,omitempty"` // EndTime is the time the event being annotated ended.
}
// Validate validates the basic filter options, setting sane defaults where appropriate.
func (f *BasicFilter) Validate(nowFunc func() time.Time) error {
now := nowFunc().UTC().Truncate(time.Second)
if f.EndTime == nil || f.EndTime.IsZero() {
f.EndTime = &now
}
if f.StartTime == nil {
f.StartTime = &time.Time{}
}
if f.EndTime.Before(*(f.StartTime)) {
return errReversedTimes
}
return nil
}