Ensure we don't mutate provided statistics tags
parent
3ecc913c88
commit
6cafdbc604
|
@ -13,3 +13,28 @@ func NewStatistic(name string) Statistic {
|
||||||
Values: make(map[string]interface{}),
|
Values: make(map[string]interface{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatisticTags is a map that can be merged with others without causing
|
||||||
|
// mutations to either map.
|
||||||
|
type StatisticTags map[string]string
|
||||||
|
|
||||||
|
// Merge creates a new map containing the merged contents of tags and t.
|
||||||
|
// If both tags and the receiver map contain the same key, the value in tags
|
||||||
|
// is used in the resulting map.
|
||||||
|
//
|
||||||
|
// Merge always returns a usable map.
|
||||||
|
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
|
||||||
|
// Add everything in tags to the result.
|
||||||
|
out := make(map[string]string, len(tags))
|
||||||
|
for k, v := range tags {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only add values from t that don't appear in tags.
|
||||||
|
for k, v := range t {
|
||||||
|
if _, ok := tags[k]; !ok {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
package models_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTags_Merge(t *testing.T) {
|
||||||
|
examples := []struct {
|
||||||
|
Base map[string]string
|
||||||
|
Arg map[string]string
|
||||||
|
Result map[string]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Base: nil,
|
||||||
|
Arg: nil,
|
||||||
|
Result: map[string]string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Base: nil,
|
||||||
|
Arg: map[string]string{"foo": "foo"},
|
||||||
|
Result: map[string]string{"foo": "foo"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Base: map[string]string{"foo": "foo"},
|
||||||
|
Arg: nil,
|
||||||
|
Result: map[string]string{"foo": "foo"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Base: map[string]string{"foo": "foo"},
|
||||||
|
Arg: map[string]string{"bar": "bar"},
|
||||||
|
Result: map[string]string{"foo": "foo", "bar": "bar"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Base: map[string]string{"foo": "foo", "bar": "bar"},
|
||||||
|
Arg: map[string]string{"zoo": "zoo"},
|
||||||
|
Result: map[string]string{"foo": "foo", "bar": "bar", "zoo": "zoo"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Base: map[string]string{"foo": "foo", "bar": "bar"},
|
||||||
|
Arg: map[string]string{"bar": "newbar"},
|
||||||
|
Result: map[string]string{"foo": "foo", "bar": "newbar"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, example := range examples {
|
||||||
|
i++
|
||||||
|
result := models.StatisticTags(example.Base).Merge(example.Arg)
|
||||||
|
if got, exp := result, example.Result; !reflect.DeepEqual(got, exp) {
|
||||||
|
t.Errorf("[Example %d] got %#v, expected %#v", i, got, exp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -57,8 +57,8 @@ type Service struct {
|
||||||
addr net.Addr
|
addr net.Addr
|
||||||
|
|
||||||
// expvar-based stats.
|
// expvar-based stats.
|
||||||
stats *Statistics
|
stats *Statistics
|
||||||
statTags map[string]string
|
defaultTags models.StatisticTags
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService returns a new instance of the collectd service.
|
// NewService returns a new instance of the collectd service.
|
||||||
|
@ -67,10 +67,10 @@ func NewService(c Config) *Service {
|
||||||
// Use defaults where necessary.
|
// Use defaults where necessary.
|
||||||
Config: c.WithDefaults(),
|
Config: c.WithDefaults(),
|
||||||
|
|
||||||
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
|
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
stats: &Statistics{},
|
stats: &Statistics{},
|
||||||
statTags: map[string]string{"bind": c.BindAddress},
|
defaultTags: models.StatisticTags{"bind": c.BindAddress},
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s
|
return &s
|
||||||
|
@ -224,15 +224,9 @@ type Statistics struct {
|
||||||
|
|
||||||
// Statistics returns statistics for periodic monitoring.
|
// Statistics returns statistics for periodic monitoring.
|
||||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||||
// Insert any missing deault tag values.
|
|
||||||
for k, v := range s.statTags {
|
|
||||||
if _, ok := tags[k]; !ok {
|
|
||||||
tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return []models.Statistic{{
|
return []models.Statistic{{
|
||||||
Name: "collectd",
|
Name: "collectd",
|
||||||
Tags: tags,
|
Tags: s.defaultTags.Merge(tags),
|
||||||
Values: map[string]interface{}{
|
Values: map[string]interface{}{
|
||||||
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
||||||
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
||||||
|
|
|
@ -59,9 +59,9 @@ type Service struct {
|
||||||
batcher *tsdb.PointBatcher
|
batcher *tsdb.PointBatcher
|
||||||
parser *Parser
|
parser *Parser
|
||||||
|
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
stats *Statistics
|
stats *Statistics
|
||||||
statTags map[string]string
|
defaultTags models.StatisticTags
|
||||||
|
|
||||||
tcpConnectionsMu sync.Mutex
|
tcpConnectionsMu sync.Mutex
|
||||||
tcpConnections map[string]*tcpConnection
|
tcpConnections map[string]*tcpConnection
|
||||||
|
@ -106,7 +106,7 @@ func NewService(c Config) (*Service, error) {
|
||||||
batchTimeout: time.Duration(d.BatchTimeout),
|
batchTimeout: time.Duration(d.BatchTimeout),
|
||||||
logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags),
|
logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags),
|
||||||
stats: &Statistics{},
|
stats: &Statistics{},
|
||||||
statTags: map[string]string{"proto": d.Protocol, "bind": d.BindAddress},
|
defaultTags: models.StatisticTags{"proto": d.Protocol, "bind": d.BindAddress},
|
||||||
tcpConnections: make(map[string]*tcpConnection),
|
tcpConnections: make(map[string]*tcpConnection),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
|
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
|
||||||
|
@ -232,15 +232,9 @@ type Statistics struct {
|
||||||
|
|
||||||
// Statistics returns statistics for periodic monitoring.
|
// Statistics returns statistics for periodic monitoring.
|
||||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||||
// Insert any missing deault tag values.
|
|
||||||
for k, v := range s.statTags {
|
|
||||||
if _, ok := tags[k]; !ok {
|
|
||||||
tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return []models.Statistic{{
|
return []models.Statistic{{
|
||||||
Name: "graphite",
|
Name: "graphite",
|
||||||
Tags: tags,
|
Tags: s.defaultTags.Merge(tags),
|
||||||
Values: map[string]interface{}{
|
Values: map[string]interface{}{
|
||||||
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
||||||
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
||||||
|
|
|
@ -73,8 +73,8 @@ type Service struct {
|
||||||
LogPointErrors bool
|
LogPointErrors bool
|
||||||
Logger *log.Logger
|
Logger *log.Logger
|
||||||
|
|
||||||
stats *Statistics
|
stats *Statistics
|
||||||
statTags map[string]string
|
defaultTags models.StatisticTags
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService returns a new instance of Service.
|
// NewService returns a new instance of Service.
|
||||||
|
@ -96,7 +96,7 @@ func NewService(c Config) (*Service, error) {
|
||||||
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
|
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
|
||||||
LogPointErrors: d.LogPointErrors,
|
LogPointErrors: d.LogPointErrors,
|
||||||
stats: &Statistics{},
|
stats: &Statistics{},
|
||||||
statTags: map[string]string{"bind": d.BindAddress},
|
defaultTags: models.StatisticTags{"bind": d.BindAddress},
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -200,16 +200,9 @@ type Statistics struct {
|
||||||
|
|
||||||
// Statistics returns statistics for periodic monitoring.
|
// Statistics returns statistics for periodic monitoring.
|
||||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||||
// Insert any missing deault tag values.
|
|
||||||
for k, v := range s.statTags {
|
|
||||||
if _, ok := tags[k]; !ok {
|
|
||||||
tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return []models.Statistic{{
|
return []models.Statistic{{
|
||||||
Name: "opentsdb",
|
Name: "opentsdb",
|
||||||
Tags: tags,
|
Tags: s.defaultTags.Merge(tags),
|
||||||
Values: map[string]interface{}{
|
Values: map[string]interface{}{
|
||||||
statHTTPConnectionsHandled: atomic.LoadInt64(&s.stats.HTTPConnectionsHandled),
|
statHTTPConnectionsHandled: atomic.LoadInt64(&s.stats.HTTPConnectionsHandled),
|
||||||
statTelnetConnectionsActive: atomic.LoadInt64(&s.stats.ActiveTelnetConnections),
|
statTelnetConnectionsActive: atomic.LoadInt64(&s.stats.ActiveTelnetConnections),
|
||||||
|
|
|
@ -200,7 +200,7 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
|
||||||
bm: bm,
|
bm: bm,
|
||||||
writers: writers,
|
writers: writers,
|
||||||
stats: stats,
|
stats: stats,
|
||||||
tags: map[string]string{
|
defaultTags: models.StatisticTags{
|
||||||
"database": se.db,
|
"database": se.db,
|
||||||
"retention_policy": se.rp,
|
"retention_policy": se.rp,
|
||||||
"name": se.name,
|
"name": se.name,
|
||||||
|
@ -383,11 +383,11 @@ type writerStats struct {
|
||||||
|
|
||||||
// balances writes across PointsWriters according to BalanceMode
|
// balances writes across PointsWriters according to BalanceMode
|
||||||
type balancewriter struct {
|
type balancewriter struct {
|
||||||
bm BalanceMode
|
bm BalanceMode
|
||||||
writers []PointsWriter
|
writers []PointsWriter
|
||||||
stats []writerStats
|
stats []writerStats
|
||||||
tags map[string]string
|
defaultTags models.StatisticTags
|
||||||
i int
|
i int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
||||||
|
@ -415,19 +415,12 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
||||||
|
|
||||||
// Statistics returns statistics for periodic monitoring.
|
// Statistics returns statistics for periodic monitoring.
|
||||||
func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic {
|
func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic {
|
||||||
// Insert any missing default tag values.
|
|
||||||
for k, v := range b.tags {
|
|
||||||
if _, ok := tags[k]; !ok {
|
|
||||||
tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
statistics := make([]models.Statistic, len(b.stats))
|
statistics := make([]models.Statistic, len(b.stats))
|
||||||
for i := range b.stats {
|
for i := range b.stats {
|
||||||
tags["destination"] = b.stats[i].dest
|
tags["destination"] = b.stats[i].dest
|
||||||
statistics[i] = models.Statistic{
|
statistics[i] = models.Statistic{
|
||||||
Name: "subscriber",
|
Name: "subscriber",
|
||||||
Tags: tags,
|
Tags: b.defaultTags.Merge(tags),
|
||||||
Values: map[string]interface{}{
|
Values: map[string]interface{}{
|
||||||
statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten),
|
statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten),
|
||||||
statWriteFailures: atomic.LoadInt64(&b.stats[i].failures),
|
statWriteFailures: atomic.LoadInt64(&b.stats[i].failures),
|
||||||
|
|
|
@ -56,22 +56,22 @@ type Service struct {
|
||||||
CreateDatabase(name string) (*meta.DatabaseInfo, error)
|
CreateDatabase(name string) (*meta.DatabaseInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger *log.Logger
|
Logger *log.Logger
|
||||||
stats *Statistics
|
stats *Statistics
|
||||||
statTags map[string]string
|
defaultTags models.StatisticTags
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService returns a new instance of Service.
|
// NewService returns a new instance of Service.
|
||||||
func NewService(c Config) *Service {
|
func NewService(c Config) *Service {
|
||||||
d := *c.WithDefaults()
|
d := *c.WithDefaults()
|
||||||
return &Service{
|
return &Service{
|
||||||
config: d,
|
config: d,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
parserChan: make(chan []byte, parserChanLen),
|
parserChan: make(chan []byte, parserChanLen),
|
||||||
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
|
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
|
||||||
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
|
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
|
||||||
stats: &Statistics{},
|
stats: &Statistics{},
|
||||||
statTags: map[string]string{"bind": d.BindAddress},
|
defaultTags: models.StatisticTags{"bind": d.BindAddress},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,16 +132,9 @@ type Statistics struct {
|
||||||
|
|
||||||
// Statistics returns statistics for periodic monitoring.
|
// Statistics returns statistics for periodic monitoring.
|
||||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||||
// Insert any missing deault tag values.
|
|
||||||
for k, v := range s.statTags {
|
|
||||||
if _, ok := tags[k]; !ok {
|
|
||||||
tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return []models.Statistic{{
|
return []models.Statistic{{
|
||||||
Name: "udp",
|
Name: "udp",
|
||||||
Tags: tags,
|
Tags: s.defaultTags.Merge(tags),
|
||||||
Values: map[string]interface{}{
|
Values: map[string]interface{}{
|
||||||
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
||||||
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
||||||
|
|
|
@ -34,7 +34,8 @@ type DatabaseIndex struct {
|
||||||
|
|
||||||
name string // name of the database represented by this index
|
name string // name of the database represented by this index
|
||||||
|
|
||||||
stats *IndexStatistics
|
stats *IndexStatistics
|
||||||
|
defaultTags models.StatisticTags
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabaseIndex returns a new initialized DatabaseIndex.
|
// NewDatabaseIndex returns a new initialized DatabaseIndex.
|
||||||
|
@ -44,6 +45,7 @@ func NewDatabaseIndex(name string) *DatabaseIndex {
|
||||||
series: make(map[string]*Series),
|
series: make(map[string]*Series),
|
||||||
name: name,
|
name: name,
|
||||||
stats: &IndexStatistics{},
|
stats: &IndexStatistics{},
|
||||||
|
defaultTags: models.StatisticTags{"database": name},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,12 +57,9 @@ type IndexStatistics struct {
|
||||||
|
|
||||||
// Statistics returns statistics for periodic monitoring.
|
// Statistics returns statistics for periodic monitoring.
|
||||||
func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic {
|
func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic {
|
||||||
if _, ok := tags["database"]; !ok {
|
|
||||||
tags["database"] = d.name
|
|
||||||
}
|
|
||||||
return []models.Statistic{{
|
return []models.Statistic{{
|
||||||
Name: "database",
|
Name: "database",
|
||||||
Tags: tags,
|
Tags: d.defaultTags.Merge(tags),
|
||||||
Values: map[string]interface{}{
|
Values: map[string]interface{}{
|
||||||
statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries),
|
statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries),
|
||||||
statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements),
|
statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements),
|
||||||
|
|
|
@ -96,8 +96,8 @@ type Shard struct {
|
||||||
enabled bool
|
enabled bool
|
||||||
|
|
||||||
// expvar-based stats.
|
// expvar-based stats.
|
||||||
stats *ShardStatistics
|
stats *ShardStatistics
|
||||||
statTags map[string]string
|
defaultTags models.StatisticTags
|
||||||
|
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
// used by logger. Referenced so it can be passed down to new caches.
|
// used by logger. Referenced so it can be passed down to new caches.
|
||||||
|
@ -118,7 +118,7 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
|
|
||||||
stats: &ShardStatistics{},
|
stats: &ShardStatistics{},
|
||||||
statTags: map[string]string{
|
defaultTags: models.StatisticTags{
|
||||||
"path": path,
|
"path": path,
|
||||||
"id": fmt.Sprintf("%d", id),
|
"id": fmt.Sprintf("%d", id),
|
||||||
"database": db,
|
"database": db,
|
||||||
|
@ -179,16 +179,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert any missing default tag values.
|
|
||||||
for k, v := range s.statTags {
|
|
||||||
if _, ok := tags[k]; !ok {
|
|
||||||
tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
statistics := []models.Statistic{{
|
statistics := []models.Statistic{{
|
||||||
Name: "shard",
|
Name: "shard",
|
||||||
Tags: tags,
|
Tags: s.defaultTags.Merge(tags),
|
||||||
Values: map[string]interface{}{
|
Values: map[string]interface{}{
|
||||||
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
|
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
|
||||||
statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated),
|
statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated),
|
||||||
|
|
Loading…
Reference in New Issue