Merge pull request #7163 from influxdata/js-close-notify-fix

Remove redundant code the from response formatter and fix CloseNotify
pull/7171/head^2
Jonathan A. Sternberg 2016-08-17 12:28:44 -05:00 committed by GitHub
commit 58afeb0b71
3 changed files with 112 additions and 12 deletions

View File

@ -440,7 +440,12 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
resp := Response{Results: make([]*influxql.Result, 0)}
// Status header is OK once this point is reached.
// Attempt to flush the header immediately so the client gets the header information
// and knows the query was accepted.
h.writeHeader(rw, http.StatusOK)
if w, ok := w.(http.Flusher); ok {
w.Flush()
}
// pull all results from the channel
rows := 0
@ -460,7 +465,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
n, _ := rw.WriteResponse(Response{
Results: []*influxql.Result{r},
})
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n+1))
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
w.(http.Flusher).Flush()
continue
}
@ -940,6 +945,9 @@ func (w gzipResponseWriter) Write(b []byte) (int, error) {
func (w gzipResponseWriter) Flush() {
w.Writer.(*gzip.Writer).Flush()
if w, ok := w.ResponseWriter.(http.Flusher); ok {
w.Flush()
}
}
func (w gzipResponseWriter) CloseNotify() <-chan bool {

View File

@ -7,6 +7,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"strings"
"testing"
@ -365,6 +366,73 @@ func TestHandler_Query_ErrResult(t *testing.T) {
}
}
// Ensure that closing the HTTP connection causes the query to be interrupted.
func TestHandler_Query_CloseNotify(t *testing.T) {
// Avoid leaking a goroutine when this fails.
done := make(chan struct{})
defer close(done)
interrupted := make(chan struct{})
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
select {
case <-ctx.InterruptCh:
case <-done:
}
close(interrupted)
return nil
}
s := httptest.NewServer(h)
defer s.Close()
// Parse the URL and generate a query request.
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}
u.Path = "/query"
values := url.Values{}
values.Set("q", "SELECT * FROM cpu")
values.Set("db", "db0")
values.Set("rp", "rp0")
values.Set("chunked", "true")
u.RawQuery = values.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
t.Fatal(err)
}
// Perform the request and retrieve the response.
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
// Validate that the interrupted channel has NOT been closed yet.
timer := time.NewTimer(100 * time.Millisecond)
select {
case <-interrupted:
timer.Stop()
t.Fatal("query interrupted unexpectedly")
case <-timer.C:
}
// Close the response body which should abort the query in the handler.
resp.Body.Close()
// The query should abort within 100 milliseconds.
timer.Reset(100 * time.Millisecond)
select {
case <-interrupted:
timer.Stop()
case <-timer.C:
t.Fatal("timeout while waiting for query to abort")
}
}
// Ensure the handler handles ping requests correctly.
// TODO: This should be expanded to verify the MetaClient check in servePing is working correctly
func TestHandler_Ping(t *testing.T) {

View File

@ -18,13 +18,15 @@ type ResponseWriter interface {
// in the request that wraps the ResponseWriter.
func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter {
pretty := r.URL.Query().Get("pretty") == "true"
rw := &responseWriter{ResponseWriter: w}
switch r.Header.Get("Accept") {
case "application/json":
fallthrough
default:
w.Header().Add("Content-Type", "application/json")
return &jsonResponseWriter{Pretty: pretty, ResponseWriter: w}
rw.formatter = &jsonFormatter{Pretty: pretty, Writer: w}
}
return rw
}
// WriteError is a convenience function for writing an error response to the ResponseWriter.
@ -32,12 +34,41 @@ func WriteError(w ResponseWriter, err error) (int, error) {
return w.WriteResponse(Response{Err: err})
}
type jsonResponseWriter struct {
Pretty bool
// responseWriter is an implementation of ResponseWriter.
type responseWriter struct {
formatter interface {
WriteResponse(resp Response) (int, error)
}
http.ResponseWriter
}
func (w *jsonResponseWriter) WriteResponse(resp Response) (n int, err error) {
// WriteResponse writes the response using the formatter.
func (w *responseWriter) WriteResponse(resp Response) (int, error) {
return w.formatter.WriteResponse(resp)
}
// Flush flushes the ResponseWriter if it has a Flush() method.
func (w *responseWriter) Flush() {
if w, ok := w.ResponseWriter.(http.Flusher); ok {
w.Flush()
}
}
// CloseNotify calls CloseNotify on the underlying http.ResponseWriter if it
// exists. Otherwise, it returns a nil channel that will never notify.
func (w *responseWriter) CloseNotify() <-chan bool {
if notifier, ok := w.ResponseWriter.(http.CloseNotifier); ok {
return notifier.CloseNotify()
}
return nil
}
type jsonFormatter struct {
io.Writer
Pretty bool
}
func (w *jsonFormatter) WriteResponse(resp Response) (n int, err error) {
var b []byte
if w.Pretty {
b, err = json.MarshalIndent(resp, "", " ")
@ -55,10 +86,3 @@ func (w *jsonResponseWriter) WriteResponse(resp Response) (n int, err error) {
n++
return n, err
}
// Flush flushes the ResponseWriter if it has a Flush() method.
func (w *jsonResponseWriter) Flush() {
if w, ok := w.ResponseWriter.(http.Flusher); ok {
w.Flush()
}
}