2018-01-09 23:15:12 +00:00
package influx
import (
2018-01-19 03:50:46 +00:00
"bytes"
2018-01-09 23:15:12 +00:00
"context"
"encoding/json"
"fmt"
2018-01-19 03:50:46 +00:00
"time"
"github.com/influxdata/chronograf/id"
2018-01-09 23:15:12 +00:00
"github.com/influxdata/chronograf"
)
const (
// AllAnnotations returns all annotations from the chronograf database
2018-02-02 20:49:18 +00:00
AllAnnotations = ` SELECT "start_time", "modified_time_ns", "text", "type", "id" FROM "chronograf"."autogen"."annotations" WHERE "deleted"=false AND time >= %dns and "start_time" <= %d ORDER BY time DESC `
2018-01-09 23:15:12 +00:00
// GetAnnotationID returns all annotations from the chronograf database where id is %s
2018-01-25 22:36:28 +00:00
GetAnnotationID = ` SELECT "start_time", "modified_time_ns", "text", "type", "id" FROM "chronograf"."autogen"."annotations" WHERE "id"='%s' AND "deleted"=false ORDER BY time DESC `
2018-01-12 02:49:10 +00:00
// DefaultDB is chronograf. Perhaps later we allow this to be changed
DefaultDB = "chronograf"
// DefaultRP is autogen. Perhaps later we allow this to be changed
DefaultRP = "autogen"
// DefaultMeasurement is annotations.
DefaultMeasurement = "annotations"
2018-01-09 23:15:12 +00:00
)
2018-01-12 23:17:14 +00:00
var _ chronograf . AnnotationStore = & AnnotationStore { }
2018-01-09 23:15:12 +00:00
// AnnotationStore stores annotations within InfluxDB
type AnnotationStore struct {
2018-01-12 23:17:14 +00:00
client chronograf . TimeSeries
2018-01-19 03:50:46 +00:00
id chronograf . ID
2018-02-17 18:30:55 +00:00
now Now
2018-01-12 23:17:14 +00:00
}
// NewAnnotationStore constructs an annoation store with a client
func NewAnnotationStore ( client chronograf . TimeSeries ) * AnnotationStore {
return & AnnotationStore {
client : client ,
2018-01-19 03:50:46 +00:00
id : & id . UUID { } ,
2018-02-17 18:30:55 +00:00
now : time . Now ,
2018-01-12 23:17:14 +00:00
}
2018-01-09 23:15:12 +00:00
}
// All lists all Annotations
2018-01-19 03:50:46 +00:00
func ( a * AnnotationStore ) All ( ctx context . Context , start , stop time . Time ) ( [ ] chronograf . Annotation , error ) {
return a . queryAnnotations ( ctx , fmt . Sprintf ( AllAnnotations , start . UnixNano ( ) , stop . UnixNano ( ) ) )
2018-01-09 23:15:12 +00:00
}
// Get retrieves an annotation
2018-01-19 03:50:46 +00:00
func ( a * AnnotationStore ) Get ( ctx context . Context , id string ) ( * chronograf . Annotation , error ) {
annos , err := a . queryAnnotations ( ctx , fmt . Sprintf ( GetAnnotationID , id ) )
2018-01-09 23:15:12 +00:00
if err != nil {
2018-01-19 03:50:46 +00:00
return nil , err
2018-01-09 23:15:12 +00:00
}
if len ( annos ) == 0 {
2018-01-19 03:50:46 +00:00
return nil , chronograf . ErrAnnotationNotFound
2018-01-09 23:15:12 +00:00
}
2018-01-19 03:50:46 +00:00
return & annos [ 0 ] , nil
2018-01-09 23:15:12 +00:00
}
// Add creates a new annotation in the store
2018-01-19 03:50:46 +00:00
func ( a * AnnotationStore ) Add ( ctx context . Context , anno * chronograf . Annotation ) ( * chronograf . Annotation , error ) {
var err error
anno . ID , err = a . id . Generate ( )
if err != nil {
return nil , err
}
2018-02-17 18:30:55 +00:00
return anno , a . client . Write ( ctx , toPoint ( anno , a . now ( ) ) )
2018-01-09 23:15:12 +00:00
}
// Delete removes the annotation from the store
2018-01-19 03:50:46 +00:00
func ( a * AnnotationStore ) Delete ( ctx context . Context , id string ) error {
cur , err := a . Get ( ctx , id )
if err != nil {
return err
}
2018-02-17 18:30:55 +00:00
return a . client . Write ( ctx , toDeletedPoint ( cur , a . now ( ) ) )
2018-01-19 03:50:46 +00:00
}
// Update replaces annotation; if the annotation's time is different, it
// also removes the previous annotation
func ( a * AnnotationStore ) Update ( ctx context . Context , anno * chronograf . Annotation ) error {
cur , err := a . Get ( ctx , anno . ID )
if err != nil {
return err
}
2018-02-17 18:30:55 +00:00
if err := a . client . Write ( ctx , toPoint ( anno , a . now ( ) ) ) ; err != nil {
2018-01-19 03:50:46 +00:00
return err
}
// If the updated annotation has a different time, then, we must
// delete the previous annotation
2018-01-25 23:09:47 +00:00
if ! cur . EndTime . Equal ( anno . EndTime ) {
2018-02-17 18:30:55 +00:00
return a . client . Write ( ctx , toDeletedPoint ( cur , a . now ( ) ) )
2018-01-19 03:50:46 +00:00
}
return nil
}
// queryAnnotations queries the chronograf db and produces all annotations
func ( a * AnnotationStore ) queryAnnotations ( ctx context . Context , query string ) ( [ ] chronograf . Annotation , error ) {
res , err := a . client . Query ( ctx , chronograf . Query {
Command : query ,
DB : DefaultDB ,
Epoch : "ns" ,
} )
if err != nil {
return nil , err
}
octets , err := res . MarshalJSON ( )
if err != nil {
return nil , err
}
results := influxResults { }
d := json . NewDecoder ( bytes . NewReader ( octets ) )
d . UseNumber ( )
if err := d . Decode ( & results ) ; err != nil {
return nil , err
}
return results . Annotations ( )
}
2018-02-17 18:30:55 +00:00
func toPoint ( anno * chronograf . Annotation , now time . Time ) * chronograf . Point {
2018-01-19 03:50:46 +00:00
return & chronograf . Point {
2018-01-12 23:17:14 +00:00
Database : DefaultDB ,
RetentionPolicy : DefaultRP ,
2018-01-19 03:50:46 +00:00
Measurement : DefaultMeasurement ,
2018-01-25 22:36:28 +00:00
Time : anno . EndTime . UnixNano ( ) ,
2018-01-19 03:50:46 +00:00
Tags : map [ string ] string {
"id" : anno . ID ,
} ,
Fields : map [ string ] interface { } {
"deleted" : false ,
2018-01-25 22:36:28 +00:00
"start_time" : anno . StartTime . UnixNano ( ) ,
2018-02-17 18:30:55 +00:00
"modified_time_ns" : int64 ( now . UnixNano ( ) ) ,
2018-01-19 03:50:46 +00:00
"text" : anno . Text ,
"type" : anno . Type ,
} ,
}
2018-01-09 23:15:12 +00:00
}
2018-02-17 18:30:55 +00:00
func toDeletedPoint ( anno * chronograf . Annotation , now time . Time ) * chronograf . Point {
2018-01-19 03:50:46 +00:00
return & chronograf . Point {
Database : DefaultDB ,
RetentionPolicy : DefaultRP ,
Measurement : DefaultMeasurement ,
2018-01-25 22:36:28 +00:00
Time : anno . EndTime . UnixNano ( ) ,
2018-01-19 03:50:46 +00:00
Tags : map [ string ] string {
"id" : anno . ID ,
} ,
Fields : map [ string ] interface { } {
"deleted" : true ,
2018-01-25 22:36:28 +00:00
"start_time" : int64 ( 0 ) ,
2018-02-17 18:30:55 +00:00
"modified_time_ns" : int64 ( now . UnixNano ( ) ) ,
2018-01-19 03:50:46 +00:00
"text" : "" ,
"type" : "" ,
} ,
}
2018-01-09 23:15:12 +00:00
}
type value [ ] interface { }
func ( v value ) Int64 ( idx int ) ( int64 , error ) {
if idx >= len ( v ) {
return 0 , fmt . Errorf ( "index %d does not exist in values" , idx )
}
2018-01-19 03:50:46 +00:00
n , ok := v [ idx ] . ( json . Number )
2018-01-09 23:15:12 +00:00
if ! ok {
return 0 , fmt . Errorf ( "value at index %d is not int64, but, %T" , idx , v [ idx ] )
}
2018-01-19 03:50:46 +00:00
return n . Int64 ( )
}
func ( v value ) Time ( idx int ) ( time . Time , error ) {
tm , err := v . Int64 ( idx )
if err != nil {
return time . Time { } , err
}
return time . Unix ( 0 , tm ) , nil
}
2018-01-09 23:15:12 +00:00
func ( v value ) String ( idx int ) ( string , error ) {
if idx >= len ( v ) {
return "" , fmt . Errorf ( "index %d does not exist in values" , idx )
}
str , ok := v [ idx ] . ( string )
if ! ok {
return "" , fmt . Errorf ( "value at index %d is not string, but, %T" , idx , v [ idx ] )
}
return str , nil
}
2018-01-19 03:50:46 +00:00
type influxResults [ ] struct {
2018-01-09 23:15:12 +00:00
Series [ ] struct {
Values [ ] value ` json:"values" `
} ` json:"series" `
}
2018-01-19 03:50:46 +00:00
// annotationResult is an intermediate struct to track the latest modified
// time of an annotation
type annotationResult struct {
chronograf . Annotation
// modTime is bookkeeping to handle the case when an update fails; the latest
// modTime will be the record returned
modTime int64
}
// Annotations converts AllAnnotations query to annotations
func ( r * influxResults ) Annotations ( ) ( res [ ] chronograf . Annotation , err error ) {
annos := map [ string ] annotationResult { }
2018-01-09 23:15:12 +00:00
for _ , u := range * r {
for _ , s := range u . Series {
for _ , v := range s . Values {
2018-01-19 03:50:46 +00:00
anno := annotationResult { }
2018-01-25 22:36:28 +00:00
if anno . EndTime , err = v . Time ( 0 ) ; err != nil {
2018-01-19 03:50:46 +00:00
return
}
2018-01-25 22:36:28 +00:00
if anno . StartTime , err = v . Time ( 1 ) ; err != nil {
2018-01-09 23:15:12 +00:00
return
}
2018-01-19 03:50:46 +00:00
if anno . modTime , err = v . Int64 ( 2 ) ; err != nil {
2018-01-09 23:15:12 +00:00
return
}
2018-01-19 03:50:46 +00:00
if anno . Text , err = v . String ( 3 ) ; err != nil {
2018-01-09 23:15:12 +00:00
return
}
2018-01-19 03:50:46 +00:00
if anno . Type , err = v . String ( 4 ) ; err != nil {
2018-01-09 23:15:12 +00:00
return
}
2018-01-19 03:50:46 +00:00
if anno . ID , err = v . String ( 5 ) ; err != nil {
2018-01-09 23:15:12 +00:00
return
}
2018-01-19 03:50:46 +00:00
// If there are two annotations with the same id, take
// the annotation with the latest modification time
// This is to prevent issues when an update or delete fails.
2018-02-17 18:30:55 +00:00
// Updates and deletes are multiple step queries.
2018-01-19 03:50:46 +00:00
prev , ok := annos [ anno . ID ]
if ! ok || anno . modTime > prev . modTime {
annos [ anno . ID ] = anno
}
2018-01-09 23:15:12 +00:00
}
}
}
2018-01-19 03:50:46 +00:00
res = [ ] chronograf . Annotation { }
for _ , a := range annos {
res = append ( res , a . Annotation )
2018-01-09 23:15:12 +00:00
}
2018-01-19 03:50:46 +00:00
return res , err
2018-01-09 23:15:12 +00:00
}