Merge pull request #352 from influxdb/sort_points

Sort points
pull/357/head
Paul Dix 2014-03-20 14:27:22 -04:00
commit 05c86f15e0
2 changed files with 13 additions and 1 deletions

View File

@ -468,10 +468,16 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series
lastPointIndex := 0
now := common.CurrentTime()
var shardToWrite cluster.Shard
for i, point := range series.Points {
for _, point := range series.Points {
if point.Timestamp == nil {
point.Timestamp = &now
}
}
// sort the points by timestamp
series.SortPointsTimeDescending()
for i, point := range series.Points {
if *point.Timestamp != lastTime {
shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, *series.Name, *point.Timestamp)
if err != nil {

View File

@ -136,23 +136,29 @@ func (self *ProtobufClient) readResponses() {
buff.Reset()
conn := self.getConnection()
if conn == nil {
time.Sleep(200 * time.Millisecond)
continue
}
var messageSizeU uint32
var err error
err = binary.Read(conn, binary.LittleEndian, &messageSizeU)
if err != nil {
log.Error("Error while reading messsage size: %d", err)
time.Sleep(200 * time.Millisecond)
continue
}
messageSize := int64(messageSizeU)
messageReader := io.LimitReader(conn, messageSize)
_, err = io.Copy(buff, messageReader)
if err != nil {
log.Error("Error while reading message: %d", err)
time.Sleep(200 * time.Millisecond)
continue
}
response, err := protocol.DecodeResponse(buff)
if err != nil {
log.Error("error unmarshaling response: %s", err)
time.Sleep(200 * time.Millisecond)
} else {
self.sendResponse(response)
}