package httpd_test import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "log" "math" "mime/multipart" "net/http" "net/http/httptest" "net/url" "os" "reflect" "sort" "strings" "sync/atomic" "testing" "time" "github.com/golang-jwt/jwt/v4" "github.com/golang/snappy" "github.com/google/go-cmp/cmp" "github.com/influxdata/flux" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/mock" "github.com/influxdata/influxdb/flux/client" "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/pkg/testing/assert" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/httpd" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/storage/reads" "github.com/influxdata/influxdb/storage/reads/datatypes" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" "github.com/prometheus/prometheus/prompb" ) // Ensure the handler returns results from a query (including nil results). func TestHandler_Query(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { t.Fatalf("unexpected body: %s", body) } } // Ensure the handler returns results from a query passed as a file. func TestHandler_Query_File(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil } var body bytes.Buffer writer := multipart.NewWriter(&body) part, err := writer.CreateFormFile("q", "") if err != nil { t.Fatal(err) } io.WriteString(part, "SELECT * FROM bar") if err := writer.Close(); err != nil { t.Fatal(err) } r := MustNewJSONRequest("POST", "/query?db=foo", &body) r.Header.Set("Content-Type", writer.FormDataContentType()) w := httptest.NewRecorder() h.ServeHTTP(w, r) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { t.Fatalf("unexpected body: %s", body) } } // Test query with user authentication. func TestHandler_Query_Auth(t *testing.T) { // Create the handler to be tested. h := NewHandler(true) // Set mock meta client functions for the handler to use. h.MetaClient.AdminUserExistsFn = func() bool { return true } h.MetaClient.UserFn = func(username string) (meta.User, error) { if username != "user1" { return nil, meta.ErrUserNotFound } return &meta.UserInfo{ Name: "user1", Hash: "abcd", Admin: true, }, nil } h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { if u != "user1" { return nil, fmt.Errorf("unexpected user: exp: user1, got: %s", u) } else if p != "abcd" { return nil, fmt.Errorf("unexpected password: exp: abcd, got: %s", p) } return h.MetaClient.User(u) } // Set mock query authorizer for handler to use. h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { return nil } // Set mock statement executor for handler to use. h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil } // Test the handler with valid user and password in the URL parameters. w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { t.Fatalf("unexpected body: %s", body) } // Test the handler with valid user and password using basic auth. w = httptest.NewRecorder() r := MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil) r.SetBasicAuth("user1", "abcd") h.ServeHTTP(w, r) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { t.Fatalf("unexpected body: %s", body) } // Test the handler with valid JWT bearer token. req := MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil) // Create a signed JWT token string and add it to the request header. _, signedToken := MustJWTToken("user1", h.Config.SharedSecret, false) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) w = httptest.NewRecorder() h.ServeHTTP(w, req) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { t.Fatalf("unexpected body: %s", body) } // Test the handler with JWT token signed with invalid key. req = MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil) // Create a signed JWT token string and add it to the request header. _, signedToken = MustJWTToken("user1", "invalid key", false) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) w = httptest.NewRecorder() h.ServeHTTP(w, req) if w.Code != http.StatusUnauthorized { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"signature is invalid"}` { t.Fatalf("unexpected body: %s", body) } // Test handler with valid JWT token carrying non-existent user. _, signedToken = MustJWTToken("bad_user", h.Config.SharedSecret, false) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) w = httptest.NewRecorder() h.ServeHTTP(w, req) if w.Code != http.StatusUnauthorized { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"user not found"}` { t.Fatalf("unexpected body: %s", body) } // Test handler with expired JWT token. _, signedToken = MustJWTToken("user1", h.Config.SharedSecret, true) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) w = httptest.NewRecorder() h.ServeHTTP(w, req) if w.Code != http.StatusUnauthorized { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if !strings.Contains(w.Body.String(), `{"error":"Token is expired`) { t.Fatalf("unexpected body: %s", w.Body.String()) } // Test handler with JWT token that has no expiration set. token, _ := MustJWTToken("user1", h.Config.SharedSecret, false) delete(token.Claims.(jwt.MapClaims), "exp") signedToken, err := token.SignedString([]byte(h.Config.SharedSecret)) if err != nil { t.Fatal(err) } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) w = httptest.NewRecorder() h.ServeHTTP(w, req) if w.Code != http.StatusUnauthorized { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"token expiration required"}` { t.Fatalf("unexpected body: %s", body) } // Test that auth fails if shared secret is blank. origSecret := h.Config.SharedSecret h.Config.SharedSecret = "" token, _ = MustJWTToken("user1", h.Config.SharedSecret, false) signedToken, err = token.SignedString([]byte(h.Config.SharedSecret)) if err != nil { t.Fatal(err) } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) w = httptest.NewRecorder() h.ServeHTTP(w, req) if w.Code != http.StatusUnauthorized { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"bearer auth disabled"}` { t.Fatalf("unexpected body: %s", body) } h.Config.SharedSecret = origSecret // Test the handler with valid user and password in the url and invalid in // basic auth (prioritize url). w = httptest.NewRecorder() r = MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil) r.SetBasicAuth("user1", "efgh") h.ServeHTTP(w, r) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { t.Fatalf("unexpected body: %s", body) } } // Ensure the handler returns results from a query (including nil results). func TestHandler_QueryRegex(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `test` { t.Fatalf("unexpected db: %s", ctx.Database) } ctx.Results <- nil return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("GET", "/query?db=test&q=SELECT%20%2A%20FROM%20test%20WHERE%20url%20%3D~%20%2Fhttp%5C%3A%5C%2F%5C%2Fwww.akamai%5C.com%2F", nil)) } // Ensure the handler merges results from the same statement. func TestHandler_Query_MergeResults(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"},{"name":"series1"}]}]}` { t.Fatalf("unexpected body: %s", body) } } // Ensure the handler merges results from the same statement. func TestHandler_Query_MergeEmptyResults(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}` { t.Fatalf("unexpected body: %s", body) } } // Ensure the handler merges series from the same result. func TestHandler_Query_MergeSeries(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{ { Name: "series0", Values: [][]interface{}{ {float64(2.0)}, }, Partial: true, }, })} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{ { Name: "series0", Values: [][]interface{}{ {float64(3.0)}, }, }, })} return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0","values":[[2],[3]]}]}]}` { t.Fatalf("unexpected body: %s", body) } } // Ensure the handler can parse chunked and chunk size query parameters. func TestHandler_Query_Chunked(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if ctx.ChunkSize != 2 { t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize) } ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } else if w.Body.String() != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]}]} {"results":[{"statement_id":1,"series":[{"name":"series1"}]}]} ` { t.Fatalf("unexpected body: %s", w.Body.String()) } } // Ensure the handler can accept an async query. func TestHandler_Query_Async(t *testing.T) { done := make(chan struct{}) h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} close(done) return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&async=true", nil)) if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `` { t.Fatalf("unexpected body: %s", body) } // Wait to make sure the async query runs and completes. timer := time.NewTimer(100 * time.Millisecond) defer timer.Stop() select { case <-timer.C: t.Fatal("timeout while waiting for async query to complete") case <-done: } } // Ensure the handler returns a status 400 if the query is not passed in. func TestHandler_Query_ErrQueryRequired(t *testing.T) { h := NewHandler(false) w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query", nil)) if w.Code != http.StatusBadRequest { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"missing required parameter \"q\""}` { t.Fatalf("unexpected body: %s", body) } } // Ensure the handler returns a status 400 if the query cannot be parsed. func TestHandler_Query_ErrInvalidQuery(t *testing.T) { h := NewHandler(false) w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?q=SELECT", nil)) if w.Code != http.StatusBadRequest { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"error parsing query: found EOF, expected identifier, string, number, bool at line 1, char 8"}` { t.Fatalf("unexpected body: %s", body) } } // Ensure the handler returns an appropriate 401 or 403 status when authentication or authorization fails. func TestHandler_Query_ErrAuthorize(t *testing.T) { h := NewHandler(true) h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, q *influxql.Query, db string) error { return errors.New("marker") } h.MetaClient.AdminUserExistsFn = func() bool { return true } h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { users := []meta.UserInfo{ { Name: "admin", Hash: "admin", Admin: true, }, { Name: "user1", Hash: "abcd", Privileges: map[string]influxql.Privilege{ "db0": influxql.ReadPrivilege, }, }, } for _, user := range users { if u == user.Name { if p == user.Hash { return &user, nil } return nil, meta.ErrAuthenticate } } return nil, meta.ErrUserNotFound } for i, tt := range []struct { user string password string query string code int }{ { query: "/query?q=SHOW+DATABASES", code: http.StatusUnauthorized, }, { user: "user1", password: "abcd", query: "/query?q=SHOW+DATABASES", code: http.StatusForbidden, }, { user: "user2", password: "abcd", query: "/query?q=SHOW+DATABASES", code: http.StatusUnauthorized, }, } { w := httptest.NewRecorder() r := MustNewJSONRequest("GET", tt.query, nil) params := r.URL.Query() if tt.user != "" { params.Set("u", tt.user) } if tt.password != "" { params.Set("p", tt.password) } r.URL.RawQuery = params.Encode() h.ServeHTTP(w, r) if w.Code != tt.code { t.Errorf("%d. unexpected status: got=%d exp=%d\noutput: %s", i, w.Code, tt.code, w.Body.String()) } } } // Ensure the handler returns a status 200 if an error is returned in the result. func TestHandler_Query_ErrResult(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errors.New("measurement not found") } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SHOW+SERIES+from+bin", nil)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":0,"error":"measurement not found"}]}` { t.Fatalf("unexpected body: %s", body) } } // Ensure that closing the HTTP connection causes the query to be interrupted. func TestHandler_Query_CloseNotify(t *testing.T) { // Avoid leaking a goroutine when this fails. done := make(chan struct{}) defer close(done) interrupted := make(chan struct{}) h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { select { case <-ctx.Done(): case <-done: } close(interrupted) return nil } s := httptest.NewServer(h) defer s.Close() // Parse the URL and generate a query request. u, err := url.Parse(s.URL) if err != nil { t.Fatal(err) } u.Path = "/query" values := url.Values{} values.Set("q", "SELECT * FROM cpu") values.Set("db", "db0") values.Set("rp", "rp0") values.Set("chunked", "true") u.RawQuery = values.Encode() req, err := http.NewRequest("GET", u.String(), nil) if err != nil { t.Fatal(err) } // Perform the request and retrieve the response. resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatal(err) } // Validate that the interrupted channel has NOT been closed yet. timer := time.NewTimer(100 * time.Millisecond) select { case <-interrupted: timer.Stop() t.Fatal("query interrupted unexpectedly") case <-timer.C: } // Close the response body which should abort the query in the handler. resp.Body.Close() // The query should abort within 100 milliseconds. timer.Reset(100 * time.Millisecond) select { case <-interrupted: timer.Stop() case <-timer.C: t.Fatal("timeout while waiting for query to abort") } } // Ensure the handler returns an appropriate 401 status when authentication // fails on ping endpoints. func TestHandler_Ping_ErrAuthorize(t *testing.T) { h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPingAuthEnabled())) h.MetaClient.AdminUserExistsFn = func() bool { return true } h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { users := []meta.UserInfo{ { Name: "admin", Hash: "admin", Admin: true, }, { Name: "user1", Hash: "abcd", Privileges: map[string]influxql.Privilege{ "db0": influxql.ReadPrivilege, }, }, } for _, user := range users { if u == user.Name { if p == user.Hash { return &user, nil } return nil, meta.ErrAuthenticate } } return nil, meta.ErrUserNotFound } for i, tt := range []struct { user string password string query string code int }{ { query: "/ping", code: http.StatusUnauthorized, }, { user: "user1", password: "abcd", query: "/ping", code: http.StatusNoContent, }, { user: "user2", password: "abcd", query: "/ping", code: http.StatusUnauthorized, }, } { w := httptest.NewRecorder() r := MustNewJSONRequest("GET", tt.query, nil) params := r.URL.Query() if tt.user != "" { params.Set("u", tt.user) } if tt.password != "" { params.Set("p", tt.password) } r.URL.RawQuery = params.Encode() h.ServeHTTP(w, r) if w.Code != tt.code { t.Errorf("%d. unexpected status: got=%d exp=%d\noutput: %s", i, w.Code, tt.code, w.Body.String()) } } } // Ensure the handler returns an appropriate 403 status when authentication or // authorization fails on debug endpoints. func TestHandler_Debug_ErrAuthorize(t *testing.T) { h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPprofAuthEnabled())) h.MetaClient.AdminUserExistsFn = func() bool { return true } h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { users := []meta.UserInfo{ { Name: "admin", Hash: "admin", Admin: true, }, { Name: "user1", Hash: "abcd", Privileges: map[string]influxql.Privilege{ "db0": influxql.ReadPrivilege, }, }, } for _, user := range users { if u == user.Name { if p == user.Hash { return &user, nil } return nil, meta.ErrAuthenticate } } return nil, meta.ErrUserNotFound } for i, tt := range []struct { user string password string query string code int }{ { query: "/debug/vars", code: http.StatusUnauthorized, }, { user: "user1", password: "abcd", query: "/debug/vars", code: http.StatusForbidden, }, { user: "user2", password: "abcd", query: "/debug/vars", code: http.StatusUnauthorized, }, } { w := httptest.NewRecorder() r := MustNewJSONRequest("GET", tt.query, nil) params := r.URL.Query() if tt.user != "" { params.Set("u", tt.user) } if tt.password != "" { params.Set("p", tt.password) } r.URL.RawQuery = params.Encode() h.ServeHTTP(w, r) if w.Code != tt.code { t.Errorf("%d. unexpected status: got=%d exp=%d\noutput: %s", i, w.Code, tt.code, w.Body.String()) } } } // Ensure the prometheus remote write works with valid values. func TestHandler_PromWrite(t *testing.T) { req := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { Labels: []prompb.Label{ {Name: "host", Value: "a"}, {Name: "region", Value: "west"}, }, Samples: []prompb.Sample{ {Timestamp: 1, Value: 1.2}, {Timestamp: 3, Value: 14.5}, {Timestamp: 6, Value: 222.99}, }, }, }, } data, err := req.Marshal() if err != nil { t.Fatal("couldn't marshal prometheus request") } compressed := snappy.Encode(nil, data) b := bytes.NewReader(compressed) h := NewHandler(false) h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } var called bool h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error { called = true if got, exp := len(points), 3; got != exp { t.Fatalf("got %d points, expected %d\n\npoints:\n%v", got, exp, points) } expFields := []models.Fields{ models.Fields{"value": req.Timeseries[0].Samples[0].Value}, models.Fields{"value": req.Timeseries[0].Samples[1].Value}, models.Fields{"value": req.Timeseries[0].Samples[2].Value}, } expTS := []int64{ req.Timeseries[0].Samples[0].Timestamp * int64(time.Millisecond), req.Timeseries[0].Samples[1].Timestamp * int64(time.Millisecond), req.Timeseries[0].Samples[2].Timestamp * int64(time.Millisecond), } for i, point := range points { if got, exp := point.UnixNano(), expTS[i]; got != exp { t.Fatalf("got time %d, expected %d\npoint:\n%v", got, exp, point) } exp := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}} if got := point.Tags(); !reflect.DeepEqual(got, exp) { t.Fatalf("got tags: %v, expected: %v\npoint:\n%v", got, exp, point) } gotFields, err := point.Fields() if err != nil { t.Fatal(err.Error()) } if got, exp := gotFields, expFields[i]; !reflect.DeepEqual(got, exp) { t.Fatalf("got fields %v, expected %v\npoint:\n%v", got, exp, point) } } return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b)) if !called { t.Fatal("WritePoints: expected call") } if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } } // Ensure the prometheus remote write works with invalid values. func TestHandler_PromWrite_Dropped(t *testing.T) { req := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { Labels: []prompb.Label{ {Name: "host", Value: "a"}, {Name: "region", Value: "west"}, }, Samples: []prompb.Sample{ {Timestamp: 1, Value: 1.2}, {Timestamp: 2, Value: math.NaN()}, {Timestamp: 3, Value: 14.5}, {Timestamp: 4, Value: math.Inf(-1)}, {Timestamp: 5, Value: math.Inf(1)}, {Timestamp: 6, Value: 222.99}, {Timestamp: 7, Value: math.Inf(-1)}, {Timestamp: 8, Value: math.Inf(1)}, {Timestamp: 9, Value: math.Inf(1)}, }, }, }, } data, err := req.Marshal() if err != nil { t.Fatal("couldn't marshal prometheus request") } compressed := snappy.Encode(nil, data) b := bytes.NewReader(compressed) h := NewHandler(false) h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } var called bool h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error { called = true if got, exp := len(points), 3; got != exp { t.Fatalf("got %d points, expected %d\n\npoints:\n%v", got, exp, points) } expFields := []models.Fields{ models.Fields{"value": req.Timeseries[0].Samples[0].Value}, models.Fields{"value": req.Timeseries[0].Samples[2].Value}, models.Fields{"value": req.Timeseries[0].Samples[5].Value}, } expTS := []int64{ req.Timeseries[0].Samples[0].Timestamp * int64(time.Millisecond), req.Timeseries[0].Samples[2].Timestamp * int64(time.Millisecond), req.Timeseries[0].Samples[5].Timestamp * int64(time.Millisecond), } for i, point := range points { if got, exp := point.UnixNano(), expTS[i]; got != exp { t.Fatalf("got time %d, expected %d\npoint:\n%v", got, exp, point) } exp := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}} if got := point.Tags(); !reflect.DeepEqual(got, exp) { t.Fatalf("got tags: %v, expected: %v\npoint:\n%v", got, exp, point) } gotFields, err := point.Fields() if err != nil { t.Fatal(err.Error()) } if got, exp := gotFields, expFields[i]; !reflect.DeepEqual(got, exp) { t.Fatalf("got fields %v, expected %v\npoint:\n%v", got, exp, point) } } return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b)) if !called { t.Fatal("WritePoints: expected call") } if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } } func mustMakeBigString(sz int) string { a := make([]byte, 0, sz) for i := 0; i < cap(a); i++ { a = append(a, 'a') } return string(a) } func TestHandler_PromWrite_Error(t *testing.T) { req := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { // Invalid tag key Labels: []prompb.Label{{Name: mustMakeBigString(models.MaxKeyLength), Value: "a"}}, Samples: []prompb.Sample{{Timestamp: 1, Value: 1.2}}, }, }, } data, err := req.Marshal() if err != nil { t.Fatal("couldn't marshal prometheus request") } compressed := snappy.Encode(nil, data) b := bytes.NewReader(compressed) h := NewHandler(false) h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } var called bool h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error { called = true return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b)) if w.Code != http.StatusBadRequest { t.Fatalf("unexpected status: %d", w.Code) } if got, exp := strings.TrimSpace(w.Body.String()), `{"error":"max key length exceeded: 65572 \u003e 65535"}`; got != exp { t.Fatalf("got error %q, expected %q", got, exp) } if called { t.Fatal("WritePoints called but should not be") } } // Ensure Prometheus remote read requests are converted to the correct InfluxQL query and // data is returned func TestHandler_PromRead(t *testing.T) { req := &prompb.ReadRequest{ Queries: []*prompb.Query{{ Matchers: []*prompb.LabelMatcher{ { Type: prompb.LabelMatcher_EQ, Name: "__name__", Value: "value", }, }, StartTimestampMs: 1, EndTimestampMs: 2, }}, } data, err := req.Marshal() if err != nil { t.Fatal("couldn't marshal prometheus request") } compressed := snappy.Encode(nil, data) b := bytes.NewReader(compressed) h := NewHandler(false) w := httptest.NewRecorder() // Number of results in the result set var i int64 h.Store.ResultSet.NextFn = func() bool { i++ return i <= 2 } // data for each cursor. h.Store.ResultSet.CursorFn = func() tsdb.Cursor { cursor := internal.NewFloatArrayCursorMock() var i int64 cursor.NextFn = func() *tsdb.FloatArray { i++ ts := []int64{22000000 * i, 10000000000 * i} vs := []float64{2.3, 2992.33} if i > 2 { ts, vs = nil, nil } return &tsdb.FloatArray{Timestamps: ts, Values: vs} } return cursor } // Tags for each cursor. h.Store.ResultSet.TagsFn = func() models.Tags { return models.NewTags(map[string]string{ "host": fmt.Sprintf("server-%d", i), "_measurement": "mem", }) } h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } reqBuf, err := snappy.Decode(nil, w.Body.Bytes()) if err != nil { t.Fatal(err) } var resp prompb.ReadResponse if err := resp.Unmarshal(reqBuf); err != nil { t.Fatal(err) } expResults := []*prompb.QueryResult{ { Timeseries: []*prompb.TimeSeries{ { Labels: []prompb.Label{ {Name: "host", Value: "server-1"}, }, Samples: []prompb.Sample{ {Timestamp: 22, Value: 2.3}, {Timestamp: 10000, Value: 2992.33}, {Timestamp: 44, Value: 2.3}, {Timestamp: 20000, Value: 2992.33}, }, }, { Labels: []prompb.Label{ {Name: "host", Value: "server-2"}, }, Samples: []prompb.Sample{ {Timestamp: 22, Value: 2.3}, {Timestamp: 10000, Value: 2992.33}, {Timestamp: 44, Value: 2.3}, {Timestamp: 20000, Value: 2992.33}, }, }, }, }, } if !reflect.DeepEqual(resp.Results, expResults) { t.Fatalf("Results differ:\n%v", cmp.Diff(resp.Results, expResults)) } } func TestHandler_PromRead_NoResults(t *testing.T) { req := &prompb.ReadRequest{Queries: []*prompb.Query{&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { Type: prompb.LabelMatcher_EQ, Name: "__name__", Value: "value", }, }, StartTimestampMs: 0, EndTimestampMs: models.MaxNanoTime / int64(time.Millisecond), }}} data, err := req.Marshal() if err != nil { t.Fatal("couldn't marshal prometheus request") } compressed := snappy.Encode(nil, data) h := NewHandler(false) w := httptest.NewRecorder() b := bytes.NewReader(compressed) h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } reqBuf, err := snappy.Decode(nil, w.Body.Bytes()) if err != nil { t.Fatal(err.Error()) } var resp prompb.ReadResponse if err := resp.Unmarshal(reqBuf); err != nil { t.Fatal(err.Error()) } } func TestHandler_PromRead_UnsupportedCursors(t *testing.T) { req := &prompb.ReadRequest{Queries: []*prompb.Query{&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { Type: prompb.LabelMatcher_EQ, Name: "__name__", Value: "value", }, }, StartTimestampMs: 0, EndTimestampMs: models.MaxNanoTime / int64(time.Millisecond), }}} data, err := req.Marshal() if err != nil { t.Fatal("couldn't marshal prometheus request") } compressed := snappy.Encode(nil, data) unsupported := []tsdb.Cursor{ internal.NewIntegerArrayCursorMock(), internal.NewBooleanArrayCursorMock(), internal.NewUnsignedArrayCursorMock(), internal.NewStringArrayCursorMock(), } for _, cursor := range unsupported { h := NewHandler(false) w := httptest.NewRecorder() var lb bytes.Buffer h.Logger = logger.New(&lb) more := true h.Store.ResultSet.NextFn = func() bool { defer func() { more = false }(); return more } // Set the cursor type that will be returned while iterating over // the mock store. h.Store.ResultSet.CursorFn = func() tsdb.Cursor { return cursor } b := bytes.NewReader(compressed) h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } reqBuf, err := snappy.Decode(nil, w.Body.Bytes()) if err != nil { t.Fatal(err.Error()) } var resp prompb.ReadResponse if err := resp.Unmarshal(reqBuf); err != nil { t.Fatal(err.Error()) } if !strings.Contains(lb.String(), "cursor_type=") { t.Fatalf("got log message %q, expected to contain \"cursor_type\"", lb.String()) } } } func TestHandler_Flux_DisabledByDefault(t *testing.T) { h := NewHandler(false) w := httptest.NewRecorder() body := bytes.NewBufferString(`from(bucket:"db/rp") |> range(start:-1h) |> last()`) h.ServeHTTP(w, MustNewRequest("POST", "/api/v2/query", body)) if got := w.Code; !cmp.Equal(got, http.StatusForbidden) { t.Fatalf("unexpected status: %d", got) } exp := `{"error":"Flux query service disabled. Verify flux-enabled=true in the [http] section of the InfluxDB config."}` + "\n" if got := w.Body.String(); got != exp { t.Fatalf("unexpected body -got/+exp\n%s", cmp.Diff(got, exp)) } } func TestHandler_PromRead_NilResultSet(t *testing.T) { req := &prompb.ReadRequest{ Queries: []*prompb.Query{{ Matchers: []*prompb.LabelMatcher{ { Type: prompb.LabelMatcher_EQ, Name: "__name__", Value: "value", }, }, StartTimestampMs: 1, EndTimestampMs: 2, }}, } data, err := req.Marshal() if err != nil { log.Fatal("couldn't marshal prometheus request") } compressed := snappy.Encode(nil, data) b := bytes.NewReader(compressed) h := NewHandler(false) // Mocks the case when Store.Read() returns nil, nil h.Handler.Store.(*internal.StorageStoreMock).ReadFilterFn = func(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) { return nil, nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } if w.Header().Get("Content-Type") != "application/x-protobuf" { t.Fatalf("Got unexpected \"Content-Type\" header value:\n%v", cmp.Diff("application/x-protobuf", w.Header().Get("Content-Type"))) } if w.Header().Get("Content-Encoding") != "snappy" { t.Fatalf("Got unexpected \"Content-Encoding\" header value:\n%v", cmp.Diff("snappy", w.Header().Get("Content-Encoding"))) } decompressed, err := snappy.Decode(nil, w.Body.Bytes()) if err != nil { t.Fatal(err) } resp := new(prompb.ReadResponse) err = resp.Unmarshal(decompressed) if err != nil { t.Fatal(err) } expected := &prompb.ReadResponse{ Results: []*prompb.QueryResult{{}}, } if !reflect.DeepEqual(resp, expected) { t.Fatalf("Results differ:\n%v", cmp.Diff(expected, resp)) } } func TestHandler_Flux_QueryJSON(t *testing.T) { h := NewHandlerWithConfig(NewHandlerConfig(WithFlux(), WithNoLog())) called := false qry := "foo" h.Controller.QueryFn = func(ctx context.Context, compiler flux.Compiler) (i flux.Query, e error) { if exp := flux.CompilerType(lang.FluxCompilerType); compiler.CompilerType() != exp { t.Fatalf("unexpected compiler type -got/+exp\n%s", cmp.Diff(compiler.CompilerType(), exp)) } if c, ok := compiler.(lang.FluxCompiler); !ok { t.Fatal("expected lang.FluxCompiler") } else if exp := qry; c.Query != exp { t.Fatalf("unexpected query -got/+exp\n%s", cmp.Diff(c.Query, exp)) } called = true p := &mock.Program{} return p.Start(ctx, nil) } q := client.QueryRequest{Query: qry} var body bytes.Buffer if err := json.NewEncoder(&body).Encode(q); err != nil { t.Fatalf("unexpected JSON encoding error: %q", err.Error()) } req := MustNewRequest("POST", "/api/v2/query", &body) req.Header.Add("content-type", "application/json") w := httptest.NewRecorder() h.ServeHTTP(w, req) if got := w.Code; !cmp.Equal(got, http.StatusOK) { t.Fatalf("unexpected status: %d", got) } if !called { t.Fatalf("expected QueryFn to be called") } } func TestHandler_Flux_QueryText(t *testing.T) { h := NewHandlerWithConfig(NewHandlerConfig(WithFlux(), WithNoLog())) called := false qry := "bar" h.Controller.QueryFn = func(ctx context.Context, compiler flux.Compiler) (i flux.Query, e error) { if exp := flux.CompilerType(lang.FluxCompilerType); compiler.CompilerType() != exp { t.Fatalf("unexpected compiler type -got/+exp\n%s", cmp.Diff(compiler.CompilerType(), exp)) } if c, ok := compiler.(lang.FluxCompiler); !ok { t.Fatal("expected lang.FluxCompiler") } else if exp := qry; c.Query != exp { t.Fatalf("unexpected query -got/+exp\n%s", cmp.Diff(c.Query, exp)) } called = true p := &mock.Program{} return p.Start(ctx, nil) } req := MustNewRequest("POST", "/api/v2/query", bytes.NewBufferString(qry)) req.Header.Add("content-type", "application/vnd.flux") w := httptest.NewRecorder() h.ServeHTTP(w, req) if got := w.Code; !cmp.Equal(got, http.StatusOK) { t.Fatalf("unexpected status: %d", got) } if !called { t.Fatalf("expected QueryFn to be called") } } func TestHandler_Flux(t *testing.T) { queryBytes := func(qs string) io.Reader { var b bytes.Buffer q := &client.QueryRequest{Query: qs} if err := json.NewEncoder(&b).Encode(q); err != nil { t.Fatalf("unexpected JSON encoding error: %q", err.Error()) } return &b } tests := []struct { name string reqFn func() *http.Request expCode int expBody string }{ { name: "no media type", reqFn: func() *http.Request { return MustNewRequest("POST", "/api/v2/query", nil) }, expCode: http.StatusBadRequest, expBody: "{\"error\":\"mime: no media type\"}\n", }, { name: "200 OK", reqFn: func() *http.Request { req := MustNewRequest("POST", "/api/v2/query", queryBytes("foo")) req.Header.Add("content-type", "application/json") return req }, expCode: http.StatusOK, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { h := NewHandlerWithConfig(NewHandlerConfig(WithFlux(), WithNoLog())) h.Controller.QueryFn = func(ctx context.Context, compiler flux.Compiler) (i flux.Query, e error) { p := &mock.Program{} return p.Start(ctx, nil) } w := httptest.NewRecorder() h.ServeHTTP(w, test.reqFn()) if got := w.Code; !cmp.Equal(got, test.expCode) { t.Fatalf("unexpected status: %d", got) } if test.expBody != "" { if got := w.Body.String(); got != test.expBody { t.Fatalf("unexpected body -got/+exp\n%s", cmp.Diff(got, test.expBody)) } } }) } } func TestHandler_Flux_Auth(t *testing.T) { // Create the handler to be tested. h := NewHandlerWithConfig(NewHandlerConfig(WithFlux(), WithNoLog(), WithAuthentication())) h.MetaClient.AdminUserExistsFn = func() bool { return true } h.MetaClient.UserFn = func(username string) (meta.User, error) { if username != "user1" { return nil, meta.ErrUserNotFound } return &meta.UserInfo{ Name: "user1", Hash: "abcd", Admin: true, }, nil } h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { if u != "user1" { return nil, fmt.Errorf("unexpected user: exp: user1, got: %s", u) } else if p != "abcd" { return nil, fmt.Errorf("unexpected password: exp: abcd, got: %s", p) } return h.MetaClient.User(u) } h.Controller.QueryFn = func(ctx context.Context, compiler flux.Compiler) (i flux.Query, e error) { p := &mock.Program{} return p.Start(ctx, nil) } req := MustNewRequest("POST", "/api/v2/query", bytes.NewBufferString("bar")) req.Header.Set("content-type", "application/vnd.flux") req.Header.Set("Authorization", "Token user1:abcd") // Test the handler with valid user and password in the URL parameters. w := httptest.NewRecorder() h.ServeHTTP(w, req) if got := w.Code; !cmp.Equal(got, http.StatusOK) { t.Fatalf("unexpected status: %d", got) } req.Header.Set("Authorization", "Token user1:efgh") w = httptest.NewRecorder() h.ServeHTTP(w, req) if got := w.Code; !cmp.Equal(got, http.StatusUnauthorized) { t.Fatalf("unexpected status: %d", got) } } // Ensure the handler handles ping requests correctly. func TestHandler_Ping(t *testing.T) { h := NewHandler(false) w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("GET", "/ping", nil)) if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } h.ServeHTTP(w, MustNewRequest("HEAD", "/ping", nil)) if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } } // Ensure the handler handles health requests correctly. func TestHandler_Health(t *testing.T) { h := NewHandler(false) w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("GET", "/health", nil)) if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } var got map[string]interface{} if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil { t.Fatal(err) } assert.Equal(t, got["name"], "influxdb", "invalid name") assert.Equal(t, got["message"], "ready for queries and writes", "invalid message") assert.Equal(t, got["status"], "pass", "invalid status") assert.Equal(t, got["version"], "0.0.0", "invalid version") if _, present := got["checks"]; !present { t.Fatal("missing checks") } } // Ensure the handler returns the version correctly from the different endpoints. func TestHandler_Version(t *testing.T) { h := NewHandler(false) h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return nil } tests := []struct { method string endpoint string body io.Reader }{ { method: "GET", endpoint: "/ping", body: nil, }, { method: "GET", endpoint: "/query?db=foo&q=SELECT+*+FROM+bar", body: nil, }, { method: "POST", endpoint: "/write", body: bytes.NewReader(make([]byte, 10)), }, { method: "GET", endpoint: "/notfound", body: nil, }, } for _, test := range tests { w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest(test.method, test.endpoint, test.body)) if v := w.HeaderMap["X-Influxdb-Version"]; len(v) > 0 { if v[0] != "0.0.0" { t.Fatalf("unexpected version: %s", v) } } else { t.Fatalf("Header entry 'X-Influxdb-Version' not present") } if v := w.HeaderMap["X-Influxdb-Build"]; len(v) > 0 { if v[0] != "OSS" { t.Fatalf("unexpected BuildType: %s", v) } } else { t.Fatalf("Header entry 'X-Influxdb-Build' not present") } } } // Ensure the handler handles status requests correctly. func TestHandler_Status(t *testing.T) { h := NewHandler(false) w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("GET", "/status", nil)) if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } h.ServeHTTP(w, MustNewRequest("HEAD", "/status", nil)) if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } } // Ensure write endpoint can handle bad requests func TestHandler_HandleBadRequestBody(t *testing.T) { b := bytes.NewReader(make([]byte, 10)) h := NewHandler(false) w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/write", b)) if w.Code != http.StatusBadRequest { t.Fatalf("unexpected status: %d", w.Code) } } func TestHandler_Write_EntityTooLarge_ContentLength(t *testing.T) { b := bytes.NewReader(make([]byte, 100)) h := NewHandler(false) h.Config.MaxBodySize = 5 h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b)) if w.Code != http.StatusRequestEntityTooLarge { t.Fatalf("unexpected status: %d", w.Code) } } func TestHandler_Write_SuppressLog(t *testing.T) { var buf bytes.Buffer c := httpd.NewConfig() c.SuppressWriteLog = true h := NewHandlerWithConfig(c) h.CLFLogger = log.New(&buf, "", log.LstdFlags) h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } h.PointsWriter.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error { return nil } b := strings.NewReader("cpu,host=server01 value=2\n") w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b)) if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } // If the log has anything in it, this failed. if buf.Len() > 0 { t.Fatalf("expected no bytes to be written to the log, got %d", buf.Len()) } } // onlyReader implements io.Reader only to ensure Request.ContentLength is not set type onlyReader struct { r io.Reader } func (o onlyReader) Read(p []byte) (n int, err error) { return o.r.Read(p) } func TestHandler_Write_EntityTooLarge_NoContentLength(t *testing.T) { b := onlyReader{bytes.NewReader(make([]byte, 100))} h := NewHandler(false) h.Config.MaxBodySize = 5 h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b)) if w.Code != http.StatusRequestEntityTooLarge { t.Fatalf("unexpected status: %d", w.Code) } } // TestHandler_Write_NegativeMaxBodySize verifies no error occurs if MaxBodySize is < 0 func TestHandler_Write_NegativeMaxBodySize(t *testing.T) { b := bytes.NewReader([]byte(`foo n=1`)) h := NewHandler(false) h.Config.MaxBodySize = -1 h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } called := false h.PointsWriter.WritePointsFn = func(_, _ string, _ models.ConsistencyLevel, _ meta.User, _ []models.Point) error { called = true return nil } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b)) if !called { t.Fatal("WritePoints: expected call") } if w.Code != http.StatusNoContent { t.Fatalf("unexpected status: %d", w.Code) } } // TestHandler_Write_V1_Precision verifies v1 writes validate precision. func TestHandler_Write_V1_Precision(t *testing.T) { h := NewHandler(false) h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } h.PointsWriter.WritePointsFn = func(_, _ string, _ models.ConsistencyLevel, _ meta.User, _ []models.Point) error { return nil } tests := []struct { url string status int }{ // Successful requests. {"/write?db=foo", http.StatusNoContent}, {"/write?db=foo&precision=n", http.StatusNoContent}, {"/write?db=foo&precision=u", http.StatusNoContent}, {"/write?db=foo&precision=ms", http.StatusNoContent}, {"/write?db=foo&precision=s", http.StatusNoContent}, {"/write?db=foo&precision=m", http.StatusNoContent}, {"/write?db=foo&precision=h", http.StatusNoContent}, // Invalid requests. {"/write?db=foo&precision=us", http.StatusBadRequest}, } runTest := func(url string, status int) { b := bytes.NewReader([]byte(`foo n=1`)) w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", url, b)) if w.Code != status { t.Fatalf("unexpected result for: \"%s\"\n\texp: %d, got: %d\n\t%s", url, status, w.Code, w.Body) } } for _, t := range tests { runTest(t.url, t.status) } } // TestHandler_Write_V2_Precision verifies v2 writes validate precision. func TestHandler_Write_V2_Precision(t *testing.T) { h := NewHandler(false) h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { return &meta.DatabaseInfo{} } h.PointsWriter.WritePointsFn = func(_, _ string, _ models.ConsistencyLevel, _ meta.User, _ []models.Point) error { return nil } tests := []struct { url string status int }{ // Successful requests. {"/api/v2/write?org=bar&bucket=foo", http.StatusNoContent}, {"/api/v2/write?org=bar&bucket=foo&precision=ns", http.StatusNoContent}, {"/api/v2/write?org=bar&bucket=foo&precision=us", http.StatusNoContent}, {"/api/v2/write?org=bar&bucket=foo&precision=ms", http.StatusNoContent}, {"/api/v2/write?org=bar&bucket=foo&precision=s", http.StatusNoContent}, // Invalid requests. {"/api/v2/write?org=bar&bucket=foo&precision=n", http.StatusBadRequest}, {"/api/v2/write?org=bar&bucket=foo&precision=u", http.StatusBadRequest}, {"/api/v2/write?org=bar&bucket=foo&precision=m", http.StatusBadRequest}, {"/api/v2/write?org=bar&bucket=foo&precision=h", http.StatusBadRequest}, } runTest := func(url string, status int) { b := bytes.NewReader([]byte(`foo n=1`)) w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("POST", url, b)) if w.Code != status { t.Fatalf("unexpected result for: \"%s\"\n\texp: %d, got: %d\n\t%s", url, status, w.Code, w.Body) } } for _, t := range tests { runTest(t.url, t.status) } } func TestHandler_Delete_V2(t *testing.T) { var errUnexpectedMeasurement = errors.New("unexpected measurement") type test struct { url string body httpd.DeleteBody status int errMsg string } tests := []*test{ &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z"}, status: http.StatusOK, errMsg: ``, }, &test{ url: "/api/v2/delete?/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z"}, status: http.StatusNotFound, errMsg: `delete - bucket: bucket name "" is missing a slash; not in "database/retention-policy" format`, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z", Predicate: "?>!!?>?>;;;"}, status: http.StatusBadRequest, errMsg: `delete - cannot parse predicate "?>!!?>?>;;; AND time >= '2022-03-23T22:56:06Z' AND time < '2022-03-23T20:56:06Z'": found ?, expected identifier, string, number, bool at line 1, char 1`, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z", Predicate: "_measurement=\"mymeasure\" AND t1=tagOne"}, status: http.StatusOK, errMsg: ``, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z"}, status: http.StatusOK, errMsg: ``, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Start: "2022-03-23T20:56:06Z"}, status: http.StatusBadRequest, errMsg: "delete - stop field in RFC3339Nano format required", }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z"}, status: http.StatusBadRequest, errMsg: "delete - start field in RFC3339Nano format required", }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Start: "2022-03-23T20:56:06Z", Stop: "NotAValidTime"}, status: http.StatusBadRequest, errMsg: `delete - invalid format for stop field "NotAValidTime", please use RFC3339Nano: parsing time "NotAValidTime" as "2006-01-02T15:04:05.999999999Z07:00": cannot parse "NotAValidTime" as "2006"`, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "NotAValidTime"}, status: http.StatusBadRequest, errMsg: `delete - invalid format for start field "NotAValidTime", please use RFC3339Nano: parsing time "NotAValidTime" as "2006-01-02T15:04:05.999999999Z07:00": cannot parse "NotAValidTime" as "2006"`, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = "mymeasure" AND "tag0" = "value1"`}, status: http.StatusOK, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = mymeasure AND "tag0" = "value1"`}, status: http.StatusOK, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = "mymeasure" AND "tag0" = "value1" AND tag1 = value3`}, status: http.StatusOK, }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = "bad_measurement" AND "tag0" = "value1" AND tag1 = value3`}, status: http.StatusBadRequest, errMsg: "delete - database: \"mydb\", retention policy: \"myrp\", start: \"2022-03-23T18:56:06Z\", stop: \"2022-03-23T20:56:06Z\", predicate: \"_measurement = \\\"bad_measurement\\\" AND \\\"tag0\\\" = \\\"value1\\\" AND tag1 = value3\", error: unexpected measurement", }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = bad_measurement AND "tag0" = "value1" AND tag1 = value3`}, status: http.StatusBadRequest, errMsg: "delete - database: \"mydb\", retention policy: \"myrp\", start: \"2022-03-23T18:56:06Z\", stop: \"2022-03-23T20:56:06Z\", predicate: \"_measurement = bad_measurement AND \\\"tag0\\\" = \\\"value1\\\" AND tag1 = value3\", error: unexpected measurement", }, &test{ url: "/api/v2/delete?org=bar&bucket=mydb/myrp", body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = "mymeasure" AND "tag0" != "value1" AND tag1 = value3`}, status: http.StatusBadRequest, errMsg: `delete - predicate only supports equality operators and conjunctions. database: "mydb", retention policy: "myrp", start: "2022-03-23T18:56:06Z", stop: "2022-03-23T20:56:06Z", predicate: "_measurement = \"mymeasure\" AND \"tag0\" != \"value1\" AND tag1 = value3"`, }, } h := NewHandler(false) h.Store.DeleteFn = func(database string, sources []influxql.Source, condition influxql.Expr) error { if len(sources) > 0 { if m, ok := sources[0].(*influxql.Measurement); ok && m.Name != "mymeasure" { return errUnexpectedMeasurement } } return nil } h.MetaClient = &internal.MetaClientMock{ DatabaseFn: func(name string) *meta.DatabaseInfo { if name == "mydb" { return &meta.DatabaseInfo{ Name: "mydb", RetentionPolicies: []meta.RetentionPolicyInfo{meta.RetentionPolicyInfo{Name: "myrp"}}, } } else { return nil } }, } h.Handler.MetaClient = h.MetaClient var req *http.Request fn := func(ct *test) { w := httptest.NewRecorder() if body, err := json.Marshal(&ct.body); err != nil { t.Fatalf("error marshaling body: %s", err) } else { req = MustNewJSONRequest("POST", ct.url, bytes.NewReader(body)) } h.ServeHTTP(w, req) var errMsg string if w.Code != ct.status { t.Fatalf("error, expected %d got %d: %s", ct.status, w.Code, errMsg) } else if w.Code != http.StatusOK { errMsg = w.Header().Get("X-InfluxDB-Error") if errMsg != ct.errMsg { t.Fatalf("incorrect error message, expected: %q, got: %q", ct.errMsg, errMsg) } } } for _, ct := range tests { fn(ct) } } func TestHandler_CreateDeleteBuckets(t *testing.T) { const existingDb = "mydb" const newDb = "newDb" const goodRp = "myrp" const postMethod = "POST" const deleteMethod = "DELETE" const patchMethod = "PATCH" type test struct { url string method string body httpd.BucketsBody status int errMsg string } tests := []*test{ { url: "/api/v2/buckets", method: postMethod, body: httpd.BucketsBody{ BucketUpdate: httpd.BucketUpdate{ Name: existingDb + "/", RetentionRules: []httpd.RetentionRule{ httpd.RetentionRule{ EverySeconds: 7200, ShardGroupDurationSeconds: 14400, }, }, }, Rp: "", SchemaType: "implicit", }, status: http.StatusBadRequest, errMsg: `buckets - illegal bucket name: "mydb/"`, }, { url: "/api/v2/buckets", method: postMethod, body: httpd.BucketsBody{ BucketUpdate: httpd.BucketUpdate{ Name: existingDb + "//", RetentionRules: []httpd.RetentionRule{ httpd.RetentionRule{ EverySeconds: 7200, ShardGroupDurationSeconds: 14400, }, }, }, Rp: "", SchemaType: "implicit", }, status: http.StatusBadRequest, errMsg: `buckets - retention policy "/": invalid name`, }, { url: "/api/v2/buckets/" + existingDb + "%2f" + goodRp, method: patchMethod, body: httpd.BucketsBody{ BucketUpdate: httpd.BucketUpdate{ Name: "newNewRp", RetentionRules: []httpd.RetentionRule{ { EverySeconds: 6000, ShardGroupDurationSeconds: 18000, }, }, }, }, status: http.StatusOK, }, { url: "/api/v2/buckets/" + existingDb + "/", method: deleteMethod, status: http.StatusNotFound, }, { url: "/api/v2/buckets/baddb%2f" + goodRp, method: deleteMethod, status: http.StatusNotFound, errMsg: `delete bucket - not found: "baddb/myrp"`, }, { url: "/api/v2/buckets", method: postMethod, body: httpd.BucketsBody{ BucketUpdate: httpd.BucketUpdate{ Name: newDb + "/" + goodRp, RetentionRules: []httpd.RetentionRule{ httpd.RetentionRule{ EverySeconds: 7200, ShardGroupDurationSeconds: 14400, }, }, }, Rp: goodRp, SchemaType: "implicit", }, status: http.StatusCreated, }, { url: "/api/v2/buckets/" + existingDb + "%2fbadrp", method: deleteMethod, status: http.StatusNotFound, errMsg: `delete bucket - not found: "mydb/badrp"`, }, { url: "/api/v2/buckets", method: postMethod, body: httpd.BucketsBody{ BucketUpdate: httpd.BucketUpdate{ Name: existingDb + "/" + goodRp, RetentionRules: []httpd.RetentionRule{ httpd.RetentionRule{ EverySeconds: 7200, ShardGroupDurationSeconds: 14400, }, }, }, Rp: goodRp, SchemaType: "implicit", }, status: http.StatusCreated, }, { url: "/api/v2/buckets/" + existingDb + "%2f" + goodRp, method: deleteMethod, status: http.StatusOK, }, } createRp := func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) { return &meta.RetentionPolicyInfo{ Name: spec.Name, ReplicaN: *spec.ReplicaN, Duration: *spec.Duration, ShardGroupDuration: spec.ShardGroupDuration, }, nil } lookupDb := func(name string) *meta.DatabaseInfo { if name == existingDb { return &meta.DatabaseInfo{ Name: name, DefaultRetentionPolicy: goodRp, RetentionPolicies: []meta.RetentionPolicyInfo{meta.RetentionPolicyInfo{Name: goodRp}}, } } else { return nil } } dropDeleteRp := func(database, rp string) error { if dbi := lookupDb(database); dbi == nil { return fmt.Errorf("database not found: %q", database) } else if len(dbi.RetentionPolicies) <= 0 || dbi.RetentionPolicies[0].Name != rp { return fmt.Errorf("retention policy in database %q not found: %q", database, rp) } else { return nil } } updateRp := func(database string, name string, update *meta.RetentionPolicyUpdate, makeDefault bool) error { if database == existingDb && name == goodRp { return nil } else { return fmt.Errorf("bucket not found: %q", fmt.Sprintf("%s/%s", database, name)) } } h := NewHandler(false) h.MetaClient = &internal.MetaClientMock{ DatabaseFn: lookupDb, CreateRetentionPolicyFn: createRp, CreateDatabaseWithRetentionPolicyFn: func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { rpi, err := createRp(name, spec, true) return &meta.DatabaseInfo{ Name: name, DefaultRetentionPolicy: spec.Name, RetentionPolicies: []meta.RetentionPolicyInfo{*rpi}, ContinuousQueries: nil, }, err }, DropRetentionPolicyFn: dropDeleteRp, UpdateRetentionPolicyFn: updateRp, } h.Store.DeleteRetentionPolicyFn = dropDeleteRp h.Handler.Store = h.Store h.Handler.MetaClient = h.MetaClient var req *http.Request fn := func(ct *test) { w := httptest.NewRecorder() if body, err := json.Marshal(&ct.body); err != nil { t.Fatalf("error marshaling body: %s", err) } else { req = MustNewJSONRequest(ct.method, ct.url, bytes.NewReader(body)) } h.ServeHTTP(w, req) var errMsg string if w.Code != ct.status { t.Fatalf("error, expected %d got %d: %s", ct.status, w.Code, errMsg) } else if w.Code != http.StatusOK { errMsg = w.Header().Get("X-InfluxDB-Error") if errMsg != ct.errMsg { t.Fatalf("incorrect error message, expected: %q, got: %q", ct.errMsg, errMsg) } } } for _, ct := range tests { fn(ct) } } var testBuckets = []meta.DatabaseInfo{ { Name: "dbOne", RetentionPolicies: []meta.RetentionPolicyInfo{{Name: "rpOne_1"}, {Name: "rpTwo_1"}, {Name: "rpThree_1"}}, }, { Name: "dbTwo", RetentionPolicies: []meta.RetentionPolicyInfo{{Name: "rpOne_2"}, {Name: "rpTwo_2"}, {Name: "rpThree_2"}, {Name: "rpFour_2"}}, }, { Name: "dbThree", RetentionPolicies: []meta.RetentionPolicyInfo{{Name: "rpOne_3"}}, }, } func getBuckets(offset, limit int) []string { i := 0 buckets := make([]string, 0, 8) for _, dbi := range testBuckets { for _, rpi := range dbi.RetentionPolicies { if limit <= (i - offset) { return buckets } if i >= offset { buckets = append(buckets, fmt.Sprintf("%s/%s", dbi.Name, rpi.Name)) } i++ } } return buckets } func TestHandler_ListBuckets(t *testing.T) { type test struct { url string status int errMsg string skip int limit int } tests := []*test{ { url: "/api/v2/buckets?after=dbOne/rpTwo_1&limit=-1", status: http.StatusOK, skip: 2, limit: 1000000, }, { url: "/api/v2/buckets?offset=200", status: http.StatusOK, skip: 100, limit: 20, }, { url: "/api/v2/buckets?id=dbOne/rpTwo_1&name=NotThere/rpTwo_1", status: http.StatusBadRequest, skip: 1, limit: 1, errMsg: "list buckets: name: \"NotThere/rpTwo_1\" and id: \"dbOne/rpTwo_1\" do not match", }, { url: "/api/v2/buckets?after=dbOne/rpTwo_1&limit=4", status: http.StatusOK, skip: 2, limit: 4, }, { url: "/api/v2/buckets?after=dbOne/rpThree_1", status: http.StatusOK, skip: 3, limit: 20, }, { url: "/api/v2/buckets?after=dbTwo/rpTwo_2", status: http.StatusOK, skip: 5, limit: 20, }, { url: "/api/v2/buckets?id=dbOne/rpTwo_1&name=dbOne/rpTwo_1", status: http.StatusOK, skip: 1, limit: 1, }, { url: "/api/v2/buckets?id=dbOne/rpTwo_1", status: http.StatusOK, skip: 1, limit: 1, }, { url: "/api/v2/buckets?name=dbOne/rpTwo_1", status: http.StatusOK, skip: 1, limit: 1, }, { url: "/api/v2/buckets?offset=3&after=dbOne/rpOne_1", status: http.StatusBadRequest, skip: 0, limit: 20, errMsg: "list buckets cannot have both \"offset\" and \"after\" arguments", }, { url: "/api/v2/buckets?offset=3&limit=4", status: http.StatusOK, skip: 3, limit: 4, }, { url: "/api/v2/buckets?offset=1&limit=5", status: http.StatusOK, skip: 1, limit: 5, }, { url: "/api/v2/buckets", status: http.StatusOK, skip: 0, limit: 20, }, } lookupDbFn := func(name string) *meta.DatabaseInfo { for i := 0; i < len(testBuckets); i++ { if testBuckets[i].Name == name { return &testBuckets[i] } } return nil } dbsFn := func() []meta.DatabaseInfo { return testBuckets } h := NewHandler(false) h.MetaClient = &internal.MetaClientMock{ DatabaseFn: lookupDbFn, DatabasesFn: dbsFn, } h.Handler.MetaClient = h.MetaClient var req *http.Request fn := func(ct *test) { w := httptest.NewRecorder() req = MustNewJSONRequest("GET", ct.url, nil) h.ServeHTTP(w, req) var errMsg string if w.Code != ct.status { t.Fatalf("error, expected %d got %d: %s", ct.status, w.Code, errMsg) } else if w.Code != http.StatusOK { errMsg = w.Header().Get("X-InfluxDB-Error") if errMsg != ct.errMsg { t.Fatalf("incorrect error message, expected: %q, got: %q", ct.errMsg, errMsg) } } else { var got httpd.Buckets exp := getBuckets(ct.skip, ct.limit) if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil { t.Fatalf("unmarshaling buckets: %s", err.Error()) } if len(exp) != len(got.Buckets) { t.Fatalf("expected %d buckets returned, got %d", len(exp), len(got.Buckets)) } for i := 0; i < len(got.Buckets); i++ { if exp[i] != got.Buckets[i].Name { t.Fatalf("expected %q, got %q", exp[i], got.Buckets[i].Name) } } } } for _, ct := range tests { fn(ct) } } func TestHandler_RetrieveBucket(t *testing.T) { type test struct { url string status int errMsg string exp string } tests := []*test{ { url: "/api/v2/buckets/dbOne//", status: http.StatusNotFound, }, { url: "/api/v2/buckets/%2frpTwo_1", status: http.StatusBadRequest, errMsg: `bucket "/rpTwo_1": bucket name "/rpTwo_1" is in db/rp form but has an empty database`, }, { url: "/api/v2/buckets/dbOne%2f", status: http.StatusBadRequest, errMsg: `bucket "dbOne/": illegal bucket id, empty retention policy`, }, { url: "/api/v2/buckets/dbFive%2frpTwo_1", status: http.StatusNotFound, errMsg: `bucket not found: "dbFive/rpTwo_1"`, }, { url: "/api/v2/buckets/dbOne%2frpTwo_1", status: http.StatusOK, exp: "dbOne/rpTwo_1", }, { url: "/api/v2/buckets/dbOne%2frpOne_2", status: http.StatusNotFound, errMsg: `bucket not found: "dbOne/rpOne_2"`, }, } lookupDbFn := func(name string) *meta.DatabaseInfo { for i := 0; i < len(testBuckets); i++ { if testBuckets[i].Name == name { return &testBuckets[i] } } return nil } h := NewHandler(false) h.MetaClient = &internal.MetaClientMock{ DatabaseFn: lookupDbFn, } h.Handler.MetaClient = h.MetaClient var req *http.Request fn := func(ct *test) { w := httptest.NewRecorder() req = MustNewJSONRequest("GET", ct.url, nil) h.ServeHTTP(w, req) var errMsg string if w.Code != ct.status { t.Fatalf("error, test %s: expected %d got %d: %s", ct.url, ct.status, w.Code, errMsg) } else if w.Code != http.StatusOK { errMsg = w.Header().Get("X-InfluxDB-Error") if errMsg != ct.errMsg { t.Fatalf("incorrect error message, test %s: expected: %q, got: %q", ct.url, ct.errMsg, errMsg) } } else { var got httpd.Bucket if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil { t.Fatalf("unmarshaling buckets: %s", err.Error()) } if ct.exp != got.Name { t.Fatalf("expected %q, got %q", ct.exp, got.Name) } } } for _, ct := range tests { fn(ct) } } func TestHandler_UnsupportedV2API(t *testing.T) { type test struct { method string url string status int errMsg string } tests := []*test{ { method: "GET", url: "/api/v2/buckets/mydb%2fmyrp/labels", status: http.StatusMethodNotAllowed, errMsg: "bucket labels not supported in this version"}, { method: "POST", url: "/api/v2/buckets/mydb%2fmyrp/labels", status: http.StatusMethodNotAllowed, errMsg: "bucket labels not supported in this version", }, { method: "DELETE", url: "/api/v2/buckets/mydb%2fmyrp/labels/mylabel", status: http.StatusMethodNotAllowed, errMsg: "bucket labels not supported in this version"}, { method: "GET", url: "/api/v2/buckets/mydb%2fmyrp/members", status: http.StatusMethodNotAllowed, errMsg: "bucket members not supported in this version", }, { method: "POST", url: "/api/v2/buckets/mydb%2fmyrp/members", status: http.StatusMethodNotAllowed, errMsg: "bucket members not supported in this version", }, { method: "DELETE", url: "/api/v2/buckets/mydb%2fmyrp/members/amember", status: http.StatusMethodNotAllowed, errMsg: "bucket members not supported in this version", }, { method: "GET", url: "/api/v2/buckets/mydb%2fmyrp/owners", status: http.StatusMethodNotAllowed, errMsg: "bucket owners not supported in this version", }, { method: "POST", url: "/api/v2/buckets/mydb%2fmyrp/owners", status: http.StatusMethodNotAllowed, errMsg: "bucket owners not supported in this version", }, { method: "DELETE", url: "/api/v2/buckets/mydb%2fmyrp/owners/anowner", status: http.StatusMethodNotAllowed, errMsg: "bucket owners not supported in this version", }, } h := NewHandler(false) var req *http.Request fn := func(ct *test) { w := httptest.NewRecorder() req = MustNewJSONRequest(ct.method, ct.url, nil) h.ServeHTTP(w, req) var errMsg string if w.Code != ct.status { t.Fatalf("error, expected %d got %d: %s", ct.status, w.Code, errMsg) } else if w.Code != http.StatusOK { errMsg = w.Header().Get("X-InfluxDB-Error") if errMsg != ct.errMsg { t.Fatalf("incorrect error message, expected: %q, got: %q", ct.errMsg, errMsg) } } } for _, ct := range tests { fn(ct) } } // Ensure X-Forwarded-For header writes the correct log message. func TestHandler_XForwardedFor(t *testing.T) { var buf bytes.Buffer h := NewHandler(false) h.CLFLogger = log.New(&buf, "", 0) req := MustNewRequest("GET", "/query", nil) req.Header.Set("X-Forwarded-For", "192.168.0.1") req.RemoteAddr = "127.0.0.1" h.ServeHTTP(httptest.NewRecorder(), req) parts := strings.Split(buf.String(), " ") if parts[0] != "192.168.0.1,127.0.0.1" { t.Errorf("unexpected host ip address: %s", parts[0]) } } func TestHandler_XRequestId(t *testing.T) { var buf bytes.Buffer h := NewHandler(false) h.CLFLogger = log.New(&buf, "", 0) cases := []map[string]string{ {"X-Request-Id": "abc123", "Request-Id": ""}, // X-Request-Id is used. {"X-REQUEST-ID": "cde", "Request-Id": ""}, // X-REQUEST-ID is used. {"X-Request-Id": "", "Request-Id": "foobarzoo"}, // Request-Id is used. {"X-Request-Id": "abc123", "Request-Id": "foobarzoo"}, // X-Request-Id takes precedence. {"X-Request-Id": "", "Request-Id": ""}, // v1 UUID generated. } for _, c := range cases { t.Run(fmt.Sprint(c), func(t *testing.T) { buf.Reset() req := MustNewRequest("GET", "/ping", nil) req.RemoteAddr = "127.0.0.1" // Set the relevant request ID headers var allEmpty = true for k, v := range c { req.Header.Set(k, v) if v != "" { allEmpty = false } } w := httptest.NewRecorder() h.ServeHTTP(w, req) // Split up the HTTP log line. The request ID is currently located in // index 12. If the log line gets changed in the future, this test // will likely break and the index will need to be updated. parts := strings.Split(buf.String(), " ") i := 12 // If neither header is set then we expect a v1 UUID to be generated. if allEmpty { if got, exp := len(parts[i]), 36; got != exp { t.Fatalf("got ID of length %d, expected one of length %d", got, exp) } } else if c["X-Request-Id"] != "" { if got, exp := parts[i], c["X-Request-Id"]; got != exp { t.Fatalf("got ID of %q, expected %q", got, exp) } } else if c["X-REQUEST-ID"] != "" { if got, exp := parts[i], c["X-REQUEST-ID"]; got != exp { t.Fatalf("got ID of %q, expected %q", got, exp) } } else { if got, exp := parts[i], c["Request-Id"]; got != exp { t.Fatalf("got ID of %q, expected %q", got, exp) } } // Check response headers if got, exp := w.Header().Get("Request-Id"), parts[i]; got != exp { t.Fatalf("Request-Id header was %s, expected %s", got, exp) } else if got, exp := w.Header().Get("X-Request-Id"), parts[i]; got != exp { t.Fatalf("X-Request-Id header was %s, expected %s", got, exp) } }) } } func TestThrottler_Handler(t *testing.T) { t.Run("OK", func(t *testing.T) { throttler := httpd.NewThrottler(2, 98) // Send the total number of concurrent requests to the channel. var concurrentN int32 concurrentCh := make(chan int) h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { atomic.AddInt32(&concurrentN, 1) concurrentCh <- int(atomic.LoadInt32(&concurrentN)) time.Sleep(1 * time.Millisecond) atomic.AddInt32(&concurrentN, -1) })) // Execute requests concurrently. const n = 100 for i := 0; i < n; i++ { go func() { h.ServeHTTP(nil, nil) }() } // Read the number of concurrent requests for every execution. for i := 0; i < n; i++ { if v := <-concurrentCh; v > 2 { t.Fatalf("concurrent requests exceed maximum: %d", v) } } }) t.Run("ErrTimeout", func(t *testing.T) { throttler := httpd.NewThrottler(2, 1) throttler.EnqueueTimeout = 1 * time.Millisecond begin, end := make(chan struct{}), make(chan struct{}) h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { begin <- struct{}{} end <- struct{}{} })) // First two requests should execute immediately. go func() { h.ServeHTTP(nil, nil) }() go func() { h.ServeHTTP(nil, nil) }() <-begin <-begin // Third request should be enqueued but timeout. w := httptest.NewRecorder() h.ServeHTTP(w, nil) if w.Code != http.StatusServiceUnavailable { t.Fatalf("unexpected status code: %d", w.Code) } else if body := w.Body.String(); body != "request throttled, exceeds timeout\n" { t.Fatalf("unexpected response body: %q", body) } // Allow 2 existing requests to complete. <-end <-end }) t.Run("ErrFull", func(t *testing.T) { delay := 100 * time.Millisecond if os.Getenv("CI") != "" { delay = 2 * time.Second } throttler := httpd.NewThrottler(2, 1) resp := make(chan struct{}) h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { resp <- struct{}{} })) // First two requests should execute immediately and third should be queued. go func() { h.ServeHTTP(nil, nil) }() go func() { h.ServeHTTP(nil, nil) }() go func() { h.ServeHTTP(nil, nil) }() time.Sleep(delay) // Fourth request should fail when trying to enqueue. w := httptest.NewRecorder() h.ServeHTTP(w, nil) if w.Code != http.StatusServiceUnavailable { t.Fatalf("unexpected status code: %d", w.Code) } else if body := w.Body.String(); body != "request throttled, queue full\n" { t.Fatalf("unexpected response body: %q", body) } // Allow 3 existing requests to complete. <-resp <-resp <-resp }) } func TestHandlerDebugVars(t *testing.T) { stats := func(s ...*monitor.Statistic) ([]*monitor.Statistic, error) { return s, nil } stat := func(name string, tags map[string]string, vals map[string]interface{}) *monitor.Statistic { return &monitor.Statistic{ Statistic: models.Statistic{ Name: name, Tags: tags, Values: vals, }, } } tags := func(kv ...string) map[string]string { if len(kv)%2 != 0 { panic("expect even number of key/values") } res := make(map[string]string, len(kv)/2) for i := 0; i < len(kv); i += 2 { res[kv[i]] = kv[i+1] } return res } vals := func(kv ...interface{}) map[string]interface{} { if len(kv)%2 != 0 { panic("expect even number of key/values") } res := make(map[string]interface{}, len(kv)/2) for i := 0; i < len(kv); i += 2 { if key, ok := kv[i].(string); !ok { panic("key must be string") } else { res[key] = kv[i+1] } } return res } newDiagFn := func(d map[string]*diagnostics.Diagnostics) func() (map[string]*diagnostics.Diagnostics, error) { return func() (map[string]*diagnostics.Diagnostics, error) { return d, nil } } var Ignored = []string{"memstats", "cmdline"} read := func(t *testing.T, b *bytes.Buffer, del ...string) map[string]interface{} { t.Helper() res := make(map[string]interface{}) if err := json.Unmarshal(b.Bytes(), &res); err != nil { t.Fatal(err) } for _, k := range del { delete(res, k) } return res } keys := func(m map[string]interface{}) []string { keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } sort.Strings(keys) return keys } // stats tests the results of serializing Monitor.Statistics t.Run("stats", func(t *testing.T) { t.Run("generates unique keys using known tags", func(t *testing.T) { h := NewHandler(false) h.Monitor.StatisticsFn = func(_ map[string]string) ([]*monitor.Statistic, error) { return stats( stat("database", tags("database", "foo"), nil), stat("hh", tags("path", "/mnt/foo/bar"), nil), stat("httpd", tags("bind", "127.0.0.1:8088", "proto", "https"), nil), stat("other", tags("foo", "bar"), nil), stat("shard", tags("path", "/mnt/foo", "id", "111"), nil), ) } h.Monitor.DiagnosticsFn = newDiagFn(map[string]*diagnostics.Diagnostics{}) req := MustNewRequest("GET", "/debug/vars", nil) w := httptest.NewRecorder() h.ServeHTTP(w, req) got := keys(read(t, w.Body, Ignored...)) exp := []string{"crypto", "database:foo", "hh:/mnt/foo/bar", "httpd:https:127.0.0.1:8088", "other", "shard:/mnt/foo:111"} if !cmp.Equal(got, exp) { t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp)) } }) t.Run("generates numbered keys for collisions", func(t *testing.T) { // This also implicitly tests the case where no `crypto` diagnostics are not set by application. h := NewHandler(false) h.Monitor.StatisticsFn = func(_ map[string]string) ([]*monitor.Statistic, error) { return stats( stat("hh_processor", tags("db", "foo", "shardID", "10"), vals("queueSize", 100)), stat("hh_processor", tags("db", "foo", "shardID", "15"), vals("queueSize", 500)), stat("hh_processor", tags("db", "bar", "shardID", "20"), vals("queueSize", 200)), stat("hh_processor", tags("db", "bar", "shardID", "25"), vals("queueSize", 700)), ) } req := MustNewRequest("GET", "/debug/vars", nil) w := httptest.NewRecorder() h.ServeHTTP(w, req) got := read(t, w.Body, Ignored...) exp := map[string]interface{}{ "crypto": map[string]interface{}{ "FIPS": false, "ensureFIPS": false, "passwordHash": "bcrypt", "implementation": "Go", }, "hh_processor": map[string]interface{}{ "name": "hh_processor", "tags": map[string]interface{}{"db": "foo", "shardID": "10"}, "values": map[string]interface{}{"queueSize": float64(100)}, }, "hh_processor:1": map[string]interface{}{ "name": "hh_processor", "tags": map[string]interface{}{"db": "foo", "shardID": "15"}, "values": map[string]interface{}{"queueSize": float64(500)}, }, "hh_processor:2": map[string]interface{}{ "name": "hh_processor", "tags": map[string]interface{}{"db": "bar", "shardID": "20"}, "values": map[string]interface{}{"queueSize": float64(200)}, }, "hh_processor:3": map[string]interface{}{ "name": "hh_processor", "tags": map[string]interface{}{"db": "bar", "shardID": "25"}, "values": map[string]interface{}{"queueSize": float64(700)}, }, } if !cmp.Equal(got, exp) { t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp)) } }) }) t.Run("checks crypto diagnostic handling", func(t *testing.T) { h := NewHandler(false) // intentionally leave out "ensureFIPS" to test that code path h.Monitor.DiagnosticsFn = newDiagFn( map[string]*diagnostics.Diagnostics{ "crypto": diagnostics.RowFromMap(map[string]interface{}{ "FIPS": true, "passwordHash": "pbkdf2-sha256", "implementation": "BoringCrypto", }), }) req := MustNewRequest("GET", "/debug/vars", nil) w := httptest.NewRecorder() h.ServeHTTP(w, req) got := read(t, w.Body, Ignored...) exp := map[string]interface{}{ "crypto": map[string]interface{}{ "FIPS": true, "ensureFIPS": nil, "passwordHash": "pbkdf2-sha256", "implementation": "BoringCrypto", }, } if !cmp.Equal(got, exp) { t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp)) } }) } // NewHandler represents a test wrapper for httpd.Handler. type Handler struct { *httpd.Handler MetaClient *internal.MetaClientMock StatementExecutor HandlerStatementExecutor QueryAuthorizer HandlerQueryAuthorizer PointsWriter HandlerPointsWriter Monitor *HandlerMonitor Store *internal.StorageStoreMock Controller *internal.FluxControllerMock } type configOption func(c *httpd.Config) func WithAuthentication() configOption { return func(c *httpd.Config) { c.AuthEnabled = true c.SharedSecret = "super secret key" } } func WithPprofAuthEnabled() configOption { return func(c *httpd.Config) { c.PprofEnabled = true c.PprofAuthEnabled = true } } func WithPingAuthEnabled() configOption { return func(c *httpd.Config) { c.PingAuthEnabled = true } } func WithFlux() configOption { return func(c *httpd.Config) { c.FluxEnabled = true } } func WithNoLog() configOption { return func(c *httpd.Config) { c.LogEnabled = false } } func WithHeaders(h map[string]string) configOption { return func(c *httpd.Config) { c.HTTPHeaders = h } } // NewHandlerConfig returns a new instance of httpd.Config with // authentication configured. func NewHandlerConfig(opts ...configOption) httpd.Config { config := httpd.NewConfig() for _, opt := range opts { opt(&config) } return config } // NewHandler returns a new instance of Handler. func NewHandler(requireAuthentication bool) *Handler { var opts []configOption if requireAuthentication { opts = append(opts, WithAuthentication()) } return NewHandlerWithConfig(NewHandlerConfig(opts...)) } func NewHandlerWithConfig(config httpd.Config) *Handler { h := &Handler{ Handler: httpd.NewHandler(config), } h.MetaClient = &internal.MetaClientMock{} h.Store = internal.NewStorageStoreMock() h.Controller = internal.NewFluxControllerMock() h.Monitor = newHandlerMonitor() h.Handler.MetaClient = h.MetaClient h.Handler.Store = h.Store h.Handler.QueryExecutor = query.NewExecutor() h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor h.Handler.QueryAuthorizer = &h.QueryAuthorizer h.Handler.PointsWriter = &h.PointsWriter h.Handler.Monitor = h.Monitor h.Handler.Version = "0.0.0" h.Handler.BuildType = "OSS" h.Handler.Controller = h.Controller if testing.Verbose() { l := logger.New(os.Stdout) h.Handler.Logger = l } return h } // HandlerMonitor is a mock implementation of Handler.Monitor. type HandlerMonitor struct { StatisticsFn func(tags map[string]string) ([]*monitor.Statistic, error) DiagnosticsFn func() (map[string]*diagnostics.Diagnostics, error) } // newHandlerMonitor returns a HandlerMonitor with default implementations // for each function. func newHandlerMonitor() *HandlerMonitor { return &HandlerMonitor{ StatisticsFn: func(_ map[string]string) ([]*monitor.Statistic, error) { return nil, nil }, DiagnosticsFn: func() (map[string]*diagnostics.Diagnostics, error) { return make(map[string]*diagnostics.Diagnostics), nil }, } } func (m *HandlerMonitor) Statistics(tags map[string]string) ([]*monitor.Statistic, error) { return m.StatisticsFn(tags) } func (m *HandlerMonitor) Diagnostics() (map[string]*diagnostics.Diagnostics, error) { return m.DiagnosticsFn() } // HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor. type HandlerStatementExecutor struct { ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error } func (e *HandlerStatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt influxql.Statement) error { return e.ExecuteStatementFn(stmt, ctx) } // HandlerQueryAuthorizer is a mock implementation of Handler.QueryAuthorizer. type HandlerQueryAuthorizer struct { AuthorizeQueryFn func(u meta.User, query *influxql.Query, database string) error AuthorizeCreateDatabaseFn func(u meta.User) error AuthorizeCreateRetentionPolicyFn func(u meta.User, db string) AuthorizeDeleteRetentionPolicyFn func(u meta.User, db string) error } func (a *HandlerQueryAuthorizer) AuthorizeQuery(u meta.User, q *influxql.Query, database string) (query.FineAuthorizer, error) { return query.OpenAuthorizer, a.AuthorizeQueryFn(u, q, database) } func (a *HandlerQueryAuthorizer) AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error { panic("AuthorizeDatabase: not implemented") } func (a *HandlerQueryAuthorizer) AuthorizeCreateDatabase(u meta.User) error { return a.AuthorizeCreateDatabaseFn(u) } func (a *HandlerQueryAuthorizer) AuthorizeCreateRetentionPolicy(u meta.User, db string) error { return a.AuthorizeCreateRetentionPolicy(u, db) } func (a *HandlerQueryAuthorizer) AuthorizeDeleteRetentionPolicy(u meta.User, db string) error { return a.AuthorizeDeleteRetentionPolicyFn(u, db) } type HandlerPointsWriter struct { WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error } func (h *HandlerPointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error { return h.WritePointsFn(database, retentionPolicy, consistencyLevel, user, points) } // MustNewRequest returns a new HTTP request. Panic on error. func MustNewRequest(method, urlStr string, body io.Reader) *http.Request { r, err := http.NewRequest(method, urlStr, body) if err != nil { panic(err.Error()) } return r } // MustNewRequest returns a new HTTP request with the content type set. Panic on error. func MustNewJSONRequest(method, urlStr string, body io.Reader) *http.Request { r := MustNewRequest(method, urlStr, body) r.Header.Set("Accept", "application/json") return r } // MustJWTToken returns a new JWT token and signed string or panics trying. func MustJWTToken(username, secret string, expired bool) (*jwt.Token, string) { token := jwt.New(jwt.GetSigningMethod("HS512")) token.Claims.(jwt.MapClaims)["username"] = username if expired { token.Claims.(jwt.MapClaims)["exp"] = time.Now().Add(-time.Second).Unix() } else { token.Claims.(jwt.MapClaims)["exp"] = time.Now().Add(time.Minute * 10).Unix() } signed, err := token.SignedString([]byte(secret)) if err != nil { panic(err) } return token, signed } // Ensure that user supplied headers are applied to responses. func TestHandler_UserSuppliedHeaders(t *testing.T) { endpoints := []struct { method string path string }{ {method: "GET", path: "/ping"}, {method: "POST", path: "/api/v2/query"}, {method: "GET", path: "/query?db=foo&q=SELECT+*+FROM+bar"}, } for _, endpoint := range endpoints { t.Run(endpoint.method+endpoint.path, func(t *testing.T) { headers := map[string]string{ "X-Best-Operating-System": "FreeBSD", "X-Nana-Nana-Nana-Nana": "Batheader", "X-Powered-By": "hamster in a wheel", "X-Trek": "Live long and prosper", } // build a new handler with our headers as part of its configuration h := NewHandlerWithConfig(NewHandlerConfig(WithHeaders(headers))) w := httptest.NewRecorder() // generate request request req, err := http.NewRequest(endpoint.method, endpoint.path, nil) if err != nil { t.Fatal(err) } // serve the request h.ServeHTTP(w, req) response := w.Result() // ensure we received the headers we supplied for k, v := range headers { val, found := response.Header[k] if !found { t.Fatalf("Could not find header field %q in response", k) continue } if v != val[0] { t.Fatalf("value for header %q in http response is %q; expected %q", k, val, v) } } }) } }