diff --git a/models/statistic.go b/models/statistic.go index 20bbcf97e7..33f8089948 100644 --- a/models/statistic.go +++ b/models/statistic.go @@ -13,3 +13,28 @@ func NewStatistic(name string) Statistic { Values: make(map[string]interface{}), } } + +// StatisticTags is a map that can be merged with others without causing +// mutations to either map. +type StatisticTags map[string]string + +// Merge creates a new map containing the merged contents of tags and t. +// If both tags and the receiver map contain the same key, the value in tags +// is used in the resulting map. +// +// Merge always returns a usable map. +func (t StatisticTags) Merge(tags map[string]string) map[string]string { + // Add everything in tags to the result. + out := make(map[string]string, len(tags)) + for k, v := range tags { + out[k] = v + } + + // Only add values from t that don't appear in tags. + for k, v := range t { + if _, ok := tags[k]; !ok { + out[k] = v + } + } + return out +} diff --git a/models/statistic_test.go b/models/statistic_test.go new file mode 100644 index 0000000000..918c991342 --- /dev/null +++ b/models/statistic_test.go @@ -0,0 +1,55 @@ +package models_test + +import ( + "reflect" + "testing" + + "github.com/influxdata/influxdb/models" +) + +func TestTags_Merge(t *testing.T) { + examples := []struct { + Base map[string]string + Arg map[string]string + Result map[string]string + }{ + { + Base: nil, + Arg: nil, + Result: map[string]string{}, + }, + { + Base: nil, + Arg: map[string]string{"foo": "foo"}, + Result: map[string]string{"foo": "foo"}, + }, + { + Base: map[string]string{"foo": "foo"}, + Arg: nil, + Result: map[string]string{"foo": "foo"}, + }, + { + Base: map[string]string{"foo": "foo"}, + Arg: map[string]string{"bar": "bar"}, + Result: map[string]string{"foo": "foo", "bar": "bar"}, + }, + { + Base: map[string]string{"foo": "foo", "bar": "bar"}, + Arg: map[string]string{"zoo": "zoo"}, + Result: map[string]string{"foo": "foo", "bar": "bar", "zoo": "zoo"}, + }, + { + Base: map[string]string{"foo": "foo", "bar": "bar"}, + Arg: map[string]string{"bar": "newbar"}, + Result: map[string]string{"foo": "foo", "bar": "newbar"}, + }, + } + + for i, example := range examples { + i++ + result := models.StatisticTags(example.Base).Merge(example.Arg) + if got, exp := result, example.Result; !reflect.DeepEqual(got, exp) { + t.Errorf("[Example %d] got %#v, expected %#v", i, got, exp) + } + } +} diff --git a/services/collectd/service.go b/services/collectd/service.go index df06bb648a..f29e41c8da 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -57,8 +57,8 @@ type Service struct { addr net.Addr // expvar-based stats. - stats *Statistics - statTags map[string]string + stats *Statistics + defaultTags models.StatisticTags } // NewService returns a new instance of the collectd service. @@ -67,10 +67,10 @@ func NewService(c Config) *Service { // Use defaults where necessary. Config: c.WithDefaults(), - Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags), - err: make(chan error), - stats: &Statistics{}, - statTags: map[string]string{"bind": c.BindAddress}, + Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags), + err: make(chan error), + stats: &Statistics{}, + defaultTags: models.StatisticTags{"bind": c.BindAddress}, } return &s @@ -224,15 +224,9 @@ type Statistics struct { // Statistics returns statistics for periodic monitoring. func (s *Service) Statistics(tags map[string]string) []models.Statistic { - // Insert any missing deault tag values. - for k, v := range s.statTags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } return []models.Statistic{{ Name: "collectd", - Tags: tags, + Tags: s.defaultTags.Merge(tags), Values: map[string]interface{}{ statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived), statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived), diff --git a/services/graphite/service.go b/services/graphite/service.go index 932e2cf772..9b440dbd71 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -59,9 +59,9 @@ type Service struct { batcher *tsdb.PointBatcher parser *Parser - logger *log.Logger - stats *Statistics - statTags map[string]string + logger *log.Logger + stats *Statistics + defaultTags models.StatisticTags tcpConnectionsMu sync.Mutex tcpConnections map[string]*tcpConnection @@ -106,7 +106,7 @@ func NewService(c Config) (*Service, error) { batchTimeout: time.Duration(d.BatchTimeout), logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags), stats: &Statistics{}, - statTags: map[string]string{"proto": d.Protocol, "bind": d.BindAddress}, + defaultTags: models.StatisticTags{"proto": d.Protocol, "bind": d.BindAddress}, tcpConnections: make(map[string]*tcpConnection), done: make(chan struct{}), diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"), @@ -232,15 +232,9 @@ type Statistics struct { // Statistics returns statistics for periodic monitoring. func (s *Service) Statistics(tags map[string]string) []models.Statistic { - // Insert any missing deault tag values. - for k, v := range s.statTags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } return []models.Statistic{{ Name: "graphite", - Tags: tags, + Tags: s.defaultTags.Merge(tags), Values: map[string]interface{}{ statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived), statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived), diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 82ea93c03b..d17944dcef 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -73,8 +73,8 @@ type Service struct { LogPointErrors bool Logger *log.Logger - stats *Statistics - statTags map[string]string + stats *Statistics + defaultTags models.StatisticTags } // NewService returns a new instance of Service. @@ -96,7 +96,7 @@ func NewService(c Config) (*Service, error) { Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags), LogPointErrors: d.LogPointErrors, stats: &Statistics{}, - statTags: map[string]string{"bind": d.BindAddress}, + defaultTags: models.StatisticTags{"bind": d.BindAddress}, } return s, nil } @@ -200,16 +200,9 @@ type Statistics struct { // Statistics returns statistics for periodic monitoring. func (s *Service) Statistics(tags map[string]string) []models.Statistic { - // Insert any missing deault tag values. - for k, v := range s.statTags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - return []models.Statistic{{ Name: "opentsdb", - Tags: tags, + Tags: s.defaultTags.Merge(tags), Values: map[string]interface{}{ statHTTPConnectionsHandled: atomic.LoadInt64(&s.stats.HTTPConnectionsHandled), statTelnetConnectionsActive: atomic.LoadInt64(&s.stats.ActiveTelnetConnections), diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 0c44955432..cb8b72a276 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -200,7 +200,7 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st bm: bm, writers: writers, stats: stats, - tags: map[string]string{ + defaultTags: models.StatisticTags{ "database": se.db, "retention_policy": se.rp, "name": se.name, @@ -383,11 +383,11 @@ type writerStats struct { // balances writes across PointsWriters according to BalanceMode type balancewriter struct { - bm BalanceMode - writers []PointsWriter - stats []writerStats - tags map[string]string - i int + bm BalanceMode + writers []PointsWriter + stats []writerStats + defaultTags models.StatisticTags + i int } func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error { @@ -415,19 +415,12 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error { // Statistics returns statistics for periodic monitoring. func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic { - // Insert any missing default tag values. - for k, v := range b.tags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - statistics := make([]models.Statistic, len(b.stats)) for i := range b.stats { tags["destination"] = b.stats[i].dest statistics[i] = models.Statistic{ Name: "subscriber", - Tags: tags, + Tags: b.defaultTags.Merge(tags), Values: map[string]interface{}{ statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten), statWriteFailures: atomic.LoadInt64(&b.stats[i].failures), diff --git a/services/udp/service.go b/services/udp/service.go index 298a62b71f..5bdcf69cc3 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -56,22 +56,22 @@ type Service struct { CreateDatabase(name string) (*meta.DatabaseInfo, error) } - Logger *log.Logger - stats *Statistics - statTags map[string]string + Logger *log.Logger + stats *Statistics + defaultTags models.StatisticTags } // NewService returns a new instance of Service. func NewService(c Config) *Service { d := *c.WithDefaults() return &Service{ - config: d, - done: make(chan struct{}), - parserChan: make(chan []byte, parserChanLen), - batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)), - Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), - stats: &Statistics{}, - statTags: map[string]string{"bind": d.BindAddress}, + config: d, + done: make(chan struct{}), + parserChan: make(chan []byte, parserChanLen), + batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)), + Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), + stats: &Statistics{}, + defaultTags: models.StatisticTags{"bind": d.BindAddress}, } } @@ -132,16 +132,9 @@ type Statistics struct { // Statistics returns statistics for periodic monitoring. func (s *Service) Statistics(tags map[string]string) []models.Statistic { - // Insert any missing deault tag values. - for k, v := range s.statTags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - return []models.Statistic{{ Name: "udp", - Tags: tags, + Tags: s.defaultTags.Merge(tags), Values: map[string]interface{}{ statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived), statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived), diff --git a/tsdb/meta.go b/tsdb/meta.go index 2b2bb73200..dcca1537cb 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -34,7 +34,8 @@ type DatabaseIndex struct { name string // name of the database represented by this index - stats *IndexStatistics + stats *IndexStatistics + defaultTags models.StatisticTags } // NewDatabaseIndex returns a new initialized DatabaseIndex. @@ -44,6 +45,7 @@ func NewDatabaseIndex(name string) *DatabaseIndex { series: make(map[string]*Series), name: name, stats: &IndexStatistics{}, + defaultTags: models.StatisticTags{"database": name}, } } @@ -55,12 +57,9 @@ type IndexStatistics struct { // Statistics returns statistics for periodic monitoring. func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic { - if _, ok := tags["database"]; !ok { - tags["database"] = d.name - } return []models.Statistic{{ Name: "database", - Tags: tags, + Tags: d.defaultTags.Merge(tags), Values: map[string]interface{}{ statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries), statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements), diff --git a/tsdb/shard.go b/tsdb/shard.go index 7c16a2e690..87d7e631e0 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -96,8 +96,8 @@ type Shard struct { enabled bool // expvar-based stats. - stats *ShardStatistics - statTags map[string]string + stats *ShardStatistics + defaultTags models.StatisticTags logger *log.Logger // used by logger. Referenced so it can be passed down to new caches. @@ -118,7 +118,7 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti closing: make(chan struct{}), stats: &ShardStatistics{}, - statTags: map[string]string{ + defaultTags: models.StatisticTags{ "path": path, "id": fmt.Sprintf("%d", id), "database": db, @@ -179,16 +179,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { return nil } - // Insert any missing default tag values. - for k, v := range s.statTags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - statistics := []models.Statistic{{ Name: "shard", - Tags: tags, + Tags: s.defaultTags.Merge(tags), Values: map[string]interface{}{ statWriteReq: atomic.LoadInt64(&s.stats.WriteReq), statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated),