Fix #418. Make sure none of the writes exceed 2M
parent
b626345070
commit
6e46865095
|
@ -700,6 +700,26 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Se
|
|||
|
||||
func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard cluster.Shard, sync bool) error {
|
||||
request := &protocol.Request{Type: &write, Database: &db, MultiSeries: series}
|
||||
// break the request if it's too big
|
||||
if request.Size() >= MAX_REQUEST_SIZE {
|
||||
if l := len(series); l > 1 {
|
||||
// create two requests with half the serie
|
||||
if err := self.write(db, series[:l/2], shard, sync); err != nil {
|
||||
return err
|
||||
}
|
||||
return self.write(db, series[l/2:], shard, sync)
|
||||
}
|
||||
|
||||
// otherwise, split the points of the only series
|
||||
s := series[0]
|
||||
l := len(s.Points)
|
||||
s1 := &protocol.Series{Name: s.Name, Fields: s.Fields, Points: s.Points[:l/2]}
|
||||
if err := self.write(db, []*protocol.Series{s1}, shard, sync); err != nil {
|
||||
return err
|
||||
}
|
||||
s2 := &protocol.Series{Name: s.Name, Fields: s.Fields, Points: s.Points[l/2:]}
|
||||
return self.write(db, []*protocol.Series{s2}, shard, sync)
|
||||
}
|
||||
if sync {
|
||||
return shard.SyncWrite(request)
|
||||
}
|
||||
|
|
|
@ -53,6 +53,23 @@ func (self *ServerSuite) TearDownSuite(c *C) {
|
|||
}
|
||||
}
|
||||
|
||||
func (self *ServerSuite) TestLargeRequestSize(c *C) {
|
||||
client := self.serverProcesses[0].GetClient("db1", c)
|
||||
c.Assert(client.CreateDatabase("db1"), IsNil)
|
||||
numberOfPoints := 2 * 1024 * 1024
|
||||
data := CreatePoints("test_large_requests", 1, numberOfPoints)
|
||||
self.serverProcesses[0].WriteData(data, c)
|
||||
for _, s := range self.serverProcesses {
|
||||
s.WaitForServerToSync()
|
||||
}
|
||||
for _, s := range self.serverProcesses {
|
||||
data = s.RunQuery("select count(column0) from test_large_requests", "m", c)
|
||||
c.Assert(data, HasLen, 1)
|
||||
c.Assert(data[0].Points, HasLen, 1)
|
||||
c.Assert(data[0].Points[0][1], Equals, float64(numberOfPoints))
|
||||
}
|
||||
}
|
||||
|
||||
func (self *ServerSuite) TestChangingRootPassword(c *C) {
|
||||
rootClient := self.serverProcesses[0].GetClient("", c)
|
||||
c.Assert(rootClient.CreateClusterAdmin("newroot", "root"), IsNil)
|
||||
|
|
|
@ -89,6 +89,10 @@ func (self *Point) GetFieldValueAsString(idx int) string {
|
|||
}
|
||||
}
|
||||
|
||||
func (self *Request) Size() int {
|
||||
return proto.Size(self)
|
||||
}
|
||||
|
||||
func DecodeRequest(buff *bytes.Buffer) (request *Request, err error) {
|
||||
request = &Request{}
|
||||
err = proto.Unmarshal(buff.Bytes(), request)
|
||||
|
@ -112,6 +116,10 @@ func (self *Request) Decode(data []byte) error {
|
|||
return proto.Unmarshal(data, self)
|
||||
}
|
||||
|
||||
func (self *Response) Size() int {
|
||||
return proto.Size(self)
|
||||
}
|
||||
|
||||
func DecodeResponse(buff *bytes.Buffer) (response *Response, err error) {
|
||||
response = &Response{}
|
||||
err = proto.Unmarshal(buff.Bytes(), response)
|
||||
|
|
Loading…
Reference in New Issue