From 7880bc24522cfb090bfafb1bac0e12afd179a683 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 12 Mar 2015 12:12:26 -0600 Subject: [PATCH] Add zero length data checks. --- cmd/influxd/server_integration_test.go | 1 - messaging/client.go | 5 +++++ messaging/errors.go | 3 +++ messaging/handler.go | 4 ++++ server.go | 2 ++ server_test.go | 8 ++++++-- 6 files changed, 20 insertions(+), 3 deletions(-) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 632b7a749b..f3ba0edfce 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -841,7 +841,6 @@ func TestSingleServer(t *testing.T) { } func Test3NodeServer(t *testing.T) { - t.Skip() testName := "3-node server integration" if testing.Short() { t.Skip(fmt.Sprintf("skipping '%s'", testName)) diff --git a/messaging/client.go b/messaging/client.go index 2cf9423c4e..f03002cc85 100644 --- a/messaging/client.go +++ b/messaging/client.go @@ -548,6 +548,11 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error { return fmt.Errorf("decode: %s", err) } + // Panic if we received no data. + if len(m.Data) == 0 { + panic("messaging conn no data recv") + } + // TODO: Write broker set updates, do not passthrough to channel. // Write message to streaming channel. diff --git a/messaging/errors.go b/messaging/errors.go index 4dc1927679..35462b99e1 100644 --- a/messaging/errors.go +++ b/messaging/errors.go @@ -72,4 +72,7 @@ var ( // ErrReaderClosed is returned when reading from a closed topic reader. ErrReaderClosed = errors.New("reader closed") + + // ErrMessageDataRequired is returned when publishing a message without data. + ErrMessageDataRequired = errors.New("message data required") ) diff --git a/messaging/handler.go b/messaging/handler.go index 61e7113685..51a1492356 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -116,10 +116,14 @@ func (h *Handler) postMessages(w http.ResponseWriter, r *http.Request) { } // Read the request body. + // Exit if there is no message data provided. data, err := ioutil.ReadAll(r.Body) if err != nil { h.error(w, err, http.StatusInternalServerError) return + } else if len(data) == 0 { + h.error(w, ErrMessageDataRequired, http.StatusBadRequest) + return } // Publish message to the broker. diff --git a/server.go b/server.go index 70826e5a56..51bd26d682 100644 --- a/server.go +++ b/server.go @@ -1602,6 +1602,8 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( var err error var maxIndex uint64 for i, d := range shardData { + assert(len(d) > 0, "raw series data required: topic=%d", i) + index, err := s.client.Publish(&messaging.Message{ Type: writeRawSeriesMessageType, TopicID: i, diff --git a/server_test.go b/server_test.go index f97857a035..bb62961f9c 100644 --- a/server_test.go +++ b/server_test.go @@ -596,7 +596,9 @@ func TestServer_CreateRetentionPolicy(t *testing.T) { // Ensure the database can create a new retention policy with infinite duration. func TestServer_CreateRetentionPolicyInfinite(t *testing.T) { - s := OpenServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + s := OpenServer(c) defer s.Close() // Create a database. @@ -629,7 +631,9 @@ func TestServer_CreateRetentionPolicyInfinite(t *testing.T) { // Ensure the database can creates a default retention policy. func TestServer_CreateRetentionPolicyDefault(t *testing.T) { - s := OpenServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + s := OpenServer(c) defer s.Close() s.RetentionAutoCreate = true