Convert index write lock to series lock
parent
86385f9930
commit
f1bb87d4f8
22
tsdb/meta.go
22
tsdb/meta.go
|
@ -158,9 +158,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
|
|||
func (d *DatabaseIndex) AssignShard(k string, shardID uint64) {
|
||||
ss := d.Series(k)
|
||||
if ss != nil {
|
||||
d.mu.Lock()
|
||||
ss.AssignShard(shardID)
|
||||
d.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,6 +166,7 @@ func (d *DatabaseIndex) AssignShard(k string, shardID uint64) {
|
|||
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
ss := d.series[key]
|
||||
if ss == nil {
|
||||
return nil
|
||||
|
@ -1311,9 +1310,9 @@ func (a Measurements) union(other Measurements) Measurements {
|
|||
|
||||
// Series belong to a Measurement and represent unique time series in a database
|
||||
type Series struct {
|
||||
Key string
|
||||
Tags map[string]string
|
||||
|
||||
mu sync.RWMutex
|
||||
Key string
|
||||
Tags map[string]string
|
||||
id uint64
|
||||
measurement *Measurement
|
||||
shardIDs map[uint64]bool // shards that have this series defined
|
||||
|
@ -1329,11 +1328,16 @@ func NewSeries(key string, tags map[string]string) *Series {
|
|||
}
|
||||
|
||||
func (s *Series) AssignShard(shardID uint64) {
|
||||
s.mu.Lock()
|
||||
s.shardIDs[shardID] = true
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// MarshalBinary encodes the object to a binary format.
|
||||
func (s *Series) MarshalBinary() ([]byte, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
var pb internal.Series
|
||||
pb.Key = &s.Key
|
||||
for k, v := range s.Tags {
|
||||
|
@ -1346,6 +1350,9 @@ func (s *Series) MarshalBinary() ([]byte, error) {
|
|||
|
||||
// UnmarshalBinary decodes the object from a binary format.
|
||||
func (s *Series) UnmarshalBinary(buf []byte) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var pb internal.Series
|
||||
if err := proto.Unmarshal(buf, &pb); err != nil {
|
||||
return err
|
||||
|
@ -1360,11 +1367,16 @@ func (s *Series) UnmarshalBinary(buf []byte) error {
|
|||
|
||||
// InitializeShards initializes the list of shards.
|
||||
func (s *Series) InitializeShards() {
|
||||
s.mu.Lock()
|
||||
s.shardIDs = make(map[uint64]bool)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// match returns true if all tags match the series' tags.
|
||||
func (s *Series) match(tags map[string]string) bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
for k, v := range tags {
|
||||
if s.Tags[k] != v {
|
||||
return false
|
||||
|
|
Loading…
Reference in New Issue