Add zero length data checks.
parent
7ab19b9f91
commit
7880bc2452
|
@ -841,7 +841,6 @@ func TestSingleServer(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test3NodeServer(t *testing.T) {
|
||||
t.Skip()
|
||||
testName := "3-node server integration"
|
||||
if testing.Short() {
|
||||
t.Skip(fmt.Sprintf("skipping '%s'", testName))
|
||||
|
|
|
@ -548,6 +548,11 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error {
|
|||
return fmt.Errorf("decode: %s", err)
|
||||
}
|
||||
|
||||
// Panic if we received no data.
|
||||
if len(m.Data) == 0 {
|
||||
panic("messaging conn no data recv")
|
||||
}
|
||||
|
||||
// TODO: Write broker set updates, do not passthrough to channel.
|
||||
|
||||
// Write message to streaming channel.
|
||||
|
|
|
@ -72,4 +72,7 @@ var (
|
|||
|
||||
// ErrReaderClosed is returned when reading from a closed topic reader.
|
||||
ErrReaderClosed = errors.New("reader closed")
|
||||
|
||||
// ErrMessageDataRequired is returned when publishing a message without data.
|
||||
ErrMessageDataRequired = errors.New("message data required")
|
||||
)
|
||||
|
|
|
@ -116,10 +116,14 @@ func (h *Handler) postMessages(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// Read the request body.
|
||||
// Exit if there is no message data provided.
|
||||
data, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
h.error(w, err, http.StatusInternalServerError)
|
||||
return
|
||||
} else if len(data) == 0 {
|
||||
h.error(w, ErrMessageDataRequired, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Publish message to the broker.
|
||||
|
|
|
@ -1602,6 +1602,8 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
|
|||
var err error
|
||||
var maxIndex uint64
|
||||
for i, d := range shardData {
|
||||
assert(len(d) > 0, "raw series data required: topic=%d", i)
|
||||
|
||||
index, err := s.client.Publish(&messaging.Message{
|
||||
Type: writeRawSeriesMessageType,
|
||||
TopicID: i,
|
||||
|
|
|
@ -596,7 +596,9 @@ func TestServer_CreateRetentionPolicy(t *testing.T) {
|
|||
|
||||
// Ensure the database can create a new retention policy with infinite duration.
|
||||
func TestServer_CreateRetentionPolicyInfinite(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
c := test.NewMessagingClient()
|
||||
defer c.Close()
|
||||
s := OpenServer(c)
|
||||
defer s.Close()
|
||||
|
||||
// Create a database.
|
||||
|
@ -629,7 +631,9 @@ func TestServer_CreateRetentionPolicyInfinite(t *testing.T) {
|
|||
|
||||
// Ensure the database can creates a default retention policy.
|
||||
func TestServer_CreateRetentionPolicyDefault(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
c := test.NewMessagingClient()
|
||||
defer c.Close()
|
||||
s := OpenServer(c)
|
||||
defer s.Close()
|
||||
|
||||
s.RetentionAutoCreate = true
|
||||
|
|
Loading…
Reference in New Issue