Update timeseries interface to write multiple points

pull/2846/head
Chris Goller 2018-02-27 13:27:17 -06:00
parent a8b52523e9
commit c1043b1938
8 changed files with 49 additions and 34 deletions

View File

@ -115,8 +115,8 @@ type TimeSeries interface {
Connect(context.Context, *Source) error
// Query retrieves time series data from the database.
Query(context.Context, Query) (Response, error)
// Write records a point into a series
Write(context.Context, *Point) error
// Write records points into a series
Write(context.Context, []Point) error
// UsersStore represents the user accounts within the TimeSeries database
Users(context.Context) UsersStore
// Permissions returns all valid names permissions in this database

View File

@ -144,12 +144,12 @@ func (c *Client) Query(ctx context.Context, q chronograf.Query) (chronograf.Resp
return c.nextDataNode().Query(ctx, q)
}
// Write records a point into a time series
func (c *Client) Write(ctx context.Context, point *chronograf.Point) error {
// Write records points into a time series
func (c *Client) Write(ctx context.Context, points []chronograf.Point) error {
if !c.opened {
return chronograf.ErrUninitialized
}
return c.nextDataNode().Write(ctx, point)
return c.nextDataNode().Write(ctx, points)
}
// Users is the interface to the users within Influx Enterprise

View File

@ -118,7 +118,7 @@ func (ts *TimeSeries) Connect(ctx context.Context, src *chronograf.Source) error
return nil
}
func (ts *TimeSeries) Write(ctx context.Context, point *chronograf.Point) error {
func (ts *TimeSeries) Write(ctx context.Context, points []chronograf.Point) error {
return nil
}

View File

@ -68,7 +68,9 @@ func (a *AnnotationStore) Add(ctx context.Context, anno *chronograf.Annotation)
if err != nil {
return nil, err
}
return anno, a.client.Write(ctx, toPoint(anno, a.now()))
return anno, a.client.Write(ctx, []chronograf.Point{
toPoint(anno, a.now()),
})
}
// Delete removes the annotation from the store
@ -77,7 +79,9 @@ func (a *AnnotationStore) Delete(ctx context.Context, id string) error {
if err != nil {
return err
}
return a.client.Write(ctx, toDeletedPoint(cur, a.now()))
return a.client.Write(ctx, []chronograf.Point{
toDeletedPoint(cur, a.now()),
})
}
// Update replaces annotation; if the annotation's time is different, it
@ -88,14 +92,16 @@ func (a *AnnotationStore) Update(ctx context.Context, anno *chronograf.Annotatio
return err
}
if err := a.client.Write(ctx, toPoint(anno, a.now())); err != nil {
if err := a.client.Write(ctx, []chronograf.Point{toPoint(anno, a.now())}); err != nil {
return err
}
// If the updated annotation has a different time, then, we must
// delete the previous annotation
if !cur.EndTime.Equal(anno.EndTime) {
return a.client.Write(ctx, toDeletedPoint(cur, a.now()))
return a.client.Write(ctx, []chronograf.Point{
toDeletedPoint(cur, a.now()),
})
}
return nil
}
@ -124,8 +130,8 @@ func (a *AnnotationStore) queryAnnotations(ctx context.Context, query string) ([
return results.Annotations()
}
func toPoint(anno *chronograf.Annotation, now time.Time) *chronograf.Point {
return &chronograf.Point{
func toPoint(anno *chronograf.Annotation, now time.Time) chronograf.Point {
return chronograf.Point{
Database: AnnotationsDB,
RetentionPolicy: DefaultRP,
Measurement: DefaultMeasurement,
@ -143,8 +149,8 @@ func toPoint(anno *chronograf.Annotation, now time.Time) *chronograf.Point {
}
}
func toDeletedPoint(anno *chronograf.Annotation, now time.Time) *chronograf.Point {
return &chronograf.Point{
func toDeletedPoint(anno *chronograf.Annotation, now time.Time) chronograf.Point {
return chronograf.Point{
Database: AnnotationsDB,
RetentionPolicy: DefaultRP,
Measurement: DefaultMeasurement,

View File

@ -18,7 +18,7 @@ func Test_toPoint(t *testing.T) {
name string
anno *chronograf.Annotation
now time.Time
want *chronograf.Point
want chronograf.Point
}{
0: {
name: "convert annotation to point w/o start and end times",
@ -28,7 +28,7 @@ func Test_toPoint(t *testing.T) {
Type: "mytype",
},
now: time.Unix(0, 0),
want: &chronograf.Point{
want: chronograf.Point{
Database: AnnotationsDB,
RetentionPolicy: DefaultRP,
Measurement: DefaultMeasurement,
@ -55,7 +55,7 @@ func Test_toPoint(t *testing.T) {
EndTime: time.Unix(200, 0),
},
now: time.Unix(0, 0),
want: &chronograf.Point{
want: chronograf.Point{
Database: AnnotationsDB,
RetentionPolicy: DefaultRP,
Measurement: DefaultMeasurement,
@ -87,7 +87,7 @@ func Test_toDeletedPoint(t *testing.T) {
name string
anno *chronograf.Annotation
now time.Time
want *chronograf.Point
want chronograf.Point
}{
0: {
name: "convert annotation to point w/o start and end times",
@ -96,7 +96,7 @@ func Test_toDeletedPoint(t *testing.T) {
EndTime: time.Unix(0, 0),
},
now: time.Unix(0, 0),
want: &chronograf.Point{
want: chronograf.Point{
Database: AnnotationsDB,
RetentionPolicy: DefaultRP,
Measurement: DefaultMeasurement,
@ -489,7 +489,7 @@ func TestAnnotationStore_Update(t *testing.T) {
QueryF: func(context.Context, chronograf.Query) (chronograf.Response, error) {
return mocks.NewResponse(`[ { } ]`, nil), nil
},
WriteF: func(context.Context, *chronograf.Point) error {
WriteF: func(context.Context, []chronograf.Point) error {
return nil
},
},
@ -544,7 +544,7 @@ func TestAnnotationStore_Update(t *testing.T) {
}
]`, nil), nil
},
WriteF: func(context.Context, *chronograf.Point) error {
WriteF: func(context.Context, []chronograf.Point) error {
return fmt.Errorf("error")
},
},
@ -591,7 +591,7 @@ func TestAnnotationStore_Update(t *testing.T) {
}
]`, nil), nil
},
WriteF: func(context.Context, *chronograf.Point) error {
WriteF: func(context.Context, []chronograf.Point) error {
return nil
},
},
@ -637,7 +637,7 @@ func TestAnnotationStore_Update(t *testing.T) {
}
]`, nil), nil
},
WriteF: func(context.Context, *chronograf.Point) error {
WriteF: func(context.Context, []chronograf.Point) error {
return nil
},
},

View File

@ -267,7 +267,16 @@ func (c *Client) ping(u *url.URL) (string, string, error) {
}
// Write POSTs line protocol to a database and retention policy
func (c *Client) Write(ctx context.Context, point *chronograf.Point) error {
func (c *Client) Write(ctx context.Context, points []chronograf.Point) error {
for _, point := range points {
if err := c.writePoint(ctx, &point); err != nil {
return err
}
}
return nil
}
func (c *Client) writePoint(ctx context.Context, point *chronograf.Point) error {
lp, err := toLineProtocol(point)
if err != nil {
return err

View File

@ -405,7 +405,7 @@ func TestClient_write(t *testing.T) {
}
type args struct {
ctx context.Context
point *chronograf.Point
point chronograf.Point
}
tests := []struct {
name string
@ -421,7 +421,7 @@ func TestClient_write(t *testing.T) {
},
args: args{
ctx: context.Background(),
point: &chronograf.Point{
point: chronograf.Point{
Database: "mydb",
RetentionPolicy: "myrp",
Measurement: "mymeas",
@ -440,7 +440,7 @@ func TestClient_write(t *testing.T) {
name: "point without fields",
args: args{
ctx: context.Background(),
point: &chronograf.Point{},
point: chronograf.Point{},
},
wantErr: true,
},
@ -451,7 +451,7 @@ func TestClient_write(t *testing.T) {
},
args: args{
ctx: context.Background(),
point: &chronograf.Point{
point: chronograf.Point{
Database: "mydb",
RetentionPolicy: "myrp",
Measurement: "mymeas",
@ -474,7 +474,7 @@ func TestClient_write(t *testing.T) {
},
args: args{
ctx: context.Background(),
point: &chronograf.Point{
point: chronograf.Point{
Database: "mydb",
RetentionPolicy: "myrp",
Measurement: "mymeas",
@ -497,7 +497,7 @@ func TestClient_write(t *testing.T) {
},
args: args{
ctx: context.Background(),
point: &chronograf.Point{
point: chronograf.Point{
Database: "mydb",
RetentionPolicy: "myrp",
Measurement: "mymeas",
@ -540,7 +540,7 @@ func TestClient_write(t *testing.T) {
InsecureSkipVerify: tt.fields.InsecureSkipVerify,
Logger: tt.fields.Logger,
}
if err := c.Write(tt.args.ctx, tt.args.point); (err != nil) != tt.wantErr {
if err := c.Write(tt.args.ctx, []chronograf.Point{tt.args.point}); (err != nil) != tt.wantErr {
t.Errorf("Client.write() error = %v, wantErr %v", err, tt.wantErr)
}
})

View File

@ -15,7 +15,7 @@ type TimeSeries struct {
// Query retrieves time series data from the database.
QueryF func(context.Context, chronograf.Query) (chronograf.Response, error)
// Write records points into the TimeSeries
WriteF func(context.Context, *chronograf.Point) error
WriteF func(context.Context, []chronograf.Point) error
// UsersStore represents the user accounts within the TimeSeries database
UsersF func(context.Context) chronograf.UsersStore
// Permissions returns all valid names permissions in this database
@ -40,8 +40,8 @@ func (t *TimeSeries) Query(ctx context.Context, query chronograf.Query) (chronog
}
// Write records a point into the time series
func (t *TimeSeries) Write(ctx context.Context, point *chronograf.Point) error {
return t.WriteF(ctx, point)
func (t *TimeSeries) Write(ctx context.Context, points []chronograf.Point) error {
return t.WriteF(ctx, points)
}
// Users represents the user accounts within the TimeSeries database