diff --git a/influxdb.go b/influxdb.go index ab2b6933f1..f4d4119586 100644 --- a/influxdb.go +++ b/influxdb.go @@ -81,6 +81,9 @@ var ( // ErrShardNotFound is returned writing to a non-existent shard. ErrShardNotFound = errors.New("shard not found") + // ErrInvalidPointBuffer is returned when a buffer containing data for writing is invalid + ErrInvalidPointBuffer = errors.New("invalid point buffer") + // ErrReadAccessDenied is returned when a user attempts to read // data that he or she does not have permission to read. ErrReadAccessDenied = errors.New("read access denied") diff --git a/server.go b/server.go index e55c61d566..d959218741 100644 --- a/server.go +++ b/server.go @@ -1577,7 +1577,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( } // Build writeRawSeriesMessageType publish commands. - shardData := make(map[uint64][]byte) + shardData := make(map[uint64][]byte, 0) for _, p := range points { // Local function makes lock management foolproof. measurement, series, err := func() (*Measurement, *Series, error) { @@ -1619,9 +1619,12 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( return 0, err } - // Encode point header, followed by point data, and assign to shard. - data := marshalPointHeader(series.ID, p.Timestamp.UnixNano()) + // Encode point header, followed by point data, and add to shard's batch. + data := marshalPointHeader(series.ID, uint32(len(encodedFields)), p.Timestamp.UnixNano()) data = append(data, encodedFields...) + if shardData[sh.ID] == nil { + shardData[sh.ID] = make([]byte, 0) + } shardData[sh.ID] = append(shardData[sh.ID], data...) } @@ -1655,19 +1658,37 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error { return ErrShardNotFound } - // Extract the series id and timestamp from the header. - // Everything after the header is the marshalled value. - seriesID, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize]) - data := m.Data[pointHeaderSize:] - - // Add to lookup. - s.addShardBySeriesID(sh, seriesID) - // TODO: Enable some way to specify if the data should be overwritten overwrite := true - // Write to shard. - return sh.writeSeries(seriesID, timestamp, data, overwrite) + for { + if pointHeaderSize > len(m.Data) { + return ErrInvalidPointBuffer + } + seriesID, payloadLength, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize]) + m.Data = m.Data[pointHeaderSize:] + + if payloadLength > uint32(len(m.Data)) { + return ErrInvalidPointBuffer + } + data := m.Data[:payloadLength] + + // Add to lookup. + s.addShardBySeriesID(sh, seriesID) + + // Write to shard. + if err := sh.writeSeries(seriesID, timestamp, data, overwrite); err != nil { + return err + } + + // Push the buffer forward and check if we're done. + m.Data = m.Data[payloadLength:] + if len(m.Data) == 0 { + break + } + } + + return nil } func (s *Server) addShardBySeriesID(sh *Shard, seriesID uint32) { diff --git a/shard.go b/shard.go index 8cf6a80ae3..40791f13e3 100644 --- a/shard.go +++ b/shard.go @@ -134,20 +134,22 @@ func (s *Shard) deleteSeries(name string) error { type Shards []*Shard // pointHeaderSize represents the size of a point header, in bytes. -const pointHeaderSize = 4 + 8 // seriesID + timestamp +const pointHeaderSize = 4 + 4 + 8 // seriesID + payload length + timestamp -// marshalPointHeader encodes a series id, timestamp, & flagset into a byte slice. -func marshalPointHeader(seriesID uint32, timestamp int64) []byte { - b := make([]byte, 12) +// marshalPointHeader encodes a series id, payload length, timestamp, & flagset into a byte slice. +func marshalPointHeader(seriesID uint32, payloadLength uint32, timestamp int64) []byte { + b := make([]byte, pointHeaderSize) binary.BigEndian.PutUint32(b[0:4], seriesID) - binary.BigEndian.PutUint64(b[4:12], uint64(timestamp)) + binary.BigEndian.PutUint32(b[4:8], payloadLength) + binary.BigEndian.PutUint64(b[8:16], uint64(timestamp)) return b } // unmarshalPointHeader decodes a byte slice into a series id, timestamp & flagset. -func unmarshalPointHeader(b []byte) (seriesID uint32, timestamp int64) { +func unmarshalPointHeader(b []byte) (seriesID uint32, payloadLength uint32, timestamp int64) { seriesID = binary.BigEndian.Uint32(b[0:4]) - timestamp = int64(binary.BigEndian.Uint64(b[4:12])) + payloadLength = binary.BigEndian.Uint32(b[4:8]) + timestamp = int64(binary.BigEndian.Uint64(b[8:16])) return }