3030 lines
89 KiB
Go
3030 lines
89 KiB
Go
package httpd_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"math"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"os"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang-jwt/jwt/v4"
|
|
"github.com/golang/snappy"
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/influxdata/flux"
|
|
"github.com/influxdata/flux/lang"
|
|
"github.com/influxdata/flux/mock"
|
|
"github.com/influxdata/influxdb/flux/client"
|
|
"github.com/influxdata/influxdb/internal"
|
|
"github.com/influxdata/influxdb/logger"
|
|
"github.com/influxdata/influxdb/models"
|
|
"github.com/influxdata/influxdb/monitor"
|
|
"github.com/influxdata/influxdb/monitor/diagnostics"
|
|
"github.com/influxdata/influxdb/pkg/testing/assert"
|
|
"github.com/influxdata/influxdb/query"
|
|
"github.com/influxdata/influxdb/services/httpd"
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
"github.com/influxdata/influxdb/storage/reads"
|
|
"github.com/influxdata/influxdb/storage/reads/datatypes"
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
"github.com/influxdata/influxql"
|
|
"github.com/prometheus/prometheus/prompb"
|
|
)
|
|
|
|
// Ensure the handler returns results from a query (including nil results).
|
|
func TestHandler_Query(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
if stmt.String() != `SELECT * FROM bar` {
|
|
t.Fatalf("unexpected query: %s", stmt.String())
|
|
} else if ctx.Database != `foo` {
|
|
t.Fatalf("unexpected db: %s", ctx.Database)
|
|
}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
|
|
ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns results from a query passed as a file.
|
|
func TestHandler_Query_File(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
if stmt.String() != `SELECT * FROM bar` {
|
|
t.Fatalf("unexpected query: %s", stmt.String())
|
|
} else if ctx.Database != `foo` {
|
|
t.Fatalf("unexpected db: %s", ctx.Database)
|
|
}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
|
|
ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
|
return nil
|
|
}
|
|
|
|
var body bytes.Buffer
|
|
writer := multipart.NewWriter(&body)
|
|
part, err := writer.CreateFormFile("q", "")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
io.WriteString(part, "SELECT * FROM bar")
|
|
|
|
if err := writer.Close(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
r := MustNewJSONRequest("POST", "/query?db=foo", &body)
|
|
r.Header.Set("Content-Type", writer.FormDataContentType())
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, r)
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Test query with user authentication.
|
|
func TestHandler_Query_Auth(t *testing.T) {
|
|
// Create the handler to be tested.
|
|
h := NewHandler(true)
|
|
|
|
// Set mock meta client functions for the handler to use.
|
|
h.MetaClient.AdminUserExistsFn = func() bool { return true }
|
|
|
|
h.MetaClient.UserFn = func(username string) (meta.User, error) {
|
|
if username != "user1" {
|
|
return nil, meta.ErrUserNotFound
|
|
}
|
|
return &meta.UserInfo{
|
|
Name: "user1",
|
|
Hash: "abcd",
|
|
Admin: true,
|
|
}, nil
|
|
}
|
|
|
|
h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
|
|
if u != "user1" {
|
|
return nil, fmt.Errorf("unexpected user: exp: user1, got: %s", u)
|
|
} else if p != "abcd" {
|
|
return nil, fmt.Errorf("unexpected password: exp: abcd, got: %s", p)
|
|
}
|
|
return h.MetaClient.User(u)
|
|
}
|
|
|
|
// Set mock query authorizer for handler to use.
|
|
h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error {
|
|
return nil
|
|
}
|
|
|
|
// Set mock statement executor for handler to use.
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
if stmt.String() != `SELECT * FROM bar` {
|
|
t.Fatalf("unexpected query: %s", stmt.String())
|
|
} else if ctx.Database != `foo` {
|
|
t.Fatalf("unexpected db: %s", ctx.Database)
|
|
}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
|
|
ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
|
return nil
|
|
}
|
|
|
|
// Test the handler with valid user and password in the URL parameters.
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
|
|
// Test the handler with valid user and password using basic auth.
|
|
w = httptest.NewRecorder()
|
|
r := MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)
|
|
r.SetBasicAuth("user1", "abcd")
|
|
h.ServeHTTP(w, r)
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
|
|
// Test the handler with valid JWT bearer token.
|
|
req := MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)
|
|
// Create a signed JWT token string and add it to the request header.
|
|
_, signedToken := MustJWTToken("user1", h.Config.SharedSecret, false)
|
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
|
|
|
|
w = httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
|
|
// Test the handler with JWT token signed with invalid key.
|
|
req = MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)
|
|
// Create a signed JWT token string and add it to the request header.
|
|
_, signedToken = MustJWTToken("user1", "invalid key", false)
|
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
|
|
|
|
w = httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if w.Code != http.StatusUnauthorized {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"signature is invalid"}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
|
|
// Test handler with valid JWT token carrying non-existent user.
|
|
_, signedToken = MustJWTToken("bad_user", h.Config.SharedSecret, false)
|
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
|
|
|
|
w = httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if w.Code != http.StatusUnauthorized {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"user not found"}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
|
|
// Test handler with expired JWT token.
|
|
_, signedToken = MustJWTToken("user1", h.Config.SharedSecret, true)
|
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
|
|
|
|
w = httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if w.Code != http.StatusUnauthorized {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if !strings.Contains(w.Body.String(), `{"error":"Token is expired`) {
|
|
t.Fatalf("unexpected body: %s", w.Body.String())
|
|
}
|
|
|
|
// Test handler with JWT token that has no expiration set.
|
|
token, _ := MustJWTToken("user1", h.Config.SharedSecret, false)
|
|
delete(token.Claims.(jwt.MapClaims), "exp")
|
|
signedToken, err := token.SignedString([]byte(h.Config.SharedSecret))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
|
|
w = httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if w.Code != http.StatusUnauthorized {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"token expiration required"}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
|
|
// Test that auth fails if shared secret is blank.
|
|
origSecret := h.Config.SharedSecret
|
|
h.Config.SharedSecret = ""
|
|
token, _ = MustJWTToken("user1", h.Config.SharedSecret, false)
|
|
signedToken, err = token.SignedString([]byte(h.Config.SharedSecret))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
|
|
w = httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if w.Code != http.StatusUnauthorized {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"bearer auth disabled"}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
h.Config.SharedSecret = origSecret
|
|
|
|
// Test the handler with valid user and password in the url and invalid in
|
|
// basic auth (prioritize url).
|
|
w = httptest.NewRecorder()
|
|
r = MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil)
|
|
r.SetBasicAuth("user1", "efgh")
|
|
h.ServeHTTP(w, r)
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns results from a query (including nil results).
|
|
func TestHandler_QueryRegex(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` {
|
|
t.Fatalf("unexpected query: %s", stmt.String())
|
|
} else if ctx.Database != `test` {
|
|
t.Fatalf("unexpected db: %s", ctx.Database)
|
|
}
|
|
ctx.Results <- nil
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("GET", "/query?db=test&q=SELECT%20%2A%20FROM%20test%20WHERE%20url%20%3D~%20%2Fhttp%5C%3A%5C%2F%5C%2Fwww.akamai%5C.com%2F", nil))
|
|
}
|
|
|
|
// Ensure the handler merges results from the same statement.
|
|
func TestHandler_Query_MergeResults(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"},{"name":"series1"}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler merges results from the same statement.
|
|
func TestHandler_Query_MergeEmptyResults(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler merges series from the same result.
|
|
func TestHandler_Query_MergeSeries(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{
|
|
{
|
|
Name: "series0",
|
|
Values: [][]interface{}{
|
|
{float64(2.0)},
|
|
},
|
|
Partial: true,
|
|
},
|
|
})}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{
|
|
{
|
|
Name: "series0",
|
|
Values: [][]interface{}{
|
|
{float64(3.0)},
|
|
},
|
|
},
|
|
})}
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0","values":[[2],[3]]}]}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler can parse chunked and chunk size query parameters.
|
|
func TestHandler_Query_Chunked(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
if ctx.ChunkSize != 2 {
|
|
t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize)
|
|
}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if w.Body.String() != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]}]}
|
|
{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}
|
|
` {
|
|
t.Fatalf("unexpected body: %s", w.Body.String())
|
|
}
|
|
}
|
|
|
|
// Ensure the handler can accept an async query.
|
|
func TestHandler_Query_Async(t *testing.T) {
|
|
done := make(chan struct{})
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
if stmt.String() != `SELECT * FROM bar` {
|
|
t.Fatalf("unexpected query: %s", stmt.String())
|
|
} else if ctx.Database != `foo` {
|
|
t.Fatalf("unexpected db: %s", ctx.Database)
|
|
}
|
|
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
|
|
ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
|
close(done)
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&async=true", nil))
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
|
|
// Wait to make sure the async query runs and completes.
|
|
timer := time.NewTimer(100 * time.Millisecond)
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
case <-timer.C:
|
|
t.Fatal("timeout while waiting for async query to complete")
|
|
case <-done:
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns a status 400 if the query is not passed in.
|
|
func TestHandler_Query_ErrQueryRequired(t *testing.T) {
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query", nil))
|
|
if w.Code != http.StatusBadRequest {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"missing required parameter \"q\""}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns a status 400 if the query cannot be parsed.
|
|
func TestHandler_Query_ErrInvalidQuery(t *testing.T) {
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?q=SELECT", nil))
|
|
if w.Code != http.StatusBadRequest {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"error parsing query: found EOF, expected identifier, string, number, bool at line 1, char 8"}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns an appropriate 401 or 403 status when authentication or authorization fails.
|
|
func TestHandler_Query_ErrAuthorize(t *testing.T) {
|
|
h := NewHandler(true)
|
|
h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, q *influxql.Query, db string) error {
|
|
return errors.New("marker")
|
|
}
|
|
h.MetaClient.AdminUserExistsFn = func() bool { return true }
|
|
h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
|
|
|
|
users := []meta.UserInfo{
|
|
{
|
|
Name: "admin",
|
|
Hash: "admin",
|
|
Admin: true,
|
|
},
|
|
{
|
|
Name: "user1",
|
|
Hash: "abcd",
|
|
Privileges: map[string]influxql.Privilege{
|
|
"db0": influxql.ReadPrivilege,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, user := range users {
|
|
if u == user.Name {
|
|
if p == user.Hash {
|
|
return &user, nil
|
|
}
|
|
return nil, meta.ErrAuthenticate
|
|
}
|
|
}
|
|
return nil, meta.ErrUserNotFound
|
|
}
|
|
|
|
for i, tt := range []struct {
|
|
user string
|
|
password string
|
|
query string
|
|
code int
|
|
}{
|
|
{
|
|
query: "/query?q=SHOW+DATABASES",
|
|
code: http.StatusUnauthorized,
|
|
},
|
|
{
|
|
user: "user1",
|
|
password: "abcd",
|
|
query: "/query?q=SHOW+DATABASES",
|
|
code: http.StatusForbidden,
|
|
},
|
|
{
|
|
user: "user2",
|
|
password: "abcd",
|
|
query: "/query?q=SHOW+DATABASES",
|
|
code: http.StatusUnauthorized,
|
|
},
|
|
} {
|
|
w := httptest.NewRecorder()
|
|
r := MustNewJSONRequest("GET", tt.query, nil)
|
|
params := r.URL.Query()
|
|
if tt.user != "" {
|
|
params.Set("u", tt.user)
|
|
}
|
|
if tt.password != "" {
|
|
params.Set("p", tt.password)
|
|
}
|
|
r.URL.RawQuery = params.Encode()
|
|
|
|
h.ServeHTTP(w, r)
|
|
if w.Code != tt.code {
|
|
t.Errorf("%d. unexpected status: got=%d exp=%d\noutput: %s", i, w.Code, tt.code, w.Body.String())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns a status 200 if an error is returned in the result.
|
|
func TestHandler_Query_ErrResult(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
return errors.New("measurement not found")
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SHOW+SERIES+from+bin", nil))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":0,"error":"measurement not found"}]}` {
|
|
t.Fatalf("unexpected body: %s", body)
|
|
}
|
|
}
|
|
|
|
// Ensure that closing the HTTP connection causes the query to be interrupted.
|
|
func TestHandler_Query_CloseNotify(t *testing.T) {
|
|
// Avoid leaking a goroutine when this fails.
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
|
|
interrupted := make(chan struct{})
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-done:
|
|
}
|
|
close(interrupted)
|
|
return nil
|
|
}
|
|
|
|
s := httptest.NewServer(h)
|
|
defer s.Close()
|
|
|
|
// Parse the URL and generate a query request.
|
|
u, err := url.Parse(s.URL)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
u.Path = "/query"
|
|
|
|
values := url.Values{}
|
|
values.Set("q", "SELECT * FROM cpu")
|
|
values.Set("db", "db0")
|
|
values.Set("rp", "rp0")
|
|
values.Set("chunked", "true")
|
|
u.RawQuery = values.Encode()
|
|
|
|
req, err := http.NewRequest("GET", u.String(), nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Perform the request and retrieve the response.
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Validate that the interrupted channel has NOT been closed yet.
|
|
timer := time.NewTimer(100 * time.Millisecond)
|
|
select {
|
|
case <-interrupted:
|
|
timer.Stop()
|
|
t.Fatal("query interrupted unexpectedly")
|
|
case <-timer.C:
|
|
}
|
|
|
|
// Close the response body which should abort the query in the handler.
|
|
resp.Body.Close()
|
|
|
|
// The query should abort within 100 milliseconds.
|
|
timer.Reset(100 * time.Millisecond)
|
|
select {
|
|
case <-interrupted:
|
|
timer.Stop()
|
|
case <-timer.C:
|
|
t.Fatal("timeout while waiting for query to abort")
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns an appropriate 401 status when authentication
|
|
// fails on ping endpoints.
|
|
func TestHandler_Ping_ErrAuthorize(t *testing.T) {
|
|
h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPingAuthEnabled()))
|
|
h.MetaClient.AdminUserExistsFn = func() bool { return true }
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
|
|
users := []meta.UserInfo{
|
|
{
|
|
Name: "admin",
|
|
Hash: "admin",
|
|
Admin: true,
|
|
},
|
|
{
|
|
Name: "user1",
|
|
Hash: "abcd",
|
|
Privileges: map[string]influxql.Privilege{
|
|
"db0": influxql.ReadPrivilege,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, user := range users {
|
|
if u == user.Name {
|
|
if p == user.Hash {
|
|
return &user, nil
|
|
}
|
|
return nil, meta.ErrAuthenticate
|
|
}
|
|
}
|
|
return nil, meta.ErrUserNotFound
|
|
}
|
|
|
|
for i, tt := range []struct {
|
|
user string
|
|
password string
|
|
query string
|
|
code int
|
|
}{
|
|
{
|
|
query: "/ping",
|
|
code: http.StatusUnauthorized,
|
|
},
|
|
{
|
|
user: "user1",
|
|
password: "abcd",
|
|
query: "/ping",
|
|
code: http.StatusNoContent,
|
|
},
|
|
{
|
|
user: "user2",
|
|
password: "abcd",
|
|
query: "/ping",
|
|
code: http.StatusUnauthorized,
|
|
},
|
|
} {
|
|
w := httptest.NewRecorder()
|
|
r := MustNewJSONRequest("GET", tt.query, nil)
|
|
params := r.URL.Query()
|
|
if tt.user != "" {
|
|
params.Set("u", tt.user)
|
|
}
|
|
if tt.password != "" {
|
|
params.Set("p", tt.password)
|
|
}
|
|
r.URL.RawQuery = params.Encode()
|
|
|
|
h.ServeHTTP(w, r)
|
|
if w.Code != tt.code {
|
|
t.Errorf("%d. unexpected status: got=%d exp=%d\noutput: %s", i, w.Code, tt.code, w.Body.String())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns an appropriate 403 status when authentication or
|
|
// authorization fails on debug endpoints.
|
|
func TestHandler_Debug_ErrAuthorize(t *testing.T) {
|
|
h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPprofAuthEnabled()))
|
|
h.MetaClient.AdminUserExistsFn = func() bool { return true }
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
|
|
users := []meta.UserInfo{
|
|
{
|
|
Name: "admin",
|
|
Hash: "admin",
|
|
Admin: true,
|
|
},
|
|
{
|
|
Name: "user1",
|
|
Hash: "abcd",
|
|
Privileges: map[string]influxql.Privilege{
|
|
"db0": influxql.ReadPrivilege,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, user := range users {
|
|
if u == user.Name {
|
|
if p == user.Hash {
|
|
return &user, nil
|
|
}
|
|
return nil, meta.ErrAuthenticate
|
|
}
|
|
}
|
|
return nil, meta.ErrUserNotFound
|
|
}
|
|
|
|
for i, tt := range []struct {
|
|
user string
|
|
password string
|
|
query string
|
|
code int
|
|
}{
|
|
{
|
|
query: "/debug/vars",
|
|
code: http.StatusUnauthorized,
|
|
},
|
|
{
|
|
user: "user1",
|
|
password: "abcd",
|
|
query: "/debug/vars",
|
|
code: http.StatusForbidden,
|
|
},
|
|
{
|
|
user: "user2",
|
|
password: "abcd",
|
|
query: "/debug/vars",
|
|
code: http.StatusUnauthorized,
|
|
},
|
|
} {
|
|
w := httptest.NewRecorder()
|
|
r := MustNewJSONRequest("GET", tt.query, nil)
|
|
params := r.URL.Query()
|
|
if tt.user != "" {
|
|
params.Set("u", tt.user)
|
|
}
|
|
if tt.password != "" {
|
|
params.Set("p", tt.password)
|
|
}
|
|
r.URL.RawQuery = params.Encode()
|
|
|
|
h.ServeHTTP(w, r)
|
|
if w.Code != tt.code {
|
|
t.Errorf("%d. unexpected status: got=%d exp=%d\noutput: %s", i, w.Code, tt.code, w.Body.String())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ensure the prometheus remote write works with valid values.
|
|
func TestHandler_PromWrite(t *testing.T) {
|
|
req := &prompb.WriteRequest{
|
|
Timeseries: []prompb.TimeSeries{
|
|
{
|
|
Labels: []prompb.Label{
|
|
{Name: "host", Value: "a"},
|
|
{Name: "region", Value: "west"},
|
|
},
|
|
Samples: []prompb.Sample{
|
|
{Timestamp: 1, Value: 1.2},
|
|
{Timestamp: 3, Value: 14.5},
|
|
{Timestamp: 6, Value: 222.99},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
t.Fatal("couldn't marshal prometheus request")
|
|
}
|
|
compressed := snappy.Encode(nil, data)
|
|
|
|
b := bytes.NewReader(compressed)
|
|
h := NewHandler(false)
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
|
|
var called bool
|
|
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
|
|
called = true
|
|
|
|
if got, exp := len(points), 3; got != exp {
|
|
t.Fatalf("got %d points, expected %d\n\npoints:\n%v", got, exp, points)
|
|
}
|
|
|
|
expFields := []models.Fields{
|
|
models.Fields{"value": req.Timeseries[0].Samples[0].Value},
|
|
models.Fields{"value": req.Timeseries[0].Samples[1].Value},
|
|
models.Fields{"value": req.Timeseries[0].Samples[2].Value},
|
|
}
|
|
|
|
expTS := []int64{
|
|
req.Timeseries[0].Samples[0].Timestamp * int64(time.Millisecond),
|
|
req.Timeseries[0].Samples[1].Timestamp * int64(time.Millisecond),
|
|
req.Timeseries[0].Samples[2].Timestamp * int64(time.Millisecond),
|
|
}
|
|
|
|
for i, point := range points {
|
|
if got, exp := point.UnixNano(), expTS[i]; got != exp {
|
|
t.Fatalf("got time %d, expected %d\npoint:\n%v", got, exp, point)
|
|
}
|
|
|
|
exp := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
|
|
if got := point.Tags(); !reflect.DeepEqual(got, exp) {
|
|
t.Fatalf("got tags: %v, expected: %v\npoint:\n%v", got, exp, point)
|
|
}
|
|
|
|
gotFields, err := point.Fields()
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
if got, exp := gotFields, expFields[i]; !reflect.DeepEqual(got, exp) {
|
|
t.Fatalf("got fields %v, expected %v\npoint:\n%v", got, exp, point)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
|
|
if !called {
|
|
t.Fatal("WritePoints: expected call")
|
|
}
|
|
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
}
|
|
|
|
// Ensure the prometheus remote write works with invalid values.
|
|
func TestHandler_PromWrite_Dropped(t *testing.T) {
|
|
req := &prompb.WriteRequest{
|
|
Timeseries: []prompb.TimeSeries{
|
|
{
|
|
Labels: []prompb.Label{
|
|
{Name: "host", Value: "a"},
|
|
{Name: "region", Value: "west"},
|
|
},
|
|
Samples: []prompb.Sample{
|
|
{Timestamp: 1, Value: 1.2},
|
|
{Timestamp: 2, Value: math.NaN()},
|
|
{Timestamp: 3, Value: 14.5},
|
|
{Timestamp: 4, Value: math.Inf(-1)},
|
|
{Timestamp: 5, Value: math.Inf(1)},
|
|
{Timestamp: 6, Value: 222.99},
|
|
{Timestamp: 7, Value: math.Inf(-1)},
|
|
{Timestamp: 8, Value: math.Inf(1)},
|
|
{Timestamp: 9, Value: math.Inf(1)},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
t.Fatal("couldn't marshal prometheus request")
|
|
}
|
|
compressed := snappy.Encode(nil, data)
|
|
|
|
b := bytes.NewReader(compressed)
|
|
h := NewHandler(false)
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
|
|
var called bool
|
|
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
|
|
called = true
|
|
|
|
if got, exp := len(points), 3; got != exp {
|
|
t.Fatalf("got %d points, expected %d\n\npoints:\n%v", got, exp, points)
|
|
}
|
|
|
|
expFields := []models.Fields{
|
|
models.Fields{"value": req.Timeseries[0].Samples[0].Value},
|
|
models.Fields{"value": req.Timeseries[0].Samples[2].Value},
|
|
models.Fields{"value": req.Timeseries[0].Samples[5].Value},
|
|
}
|
|
|
|
expTS := []int64{
|
|
req.Timeseries[0].Samples[0].Timestamp * int64(time.Millisecond),
|
|
req.Timeseries[0].Samples[2].Timestamp * int64(time.Millisecond),
|
|
req.Timeseries[0].Samples[5].Timestamp * int64(time.Millisecond),
|
|
}
|
|
|
|
for i, point := range points {
|
|
if got, exp := point.UnixNano(), expTS[i]; got != exp {
|
|
t.Fatalf("got time %d, expected %d\npoint:\n%v", got, exp, point)
|
|
}
|
|
|
|
exp := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
|
|
if got := point.Tags(); !reflect.DeepEqual(got, exp) {
|
|
t.Fatalf("got tags: %v, expected: %v\npoint:\n%v", got, exp, point)
|
|
}
|
|
|
|
gotFields, err := point.Fields()
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
if got, exp := gotFields, expFields[i]; !reflect.DeepEqual(got, exp) {
|
|
t.Fatalf("got fields %v, expected %v\npoint:\n%v", got, exp, point)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
|
|
if !called {
|
|
t.Fatal("WritePoints: expected call")
|
|
}
|
|
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
}
|
|
|
|
func mustMakeBigString(sz int) string {
|
|
a := make([]byte, 0, sz)
|
|
for i := 0; i < cap(a); i++ {
|
|
a = append(a, 'a')
|
|
}
|
|
return string(a)
|
|
}
|
|
|
|
func TestHandler_PromWrite_Error(t *testing.T) {
|
|
req := &prompb.WriteRequest{
|
|
Timeseries: []prompb.TimeSeries{
|
|
{
|
|
// Invalid tag key
|
|
Labels: []prompb.Label{{Name: mustMakeBigString(models.MaxKeyLength), Value: "a"}},
|
|
Samples: []prompb.Sample{{Timestamp: 1, Value: 1.2}},
|
|
},
|
|
},
|
|
}
|
|
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
t.Fatal("couldn't marshal prometheus request")
|
|
}
|
|
compressed := snappy.Encode(nil, data)
|
|
|
|
b := bytes.NewReader(compressed)
|
|
h := NewHandler(false)
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
|
|
var called bool
|
|
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
|
|
called = true
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
|
|
if w.Code != http.StatusBadRequest {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
|
|
if got, exp := strings.TrimSpace(w.Body.String()), `{"error":"max key length exceeded: 65572 \u003e 65535"}`; got != exp {
|
|
t.Fatalf("got error %q, expected %q", got, exp)
|
|
}
|
|
|
|
if called {
|
|
t.Fatal("WritePoints called but should not be")
|
|
}
|
|
}
|
|
|
|
// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and
|
|
// data is returned
|
|
func TestHandler_PromRead(t *testing.T) {
|
|
req := &prompb.ReadRequest{
|
|
Queries: []*prompb.Query{{
|
|
Matchers: []*prompb.LabelMatcher{
|
|
{
|
|
Type: prompb.LabelMatcher_EQ,
|
|
Name: "__name__",
|
|
Value: "value",
|
|
},
|
|
},
|
|
StartTimestampMs: 1,
|
|
EndTimestampMs: 2,
|
|
}},
|
|
}
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
t.Fatal("couldn't marshal prometheus request")
|
|
}
|
|
compressed := snappy.Encode(nil, data)
|
|
b := bytes.NewReader(compressed)
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
|
|
// Number of results in the result set
|
|
var i int64
|
|
h.Store.ResultSet.NextFn = func() bool {
|
|
i++
|
|
return i <= 2
|
|
}
|
|
|
|
// data for each cursor.
|
|
h.Store.ResultSet.CursorFn = func() tsdb.Cursor {
|
|
cursor := internal.NewFloatArrayCursorMock()
|
|
|
|
var i int64
|
|
cursor.NextFn = func() *tsdb.FloatArray {
|
|
i++
|
|
ts := []int64{22000000 * i, 10000000000 * i}
|
|
vs := []float64{2.3, 2992.33}
|
|
if i > 2 {
|
|
ts, vs = nil, nil
|
|
}
|
|
return &tsdb.FloatArray{Timestamps: ts, Values: vs}
|
|
}
|
|
|
|
return cursor
|
|
}
|
|
|
|
// Tags for each cursor.
|
|
h.Store.ResultSet.TagsFn = func() models.Tags {
|
|
return models.NewTags(map[string]string{
|
|
"host": fmt.Sprintf("server-%d", i),
|
|
"_measurement": "mem",
|
|
})
|
|
}
|
|
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
|
|
reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var resp prompb.ReadResponse
|
|
if err := resp.Unmarshal(reqBuf); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
expResults := []*prompb.QueryResult{
|
|
{
|
|
Timeseries: []*prompb.TimeSeries{
|
|
{
|
|
Labels: []prompb.Label{
|
|
{Name: "host", Value: "server-1"},
|
|
},
|
|
Samples: []prompb.Sample{
|
|
{Timestamp: 22, Value: 2.3},
|
|
{Timestamp: 10000, Value: 2992.33},
|
|
{Timestamp: 44, Value: 2.3},
|
|
{Timestamp: 20000, Value: 2992.33},
|
|
},
|
|
},
|
|
{
|
|
Labels: []prompb.Label{
|
|
{Name: "host", Value: "server-2"},
|
|
},
|
|
Samples: []prompb.Sample{
|
|
{Timestamp: 22, Value: 2.3},
|
|
{Timestamp: 10000, Value: 2992.33},
|
|
{Timestamp: 44, Value: 2.3},
|
|
{Timestamp: 20000, Value: 2992.33},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if !reflect.DeepEqual(resp.Results, expResults) {
|
|
t.Fatalf("Results differ:\n%v", cmp.Diff(resp.Results, expResults))
|
|
}
|
|
}
|
|
|
|
func TestHandler_PromRead_NoResults(t *testing.T) {
|
|
req := &prompb.ReadRequest{Queries: []*prompb.Query{&prompb.Query{
|
|
Matchers: []*prompb.LabelMatcher{
|
|
{
|
|
Type: prompb.LabelMatcher_EQ,
|
|
Name: "__name__",
|
|
Value: "value",
|
|
},
|
|
},
|
|
StartTimestampMs: 0,
|
|
EndTimestampMs: models.MaxNanoTime / int64(time.Millisecond),
|
|
}}}
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
t.Fatal("couldn't marshal prometheus request")
|
|
}
|
|
compressed := snappy.Encode(nil, data)
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
|
|
b := bytes.NewReader(compressed)
|
|
h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
var resp prompb.ReadResponse
|
|
if err := resp.Unmarshal(reqBuf); err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
}
|
|
|
|
func TestHandler_PromRead_UnsupportedCursors(t *testing.T) {
|
|
req := &prompb.ReadRequest{Queries: []*prompb.Query{&prompb.Query{
|
|
Matchers: []*prompb.LabelMatcher{
|
|
{
|
|
Type: prompb.LabelMatcher_EQ,
|
|
Name: "__name__",
|
|
Value: "value",
|
|
},
|
|
},
|
|
StartTimestampMs: 0,
|
|
EndTimestampMs: models.MaxNanoTime / int64(time.Millisecond),
|
|
}}}
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
t.Fatal("couldn't marshal prometheus request")
|
|
}
|
|
compressed := snappy.Encode(nil, data)
|
|
|
|
unsupported := []tsdb.Cursor{
|
|
internal.NewIntegerArrayCursorMock(),
|
|
internal.NewBooleanArrayCursorMock(),
|
|
internal.NewUnsignedArrayCursorMock(),
|
|
internal.NewStringArrayCursorMock(),
|
|
}
|
|
|
|
for _, cursor := range unsupported {
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
var lb bytes.Buffer
|
|
h.Logger = logger.New(&lb)
|
|
|
|
more := true
|
|
h.Store.ResultSet.NextFn = func() bool { defer func() { more = false }(); return more }
|
|
|
|
// Set the cursor type that will be returned while iterating over
|
|
// the mock store.
|
|
h.Store.ResultSet.CursorFn = func() tsdb.Cursor {
|
|
return cursor
|
|
}
|
|
|
|
b := bytes.NewReader(compressed)
|
|
h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
var resp prompb.ReadResponse
|
|
if err := resp.Unmarshal(reqBuf); err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
if !strings.Contains(lb.String(), "cursor_type=") {
|
|
t.Fatalf("got log message %q, expected to contain \"cursor_type\"", lb.String())
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestHandler_Flux_DisabledByDefault(t *testing.T) {
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
|
|
body := bytes.NewBufferString(`from(bucket:"db/rp") |> range(start:-1h) |> last()`)
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/api/v2/query", body))
|
|
if got := w.Code; !cmp.Equal(got, http.StatusForbidden) {
|
|
t.Fatalf("unexpected status: %d", got)
|
|
}
|
|
|
|
exp := `{"error":"Flux query service disabled. Verify flux-enabled=true in the [http] section of the InfluxDB config."}` + "\n"
|
|
if got := w.Body.String(); got != exp {
|
|
t.Fatalf("unexpected body -got/+exp\n%s", cmp.Diff(got, exp))
|
|
}
|
|
}
|
|
|
|
func TestHandler_PromRead_NilResultSet(t *testing.T) {
|
|
req := &prompb.ReadRequest{
|
|
Queries: []*prompb.Query{{
|
|
Matchers: []*prompb.LabelMatcher{
|
|
{
|
|
Type: prompb.LabelMatcher_EQ,
|
|
Name: "__name__",
|
|
Value: "value",
|
|
},
|
|
},
|
|
StartTimestampMs: 1,
|
|
EndTimestampMs: 2,
|
|
}},
|
|
}
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
log.Fatal("couldn't marshal prometheus request")
|
|
}
|
|
compressed := snappy.Encode(nil, data)
|
|
b := bytes.NewReader(compressed)
|
|
|
|
h := NewHandler(false)
|
|
|
|
// Mocks the case when Store.Read() returns nil, nil
|
|
h.Handler.Store.(*internal.StorageStoreMock).ReadFilterFn = func(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
|
|
if w.Header().Get("Content-Type") != "application/x-protobuf" {
|
|
t.Fatalf("Got unexpected \"Content-Type\" header value:\n%v", cmp.Diff("application/x-protobuf", w.Header().Get("Content-Type")))
|
|
}
|
|
if w.Header().Get("Content-Encoding") != "snappy" {
|
|
t.Fatalf("Got unexpected \"Content-Encoding\" header value:\n%v", cmp.Diff("snappy", w.Header().Get("Content-Encoding")))
|
|
}
|
|
|
|
decompressed, err := snappy.Decode(nil, w.Body.Bytes())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
resp := new(prompb.ReadResponse)
|
|
err = resp.Unmarshal(decompressed)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
expected := &prompb.ReadResponse{
|
|
Results: []*prompb.QueryResult{{}},
|
|
}
|
|
if !reflect.DeepEqual(resp, expected) {
|
|
t.Fatalf("Results differ:\n%v", cmp.Diff(expected, resp))
|
|
}
|
|
}
|
|
|
|
func TestHandler_Flux_QueryJSON(t *testing.T) {
|
|
h := NewHandlerWithConfig(NewHandlerConfig(WithFlux(), WithNoLog()))
|
|
called := false
|
|
qry := "foo"
|
|
h.Controller.QueryFn = func(ctx context.Context, compiler flux.Compiler) (i flux.Query, e error) {
|
|
if exp := flux.CompilerType(lang.FluxCompilerType); compiler.CompilerType() != exp {
|
|
t.Fatalf("unexpected compiler type -got/+exp\n%s", cmp.Diff(compiler.CompilerType(), exp))
|
|
}
|
|
if c, ok := compiler.(lang.FluxCompiler); !ok {
|
|
t.Fatal("expected lang.FluxCompiler")
|
|
} else if exp := qry; c.Query != exp {
|
|
t.Fatalf("unexpected query -got/+exp\n%s", cmp.Diff(c.Query, exp))
|
|
}
|
|
called = true
|
|
|
|
p := &mock.Program{}
|
|
return p.Start(ctx, nil)
|
|
}
|
|
|
|
q := client.QueryRequest{Query: qry}
|
|
var body bytes.Buffer
|
|
if err := json.NewEncoder(&body).Encode(q); err != nil {
|
|
t.Fatalf("unexpected JSON encoding error: %q", err.Error())
|
|
}
|
|
|
|
req := MustNewRequest("POST", "/api/v2/query", &body)
|
|
req.Header.Add("content-type", "application/json")
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if got := w.Code; !cmp.Equal(got, http.StatusOK) {
|
|
t.Fatalf("unexpected status: %d", got)
|
|
}
|
|
|
|
if !called {
|
|
t.Fatalf("expected QueryFn to be called")
|
|
}
|
|
}
|
|
|
|
func TestHandler_Flux_QueryText(t *testing.T) {
|
|
h := NewHandlerWithConfig(NewHandlerConfig(WithFlux(), WithNoLog()))
|
|
called := false
|
|
qry := "bar"
|
|
h.Controller.QueryFn = func(ctx context.Context, compiler flux.Compiler) (i flux.Query, e error) {
|
|
if exp := flux.CompilerType(lang.FluxCompilerType); compiler.CompilerType() != exp {
|
|
t.Fatalf("unexpected compiler type -got/+exp\n%s", cmp.Diff(compiler.CompilerType(), exp))
|
|
}
|
|
if c, ok := compiler.(lang.FluxCompiler); !ok {
|
|
t.Fatal("expected lang.FluxCompiler")
|
|
} else if exp := qry; c.Query != exp {
|
|
t.Fatalf("unexpected query -got/+exp\n%s", cmp.Diff(c.Query, exp))
|
|
}
|
|
called = true
|
|
|
|
p := &mock.Program{}
|
|
return p.Start(ctx, nil)
|
|
}
|
|
|
|
req := MustNewRequest("POST", "/api/v2/query", bytes.NewBufferString(qry))
|
|
req.Header.Add("content-type", "application/vnd.flux")
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if got := w.Code; !cmp.Equal(got, http.StatusOK) {
|
|
t.Fatalf("unexpected status: %d", got)
|
|
}
|
|
|
|
if !called {
|
|
t.Fatalf("expected QueryFn to be called")
|
|
}
|
|
}
|
|
|
|
func TestHandler_Flux(t *testing.T) {
|
|
|
|
queryBytes := func(qs string) io.Reader {
|
|
var b bytes.Buffer
|
|
q := &client.QueryRequest{Query: qs}
|
|
if err := json.NewEncoder(&b).Encode(q); err != nil {
|
|
t.Fatalf("unexpected JSON encoding error: %q", err.Error())
|
|
}
|
|
return &b
|
|
}
|
|
|
|
tests := []struct {
|
|
name string
|
|
reqFn func() *http.Request
|
|
expCode int
|
|
expBody string
|
|
}{
|
|
{
|
|
name: "no media type",
|
|
reqFn: func() *http.Request {
|
|
return MustNewRequest("POST", "/api/v2/query", nil)
|
|
},
|
|
expCode: http.StatusBadRequest,
|
|
expBody: "{\"error\":\"mime: no media type\"}\n",
|
|
},
|
|
{
|
|
name: "200 OK",
|
|
reqFn: func() *http.Request {
|
|
req := MustNewRequest("POST", "/api/v2/query", queryBytes("foo"))
|
|
req.Header.Add("content-type", "application/json")
|
|
return req
|
|
},
|
|
expCode: http.StatusOK,
|
|
},
|
|
}
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
h := NewHandlerWithConfig(NewHandlerConfig(WithFlux(), WithNoLog()))
|
|
h.Controller.QueryFn = func(ctx context.Context, compiler flux.Compiler) (i flux.Query, e error) {
|
|
p := &mock.Program{}
|
|
return p.Start(ctx, nil)
|
|
}
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, test.reqFn())
|
|
if got := w.Code; !cmp.Equal(got, test.expCode) {
|
|
t.Fatalf("unexpected status: %d", got)
|
|
}
|
|
|
|
if test.expBody != "" {
|
|
if got := w.Body.String(); got != test.expBody {
|
|
t.Fatalf("unexpected body -got/+exp\n%s", cmp.Diff(got, test.expBody))
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestHandler_Flux_Auth(t *testing.T) {
|
|
// Create the handler to be tested.
|
|
h := NewHandlerWithConfig(NewHandlerConfig(WithFlux(), WithNoLog(), WithAuthentication()))
|
|
h.MetaClient.AdminUserExistsFn = func() bool { return true }
|
|
h.MetaClient.UserFn = func(username string) (meta.User, error) {
|
|
if username != "user1" {
|
|
return nil, meta.ErrUserNotFound
|
|
}
|
|
return &meta.UserInfo{
|
|
Name: "user1",
|
|
Hash: "abcd",
|
|
Admin: true,
|
|
}, nil
|
|
}
|
|
h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
|
|
if u != "user1" {
|
|
return nil, fmt.Errorf("unexpected user: exp: user1, got: %s", u)
|
|
} else if p != "abcd" {
|
|
return nil, fmt.Errorf("unexpected password: exp: abcd, got: %s", p)
|
|
}
|
|
return h.MetaClient.User(u)
|
|
}
|
|
|
|
h.Controller.QueryFn = func(ctx context.Context, compiler flux.Compiler) (i flux.Query, e error) {
|
|
p := &mock.Program{}
|
|
return p.Start(ctx, nil)
|
|
}
|
|
|
|
req := MustNewRequest("POST", "/api/v2/query", bytes.NewBufferString("bar"))
|
|
req.Header.Set("content-type", "application/vnd.flux")
|
|
req.Header.Set("Authorization", "Token user1:abcd")
|
|
// Test the handler with valid user and password in the URL parameters.
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if got := w.Code; !cmp.Equal(got, http.StatusOK) {
|
|
t.Fatalf("unexpected status: %d", got)
|
|
}
|
|
|
|
req.Header.Set("Authorization", "Token user1:efgh")
|
|
w = httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
if got := w.Code; !cmp.Equal(got, http.StatusUnauthorized) {
|
|
t.Fatalf("unexpected status: %d", got)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler handles ping requests correctly.
|
|
func TestHandler_Ping(t *testing.T) {
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("GET", "/ping", nil))
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
h.ServeHTTP(w, MustNewRequest("HEAD", "/ping", nil))
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
}
|
|
|
|
// Ensure the handler handles health requests correctly.
|
|
func TestHandler_Health(t *testing.T) {
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("GET", "/health", nil))
|
|
if w.Code != http.StatusOK {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
|
|
var got map[string]interface{}
|
|
if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
assert.Equal(t, got["name"], "influxdb", "invalid name")
|
|
assert.Equal(t, got["message"], "ready for queries and writes", "invalid message")
|
|
assert.Equal(t, got["status"], "pass", "invalid status")
|
|
assert.Equal(t, got["version"], "0.0.0", "invalid version")
|
|
if _, present := got["checks"]; !present {
|
|
t.Fatal("missing checks")
|
|
}
|
|
}
|
|
|
|
// Ensure the handler returns the version correctly from the different endpoints.
|
|
func TestHandler_Version(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
|
return nil
|
|
}
|
|
tests := []struct {
|
|
method string
|
|
endpoint string
|
|
body io.Reader
|
|
}{
|
|
{
|
|
method: "GET",
|
|
endpoint: "/ping",
|
|
body: nil,
|
|
},
|
|
{
|
|
method: "GET",
|
|
endpoint: "/query?db=foo&q=SELECT+*+FROM+bar",
|
|
body: nil,
|
|
},
|
|
{
|
|
method: "POST",
|
|
endpoint: "/write",
|
|
body: bytes.NewReader(make([]byte, 10)),
|
|
},
|
|
{
|
|
method: "GET",
|
|
endpoint: "/notfound",
|
|
body: nil,
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest(test.method, test.endpoint, test.body))
|
|
if v := w.HeaderMap["X-Influxdb-Version"]; len(v) > 0 {
|
|
if v[0] != "0.0.0" {
|
|
t.Fatalf("unexpected version: %s", v)
|
|
}
|
|
} else {
|
|
t.Fatalf("Header entry 'X-Influxdb-Version' not present")
|
|
}
|
|
|
|
if v := w.HeaderMap["X-Influxdb-Build"]; len(v) > 0 {
|
|
if v[0] != "OSS" {
|
|
t.Fatalf("unexpected BuildType: %s", v)
|
|
}
|
|
} else {
|
|
t.Fatalf("Header entry 'X-Influxdb-Build' not present")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ensure the handler handles status requests correctly.
|
|
func TestHandler_Status(t *testing.T) {
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("GET", "/status", nil))
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
h.ServeHTTP(w, MustNewRequest("HEAD", "/status", nil))
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
}
|
|
|
|
// Ensure write endpoint can handle bad requests
|
|
func TestHandler_HandleBadRequestBody(t *testing.T) {
|
|
b := bytes.NewReader(make([]byte, 10))
|
|
h := NewHandler(false)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/write", b))
|
|
if w.Code != http.StatusBadRequest {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
}
|
|
|
|
func TestHandler_Write_EntityTooLarge_ContentLength(t *testing.T) {
|
|
b := bytes.NewReader(make([]byte, 100))
|
|
h := NewHandler(false)
|
|
h.Config.MaxBodySize = 5
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b))
|
|
if w.Code != http.StatusRequestEntityTooLarge {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
}
|
|
|
|
func TestHandler_Write_SuppressLog(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
c := httpd.NewConfig()
|
|
c.SuppressWriteLog = true
|
|
h := NewHandlerWithConfig(c)
|
|
h.CLFLogger = log.New(&buf, "", log.LstdFlags)
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
h.PointsWriter.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
|
|
return nil
|
|
}
|
|
|
|
b := strings.NewReader("cpu,host=server01 value=2\n")
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b))
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
|
|
// If the log has anything in it, this failed.
|
|
if buf.Len() > 0 {
|
|
t.Fatalf("expected no bytes to be written to the log, got %d", buf.Len())
|
|
}
|
|
}
|
|
|
|
// onlyReader implements io.Reader only to ensure Request.ContentLength is not set
|
|
type onlyReader struct {
|
|
r io.Reader
|
|
}
|
|
|
|
func (o onlyReader) Read(p []byte) (n int, err error) {
|
|
return o.r.Read(p)
|
|
}
|
|
|
|
func TestHandler_Write_EntityTooLarge_NoContentLength(t *testing.T) {
|
|
b := onlyReader{bytes.NewReader(make([]byte, 100))}
|
|
h := NewHandler(false)
|
|
h.Config.MaxBodySize = 5
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b))
|
|
if w.Code != http.StatusRequestEntityTooLarge {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
}
|
|
|
|
// TestHandler_Write_NegativeMaxBodySize verifies no error occurs if MaxBodySize is < 0
|
|
func TestHandler_Write_NegativeMaxBodySize(t *testing.T) {
|
|
b := bytes.NewReader([]byte(`foo n=1`))
|
|
h := NewHandler(false)
|
|
h.Config.MaxBodySize = -1
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
called := false
|
|
h.PointsWriter.WritePointsFn = func(_, _ string, _ models.ConsistencyLevel, _ meta.User, _ []models.Point) error {
|
|
called = true
|
|
return nil
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b))
|
|
if !called {
|
|
t.Fatal("WritePoints: expected call")
|
|
}
|
|
if w.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected status: %d", w.Code)
|
|
}
|
|
}
|
|
|
|
// TestHandler_Write_V1_Precision verifies v1 writes validate precision.
|
|
func TestHandler_Write_V1_Precision(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
h.PointsWriter.WritePointsFn = func(_, _ string, _ models.ConsistencyLevel, _ meta.User, _ []models.Point) error {
|
|
return nil
|
|
}
|
|
|
|
tests := []struct {
|
|
url string
|
|
status int
|
|
}{
|
|
// Successful requests.
|
|
{"/write?db=foo", http.StatusNoContent},
|
|
{"/write?db=foo&precision=n", http.StatusNoContent},
|
|
{"/write?db=foo&precision=u", http.StatusNoContent},
|
|
{"/write?db=foo&precision=ms", http.StatusNoContent},
|
|
{"/write?db=foo&precision=s", http.StatusNoContent},
|
|
{"/write?db=foo&precision=m", http.StatusNoContent},
|
|
{"/write?db=foo&precision=h", http.StatusNoContent},
|
|
// Invalid requests.
|
|
{"/write?db=foo&precision=us", http.StatusBadRequest},
|
|
}
|
|
|
|
runTest := func(url string, status int) {
|
|
b := bytes.NewReader([]byte(`foo n=1`))
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", url, b))
|
|
if w.Code != status {
|
|
t.Fatalf("unexpected result for: \"%s\"\n\texp: %d, got: %d\n\t%s", url, status, w.Code, w.Body)
|
|
}
|
|
}
|
|
|
|
for _, t := range tests {
|
|
runTest(t.url, t.status)
|
|
}
|
|
}
|
|
|
|
// TestHandler_Write_V2_Precision verifies v2 writes validate precision.
|
|
func TestHandler_Write_V2_Precision(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
|
|
return &meta.DatabaseInfo{}
|
|
}
|
|
h.PointsWriter.WritePointsFn = func(_, _ string, _ models.ConsistencyLevel, _ meta.User, _ []models.Point) error {
|
|
return nil
|
|
}
|
|
|
|
tests := []struct {
|
|
url string
|
|
status int
|
|
}{
|
|
// Successful requests.
|
|
{"/api/v2/write?org=bar&bucket=foo", http.StatusNoContent},
|
|
{"/api/v2/write?org=bar&bucket=foo&precision=ns", http.StatusNoContent},
|
|
{"/api/v2/write?org=bar&bucket=foo&precision=us", http.StatusNoContent},
|
|
{"/api/v2/write?org=bar&bucket=foo&precision=ms", http.StatusNoContent},
|
|
{"/api/v2/write?org=bar&bucket=foo&precision=s", http.StatusNoContent},
|
|
// Invalid requests.
|
|
{"/api/v2/write?org=bar&bucket=foo&precision=n", http.StatusBadRequest},
|
|
{"/api/v2/write?org=bar&bucket=foo&precision=u", http.StatusBadRequest},
|
|
{"/api/v2/write?org=bar&bucket=foo&precision=m", http.StatusBadRequest},
|
|
{"/api/v2/write?org=bar&bucket=foo&precision=h", http.StatusBadRequest},
|
|
}
|
|
|
|
runTest := func(url string, status int) {
|
|
b := bytes.NewReader([]byte(`foo n=1`))
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, MustNewRequest("POST", url, b))
|
|
if w.Code != status {
|
|
t.Fatalf("unexpected result for: \"%s\"\n\texp: %d, got: %d\n\t%s", url, status, w.Code, w.Body)
|
|
}
|
|
}
|
|
|
|
for _, t := range tests {
|
|
runTest(t.url, t.status)
|
|
}
|
|
}
|
|
|
|
func TestHandler_Delete_V2(t *testing.T) {
|
|
var errUnexpectedMeasurement = errors.New("unexpected measurement")
|
|
type test struct {
|
|
url string
|
|
body httpd.DeleteBody
|
|
status int
|
|
errMsg string
|
|
}
|
|
tests := []*test{
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z"},
|
|
status: http.StatusOK,
|
|
errMsg: ``,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z"},
|
|
status: http.StatusNotFound,
|
|
errMsg: `delete - bucket: bucket name "" is missing a slash; not in "database/retention-policy" format`,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z", Predicate: "?>!!?>?>;;;"},
|
|
status: http.StatusBadRequest,
|
|
errMsg: `delete - cannot parse predicate "?>!!?>?>;;; AND time >= '2022-03-23T22:56:06Z' AND time < '2022-03-23T20:56:06Z'": found ?, expected identifier, string, number, bool at line 1, char 1`,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z", Predicate: "_measurement=\"mymeasure\" AND t1=tagOne"},
|
|
status: http.StatusOK,
|
|
errMsg: ``,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T22:56:06Z"},
|
|
status: http.StatusOK,
|
|
errMsg: ``,
|
|
},
|
|
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Start: "2022-03-23T20:56:06Z"},
|
|
status: http.StatusBadRequest,
|
|
errMsg: "delete - stop field in RFC3339Nano format required",
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z"},
|
|
status: http.StatusBadRequest,
|
|
errMsg: "delete - start field in RFC3339Nano format required",
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Start: "2022-03-23T20:56:06Z", Stop: "NotAValidTime"},
|
|
status: http.StatusBadRequest,
|
|
errMsg: `delete - invalid format for stop field "NotAValidTime", please use RFC3339Nano: parsing time "NotAValidTime" as "2006-01-02T15:04:05.999999999Z07:00": cannot parse "NotAValidTime" as "2006"`,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "NotAValidTime"},
|
|
status: http.StatusBadRequest,
|
|
errMsg: `delete - invalid format for start field "NotAValidTime", please use RFC3339Nano: parsing time "NotAValidTime" as "2006-01-02T15:04:05.999999999Z07:00": cannot parse "NotAValidTime" as "2006"`,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = "mymeasure" AND "tag0" = "value1"`},
|
|
status: http.StatusOK,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = mymeasure AND "tag0" = "value1"`},
|
|
status: http.StatusOK,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = "mymeasure" AND "tag0" = "value1" AND tag1 = value3`},
|
|
status: http.StatusOK,
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = "bad_measurement" AND "tag0" = "value1" AND tag1 = value3`},
|
|
status: http.StatusBadRequest,
|
|
errMsg: "delete - database: \"mydb\", retention policy: \"myrp\", start: \"2022-03-23T18:56:06Z\", stop: \"2022-03-23T20:56:06Z\", predicate: \"_measurement = \\\"bad_measurement\\\" AND \\\"tag0\\\" = \\\"value1\\\" AND tag1 = value3\", error: unexpected measurement",
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = bad_measurement AND "tag0" = "value1" AND tag1 = value3`},
|
|
status: http.StatusBadRequest,
|
|
errMsg: "delete - database: \"mydb\", retention policy: \"myrp\", start: \"2022-03-23T18:56:06Z\", stop: \"2022-03-23T20:56:06Z\", predicate: \"_measurement = bad_measurement AND \\\"tag0\\\" = \\\"value1\\\" AND tag1 = value3\", error: unexpected measurement",
|
|
},
|
|
&test{
|
|
url: "/api/v2/delete?org=bar&bucket=mydb/myrp",
|
|
body: httpd.DeleteBody{Stop: "2022-03-23T20:56:06Z", Start: "2022-03-23T18:56:06Z", Predicate: `_measurement = "mymeasure" AND "tag0" != "value1" AND tag1 = value3`},
|
|
status: http.StatusBadRequest,
|
|
errMsg: `delete - predicate only supports equality operators and conjunctions. database: "mydb", retention policy: "myrp", start: "2022-03-23T18:56:06Z", stop: "2022-03-23T20:56:06Z", predicate: "_measurement = \"mymeasure\" AND \"tag0\" != \"value1\" AND tag1 = value3"`,
|
|
},
|
|
}
|
|
|
|
h := NewHandler(false)
|
|
h.Store.DeleteFn = func(database string, sources []influxql.Source, condition influxql.Expr) error {
|
|
if len(sources) > 0 {
|
|
if m, ok := sources[0].(*influxql.Measurement); ok && m.Name != "mymeasure" {
|
|
return errUnexpectedMeasurement
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
h.MetaClient = &internal.MetaClientMock{
|
|
DatabaseFn: func(name string) *meta.DatabaseInfo {
|
|
if name == "mydb" {
|
|
return &meta.DatabaseInfo{
|
|
Name: "mydb",
|
|
RetentionPolicies: []meta.RetentionPolicyInfo{meta.RetentionPolicyInfo{Name: "myrp"}},
|
|
}
|
|
} else {
|
|
return nil
|
|
}
|
|
},
|
|
}
|
|
h.Handler.MetaClient = h.MetaClient
|
|
|
|
var req *http.Request
|
|
fn := func(ct *test) {
|
|
w := httptest.NewRecorder()
|
|
if body, err := json.Marshal(&ct.body); err != nil {
|
|
t.Fatalf("error marshaling body: %s", err)
|
|
} else {
|
|
req = MustNewJSONRequest("POST", ct.url, bytes.NewReader(body))
|
|
}
|
|
h.ServeHTTP(w, req)
|
|
var errMsg string
|
|
if w.Code != ct.status {
|
|
t.Fatalf("error, expected %d got %d: %s", ct.status, w.Code, errMsg)
|
|
} else if w.Code != http.StatusOK {
|
|
errMsg = w.Header().Get("X-InfluxDB-Error")
|
|
if errMsg != ct.errMsg {
|
|
t.Fatalf("incorrect error message, expected: %q, got: %q", ct.errMsg, errMsg)
|
|
}
|
|
}
|
|
}
|
|
for _, ct := range tests {
|
|
fn(ct)
|
|
}
|
|
}
|
|
|
|
func TestHandler_CreateDeleteBuckets(t *testing.T) {
|
|
const existingDb = "mydb"
|
|
const newDb = "newDb"
|
|
const goodRp = "myrp"
|
|
const postMethod = "POST"
|
|
const deleteMethod = "DELETE"
|
|
const patchMethod = "PATCH"
|
|
|
|
type test struct {
|
|
url string
|
|
method string
|
|
body httpd.BucketsBody
|
|
status int
|
|
errMsg string
|
|
}
|
|
|
|
tests := []*test{
|
|
{
|
|
url: "/api/v2/buckets",
|
|
method: postMethod,
|
|
body: httpd.BucketsBody{
|
|
BucketUpdate: httpd.BucketUpdate{
|
|
Name: existingDb + "/",
|
|
RetentionRules: []httpd.RetentionRule{
|
|
httpd.RetentionRule{
|
|
EverySeconds: 7200,
|
|
ShardGroupDurationSeconds: 14400,
|
|
},
|
|
},
|
|
},
|
|
Rp: "",
|
|
SchemaType: "implicit",
|
|
},
|
|
status: http.StatusBadRequest,
|
|
errMsg: `buckets - illegal bucket name: "mydb/"`,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets",
|
|
method: postMethod,
|
|
body: httpd.BucketsBody{
|
|
BucketUpdate: httpd.BucketUpdate{
|
|
Name: existingDb + "//",
|
|
RetentionRules: []httpd.RetentionRule{
|
|
httpd.RetentionRule{
|
|
EverySeconds: 7200,
|
|
ShardGroupDurationSeconds: 14400,
|
|
},
|
|
},
|
|
},
|
|
Rp: "",
|
|
SchemaType: "implicit",
|
|
},
|
|
status: http.StatusBadRequest,
|
|
errMsg: `buckets - retention policy "/": invalid name`,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/" + existingDb + "%2f" + goodRp,
|
|
method: patchMethod,
|
|
body: httpd.BucketsBody{
|
|
BucketUpdate: httpd.BucketUpdate{
|
|
Name: "newNewRp",
|
|
RetentionRules: []httpd.RetentionRule{
|
|
{
|
|
EverySeconds: 6000,
|
|
ShardGroupDurationSeconds: 18000,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
status: http.StatusOK,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/" + existingDb + "/",
|
|
method: deleteMethod,
|
|
status: http.StatusNotFound,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/baddb%2f" + goodRp,
|
|
method: deleteMethod,
|
|
status: http.StatusNotFound,
|
|
errMsg: `delete bucket - not found: "baddb/myrp"`,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets",
|
|
method: postMethod,
|
|
body: httpd.BucketsBody{
|
|
BucketUpdate: httpd.BucketUpdate{
|
|
Name: newDb + "/" + goodRp,
|
|
RetentionRules: []httpd.RetentionRule{
|
|
httpd.RetentionRule{
|
|
EverySeconds: 7200,
|
|
ShardGroupDurationSeconds: 14400,
|
|
},
|
|
},
|
|
},
|
|
Rp: goodRp,
|
|
SchemaType: "implicit",
|
|
},
|
|
status: http.StatusCreated,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/" + existingDb + "%2fbadrp",
|
|
method: deleteMethod,
|
|
status: http.StatusNotFound,
|
|
errMsg: `delete bucket - not found: "mydb/badrp"`,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets",
|
|
method: postMethod,
|
|
body: httpd.BucketsBody{
|
|
BucketUpdate: httpd.BucketUpdate{
|
|
Name: existingDb + "/" + goodRp,
|
|
RetentionRules: []httpd.RetentionRule{
|
|
httpd.RetentionRule{
|
|
EverySeconds: 7200,
|
|
ShardGroupDurationSeconds: 14400,
|
|
},
|
|
},
|
|
},
|
|
Rp: goodRp,
|
|
SchemaType: "implicit",
|
|
},
|
|
status: http.StatusCreated,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/" + existingDb + "%2f" + goodRp,
|
|
method: deleteMethod,
|
|
status: http.StatusOK,
|
|
},
|
|
}
|
|
|
|
createRp := func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) {
|
|
return &meta.RetentionPolicyInfo{
|
|
Name: spec.Name,
|
|
ReplicaN: *spec.ReplicaN,
|
|
Duration: *spec.Duration,
|
|
ShardGroupDuration: spec.ShardGroupDuration,
|
|
}, nil
|
|
}
|
|
|
|
lookupDb := func(name string) *meta.DatabaseInfo {
|
|
if name == existingDb {
|
|
return &meta.DatabaseInfo{
|
|
Name: name,
|
|
DefaultRetentionPolicy: goodRp,
|
|
RetentionPolicies: []meta.RetentionPolicyInfo{meta.RetentionPolicyInfo{Name: goodRp}},
|
|
}
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
dropDeleteRp := func(database, rp string) error {
|
|
if dbi := lookupDb(database); dbi == nil {
|
|
return fmt.Errorf("database not found: %q", database)
|
|
} else if len(dbi.RetentionPolicies) <= 0 || dbi.RetentionPolicies[0].Name != rp {
|
|
return fmt.Errorf("retention policy in database %q not found: %q", database, rp)
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
updateRp := func(database string, name string, update *meta.RetentionPolicyUpdate, makeDefault bool) error {
|
|
if database == existingDb && name == goodRp {
|
|
return nil
|
|
} else {
|
|
return fmt.Errorf("bucket not found: %q", fmt.Sprintf("%s/%s", database, name))
|
|
}
|
|
}
|
|
|
|
h := NewHandler(false)
|
|
|
|
h.MetaClient = &internal.MetaClientMock{
|
|
DatabaseFn: lookupDb,
|
|
CreateRetentionPolicyFn: createRp,
|
|
CreateDatabaseWithRetentionPolicyFn: func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
|
|
rpi, err := createRp(name, spec, true)
|
|
return &meta.DatabaseInfo{
|
|
Name: name,
|
|
DefaultRetentionPolicy: spec.Name,
|
|
RetentionPolicies: []meta.RetentionPolicyInfo{*rpi},
|
|
ContinuousQueries: nil,
|
|
}, err
|
|
},
|
|
DropRetentionPolicyFn: dropDeleteRp,
|
|
UpdateRetentionPolicyFn: updateRp,
|
|
}
|
|
|
|
h.Store.DeleteRetentionPolicyFn = dropDeleteRp
|
|
h.Handler.Store = h.Store
|
|
h.Handler.MetaClient = h.MetaClient
|
|
|
|
var req *http.Request
|
|
fn := func(ct *test) {
|
|
w := httptest.NewRecorder()
|
|
if body, err := json.Marshal(&ct.body); err != nil {
|
|
t.Fatalf("error marshaling body: %s", err)
|
|
} else {
|
|
req = MustNewJSONRequest(ct.method, ct.url, bytes.NewReader(body))
|
|
}
|
|
h.ServeHTTP(w, req)
|
|
var errMsg string
|
|
if w.Code != ct.status {
|
|
t.Fatalf("error, expected %d got %d: %s", ct.status, w.Code, errMsg)
|
|
} else if w.Code != http.StatusOK {
|
|
errMsg = w.Header().Get("X-InfluxDB-Error")
|
|
if errMsg != ct.errMsg {
|
|
t.Fatalf("incorrect error message, expected: %q, got: %q", ct.errMsg, errMsg)
|
|
}
|
|
}
|
|
}
|
|
for _, ct := range tests {
|
|
fn(ct)
|
|
}
|
|
}
|
|
|
|
var testBuckets = []meta.DatabaseInfo{
|
|
{
|
|
Name: "dbOne",
|
|
RetentionPolicies: []meta.RetentionPolicyInfo{{Name: "rpOne_1"}, {Name: "rpTwo_1"}, {Name: "rpThree_1"}},
|
|
},
|
|
{
|
|
Name: "dbTwo",
|
|
RetentionPolicies: []meta.RetentionPolicyInfo{{Name: "rpOne_2"}, {Name: "rpTwo_2"}, {Name: "rpThree_2"}, {Name: "rpFour_2"}},
|
|
},
|
|
{
|
|
Name: "dbThree",
|
|
RetentionPolicies: []meta.RetentionPolicyInfo{{Name: "rpOne_3"}},
|
|
},
|
|
}
|
|
|
|
func getBuckets(offset, limit int) []string {
|
|
i := 0
|
|
buckets := make([]string, 0, 8)
|
|
|
|
for _, dbi := range testBuckets {
|
|
for _, rpi := range dbi.RetentionPolicies {
|
|
if limit <= (i - offset) {
|
|
return buckets
|
|
}
|
|
if i >= offset {
|
|
buckets = append(buckets, fmt.Sprintf("%s/%s", dbi.Name, rpi.Name))
|
|
}
|
|
i++
|
|
}
|
|
}
|
|
return buckets
|
|
}
|
|
|
|
func TestHandler_ListBuckets(t *testing.T) {
|
|
type test struct {
|
|
url string
|
|
status int
|
|
errMsg string
|
|
skip int
|
|
limit int
|
|
}
|
|
|
|
tests := []*test{
|
|
{
|
|
url: "/api/v2/buckets?after=dbOne/rpTwo_1&limit=-1",
|
|
status: http.StatusOK,
|
|
skip: 2,
|
|
limit: 1000000,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?offset=200",
|
|
status: http.StatusOK,
|
|
skip: 100,
|
|
limit: 20,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?id=dbOne/rpTwo_1&name=NotThere/rpTwo_1",
|
|
status: http.StatusBadRequest,
|
|
skip: 1,
|
|
limit: 1,
|
|
errMsg: "list buckets: name: \"NotThere/rpTwo_1\" and id: \"dbOne/rpTwo_1\" do not match",
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?after=dbOne/rpTwo_1&limit=4",
|
|
status: http.StatusOK,
|
|
skip: 2,
|
|
limit: 4,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?after=dbOne/rpThree_1",
|
|
status: http.StatusOK,
|
|
skip: 3,
|
|
limit: 20,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?after=dbTwo/rpTwo_2",
|
|
status: http.StatusOK,
|
|
skip: 5,
|
|
limit: 20,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?id=dbOne/rpTwo_1&name=dbOne/rpTwo_1",
|
|
status: http.StatusOK,
|
|
skip: 1,
|
|
limit: 1,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?id=dbOne/rpTwo_1",
|
|
status: http.StatusOK,
|
|
skip: 1,
|
|
limit: 1,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?name=dbOne/rpTwo_1",
|
|
status: http.StatusOK,
|
|
skip: 1,
|
|
limit: 1,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?offset=3&after=dbOne/rpOne_1",
|
|
status: http.StatusBadRequest,
|
|
skip: 0,
|
|
limit: 20,
|
|
errMsg: "list buckets cannot have both \"offset\" and \"after\" arguments",
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?offset=3&limit=4",
|
|
status: http.StatusOK,
|
|
skip: 3,
|
|
limit: 4,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets?offset=1&limit=5",
|
|
status: http.StatusOK,
|
|
skip: 1,
|
|
limit: 5,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets",
|
|
status: http.StatusOK,
|
|
skip: 0,
|
|
limit: 20,
|
|
},
|
|
}
|
|
|
|
lookupDbFn := func(name string) *meta.DatabaseInfo {
|
|
for i := 0; i < len(testBuckets); i++ {
|
|
if testBuckets[i].Name == name {
|
|
return &testBuckets[i]
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
dbsFn := func() []meta.DatabaseInfo {
|
|
return testBuckets
|
|
}
|
|
|
|
h := NewHandler(false)
|
|
|
|
h.MetaClient = &internal.MetaClientMock{
|
|
DatabaseFn: lookupDbFn,
|
|
DatabasesFn: dbsFn,
|
|
}
|
|
|
|
h.Handler.MetaClient = h.MetaClient
|
|
|
|
var req *http.Request
|
|
fn := func(ct *test) {
|
|
w := httptest.NewRecorder()
|
|
req = MustNewJSONRequest("GET", ct.url, nil)
|
|
h.ServeHTTP(w, req)
|
|
var errMsg string
|
|
if w.Code != ct.status {
|
|
t.Fatalf("error, expected %d got %d: %s", ct.status, w.Code, errMsg)
|
|
} else if w.Code != http.StatusOK {
|
|
errMsg = w.Header().Get("X-InfluxDB-Error")
|
|
if errMsg != ct.errMsg {
|
|
t.Fatalf("incorrect error message, expected: %q, got: %q", ct.errMsg, errMsg)
|
|
}
|
|
} else {
|
|
var got httpd.Buckets
|
|
exp := getBuckets(ct.skip, ct.limit)
|
|
|
|
if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil {
|
|
t.Fatalf("unmarshaling buckets: %s", err.Error())
|
|
}
|
|
if len(exp) != len(got.Buckets) {
|
|
t.Fatalf("expected %d buckets returned, got %d", len(exp), len(got.Buckets))
|
|
}
|
|
for i := 0; i < len(got.Buckets); i++ {
|
|
if exp[i] != got.Buckets[i].Name {
|
|
t.Fatalf("expected %q, got %q", exp[i], got.Buckets[i].Name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for _, ct := range tests {
|
|
fn(ct)
|
|
}
|
|
}
|
|
|
|
func TestHandler_RetrieveBucket(t *testing.T) {
|
|
type test struct {
|
|
url string
|
|
status int
|
|
errMsg string
|
|
exp string
|
|
}
|
|
|
|
tests := []*test{
|
|
{
|
|
url: "/api/v2/buckets/dbOne//",
|
|
status: http.StatusNotFound,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/%2frpTwo_1",
|
|
status: http.StatusBadRequest,
|
|
errMsg: `bucket "/rpTwo_1": bucket name "/rpTwo_1" is in db/rp form but has an empty database`,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/dbOne%2f",
|
|
status: http.StatusBadRequest,
|
|
errMsg: `bucket "dbOne/": illegal bucket id, empty retention policy`,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/dbFive%2frpTwo_1",
|
|
status: http.StatusNotFound,
|
|
errMsg: `bucket not found: "dbFive/rpTwo_1"`,
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/dbOne%2frpTwo_1",
|
|
status: http.StatusOK,
|
|
exp: "dbOne/rpTwo_1",
|
|
},
|
|
{
|
|
url: "/api/v2/buckets/dbOne%2frpOne_2",
|
|
status: http.StatusNotFound,
|
|
errMsg: `bucket not found: "dbOne/rpOne_2"`,
|
|
},
|
|
}
|
|
lookupDbFn := func(name string) *meta.DatabaseInfo {
|
|
for i := 0; i < len(testBuckets); i++ {
|
|
if testBuckets[i].Name == name {
|
|
return &testBuckets[i]
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
h := NewHandler(false)
|
|
|
|
h.MetaClient = &internal.MetaClientMock{
|
|
DatabaseFn: lookupDbFn,
|
|
}
|
|
|
|
h.Handler.MetaClient = h.MetaClient
|
|
|
|
var req *http.Request
|
|
fn := func(ct *test) {
|
|
w := httptest.NewRecorder()
|
|
req = MustNewJSONRequest("GET", ct.url, nil)
|
|
h.ServeHTTP(w, req)
|
|
var errMsg string
|
|
if w.Code != ct.status {
|
|
t.Fatalf("error, test %s: expected %d got %d: %s", ct.url, ct.status, w.Code, errMsg)
|
|
} else if w.Code != http.StatusOK {
|
|
errMsg = w.Header().Get("X-InfluxDB-Error")
|
|
if errMsg != ct.errMsg {
|
|
t.Fatalf("incorrect error message, test %s: expected: %q, got: %q", ct.url, ct.errMsg, errMsg)
|
|
}
|
|
} else {
|
|
var got httpd.Bucket
|
|
if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil {
|
|
t.Fatalf("unmarshaling buckets: %s", err.Error())
|
|
}
|
|
if ct.exp != got.Name {
|
|
t.Fatalf("expected %q, got %q", ct.exp, got.Name)
|
|
}
|
|
}
|
|
}
|
|
for _, ct := range tests {
|
|
fn(ct)
|
|
}
|
|
}
|
|
|
|
func TestHandler_UnsupportedV2API(t *testing.T) {
|
|
type test struct {
|
|
method string
|
|
url string
|
|
status int
|
|
errMsg string
|
|
}
|
|
tests := []*test{
|
|
{
|
|
method: "GET",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/labels",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket labels not supported in this version"},
|
|
{
|
|
method: "POST",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/labels",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket labels not supported in this version",
|
|
},
|
|
{
|
|
method: "DELETE",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/labels/mylabel",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket labels not supported in this version"},
|
|
{
|
|
method: "GET",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/members",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket members not supported in this version",
|
|
},
|
|
{
|
|
method: "POST",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/members",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket members not supported in this version",
|
|
},
|
|
{
|
|
method: "DELETE",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/members/amember",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket members not supported in this version",
|
|
},
|
|
{
|
|
method: "GET",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/owners",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket owners not supported in this version",
|
|
},
|
|
{
|
|
method: "POST",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/owners",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket owners not supported in this version",
|
|
},
|
|
{
|
|
method: "DELETE",
|
|
url: "/api/v2/buckets/mydb%2fmyrp/owners/anowner",
|
|
status: http.StatusMethodNotAllowed,
|
|
errMsg: "bucket owners not supported in this version",
|
|
},
|
|
}
|
|
h := NewHandler(false)
|
|
|
|
var req *http.Request
|
|
fn := func(ct *test) {
|
|
w := httptest.NewRecorder()
|
|
req = MustNewJSONRequest(ct.method, ct.url, nil)
|
|
h.ServeHTTP(w, req)
|
|
var errMsg string
|
|
if w.Code != ct.status {
|
|
t.Fatalf("error, expected %d got %d: %s", ct.status, w.Code, errMsg)
|
|
} else if w.Code != http.StatusOK {
|
|
errMsg = w.Header().Get("X-InfluxDB-Error")
|
|
if errMsg != ct.errMsg {
|
|
t.Fatalf("incorrect error message, expected: %q, got: %q", ct.errMsg, errMsg)
|
|
}
|
|
}
|
|
}
|
|
for _, ct := range tests {
|
|
fn(ct)
|
|
}
|
|
}
|
|
|
|
// Ensure X-Forwarded-For header writes the correct log message.
|
|
func TestHandler_XForwardedFor(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
h := NewHandler(false)
|
|
h.CLFLogger = log.New(&buf, "", 0)
|
|
|
|
req := MustNewRequest("GET", "/query", nil)
|
|
req.Header.Set("X-Forwarded-For", "192.168.0.1")
|
|
req.RemoteAddr = "127.0.0.1"
|
|
h.ServeHTTP(httptest.NewRecorder(), req)
|
|
|
|
parts := strings.Split(buf.String(), " ")
|
|
if parts[0] != "192.168.0.1,127.0.0.1" {
|
|
t.Errorf("unexpected host ip address: %s", parts[0])
|
|
}
|
|
}
|
|
|
|
func TestHandler_XRequestId(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
h := NewHandler(false)
|
|
h.CLFLogger = log.New(&buf, "", 0)
|
|
|
|
cases := []map[string]string{
|
|
{"X-Request-Id": "abc123", "Request-Id": ""}, // X-Request-Id is used.
|
|
{"X-REQUEST-ID": "cde", "Request-Id": ""}, // X-REQUEST-ID is used.
|
|
{"X-Request-Id": "", "Request-Id": "foobarzoo"}, // Request-Id is used.
|
|
{"X-Request-Id": "abc123", "Request-Id": "foobarzoo"}, // X-Request-Id takes precedence.
|
|
{"X-Request-Id": "", "Request-Id": ""}, // v1 UUID generated.
|
|
}
|
|
|
|
for _, c := range cases {
|
|
t.Run(fmt.Sprint(c), func(t *testing.T) {
|
|
buf.Reset()
|
|
req := MustNewRequest("GET", "/ping", nil)
|
|
req.RemoteAddr = "127.0.0.1"
|
|
|
|
// Set the relevant request ID headers
|
|
var allEmpty = true
|
|
for k, v := range c {
|
|
req.Header.Set(k, v)
|
|
if v != "" {
|
|
allEmpty = false
|
|
}
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
|
|
// Split up the HTTP log line. The request ID is currently located in
|
|
// index 12. If the log line gets changed in the future, this test
|
|
// will likely break and the index will need to be updated.
|
|
parts := strings.Split(buf.String(), " ")
|
|
i := 12
|
|
|
|
// If neither header is set then we expect a v1 UUID to be generated.
|
|
if allEmpty {
|
|
if got, exp := len(parts[i]), 36; got != exp {
|
|
t.Fatalf("got ID of length %d, expected one of length %d", got, exp)
|
|
}
|
|
} else if c["X-Request-Id"] != "" {
|
|
if got, exp := parts[i], c["X-Request-Id"]; got != exp {
|
|
t.Fatalf("got ID of %q, expected %q", got, exp)
|
|
}
|
|
} else if c["X-REQUEST-ID"] != "" {
|
|
if got, exp := parts[i], c["X-REQUEST-ID"]; got != exp {
|
|
t.Fatalf("got ID of %q, expected %q", got, exp)
|
|
}
|
|
} else {
|
|
if got, exp := parts[i], c["Request-Id"]; got != exp {
|
|
t.Fatalf("got ID of %q, expected %q", got, exp)
|
|
}
|
|
}
|
|
|
|
// Check response headers
|
|
if got, exp := w.Header().Get("Request-Id"), parts[i]; got != exp {
|
|
t.Fatalf("Request-Id header was %s, expected %s", got, exp)
|
|
} else if got, exp := w.Header().Get("X-Request-Id"), parts[i]; got != exp {
|
|
t.Fatalf("X-Request-Id header was %s, expected %s", got, exp)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestThrottler_Handler(t *testing.T) {
|
|
t.Run("OK", func(t *testing.T) {
|
|
throttler := httpd.NewThrottler(2, 98)
|
|
|
|
// Send the total number of concurrent requests to the channel.
|
|
var concurrentN int32
|
|
concurrentCh := make(chan int)
|
|
|
|
h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
atomic.AddInt32(&concurrentN, 1)
|
|
concurrentCh <- int(atomic.LoadInt32(&concurrentN))
|
|
time.Sleep(1 * time.Millisecond)
|
|
atomic.AddInt32(&concurrentN, -1)
|
|
}))
|
|
|
|
// Execute requests concurrently.
|
|
const n = 100
|
|
for i := 0; i < n; i++ {
|
|
go func() { h.ServeHTTP(nil, nil) }()
|
|
}
|
|
|
|
// Read the number of concurrent requests for every execution.
|
|
for i := 0; i < n; i++ {
|
|
if v := <-concurrentCh; v > 2 {
|
|
t.Fatalf("concurrent requests exceed maximum: %d", v)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("ErrTimeout", func(t *testing.T) {
|
|
throttler := httpd.NewThrottler(2, 1)
|
|
throttler.EnqueueTimeout = 1 * time.Millisecond
|
|
|
|
begin, end := make(chan struct{}), make(chan struct{})
|
|
h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
begin <- struct{}{}
|
|
end <- struct{}{}
|
|
}))
|
|
|
|
// First two requests should execute immediately.
|
|
go func() { h.ServeHTTP(nil, nil) }()
|
|
go func() { h.ServeHTTP(nil, nil) }()
|
|
|
|
<-begin
|
|
<-begin
|
|
|
|
// Third request should be enqueued but timeout.
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, nil)
|
|
if w.Code != http.StatusServiceUnavailable {
|
|
t.Fatalf("unexpected status code: %d", w.Code)
|
|
} else if body := w.Body.String(); body != "request throttled, exceeds timeout\n" {
|
|
t.Fatalf("unexpected response body: %q", body)
|
|
}
|
|
|
|
// Allow 2 existing requests to complete.
|
|
<-end
|
|
<-end
|
|
})
|
|
|
|
t.Run("ErrFull", func(t *testing.T) {
|
|
delay := 100 * time.Millisecond
|
|
if os.Getenv("CI") != "" {
|
|
delay = 2 * time.Second
|
|
}
|
|
|
|
throttler := httpd.NewThrottler(2, 1)
|
|
|
|
resp := make(chan struct{})
|
|
h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
resp <- struct{}{}
|
|
}))
|
|
|
|
// First two requests should execute immediately and third should be queued.
|
|
go func() { h.ServeHTTP(nil, nil) }()
|
|
go func() { h.ServeHTTP(nil, nil) }()
|
|
go func() { h.ServeHTTP(nil, nil) }()
|
|
time.Sleep(delay)
|
|
|
|
// Fourth request should fail when trying to enqueue.
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, nil)
|
|
if w.Code != http.StatusServiceUnavailable {
|
|
t.Fatalf("unexpected status code: %d", w.Code)
|
|
} else if body := w.Body.String(); body != "request throttled, queue full\n" {
|
|
t.Fatalf("unexpected response body: %q", body)
|
|
}
|
|
|
|
// Allow 3 existing requests to complete.
|
|
<-resp
|
|
<-resp
|
|
<-resp
|
|
})
|
|
}
|
|
|
|
func TestHandlerDebugVars(t *testing.T) {
|
|
stats := func(s ...*monitor.Statistic) ([]*monitor.Statistic, error) {
|
|
return s, nil
|
|
}
|
|
stat := func(name string, tags map[string]string, vals map[string]interface{}) *monitor.Statistic {
|
|
return &monitor.Statistic{
|
|
Statistic: models.Statistic{
|
|
Name: name,
|
|
Tags: tags,
|
|
Values: vals,
|
|
},
|
|
}
|
|
}
|
|
tags := func(kv ...string) map[string]string {
|
|
if len(kv)%2 != 0 {
|
|
panic("expect even number of key/values")
|
|
}
|
|
res := make(map[string]string, len(kv)/2)
|
|
for i := 0; i < len(kv); i += 2 {
|
|
res[kv[i]] = kv[i+1]
|
|
}
|
|
return res
|
|
}
|
|
vals := func(kv ...interface{}) map[string]interface{} {
|
|
if len(kv)%2 != 0 {
|
|
panic("expect even number of key/values")
|
|
}
|
|
res := make(map[string]interface{}, len(kv)/2)
|
|
for i := 0; i < len(kv); i += 2 {
|
|
if key, ok := kv[i].(string); !ok {
|
|
panic("key must be string")
|
|
} else {
|
|
res[key] = kv[i+1]
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
newDiagFn := func(d map[string]*diagnostics.Diagnostics) func() (map[string]*diagnostics.Diagnostics, error) {
|
|
return func() (map[string]*diagnostics.Diagnostics, error) {
|
|
return d, nil
|
|
}
|
|
}
|
|
|
|
var Ignored = []string{"memstats", "cmdline"}
|
|
read := func(t *testing.T, b *bytes.Buffer, del ...string) map[string]interface{} {
|
|
t.Helper()
|
|
res := make(map[string]interface{})
|
|
if err := json.Unmarshal(b.Bytes(), &res); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
for _, k := range del {
|
|
delete(res, k)
|
|
}
|
|
|
|
return res
|
|
}
|
|
keys := func(m map[string]interface{}) []string {
|
|
keys := make([]string, 0, len(m))
|
|
for k := range m {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
return keys
|
|
}
|
|
|
|
// stats tests the results of serializing Monitor.Statistics
|
|
t.Run("stats", func(t *testing.T) {
|
|
t.Run("generates unique keys using known tags", func(t *testing.T) {
|
|
h := NewHandler(false)
|
|
h.Monitor.StatisticsFn = func(_ map[string]string) ([]*monitor.Statistic, error) {
|
|
return stats(
|
|
stat("database", tags("database", "foo"), nil),
|
|
stat("hh", tags("path", "/mnt/foo/bar"), nil),
|
|
stat("httpd", tags("bind", "127.0.0.1:8088", "proto", "https"), nil),
|
|
stat("other", tags("foo", "bar"), nil),
|
|
stat("shard", tags("path", "/mnt/foo", "id", "111"), nil),
|
|
)
|
|
}
|
|
h.Monitor.DiagnosticsFn = newDiagFn(map[string]*diagnostics.Diagnostics{})
|
|
req := MustNewRequest("GET", "/debug/vars", nil)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
got := keys(read(t, w.Body, Ignored...))
|
|
exp := []string{"crypto", "database:foo", "hh:/mnt/foo/bar", "httpd:https:127.0.0.1:8088", "other", "shard:/mnt/foo:111"}
|
|
if !cmp.Equal(got, exp) {
|
|
t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp))
|
|
}
|
|
})
|
|
|
|
t.Run("generates numbered keys for collisions", func(t *testing.T) {
|
|
// This also implicitly tests the case where no `crypto` diagnostics are not set by application.
|
|
h := NewHandler(false)
|
|
h.Monitor.StatisticsFn = func(_ map[string]string) ([]*monitor.Statistic, error) {
|
|
return stats(
|
|
stat("hh_processor", tags("db", "foo", "shardID", "10"), vals("queueSize", 100)),
|
|
stat("hh_processor", tags("db", "foo", "shardID", "15"), vals("queueSize", 500)),
|
|
stat("hh_processor", tags("db", "bar", "shardID", "20"), vals("queueSize", 200)),
|
|
stat("hh_processor", tags("db", "bar", "shardID", "25"), vals("queueSize", 700)),
|
|
)
|
|
}
|
|
req := MustNewRequest("GET", "/debug/vars", nil)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
got := read(t, w.Body, Ignored...)
|
|
exp := map[string]interface{}{
|
|
"crypto": map[string]interface{}{
|
|
"FIPS": false,
|
|
"ensureFIPS": false,
|
|
"passwordHash": "bcrypt",
|
|
"implementation": "Go",
|
|
},
|
|
"hh_processor": map[string]interface{}{
|
|
"name": "hh_processor",
|
|
"tags": map[string]interface{}{"db": "foo", "shardID": "10"},
|
|
"values": map[string]interface{}{"queueSize": float64(100)},
|
|
},
|
|
"hh_processor:1": map[string]interface{}{
|
|
"name": "hh_processor",
|
|
"tags": map[string]interface{}{"db": "foo", "shardID": "15"},
|
|
"values": map[string]interface{}{"queueSize": float64(500)},
|
|
},
|
|
"hh_processor:2": map[string]interface{}{
|
|
"name": "hh_processor",
|
|
"tags": map[string]interface{}{"db": "bar", "shardID": "20"},
|
|
"values": map[string]interface{}{"queueSize": float64(200)},
|
|
},
|
|
"hh_processor:3": map[string]interface{}{
|
|
"name": "hh_processor",
|
|
"tags": map[string]interface{}{"db": "bar", "shardID": "25"},
|
|
"values": map[string]interface{}{"queueSize": float64(700)},
|
|
},
|
|
}
|
|
if !cmp.Equal(got, exp) {
|
|
t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp))
|
|
}
|
|
})
|
|
})
|
|
|
|
t.Run("checks crypto diagnostic handling", func(t *testing.T) {
|
|
h := NewHandler(false)
|
|
// intentionally leave out "ensureFIPS" to test that code path
|
|
h.Monitor.DiagnosticsFn = newDiagFn(
|
|
map[string]*diagnostics.Diagnostics{
|
|
"crypto": diagnostics.RowFromMap(map[string]interface{}{
|
|
"FIPS": true,
|
|
"passwordHash": "pbkdf2-sha256",
|
|
"implementation": "BoringCrypto",
|
|
}),
|
|
})
|
|
req := MustNewRequest("GET", "/debug/vars", nil)
|
|
w := httptest.NewRecorder()
|
|
h.ServeHTTP(w, req)
|
|
got := read(t, w.Body, Ignored...)
|
|
exp := map[string]interface{}{
|
|
"crypto": map[string]interface{}{
|
|
"FIPS": true,
|
|
"ensureFIPS": nil,
|
|
"passwordHash": "pbkdf2-sha256",
|
|
"implementation": "BoringCrypto",
|
|
},
|
|
}
|
|
if !cmp.Equal(got, exp) {
|
|
t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp))
|
|
}
|
|
})
|
|
|
|
}
|
|
|
|
// NewHandler represents a test wrapper for httpd.Handler.
|
|
type Handler struct {
|
|
*httpd.Handler
|
|
MetaClient *internal.MetaClientMock
|
|
StatementExecutor HandlerStatementExecutor
|
|
QueryAuthorizer HandlerQueryAuthorizer
|
|
PointsWriter HandlerPointsWriter
|
|
Monitor *HandlerMonitor
|
|
Store *internal.StorageStoreMock
|
|
Controller *internal.FluxControllerMock
|
|
}
|
|
|
|
type configOption func(c *httpd.Config)
|
|
|
|
func WithAuthentication() configOption {
|
|
return func(c *httpd.Config) {
|
|
c.AuthEnabled = true
|
|
c.SharedSecret = "super secret key"
|
|
}
|
|
}
|
|
|
|
func WithPprofAuthEnabled() configOption {
|
|
return func(c *httpd.Config) {
|
|
c.PprofEnabled = true
|
|
c.PprofAuthEnabled = true
|
|
}
|
|
}
|
|
|
|
func WithPingAuthEnabled() configOption {
|
|
return func(c *httpd.Config) {
|
|
c.PingAuthEnabled = true
|
|
}
|
|
}
|
|
|
|
func WithFlux() configOption {
|
|
return func(c *httpd.Config) {
|
|
c.FluxEnabled = true
|
|
}
|
|
}
|
|
|
|
func WithNoLog() configOption {
|
|
return func(c *httpd.Config) {
|
|
c.LogEnabled = false
|
|
}
|
|
}
|
|
|
|
func WithHeaders(h map[string]string) configOption {
|
|
return func(c *httpd.Config) {
|
|
c.HTTPHeaders = h
|
|
}
|
|
}
|
|
|
|
// NewHandlerConfig returns a new instance of httpd.Config with
|
|
// authentication configured.
|
|
func NewHandlerConfig(opts ...configOption) httpd.Config {
|
|
config := httpd.NewConfig()
|
|
for _, opt := range opts {
|
|
opt(&config)
|
|
}
|
|
return config
|
|
}
|
|
|
|
// NewHandler returns a new instance of Handler.
|
|
func NewHandler(requireAuthentication bool) *Handler {
|
|
var opts []configOption
|
|
if requireAuthentication {
|
|
opts = append(opts, WithAuthentication())
|
|
}
|
|
|
|
return NewHandlerWithConfig(NewHandlerConfig(opts...))
|
|
}
|
|
|
|
func NewHandlerWithConfig(config httpd.Config) *Handler {
|
|
h := &Handler{
|
|
Handler: httpd.NewHandler(config),
|
|
}
|
|
|
|
h.MetaClient = &internal.MetaClientMock{}
|
|
h.Store = internal.NewStorageStoreMock()
|
|
h.Controller = internal.NewFluxControllerMock()
|
|
h.Monitor = newHandlerMonitor()
|
|
|
|
h.Handler.MetaClient = h.MetaClient
|
|
h.Handler.Store = h.Store
|
|
h.Handler.QueryExecutor = query.NewExecutor()
|
|
h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor
|
|
h.Handler.QueryAuthorizer = &h.QueryAuthorizer
|
|
h.Handler.PointsWriter = &h.PointsWriter
|
|
h.Handler.Monitor = h.Monitor
|
|
h.Handler.Version = "0.0.0"
|
|
h.Handler.BuildType = "OSS"
|
|
h.Handler.Controller = h.Controller
|
|
|
|
if testing.Verbose() {
|
|
l := logger.New(os.Stdout)
|
|
h.Handler.Logger = l
|
|
}
|
|
|
|
return h
|
|
}
|
|
|
|
// HandlerMonitor is a mock implementation of Handler.Monitor.
|
|
type HandlerMonitor struct {
|
|
StatisticsFn func(tags map[string]string) ([]*monitor.Statistic, error)
|
|
DiagnosticsFn func() (map[string]*diagnostics.Diagnostics, error)
|
|
}
|
|
|
|
// newHandlerMonitor returns a HandlerMonitor with default implementations
|
|
// for each function.
|
|
func newHandlerMonitor() *HandlerMonitor {
|
|
return &HandlerMonitor{
|
|
StatisticsFn: func(_ map[string]string) ([]*monitor.Statistic, error) {
|
|
return nil, nil
|
|
},
|
|
DiagnosticsFn: func() (map[string]*diagnostics.Diagnostics, error) {
|
|
return make(map[string]*diagnostics.Diagnostics), nil
|
|
},
|
|
}
|
|
}
|
|
|
|
func (m *HandlerMonitor) Statistics(tags map[string]string) ([]*monitor.Statistic, error) {
|
|
return m.StatisticsFn(tags)
|
|
}
|
|
|
|
func (m *HandlerMonitor) Diagnostics() (map[string]*diagnostics.Diagnostics, error) {
|
|
return m.DiagnosticsFn()
|
|
}
|
|
|
|
// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
|
|
type HandlerStatementExecutor struct {
|
|
ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
|
|
}
|
|
|
|
func (e *HandlerStatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt influxql.Statement) error {
|
|
return e.ExecuteStatementFn(stmt, ctx)
|
|
}
|
|
|
|
// HandlerQueryAuthorizer is a mock implementation of Handler.QueryAuthorizer.
|
|
type HandlerQueryAuthorizer struct {
|
|
AuthorizeQueryFn func(u meta.User, query *influxql.Query, database string) error
|
|
AuthorizeCreateDatabaseFn func(u meta.User) error
|
|
AuthorizeCreateRetentionPolicyFn func(u meta.User, db string)
|
|
AuthorizeDeleteRetentionPolicyFn func(u meta.User, db string) error
|
|
}
|
|
|
|
func (a *HandlerQueryAuthorizer) AuthorizeQuery(u meta.User, q *influxql.Query, database string) (query.FineAuthorizer, error) {
|
|
return query.OpenAuthorizer, a.AuthorizeQueryFn(u, q, database)
|
|
}
|
|
|
|
func (a *HandlerQueryAuthorizer) AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error {
|
|
panic("AuthorizeDatabase: not implemented")
|
|
}
|
|
|
|
func (a *HandlerQueryAuthorizer) AuthorizeCreateDatabase(u meta.User) error {
|
|
return a.AuthorizeCreateDatabaseFn(u)
|
|
}
|
|
|
|
func (a *HandlerQueryAuthorizer) AuthorizeCreateRetentionPolicy(u meta.User, db string) error {
|
|
return a.AuthorizeCreateRetentionPolicy(u, db)
|
|
}
|
|
|
|
func (a *HandlerQueryAuthorizer) AuthorizeDeleteRetentionPolicy(u meta.User, db string) error {
|
|
return a.AuthorizeDeleteRetentionPolicyFn(u, db)
|
|
}
|
|
|
|
type HandlerPointsWriter struct {
|
|
WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
|
|
}
|
|
|
|
func (h *HandlerPointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
|
|
return h.WritePointsFn(database, retentionPolicy, consistencyLevel, user, points)
|
|
}
|
|
|
|
// MustNewRequest returns a new HTTP request. Panic on error.
|
|
func MustNewRequest(method, urlStr string, body io.Reader) *http.Request {
|
|
r, err := http.NewRequest(method, urlStr, body)
|
|
if err != nil {
|
|
panic(err.Error())
|
|
}
|
|
return r
|
|
}
|
|
|
|
// MustNewRequest returns a new HTTP request with the content type set. Panic on error.
|
|
func MustNewJSONRequest(method, urlStr string, body io.Reader) *http.Request {
|
|
r := MustNewRequest(method, urlStr, body)
|
|
r.Header.Set("Accept", "application/json")
|
|
return r
|
|
}
|
|
|
|
// MustJWTToken returns a new JWT token and signed string or panics trying.
|
|
func MustJWTToken(username, secret string, expired bool) (*jwt.Token, string) {
|
|
token := jwt.New(jwt.GetSigningMethod("HS512"))
|
|
token.Claims.(jwt.MapClaims)["username"] = username
|
|
if expired {
|
|
token.Claims.(jwt.MapClaims)["exp"] = time.Now().Add(-time.Second).Unix()
|
|
} else {
|
|
token.Claims.(jwt.MapClaims)["exp"] = time.Now().Add(time.Minute * 10).Unix()
|
|
}
|
|
signed, err := token.SignedString([]byte(secret))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return token, signed
|
|
}
|
|
|
|
// Ensure that user supplied headers are applied to responses.
|
|
func TestHandler_UserSuppliedHeaders(t *testing.T) {
|
|
|
|
endpoints := []struct {
|
|
method string
|
|
path string
|
|
}{
|
|
{method: "GET", path: "/ping"},
|
|
{method: "POST", path: "/api/v2/query"},
|
|
{method: "GET", path: "/query?db=foo&q=SELECT+*+FROM+bar"},
|
|
}
|
|
|
|
for _, endpoint := range endpoints {
|
|
t.Run(endpoint.method+endpoint.path, func(t *testing.T) {
|
|
headers := map[string]string{
|
|
"X-Best-Operating-System": "FreeBSD",
|
|
"X-Nana-Nana-Nana-Nana": "Batheader",
|
|
"X-Powered-By": "hamster in a wheel",
|
|
"X-Trek": "Live long and prosper",
|
|
}
|
|
// build a new handler with our headers as part of its configuration
|
|
h := NewHandlerWithConfig(NewHandlerConfig(WithHeaders(headers)))
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
// generate request request
|
|
req, err := http.NewRequest(endpoint.method, endpoint.path, nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// serve the request
|
|
h.ServeHTTP(w, req)
|
|
|
|
response := w.Result()
|
|
// ensure we received the headers we supplied
|
|
for k, v := range headers {
|
|
val, found := response.Header[k]
|
|
if !found {
|
|
t.Fatalf("Could not find header field %q in response", k)
|
|
continue
|
|
}
|
|
if v != val[0] {
|
|
t.Fatalf("value for header %q in http response is %q; expected %q", k, val, v)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|