diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index e35f661264..90eefa9064 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -700,6 +700,26 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Se func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard cluster.Shard, sync bool) error { request := &protocol.Request{Type: &write, Database: &db, MultiSeries: series} + // break the request if it's too big + if request.Size() >= MAX_REQUEST_SIZE { + if l := len(series); l > 1 { + // create two requests with half the serie + if err := self.write(db, series[:l/2], shard, sync); err != nil { + return err + } + return self.write(db, series[l/2:], shard, sync) + } + + // otherwise, split the points of the only series + s := series[0] + l := len(s.Points) + s1 := &protocol.Series{Name: s.Name, Fields: s.Fields, Points: s.Points[:l/2]} + if err := self.write(db, []*protocol.Series{s1}, shard, sync); err != nil { + return err + } + s2 := &protocol.Series{Name: s.Name, Fields: s.Fields, Points: s.Points[l/2:]} + return self.write(db, []*protocol.Series{s2}, shard, sync) + } if sync { return shard.SyncWrite(request) } diff --git a/src/integration/multiple_servers_test.go b/src/integration/multiple_servers_test.go index 28331da89b..e08dcdf675 100644 --- a/src/integration/multiple_servers_test.go +++ b/src/integration/multiple_servers_test.go @@ -53,6 +53,23 @@ func (self *ServerSuite) TearDownSuite(c *C) { } } +func (self *ServerSuite) TestLargeRequestSize(c *C) { + client := self.serverProcesses[0].GetClient("db1", c) + c.Assert(client.CreateDatabase("db1"), IsNil) + numberOfPoints := 2 * 1024 * 1024 + data := CreatePoints("test_large_requests", 1, numberOfPoints) + self.serverProcesses[0].WriteData(data, c) + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } + for _, s := range self.serverProcesses { + data = s.RunQuery("select count(column0) from test_large_requests", "m", c) + c.Assert(data, HasLen, 1) + c.Assert(data[0].Points, HasLen, 1) + c.Assert(data[0].Points[0][1], Equals, float64(numberOfPoints)) + } +} + func (self *ServerSuite) TestChangingRootPassword(c *C) { rootClient := self.serverProcesses[0].GetClient("", c) c.Assert(rootClient.CreateClusterAdmin("newroot", "root"), IsNil) diff --git a/src/protocol/protocol_extensions.go b/src/protocol/protocol_extensions.go index cafc70b4df..27f601e20b 100644 --- a/src/protocol/protocol_extensions.go +++ b/src/protocol/protocol_extensions.go @@ -89,6 +89,10 @@ func (self *Point) GetFieldValueAsString(idx int) string { } } +func (self *Request) Size() int { + return proto.Size(self) +} + func DecodeRequest(buff *bytes.Buffer) (request *Request, err error) { request = &Request{} err = proto.Unmarshal(buff.Bytes(), request) @@ -112,6 +116,10 @@ func (self *Request) Decode(data []byte) error { return proto.Unmarshal(data, self) } +func (self *Response) Size() int { + return proto.Size(self) +} + func DecodeResponse(buff *bytes.Buffer) (response *Response, err error) { response = &Response{} err = proto.Unmarshal(buff.Bytes(), response)