diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index be224e0e4d..d7dc6f7c21 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -468,10 +468,16 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series lastPointIndex := 0 now := common.CurrentTime() var shardToWrite cluster.Shard - for i, point := range series.Points { + for _, point := range series.Points { if point.Timestamp == nil { point.Timestamp = &now } + } + + // sort the points by timestamp + series.SortPointsTimeDescending() + + for i, point := range series.Points { if *point.Timestamp != lastTime { shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, *series.Name, *point.Timestamp) if err != nil { diff --git a/src/coordinator/protobuf_client.go b/src/coordinator/protobuf_client.go index 083b233c76..94ba7cec4f 100644 --- a/src/coordinator/protobuf_client.go +++ b/src/coordinator/protobuf_client.go @@ -136,23 +136,29 @@ func (self *ProtobufClient) readResponses() { buff.Reset() conn := self.getConnection() if conn == nil { + time.Sleep(200 * time.Millisecond) continue } var messageSizeU uint32 var err error err = binary.Read(conn, binary.LittleEndian, &messageSizeU) if err != nil { + log.Error("Error while reading messsage size: %d", err) + time.Sleep(200 * time.Millisecond) continue } messageSize := int64(messageSizeU) messageReader := io.LimitReader(conn, messageSize) _, err = io.Copy(buff, messageReader) if err != nil { + log.Error("Error while reading message: %d", err) + time.Sleep(200 * time.Millisecond) continue } response, err := protocol.DecodeResponse(buff) if err != nil { log.Error("error unmarshaling response: %s", err) + time.Sleep(200 * time.Millisecond) } else { self.sendResponse(response) }