diff --git a/services/httpd/handler.go b/services/httpd/handler.go index f2cc5c6a3e..1b7d7aac16 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -440,7 +440,12 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. resp := Response{Results: make([]*influxql.Result, 0)} // Status header is OK once this point is reached. + // Attempt to flush the header immediately so the client gets the header information + // and knows the query was accepted. h.writeHeader(rw, http.StatusOK) + if w, ok := w.(http.Flusher); ok { + w.Flush() + } // pull all results from the channel rows := 0 @@ -460,7 +465,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. n, _ := rw.WriteResponse(Response{ Results: []*influxql.Result{r}, }) - atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n+1)) + atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) w.(http.Flusher).Flush() continue } @@ -940,6 +945,9 @@ func (w gzipResponseWriter) Write(b []byte) (int, error) { func (w gzipResponseWriter) Flush() { w.Writer.(*gzip.Writer).Flush() + if w, ok := w.ResponseWriter.(http.Flusher); ok { + w.Flush() + } } func (w gzipResponseWriter) CloseNotify() <-chan bool { diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 630bd2d187..2cdae58fb1 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "regexp" "strings" "testing" @@ -365,6 +366,73 @@ func TestHandler_Query_ErrResult(t *testing.T) { } } +// Ensure that closing the HTTP connection causes the query to be interrupted. +func TestHandler_Query_CloseNotify(t *testing.T) { + // Avoid leaking a goroutine when this fails. + done := make(chan struct{}) + defer close(done) + + interrupted := make(chan struct{}) + h := NewHandler(false) + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { + select { + case <-ctx.InterruptCh: + case <-done: + } + close(interrupted) + return nil + } + + s := httptest.NewServer(h) + defer s.Close() + + // Parse the URL and generate a query request. + u, err := url.Parse(s.URL) + if err != nil { + t.Fatal(err) + } + u.Path = "/query" + + values := url.Values{} + values.Set("q", "SELECT * FROM cpu") + values.Set("db", "db0") + values.Set("rp", "rp0") + values.Set("chunked", "true") + u.RawQuery = values.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + t.Fatal(err) + } + + // Perform the request and retrieve the response. + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + + // Validate that the interrupted channel has NOT been closed yet. + timer := time.NewTimer(100 * time.Millisecond) + select { + case <-interrupted: + timer.Stop() + t.Fatal("query interrupted unexpectedly") + case <-timer.C: + } + + // Close the response body which should abort the query in the handler. + resp.Body.Close() + + // The query should abort within 100 milliseconds. + timer.Reset(100 * time.Millisecond) + select { + case <-interrupted: + timer.Stop() + case <-timer.C: + t.Fatal("timeout while waiting for query to abort") + } +} + // Ensure the handler handles ping requests correctly. // TODO: This should be expanded to verify the MetaClient check in servePing is working correctly func TestHandler_Ping(t *testing.T) { diff --git a/services/httpd/response_writer.go b/services/httpd/response_writer.go index d4d4ab65ba..1d8b375057 100644 --- a/services/httpd/response_writer.go +++ b/services/httpd/response_writer.go @@ -18,13 +18,15 @@ type ResponseWriter interface { // in the request that wraps the ResponseWriter. func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter { pretty := r.URL.Query().Get("pretty") == "true" + rw := &responseWriter{ResponseWriter: w} switch r.Header.Get("Accept") { case "application/json": fallthrough default: w.Header().Add("Content-Type", "application/json") - return &jsonResponseWriter{Pretty: pretty, ResponseWriter: w} + rw.formatter = &jsonFormatter{Pretty: pretty, Writer: w} } + return rw } // WriteError is a convenience function for writing an error response to the ResponseWriter. @@ -32,12 +34,41 @@ func WriteError(w ResponseWriter, err error) (int, error) { return w.WriteResponse(Response{Err: err}) } -type jsonResponseWriter struct { - Pretty bool +// responseWriter is an implementation of ResponseWriter. +type responseWriter struct { + formatter interface { + WriteResponse(resp Response) (int, error) + } http.ResponseWriter } -func (w *jsonResponseWriter) WriteResponse(resp Response) (n int, err error) { +// WriteResponse writes the response using the formatter. +func (w *responseWriter) WriteResponse(resp Response) (int, error) { + return w.formatter.WriteResponse(resp) +} + +// Flush flushes the ResponseWriter if it has a Flush() method. +func (w *responseWriter) Flush() { + if w, ok := w.ResponseWriter.(http.Flusher); ok { + w.Flush() + } +} + +// CloseNotify calls CloseNotify on the underlying http.ResponseWriter if it +// exists. Otherwise, it returns a nil channel that will never notify. +func (w *responseWriter) CloseNotify() <-chan bool { + if notifier, ok := w.ResponseWriter.(http.CloseNotifier); ok { + return notifier.CloseNotify() + } + return nil +} + +type jsonFormatter struct { + io.Writer + Pretty bool +} + +func (w *jsonFormatter) WriteResponse(resp Response) (n int, err error) { var b []byte if w.Pretty { b, err = json.MarshalIndent(resp, "", " ") @@ -55,10 +86,3 @@ func (w *jsonResponseWriter) WriteResponse(resp Response) (n int, err error) { n++ return n, err } - -// Flush flushes the ResponseWriter if it has a Flush() method. -func (w *jsonResponseWriter) Flush() { - if w, ok := w.ResponseWriter.(http.Flusher); ok { - w.Flush() - } -}