Add POST /query endpoint and warning messages for using GET with write operations

In order to follow REST a bit more carefully, all write operations
should go through a POST in the future. We still allow read operations
through either GET or POST (similar to the Graphite /render endpoint),
but write operations will trigger a returned warning as part of the JSON
response and will eventually return an error.

Also updates the Golang client libraries to always use POST instead of
GET.

Fixes #6290.
pull/6390/head
Jonathan A. Sternberg 2016-04-29 09:00:21 -04:00
parent 87e531e8f6
commit 6f61c0ea4a
11 changed files with 125 additions and 35 deletions

View File

@ -20,7 +20,7 @@
- [#4675](https://github.com/influxdata/influxdb/issues/4675): Allow derivative() function to be used with ORDER BY desc.
- [#6483](https://github.com/influxdata/influxdb/pull/6483): Delete series support for TSM
- [#6484](https://github.com/influxdata/influxdb/pull/6484): Query language support for DELETE
- [#6290](https://github.com/influxdata/influxdb/issues/6290): Add POST /query endpoint and warning messages for using GET with write operations.
### Bugfixes

View File

@ -181,7 +181,7 @@ func (c *Client) Query(q Query) (*Response, error) {
}
u.RawQuery = values.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}

View File

@ -541,7 +541,7 @@ func (c *client) Query(q Query) (*Response, error) {
u := c.url
u.Path = "query"
req, err := http.NewRequest("GET", u.String(), nil)
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}

View File

@ -48,10 +48,19 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ
var err error
switch stmt := stmt.(type) {
case *influxql.AlterRetentionPolicyStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeAlterRetentionPolicyStatement(stmt)
case *influxql.CreateContinuousQueryStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeCreateContinuousQueryStatement(stmt)
case *influxql.CreateDatabaseStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
if stmt.IfNotExists {
ctx.Log.Println("WARNING: IF NOT EXISTS is deprecated as of v0.13.0 and will be removed in v1.0")
messages = append(messages, &influxql.Message{
@ -61,36 +70,81 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ
}
err = e.executeCreateDatabaseStatement(stmt)
case *influxql.CreateRetentionPolicyStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeCreateRetentionPolicyStatement(stmt)
case *influxql.CreateSubscriptionStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeCreateSubscriptionStatement(stmt)
case *influxql.CreateUserStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeCreateUserStatement(stmt)
case *influxql.DeleteSeriesStatement:
err = e.executeDeleteSeriesStatement(stmt, ctx.Database)
case *influxql.DropContinuousQueryStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeDropContinuousQueryStatement(stmt)
case *influxql.DropDatabaseStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeDropDatabaseStatement(stmt)
case *influxql.DropMeasurementStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeDropMeasurementStatement(stmt, ctx.Database)
case *influxql.DropSeriesStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeDropSeriesStatement(stmt, ctx.Database)
case *influxql.DropRetentionPolicyStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeDropRetentionPolicyStatement(stmt)
case *influxql.DropShardStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeDropShardStatement(stmt)
case *influxql.DropSubscriptionStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeDropSubscriptionStatement(stmt)
case *influxql.DropUserStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeDropUserStatement(stmt)
case *influxql.GrantStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeGrantAdminStatement(stmt)
case *influxql.RevokeStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeRevokeAdminStatement(stmt)
case *influxql.ShowContinuousQueriesStatement:
rows, err = e.executeShowContinuousQueriesStatement(stmt)
@ -113,6 +167,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ
case *influxql.ShowUsersStatement:
rows, err = e.executeShowUsersStatement(stmt)
case *influxql.SetPasswordUserStatement:
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
err = e.executeSetPasswordUserStatement(stmt)
default:
return influxql.ErrInvalidQuery
@ -423,11 +480,6 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
break
}
result := &influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{row},
}
// Write points back into system for INTO statements.
if stmt.Target != nil {
if err := e.writeInto(stmt, row); err != nil {
@ -437,6 +489,11 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
continue
}
result := &influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{row},
}
// Send results or exit if closing.
select {
case <-ctx.InterruptCh:
@ -449,8 +506,14 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
// Emit write count if an INTO statement.
if stmt.Target != nil {
var messages []*influxql.Message
if ctx.ReadOnly {
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
ctx.Results <- &influxql.Result{
StatementID: ctx.StatementID,
Messages: messages,
Series: []*models.Row{{
Name: "result",
Columns: []string{"time", "written"},

View File

@ -199,7 +199,7 @@ func DefaultQueryExecutor() *QueryExecutor {
// ExecuteQuery parses query and executes against the database.
func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-chan *influxql.Result {
return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, make(chan struct{}))
return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, false, make(chan struct{}))
}
// TSDBStore is a mockable implementation of cluster.TSDBStore.

View File

@ -143,7 +143,7 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin
v, _ = url.ParseQuery(values.Encode())
}
v.Set("q", query)
return s.HTTPGet(s.URL() + "/query?" + v.Encode())
return s.HTTPPost(s.URL()+"/query?"+v.Encode(), nil)
}
// MustQueryWithParams executes a query against the server and returns the results.

View File

@ -79,6 +79,9 @@ type ExecutionContext struct {
// The requested maximum number of points to return in each result.
ChunkSize int
// If this query is being executed in a read-only context.
ReadOnly bool
// Hold the query executor's logger.
Log *log.Logger
@ -158,13 +161,13 @@ func (e *QueryExecutor) SetLogOutput(w io.Writer) {
}
// ExecuteQuery executes each statement within a query.
func (e *QueryExecutor) ExecuteQuery(query *Query, database string, chunkSize int, closing chan struct{}) <-chan *Result {
func (e *QueryExecutor) ExecuteQuery(query *Query, database string, chunkSize int, readonly bool, closing chan struct{}) <-chan *Result {
results := make(chan *Result)
go e.executeQuery(query, database, chunkSize, closing, results)
go e.executeQuery(query, database, chunkSize, readonly, closing, results)
return results
}
func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize int, closing <-chan struct{}, results chan *Result) {
func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize int, readonly bool, closing <-chan struct{}, results chan *Result) {
defer close(results)
defer e.recover(query, results)
@ -188,6 +191,7 @@ func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize in
Results: results,
Database: database,
ChunkSize: chunkSize,
ReadOnly: readonly,
Log: e.Logger,
InterruptCh: task.closing,
}
@ -240,9 +244,15 @@ loop:
}
continue loop
case *KillQueryStatement:
var messages []*Message
if ctx.ReadOnly {
messages = append(messages, ReadOnlyWarning(stmt.String()))
}
err := e.executeKillQueryStatement(stmt)
results <- &Result{
StatementID: i,
Messages: messages,
Err: err,
}

View File

@ -39,7 +39,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) {
},
}
discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
discardOutput(e.ExecuteQuery(q, "mydb", 100, false, nil))
}
func TestQueryExecutor_KillQuery(t *testing.T) {
@ -64,12 +64,12 @@ func TestQueryExecutor_KillQuery(t *testing.T) {
},
}
results := e.ExecuteQuery(q, "mydb", 100, nil)
results := e.ExecuteQuery(q, "mydb", 100, false, nil)
q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid))
if err != nil {
t.Fatal(err)
}
discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
discardOutput(e.ExecuteQuery(q, "mydb", 100, false, nil))
result := <-results
if result.Err != influxql.ErrQueryInterrupted {
@ -97,7 +97,7 @@ func TestQueryExecutor_Interrupt(t *testing.T) {
}
closing := make(chan struct{})
results := e.ExecuteQuery(q, "mydb", 100, closing)
results := e.ExecuteQuery(q, "mydb", 100, false, closing)
close(closing)
result := <-results
if result.Err != influxql.ErrQueryInterrupted {
@ -124,7 +124,7 @@ func TestQueryExecutor_ShowQueries(t *testing.T) {
t.Fatal(err)
}
results := e.ExecuteQuery(q, "", 100, nil)
results := e.ExecuteQuery(q, "", 100, false, nil)
result := <-results
if len(result.Series) != 1 {
t.Errorf("expected %d rows, got %d", 1, len(result.Series))
@ -154,7 +154,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) {
}
e.QueryTimeout = time.Nanosecond
results := e.ExecuteQuery(q, "mydb", 100, nil)
results := e.ExecuteQuery(q, "mydb", 100, false, nil)
result := <-results
if result.Err != influxql.ErrQueryTimeoutReached {
t.Errorf("unexpected error: %s", result.Err)
@ -181,11 +181,11 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) {
defer e.Close()
// Start first query and wait for it to be executing.
go discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
go discardOutput(e.ExecuteQuery(q, "mydb", 100, false, nil))
<-qid
// Start second query and expect for it to fail.
results := e.ExecuteQuery(q, "mydb", 100, nil)
results := e.ExecuteQuery(q, "mydb", 100, false, nil)
select {
case result := <-results:
@ -219,7 +219,7 @@ func TestQueryExecutor_Close(t *testing.T) {
},
}
results := e.ExecuteQuery(q, "mydb", 100, nil)
results := e.ExecuteQuery(q, "mydb", 100, false, nil)
go func(results <-chan *influxql.Result) {
result := <-results
if result.Err != influxql.ErrQueryEngineShutdown {
@ -240,7 +240,7 @@ func TestQueryExecutor_Close(t *testing.T) {
t.Error("closing the query manager did not kill the query after 100 milliseconds")
}
results = e.ExecuteQuery(q, "mydb", 100, nil)
results = e.ExecuteQuery(q, "mydb", 100, false, nil)
result := <-results
if len(result.Series) != 0 {
t.Errorf("expected %d rows, got %d", 0, len(result.Series))
@ -263,7 +263,7 @@ func TestQueryExecutor_Panic(t *testing.T) {
},
}
results := e.ExecuteQuery(q, "mydb", 100, nil)
results := e.ExecuteQuery(q, "mydb", 100, false, nil)
result := <-results
if len(result.Series) != 0 {
t.Errorf("expected %d rows, got %d", 0, len(result.Series))

View File

@ -3,6 +3,7 @@ package influxql
import (
"encoding/json"
"errors"
"fmt"
"github.com/influxdata/influxdb/models"
)
@ -40,6 +41,18 @@ type Message struct {
Text string `json:"text"`
}
// ReadOnlyWarning generates a warning message that tells the user the command
// they are using is being used for writing in a read only context.
//
// This is a temporary method while to be used while transitioning to read only
// operations for issue #6290.
func ReadOnlyWarning(stmt string) *Message {
return &Message{
Level: WarningLevel,
Text: fmt.Sprintf("deprecated use of '%s' in a read only context, please use a POST request instead", stmt),
}
}
// Result represents a resultset returned from a single statement.
// Rows represents a list of rows that can be sorted consistently by name/tag.
type Result struct {

View File

@ -348,7 +348,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
defer close(closing)
// Execute the SELECT.
ch := s.QueryExecutor.ExecuteQuery(q, cq.Database, NoChunkingSize, closing)
ch := s.QueryExecutor.ExecuteQuery(q, cq.Database, NoChunkingSize, false, closing)
// There is only one statement, so we will only ever receive one result
res, ok := <-ch

View File

@ -103,6 +103,10 @@ func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, rowLimit
"query", // Query serving route.
"GET", "/query", true, true, h.serveQuery,
},
Route{
"query", // Query serving route.
"POST", "/query", true, true, h.serveQuery,
},
Route{
"write-options", // Satisfy CORS checks.
"OPTIONS", "/write", true, true, h.serveOptions,
@ -243,19 +247,18 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
h.statMap.Add(statQueryRequestDuration, time.Since(start).Nanoseconds())
}(time.Now())
q := r.URL.Query()
pretty := q.Get("pretty") == "true"
pretty := r.FormValue("pretty") == "true"
qp := strings.TrimSpace(q.Get("q"))
qp := strings.TrimSpace(r.FormValue("q"))
if qp == "" {
httpError(w, `missing required parameter "q"`, pretty, http.StatusBadRequest)
return
}
epoch := strings.TrimSpace(q.Get("epoch"))
epoch := strings.TrimSpace(r.FormValue("epoch"))
p := influxql.NewParser(strings.NewReader(qp))
db := q.Get("db")
db := r.FormValue("db")
// Sanitize the request query params so it doesn't show up in the response logger.
// Do this before anything else so a parsing error doesn't leak passwords.
@ -280,10 +283,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
}
// Parse chunk size. Use default if not provided or unparsable.
chunked := (q.Get("chunked") == "true")
chunked := (r.FormValue("chunked") == "true")
chunkSize := DefaultChunkSize
if chunked {
if n, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64); err == nil && int(n) > 0 {
if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 {
chunkSize = int(n)
}
}
@ -314,7 +317,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
// Execute query.
w.Header().Add("Connection", "close")
w.Header().Add("content-type", "application/json")
results := h.QueryExecutor.ExecuteQuery(query, db, chunkSize, closing)
readonly := r.Method == "GET" || r.Method == "HEAD"
results := h.QueryExecutor.ExecuteQuery(query, db, chunkSize, readonly, closing)
// if we're not chunking, this will be the in memory buffer for all results before sending to client
resp := Response{Results: make([]*influxql.Result, 0)}