Simplify locking in WriteSeries()
In addition, memomize the Field codecs.pull/1644/head
parent
612ef1fa08
commit
9c4174a006
86
server.go
86
server.go
|
@ -1575,51 +1575,57 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
|
|||
|
||||
// Build writeRawSeriesMessageType publish commands.
|
||||
shardData := make(map[uint64][]byte, 0)
|
||||
for _, p := range points {
|
||||
codecs := make(map[string]*FieldCodec, 0)
|
||||
if err := func() error {
|
||||
// Local function makes lock management foolproof.
|
||||
measurement, series, err := func() (*Measurement, *Series, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
db := s.databases[database]
|
||||
if db == nil {
|
||||
return nil, nil, ErrDatabaseNotFound
|
||||
}
|
||||
if measurement, series := db.MeasurementAndSeries(p.Name, p.Tags); series != nil {
|
||||
return measurement, series, nil
|
||||
}
|
||||
return nil, nil, ErrSeriesNotFound
|
||||
}()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Retrieve shard group.
|
||||
g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Find appropriate shard within the shard group.
|
||||
sh := g.ShardBySeriesID(series.ID)
|
||||
|
||||
// Get a field codec.
|
||||
s.mu.RLock()
|
||||
codec := NewFieldCodec(measurement)
|
||||
s.mu.RUnlock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
// Convert string-key/values to encoded fields.
|
||||
encodedFields, err := codec.EncodeFields(p.Values)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
db := s.databases[database]
|
||||
if db == nil {
|
||||
return ErrDatabaseNotFound
|
||||
}
|
||||
for _, p := range points {
|
||||
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
|
||||
if series == nil {
|
||||
return ErrSeriesNotFound
|
||||
}
|
||||
|
||||
// Retrieve shard group.
|
||||
g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Find appropriate shard within the shard group.
|
||||
sh := g.ShardBySeriesID(series.ID)
|
||||
|
||||
// Many points are likely to have the same Measurement name. Re-use codecs if possible.
|
||||
var codec *FieldCodec
|
||||
codec, ok := codecs[measurement.Name]
|
||||
if !ok {
|
||||
codec = NewFieldCodec(measurement)
|
||||
codecs[measurement.Name] = codec
|
||||
}
|
||||
|
||||
// Convert string-key/values to encoded fields.
|
||||
encodedFields, err := codec.EncodeFields(p.Values)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Encode point header, followed by point data, and add to shard's batch.
|
||||
data := marshalPointHeader(series.ID, uint32(len(encodedFields)), p.Timestamp.UnixNano())
|
||||
data = append(data, encodedFields...)
|
||||
if shardData[sh.ID] == nil {
|
||||
shardData[sh.ID] = make([]byte, 0)
|
||||
}
|
||||
shardData[sh.ID] = append(shardData[sh.ID], data...)
|
||||
}
|
||||
|
||||
// Encode point header, followed by point data, and add to shard's batch.
|
||||
data := marshalPointHeader(series.ID, uint32(len(encodedFields)), p.Timestamp.UnixNano())
|
||||
data = append(data, encodedFields...)
|
||||
if shardData[sh.ID] == nil {
|
||||
shardData[sh.ID] = make([]byte, 0)
|
||||
}
|
||||
shardData[sh.ID] = append(shardData[sh.ID], data...)
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Write data for each shard to the Broker.
|
||||
|
|
Loading…
Reference in New Issue