Add tests for writing to influxdb
parent
d7b7914241
commit
cd88b5cecd
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/chronograf/id"
|
"github.com/influxdata/chronograf/id"
|
||||||
|
@ -256,5 +257,9 @@ func (r *influxResults) Annotations() (res []chronograf.Annotation, err error) {
|
||||||
res = append(res, a.Annotation)
|
res = append(res, a.Annotation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sort.Slice(res, func(i int, j int) bool {
|
||||||
|
return res[i].StartTime.Before(res[j].StartTime) || res[i].ID < res[j].ID
|
||||||
|
})
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -376,16 +376,16 @@ func TestAnnotationStore_queryAnnotations(t *testing.T) {
|
||||||
{
|
{
|
||||||
EndTime: time.Unix(0, 1516920177345000000),
|
EndTime: time.Unix(0, 1516920177345000000),
|
||||||
StartTime: time.Unix(0, 0),
|
StartTime: time.Unix(0, 0),
|
||||||
Text: "mytext",
|
Text: "mytext2",
|
||||||
Type: "mytype",
|
Type: "mytype2",
|
||||||
ID: "ecf3a75d-f1c0-40e8-9790-902701467e92",
|
ID: "ea0aa94b-969a-4cd5-912a-5db61d502268",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EndTime: time.Unix(0, 1516920177345000000),
|
EndTime: time.Unix(0, 1516920177345000000),
|
||||||
StartTime: time.Unix(0, 0),
|
StartTime: time.Unix(0, 0),
|
||||||
Text: "mytext2",
|
Text: "mytext",
|
||||||
Type: "mytype2",
|
Type: "mytype",
|
||||||
ID: "ea0aa94b-969a-4cd5-912a-5db61d502268",
|
ID: "ecf3a75d-f1c0-40e8-9790-902701467e92",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -466,3 +466,200 @@ func TestAnnotationStore_queryAnnotations(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAnnotationStore_Update(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
client chronograf.TimeSeries
|
||||||
|
now Now
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
ctx context.Context
|
||||||
|
anno *chronograf.Annotation
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no responses returns error",
|
||||||
|
fields: fields{
|
||||||
|
client: &mocks.TimeSeries{
|
||||||
|
QueryF: func(context.Context, chronograf.Query) (chronograf.Response, error) {
|
||||||
|
return mocks.NewResponse(`[ { } ]`, nil), nil
|
||||||
|
},
|
||||||
|
WriteF: func(context.Context, *chronograf.Point) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
anno: &chronograf.Annotation{
|
||||||
|
ID: "1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error writing returns error",
|
||||||
|
fields: fields{
|
||||||
|
now: func() time.Time { return time.Time{} },
|
||||||
|
client: &mocks.TimeSeries{
|
||||||
|
QueryF: func(context.Context, chronograf.Query) (chronograf.Response, error) {
|
||||||
|
return mocks.NewResponse(`[
|
||||||
|
{
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"name": "annotations",
|
||||||
|
"columns": [
|
||||||
|
"time",
|
||||||
|
"start_time",
|
||||||
|
"modified_time_ns",
|
||||||
|
"text",
|
||||||
|
"type",
|
||||||
|
"id"
|
||||||
|
],
|
||||||
|
"values": [
|
||||||
|
[
|
||||||
|
1516920177345000000,
|
||||||
|
0,
|
||||||
|
1516989242129417403,
|
||||||
|
"mytext",
|
||||||
|
"mytype",
|
||||||
|
"ecf3a75d-f1c0-40e8-9790-902701467e92"
|
||||||
|
],
|
||||||
|
[
|
||||||
|
1516920177345000000,
|
||||||
|
0,
|
||||||
|
1517425914433539296,
|
||||||
|
"mytext2",
|
||||||
|
"mytype2",
|
||||||
|
"ea0aa94b-969a-4cd5-912a-5db61d502268"
|
||||||
|
]
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]`, nil), nil
|
||||||
|
},
|
||||||
|
WriteF: func(context.Context, *chronograf.Point) error {
|
||||||
|
return fmt.Errorf("error")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
anno: &chronograf.Annotation{
|
||||||
|
ID: "1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Update with delete",
|
||||||
|
fields: fields{
|
||||||
|
now: func() time.Time { return time.Time{} },
|
||||||
|
client: &mocks.TimeSeries{
|
||||||
|
QueryF: func(context.Context, chronograf.Query) (chronograf.Response, error) {
|
||||||
|
return mocks.NewResponse(`[
|
||||||
|
{
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"name": "annotations",
|
||||||
|
"columns": [
|
||||||
|
"time",
|
||||||
|
"start_time",
|
||||||
|
"modified_time_ns",
|
||||||
|
"text",
|
||||||
|
"type",
|
||||||
|
"id"
|
||||||
|
],
|
||||||
|
"values": [
|
||||||
|
[
|
||||||
|
1516920177345000000,
|
||||||
|
0,
|
||||||
|
1516989242129417403,
|
||||||
|
"mytext",
|
||||||
|
"mytype",
|
||||||
|
"ecf3a75d-f1c0-40e8-9790-902701467e92"
|
||||||
|
]
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]`, nil), nil
|
||||||
|
},
|
||||||
|
WriteF: func(context.Context, *chronograf.Point) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
anno: &chronograf.Annotation{
|
||||||
|
ID: "1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Update with delete no delete",
|
||||||
|
fields: fields{
|
||||||
|
now: func() time.Time { return time.Time{} },
|
||||||
|
client: &mocks.TimeSeries{
|
||||||
|
QueryF: func(context.Context, chronograf.Query) (chronograf.Response, error) {
|
||||||
|
return mocks.NewResponse(`[
|
||||||
|
{
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"name": "annotations",
|
||||||
|
"columns": [
|
||||||
|
"time",
|
||||||
|
"start_time",
|
||||||
|
"modified_time_ns",
|
||||||
|
"text",
|
||||||
|
"type",
|
||||||
|
"id"
|
||||||
|
],
|
||||||
|
"values": [
|
||||||
|
[
|
||||||
|
1516920177345000000,
|
||||||
|
0,
|
||||||
|
1516989242129417403,
|
||||||
|
"mytext",
|
||||||
|
"mytype",
|
||||||
|
"ecf3a75d-f1c0-40e8-9790-902701467e92"
|
||||||
|
]
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]`, nil), nil
|
||||||
|
},
|
||||||
|
WriteF: func(context.Context, *chronograf.Point) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
anno: &chronograf.Annotation{
|
||||||
|
ID: "ecf3a75d-f1c0-40e8-9790-902701467e92",
|
||||||
|
EndTime: time.Unix(0, 1516920177345000000),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
a := &AnnotationStore{
|
||||||
|
client: tt.fields.client,
|
||||||
|
now: tt.fields.now,
|
||||||
|
}
|
||||||
|
if err := a.Update(tt.args.ctx, tt.args.anno); (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("AnnotationStore.Update() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/influxdata/chronograf"
|
"github.com/influxdata/chronograf"
|
||||||
"github.com/influxdata/chronograf/influx"
|
"github.com/influxdata/chronograf/influx"
|
||||||
"github.com/influxdata/chronograf/log"
|
"github.com/influxdata/chronograf/log"
|
||||||
|
"github.com/influxdata/chronograf/mocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewClient initializes an HTTP Client for InfluxDB.
|
// NewClient initializes an HTTP Client for InfluxDB.
|
||||||
|
@ -395,3 +396,153 @@ func TestClient_Roles(t *testing.T) {
|
||||||
t.Errorf("Client.Roles() want error")
|
t.Errorf("Client.Roles() want error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClient_write(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
Authorizer influx.Authorizer
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
Logger chronograf.Logger
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
ctx context.Context
|
||||||
|
point *chronograf.Point
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
body string
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "write point to influxdb",
|
||||||
|
fields: fields{
|
||||||
|
Logger: mocks.NewLogger(),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
point: &chronograf.Point{
|
||||||
|
Database: "mydb",
|
||||||
|
RetentionPolicy: "myrp",
|
||||||
|
Measurement: "mymeas",
|
||||||
|
Time: 10,
|
||||||
|
Tags: map[string]string{
|
||||||
|
"tag1": "value1",
|
||||||
|
"tag2": "value2",
|
||||||
|
},
|
||||||
|
Fields: map[string]interface{}{
|
||||||
|
"field1": "value1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "point without fields",
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
point: &chronograf.Point{},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "hinted handoff errors are not errors really.",
|
||||||
|
fields: fields{
|
||||||
|
Logger: mocks.NewLogger(),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
point: &chronograf.Point{
|
||||||
|
Database: "mydb",
|
||||||
|
RetentionPolicy: "myrp",
|
||||||
|
Measurement: "mymeas",
|
||||||
|
Time: 10,
|
||||||
|
Tags: map[string]string{
|
||||||
|
"tag1": "value1",
|
||||||
|
"tag2": "value2",
|
||||||
|
},
|
||||||
|
Fields: map[string]interface{}{
|
||||||
|
"field1": "value1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
body: `{"error":"hinted handoff queue not empty"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "database not found creates a new db",
|
||||||
|
fields: fields{
|
||||||
|
Logger: mocks.NewLogger(),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
point: &chronograf.Point{
|
||||||
|
Database: "mydb",
|
||||||
|
RetentionPolicy: "myrp",
|
||||||
|
Measurement: "mymeas",
|
||||||
|
Time: 10,
|
||||||
|
Tags: map[string]string{
|
||||||
|
"tag1": "value1",
|
||||||
|
"tag2": "value2",
|
||||||
|
},
|
||||||
|
Fields: map[string]interface{}{
|
||||||
|
"field1": "value1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
body: `{"error":"database not found"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error from database reported",
|
||||||
|
fields: fields{
|
||||||
|
Logger: mocks.NewLogger(),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
ctx: context.Background(),
|
||||||
|
point: &chronograf.Point{
|
||||||
|
Database: "mydb",
|
||||||
|
RetentionPolicy: "myrp",
|
||||||
|
Measurement: "mymeas",
|
||||||
|
Time: 10,
|
||||||
|
Tags: map[string]string{
|
||||||
|
"tag1": "value1",
|
||||||
|
"tag2": "value2",
|
||||||
|
},
|
||||||
|
Fields: map[string]interface{}{
|
||||||
|
"field1": "value1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
body: `{"error":"oh no!"}`,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
retry := 0 // if the retry is > 0 then we don't error
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
if strings.HasPrefix(r.RequestURI, "/write") {
|
||||||
|
if tt.body == "" || retry > 0 {
|
||||||
|
rw.WriteHeader(http.StatusNoContent)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
retry++
|
||||||
|
rw.WriteHeader(http.StatusBadRequest)
|
||||||
|
rw.Write([]byte(tt.body))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rw.WriteHeader(http.StatusOK)
|
||||||
|
rw.Write([]byte(`{"results":[{}]}`))
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
u, _ := url.Parse(ts.URL)
|
||||||
|
c := &influx.Client{
|
||||||
|
URL: u,
|
||||||
|
Authorizer: tt.fields.Authorizer,
|
||||||
|
InsecureSkipVerify: tt.fields.InsecureSkipVerify,
|
||||||
|
Logger: tt.fields.Logger,
|
||||||
|
}
|
||||||
|
if err := c.Write(tt.args.ctx, tt.args.point); (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("Client.write() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue