From 71b54768d0f329c4d2777dc34bbf45baadb621a3 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 16 Aug 2016 15:52:43 -0500 Subject: [PATCH 1/2] Remove redundant code the from response formatter and fix CloseNotify The query killing functionality depends on the ResponseWriter exposing a CloseNotify method. Since we wrap the http.ResponseWriter, the new struct does not have that method and the HTTP handler would skip past calling that method. Instead of duplicating `Flush()` and `CloseNotify()` for every response formatter, we will unify all of that under a single struct and create formatters instead. Also, fixes a bug where the header information from a query would not be returned until some other data was returned with it because of buffering and another bug in the gzipResponseWriter that wouldn't flush the actual underlying ResponseWriter. --- services/httpd/handler.go | 8 ++++ services/httpd/handler_test.go | 68 +++++++++++++++++++++++++++++++ services/httpd/response_writer.go | 46 ++++++++++++++++----- 3 files changed, 111 insertions(+), 11 deletions(-) diff --git a/services/httpd/handler.go b/services/httpd/handler.go index f2cc5c6a3e..c8c7ff3c54 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -440,7 +440,12 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. resp := Response{Results: make([]*influxql.Result, 0)} // Status header is OK once this point is reached. + // Attempt to flush the header immediately so the client gets the header information + // and knows the query was accepted. h.writeHeader(rw, http.StatusOK) + if w, ok := w.(http.Flusher); ok { + w.Flush() + } // pull all results from the channel rows := 0 @@ -940,6 +945,9 @@ func (w gzipResponseWriter) Write(b []byte) (int, error) { func (w gzipResponseWriter) Flush() { w.Writer.(*gzip.Writer).Flush() + if w, ok := w.ResponseWriter.(http.Flusher); ok { + w.Flush() + } } func (w gzipResponseWriter) CloseNotify() <-chan bool { diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 630bd2d187..2cdae58fb1 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "regexp" "strings" "testing" @@ -365,6 +366,73 @@ func TestHandler_Query_ErrResult(t *testing.T) { } } +// Ensure that closing the HTTP connection causes the query to be interrupted. +func TestHandler_Query_CloseNotify(t *testing.T) { + // Avoid leaking a goroutine when this fails. + done := make(chan struct{}) + defer close(done) + + interrupted := make(chan struct{}) + h := NewHandler(false) + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { + select { + case <-ctx.InterruptCh: + case <-done: + } + close(interrupted) + return nil + } + + s := httptest.NewServer(h) + defer s.Close() + + // Parse the URL and generate a query request. + u, err := url.Parse(s.URL) + if err != nil { + t.Fatal(err) + } + u.Path = "/query" + + values := url.Values{} + values.Set("q", "SELECT * FROM cpu") + values.Set("db", "db0") + values.Set("rp", "rp0") + values.Set("chunked", "true") + u.RawQuery = values.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + t.Fatal(err) + } + + // Perform the request and retrieve the response. + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + + // Validate that the interrupted channel has NOT been closed yet. + timer := time.NewTimer(100 * time.Millisecond) + select { + case <-interrupted: + timer.Stop() + t.Fatal("query interrupted unexpectedly") + case <-timer.C: + } + + // Close the response body which should abort the query in the handler. + resp.Body.Close() + + // The query should abort within 100 milliseconds. + timer.Reset(100 * time.Millisecond) + select { + case <-interrupted: + timer.Stop() + case <-timer.C: + t.Fatal("timeout while waiting for query to abort") + } +} + // Ensure the handler handles ping requests correctly. // TODO: This should be expanded to verify the MetaClient check in servePing is working correctly func TestHandler_Ping(t *testing.T) { diff --git a/services/httpd/response_writer.go b/services/httpd/response_writer.go index d4d4ab65ba..1d8b375057 100644 --- a/services/httpd/response_writer.go +++ b/services/httpd/response_writer.go @@ -18,13 +18,15 @@ type ResponseWriter interface { // in the request that wraps the ResponseWriter. func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter { pretty := r.URL.Query().Get("pretty") == "true" + rw := &responseWriter{ResponseWriter: w} switch r.Header.Get("Accept") { case "application/json": fallthrough default: w.Header().Add("Content-Type", "application/json") - return &jsonResponseWriter{Pretty: pretty, ResponseWriter: w} + rw.formatter = &jsonFormatter{Pretty: pretty, Writer: w} } + return rw } // WriteError is a convenience function for writing an error response to the ResponseWriter. @@ -32,12 +34,41 @@ func WriteError(w ResponseWriter, err error) (int, error) { return w.WriteResponse(Response{Err: err}) } -type jsonResponseWriter struct { - Pretty bool +// responseWriter is an implementation of ResponseWriter. +type responseWriter struct { + formatter interface { + WriteResponse(resp Response) (int, error) + } http.ResponseWriter } -func (w *jsonResponseWriter) WriteResponse(resp Response) (n int, err error) { +// WriteResponse writes the response using the formatter. +func (w *responseWriter) WriteResponse(resp Response) (int, error) { + return w.formatter.WriteResponse(resp) +} + +// Flush flushes the ResponseWriter if it has a Flush() method. +func (w *responseWriter) Flush() { + if w, ok := w.ResponseWriter.(http.Flusher); ok { + w.Flush() + } +} + +// CloseNotify calls CloseNotify on the underlying http.ResponseWriter if it +// exists. Otherwise, it returns a nil channel that will never notify. +func (w *responseWriter) CloseNotify() <-chan bool { + if notifier, ok := w.ResponseWriter.(http.CloseNotifier); ok { + return notifier.CloseNotify() + } + return nil +} + +type jsonFormatter struct { + io.Writer + Pretty bool +} + +func (w *jsonFormatter) WriteResponse(resp Response) (n int, err error) { var b []byte if w.Pretty { b, err = json.MarshalIndent(resp, "", " ") @@ -55,10 +86,3 @@ func (w *jsonResponseWriter) WriteResponse(resp Response) (n int, err error) { n++ return n, err } - -// Flush flushes the ResponseWriter if it has a Flush() method. -func (w *jsonResponseWriter) Flush() { - if w, ok := w.ResponseWriter.(http.Flusher); ok { - w.Flush() - } -} From d2746ee8f2a74a5dbec167c739c1290f95852503 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Wed, 17 Aug 2016 11:54:19 -0500 Subject: [PATCH 2/2] The number of bytes recorded when using chunking was off by one Previously, we implicitly added a newline and had to add one to the number of bytes transmitted because we added that byte. That was removed at some point and the metric was not updated to record the correct value. --- services/httpd/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/httpd/handler.go b/services/httpd/handler.go index c8c7ff3c54..1b7d7aac16 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -465,7 +465,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. n, _ := rw.WriteResponse(Response{ Results: []*influxql.Result{r}, }) - atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n+1)) + atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) w.(http.Flusher).Flush() continue }