influxdb/http/query_test.go

666 lines
16 KiB
Go

package http
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
platform "github.com/influxdata/influxdb/v2"
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
platform2 "github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query"
)
var cmpOptions = cmp.Options{
cmpopts.IgnoreTypes(ast.BaseNode{}),
cmpopts.IgnoreUnexported(query.ProxyRequest{}),
cmpopts.IgnoreUnexported(query.Request{}),
cmpopts.EquateEmpty(),
}
func TestQueryRequest_WithDefaults(t *testing.T) {
type fields struct {
AST json.RawMessage
Query string
Type string
Dialect QueryDialect
org *platform.Organization
}
tests := []struct {
name string
fields fields
want QueryRequest
}{
{
name: "empty query has defaults set",
want: QueryRequest{
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
Header: func(x bool) *bool { return &x }(true),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := QueryRequest{
AST: tt.fields.AST,
Query: tt.fields.Query,
Type: tt.fields.Type,
Dialect: tt.fields.Dialect,
Org: tt.fields.org,
}
if got := r.WithDefaults(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("QueryRequest.WithDefaults() = %v, want %v", got, tt.want)
}
})
}
}
func TestQueryRequest_Validate(t *testing.T) {
type fields struct {
Extern json.RawMessage
AST json.RawMessage
Query string
Type string
Dialect QueryDialect
org *platform.Organization
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "requires query, spec, or ast",
fields: fields{
Type: "flux",
},
wantErr: true,
},
{
name: "requires flux type",
fields: fields{
Query: "howdy",
Type: "doody",
},
wantErr: true,
},
{
name: "comment must be a single character",
fields: fields{
Query: "from()",
Type: "flux",
Dialect: QueryDialect{
CommentPrefix: "error!",
},
},
wantErr: true,
},
{
name: "delimiter must be a single character",
fields: fields{
Query: "from()",
Type: "flux",
Dialect: QueryDialect{
Delimiter: "",
},
},
wantErr: true,
},
{
name: "characters must be unicode runes",
fields: fields{
Query: "from()",
Type: "flux",
Dialect: QueryDialect{
Delimiter: string([]byte{0x80}),
},
},
wantErr: true,
},
{
name: "unknown annotations",
fields: fields{
Query: "from()",
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
Annotations: []string{"error"},
},
},
wantErr: true,
},
{
name: "unknown date time format",
fields: fields{
Query: "from()",
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "error",
},
},
wantErr: true,
},
{
name: "valid query",
fields: fields{
Query: "from()",
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := QueryRequest{
Extern: tt.fields.Extern,
AST: tt.fields.AST,
Query: tt.fields.Query,
Type: tt.fields.Type,
Dialect: tt.fields.Dialect,
Org: tt.fields.org,
}
if err := r.Validate(); (err != nil) != tt.wantErr {
t.Errorf("QueryRequest.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestQueryRequest_proxyRequest(t *testing.T) {
type fields struct {
Extern json.RawMessage
AST json.RawMessage
Query string
Type string
Dialect QueryDialect
Now time.Time
org *platform.Organization
}
tests := []struct {
name string
fields fields
now func() time.Time
want *query.ProxyRequest
wantErr bool
}{
{
name: "requires query, spec, or ast",
fields: fields{
Type: "flux",
},
wantErr: true,
},
{
name: "valid query",
fields: fields{
Query: "howdy",
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
},
org: &platform.Organization{},
},
now: func() time.Time { return time.Unix(1, 1) },
want: &query.ProxyRequest{
Request: query.Request{
Compiler: lang.FluxCompiler{
Now: time.Unix(1, 1),
Query: `howdy`,
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
{
name: "valid AST",
fields: fields{
AST: mustMarshal(&ast.Package{}),
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
},
Now: time.Unix(1, 1),
org: &platform.Organization{},
},
now: func() time.Time { return time.Unix(2, 2) },
want: &query.ProxyRequest{
Request: query.Request{
Compiler: lang.ASTCompiler{
AST: mustMarshal(&ast.Package{}),
Now: time.Unix(1, 1),
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
{
name: "valid AST with calculated now",
fields: fields{
AST: mustMarshal(&ast.Package{}),
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
},
org: &platform.Organization{},
},
now: func() time.Time { return time.Unix(2, 2) },
want: &query.ProxyRequest{
Request: query.Request{
Compiler: lang.ASTCompiler{
AST: mustMarshal(&ast.Package{}),
Now: time.Unix(2, 2),
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
{
name: "valid AST with extern",
fields: fields{
Extern: mustMarshal(&ast.File{
Body: []ast.Statement{
&ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: "x"},
Init: &ast.IntegerLiteral{Value: 0},
},
},
},
}),
AST: mustMarshal(&ast.Package{}),
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
},
org: &platform.Organization{},
},
now: func() time.Time { return time.Unix(1, 1) },
want: &query.ProxyRequest{
Request: query.Request{
Compiler: lang.ASTCompiler{
Extern: mustMarshal(&ast.File{
Body: []ast.Statement{
&ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: "x"},
Init: &ast.IntegerLiteral{Value: 0},
},
},
},
}),
AST: mustMarshal(&ast.Package{}),
Now: time.Unix(1, 1),
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := QueryRequest{
Extern: tt.fields.Extern,
AST: tt.fields.AST,
Query: tt.fields.Query,
Type: tt.fields.Type,
Dialect: tt.fields.Dialect,
Now: tt.fields.Now,
Org: tt.fields.org,
}
got, err := r.proxyRequest(tt.now)
if (err != nil) != tt.wantErr {
t.Errorf("QueryRequest.ProxyRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(got, tt.want, cmpOptions...) {
t.Errorf("QueryRequest.ProxyRequest() -want/+got\n%s", cmp.Diff(tt.want, got, cmpOptions...))
}
})
}
}
func mustMarshal(p ast.Node) []byte {
bs, err := json.Marshal(p)
if err != nil {
panic(err)
}
return bs
}
func Test_decodeQueryRequest(t *testing.T) {
type args struct {
ctx context.Context
r *http.Request
svc platform.OrganizationService
}
tests := []struct {
name string
args args
want *QueryRequest
wantErr bool
}{
{
name: "valid query request",
args: args{
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"query": "from()"}`)),
svc: &mock.OrganizationService{
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
return &platform.Organization{
ID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
}, nil
},
},
},
want: &QueryRequest{
Query: "from()",
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
Header: func(x bool) *bool { return &x }(true),
},
Org: &platform.Organization{
ID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
},
},
},
{
name: "valid query request with explicit content-type",
args: args{
r: func() *http.Request {
r := httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"query": "from()"}`))
r.Header.Set("Content-Type", "application/json")
return r
}(),
svc: &mock.OrganizationService{
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
return &platform.Organization{
ID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
}, nil
},
},
},
want: &QueryRequest{
Query: "from()",
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
Header: func(x bool) *bool { return &x }(true),
},
Org: &platform.Organization{
ID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
},
},
},
{
name: "error decoding json",
args: args{
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`error`)),
},
wantErr: true,
},
{
name: "error validating query",
args: args{
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`{}`)),
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _, err := decodeQueryRequest(tt.args.ctx, tt.args.r, tt.args.svc)
if (err != nil) != tt.wantErr {
t.Errorf("decodeQueryRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("decodeQueryRequest() = %v, want %v", got, tt.want)
}
})
}
}
func Test_decodeProxyQueryRequest(t *testing.T) {
externJSON := `{
"type": "File",
"body": [
{
"type": "OptionStatement",
"assignment": {
"type": "VariableAssignment",
"id": {
"type": "Identifier",
"name": "x"
},
"init": {
"type": "IntegerLiteral",
"value": "0"
}
}
}
]
}`
type args struct {
ctx context.Context
r *http.Request
auth *platform.Authorization
svc platform.OrganizationService
}
tests := []struct {
name string
args args
want *query.ProxyRequest
wantErr bool
}{
{
name: "valid post query request",
args: args{
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"query": "from()"}`)),
svc: &mock.OrganizationService{
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
return &platform.Organization{
ID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
}, nil
},
},
},
want: &query.ProxyRequest{
Request: query.Request{
OrganizationID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
Compiler: lang.FluxCompiler{
Query: "from()",
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
{
name: "valid query including extern definition",
args: args{
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`
{
"extern": `+externJSON+`,
"query": "from(bucket: \"mybucket\")"
}
`)),
svc: &mock.OrganizationService{
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
return &platform.Organization{
ID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
}, nil
},
},
},
want: &query.ProxyRequest{
Request: query.Request{
OrganizationID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
Compiler: lang.FluxCompiler{
Extern: []byte(externJSON),
Query: `from(bucket: "mybucket")`,
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
{
name: "valid post vnd.flux query request",
args: args{
r: func() *http.Request {
r := httptest.NewRequest("POST", "/api/v2/query?org=myorg", strings.NewReader(`from(bucket: "mybucket")`))
r.Header.Set("Content-Type", "application/vnd.flux")
return r
}(),
svc: &mock.OrganizationService{
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
return &platform.Organization{
ID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
}, nil
},
},
},
want: &query.ProxyRequest{
Request: query.Request{
OrganizationID: func() platform2.ID { s, _ := platform2.IDFromString("deadbeefdeadbeef"); return *s }(),
Compiler: lang.FluxCompiler{
Query: `from(bucket: "mybucket")`,
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
}
cmpOptions := append(cmpOptions,
cmpopts.IgnoreFields(lang.ASTCompiler{}, "Now"),
cmpopts.IgnoreFields(lang.FluxCompiler{}, "Now"),
)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _, err := decodeProxyQueryRequest(tt.args.ctx, tt.args.r, tt.args.auth, tt.args.svc)
if (err != nil) != tt.wantErr {
t.Errorf("decodeProxyQueryRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(tt.want, got, cmpOptions...) {
t.Errorf("decodeProxyQueryRequest() -want/+got\n%s", cmp.Diff(tt.want, got, cmpOptions...))
}
})
}
}
func TestProxyRequestToQueryRequest_Compilers(t *testing.T) {
tests := []struct {
name string
pr query.ProxyRequest
want QueryRequest
}{
{
name: "flux compiler copied",
pr: query.ProxyRequest{
Dialect: &query.NoContentDialect{},
Request: query.Request{
Compiler: lang.FluxCompiler{
Query: `howdy`,
Now: time.Unix(45, 45),
},
},
},
want: QueryRequest{
Type: "flux",
Query: `howdy`,
PreferNoContent: true,
Now: time.Unix(45, 45),
},
},
{
name: "AST compiler copied",
pr: query.ProxyRequest{
Dialect: &query.NoContentDialect{},
Request: query.Request{
Compiler: lang.ASTCompiler{
Now: time.Unix(45, 45),
AST: mustMarshal(&ast.Package{}),
},
},
},
want: QueryRequest{
Type: "flux",
PreferNoContent: true,
AST: mustMarshal(&ast.Package{}),
Now: time.Unix(45, 45),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := QueryRequestFromProxyRequest(&tt.pr)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(*got, tt.want) {
t.Errorf("QueryRequestFromProxyRequest = %v, want %v", got, tt.want)
}
})
}
}