fix: eliminate race condition on Monitor.globalTags (#23471)
close #23470
(cherry picked from commit dd356e0bcc
)
pull/23490/head
parent
75ed51b9c3
commit
ee881ec558
|
@ -32,6 +32,39 @@ const (
|
|||
MonitorRetentionPolicyReplicaN = 1
|
||||
)
|
||||
|
||||
// tags provides thread-safe tag handling
|
||||
type tags struct {
|
||||
mu sync.RWMutex
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
// NewTags creates a new tags struct to use
|
||||
func newTags() *tags {
|
||||
return &tags{
|
||||
tags: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a new tag to the tag collection
|
||||
func (t *tags) Add(key string, value interface{}) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
t.tags[key] = fmt.Sprintf("%v", value)
|
||||
}
|
||||
|
||||
// Tags safely returns a copy of the current tag mapping
|
||||
func (t *tags) Tags() map[string]string {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
r := make(map[string]string)
|
||||
for k, v := range t.tags {
|
||||
r[k] = v
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// Monitor represents an instance of the monitor system.
|
||||
type Monitor struct {
|
||||
// Build information for diagnostics.
|
||||
|
@ -42,8 +75,9 @@ type Monitor struct {
|
|||
|
||||
wg sync.WaitGroup
|
||||
|
||||
globalTags *tags
|
||||
|
||||
mu sync.RWMutex
|
||||
globalTags map[string]string
|
||||
diagRegistrations map[string]diagnostics.Client
|
||||
reporter Reporter
|
||||
done chan struct{}
|
||||
|
@ -73,7 +107,7 @@ type PointsWriter interface {
|
|||
// New returns a new instance of the monitor system.
|
||||
func New(r Reporter, c Config) *Monitor {
|
||||
return &Monitor{
|
||||
globalTags: make(map[string]string),
|
||||
globalTags: newTags(),
|
||||
diagRegistrations: make(map[string]diagnostics.Client),
|
||||
reporter: r,
|
||||
storeEnabled: c.StoreEnabled,
|
||||
|
@ -138,9 +172,10 @@ func (m *Monitor) WritePoints(p models.Points) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if len(m.globalTags) > 0 {
|
||||
gt := m.globalTags.Tags()
|
||||
if len(gt) > 0 {
|
||||
for _, pp := range p {
|
||||
pp.SetTags(pp.Tags().Merge(m.globalTags))
|
||||
pp.SetTags(pp.Tags().Merge(gt))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,9 +220,7 @@ func (m *Monitor) Close() error {
|
|||
// SetGlobalTag can be used to set tags that will appear on all points
|
||||
// written by the Monitor.
|
||||
func (m *Monitor) SetGlobalTag(key string, value interface{}) {
|
||||
m.mu.Lock()
|
||||
m.globalTags[key] = fmt.Sprintf("%v", value)
|
||||
m.mu.Unlock()
|
||||
m.globalTags.Add(key, value)
|
||||
}
|
||||
|
||||
// RemoteWriterConfig represents the configuration of a remote writer.
|
||||
|
@ -440,7 +473,7 @@ func (m *Monitor) storeStatistics() {
|
|||
m.createInternalStorage()
|
||||
}()
|
||||
|
||||
stats, err := m.Statistics(m.globalTags)
|
||||
stats, err := m.Statistics(m.globalTags.Tags())
|
||||
if err != nil {
|
||||
m.Logger.Info("Failed to retrieve registered statistics", zap.Error(err))
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue