From f1bb87d4f888dcd015cba30c3164736e42c5850b Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 30 Mar 2016 21:11:16 -0600 Subject: [PATCH] Convert index write lock to series lock --- tsdb/meta.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/tsdb/meta.go b/tsdb/meta.go index feb10f8ffb..29ee56aa34 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -158,9 +158,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem func (d *DatabaseIndex) AssignShard(k string, shardID uint64) { ss := d.Series(k) if ss != nil { - d.mu.Lock() ss.AssignShard(shardID) - d.mu.Unlock() } } @@ -168,6 +166,7 @@ func (d *DatabaseIndex) AssignShard(k string, shardID uint64) { func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { d.mu.RLock() defer d.mu.RUnlock() + ss := d.series[key] if ss == nil { return nil @@ -1311,9 +1310,9 @@ func (a Measurements) union(other Measurements) Measurements { // Series belong to a Measurement and represent unique time series in a database type Series struct { - Key string - Tags map[string]string - + mu sync.RWMutex + Key string + Tags map[string]string id uint64 measurement *Measurement shardIDs map[uint64]bool // shards that have this series defined @@ -1329,11 +1328,16 @@ func NewSeries(key string, tags map[string]string) *Series { } func (s *Series) AssignShard(shardID uint64) { + s.mu.Lock() s.shardIDs[shardID] = true + s.mu.Unlock() } // MarshalBinary encodes the object to a binary format. func (s *Series) MarshalBinary() ([]byte, error) { + s.mu.RLock() + defer s.mu.RUnlock() + var pb internal.Series pb.Key = &s.Key for k, v := range s.Tags { @@ -1346,6 +1350,9 @@ func (s *Series) MarshalBinary() ([]byte, error) { // UnmarshalBinary decodes the object from a binary format. func (s *Series) UnmarshalBinary(buf []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + var pb internal.Series if err := proto.Unmarshal(buf, &pb); err != nil { return err @@ -1360,11 +1367,16 @@ func (s *Series) UnmarshalBinary(buf []byte) error { // InitializeShards initializes the list of shards. func (s *Series) InitializeShards() { + s.mu.Lock() s.shardIDs = make(map[uint64]bool) + s.mu.Unlock() } // match returns true if all tags match the series' tags. func (s *Series) match(tags map[string]string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + for k, v := range tags { if s.Tags[k] != v { return false