Support multiple collectd inputs

pull/6228/head
gunnaraasen 2016-04-05 11:05:06 -07:00
parent 78086fa1f3
commit fccd0943a8
7 changed files with 69 additions and 17 deletions

View File

@ -48,7 +48,7 @@ type Config struct {
Subscriber subscriber.Config `toml:"subscriber"`
HTTPD httpd.Config `toml:"http"`
GraphiteInputs []graphite.Config `toml:"graphite"`
Collectd collectd.Config `toml:"collectd"`
CollectdInputs []collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
UDPInputs []udp.Config `toml:"udp"`
@ -79,10 +79,10 @@ func NewConfig() *Config {
c.Monitor = monitor.NewConfig()
c.Subscriber = subscriber.NewConfig()
c.HTTPD = httpd.NewConfig()
c.Collectd = collectd.NewConfig()
c.OpenTSDB = opentsdb.NewConfig()
c.GraphiteInputs = []graphite.Config{graphite.NewConfig()}
c.CollectdInputs = []collectd.Config{collectd.NewConfig()}
c.UDPInputs = []udp.Config{udp.NewConfig()}
c.ContinuousQuery = continuous_querier.NewConfig()

View File

@ -35,9 +35,12 @@ protocol = "udp"
[[graphite]]
protocol = "tcp"
[collectd]
[[collectd]]
bind-address = ":1000"
[[collectd]]
bind-address = ":1010"
[opentsdb]
bind-address = ":2000"
@ -71,8 +74,10 @@ enabled = true
t.Fatalf("unexpected graphite protocol(0): %s", c.GraphiteInputs[0].Protocol)
} else if c.GraphiteInputs[1].Protocol != "tcp" {
t.Fatalf("unexpected graphite protocol(1): %s", c.GraphiteInputs[1].Protocol)
} else if c.Collectd.BindAddress != ":1000" {
t.Fatalf("unexpected collectd bind address: %s", c.Collectd.BindAddress)
} else if c.CollectdInputs[0].BindAddress != ":1000" {
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[0].BindAddress)
} else if c.CollectdInputs[1].BindAddress != ":1010" {
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[1].BindAddress)
} else if c.OpenTSDB.BindAddress != ":2000" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress)
} else if c.UDPInputs[0].BindAddress != ":4444" {
@ -111,9 +116,12 @@ protocol = "udp"
[[graphite]]
protocol = "tcp"
[collectd]
[[collectd]]
bind-address = ":1000"
[[collectd]]
bind-address = ":1010"
[opentsdb]
bind-address = ":2000"
@ -137,6 +145,10 @@ enabled = true
t.Fatalf("failed to set env var: %v", err)
}
if err := os.Setenv("INFLUXDB_COLLECTD_1_BIND_ADDRESS", ":1020"); err != nil {
t.Fatalf("failed to set env var: %v", err)
}
if err := c.ApplyEnvOverrides(); err != nil {
t.Fatalf("failed to apply env overrides: %v", err)
}
@ -146,7 +158,11 @@ enabled = true
}
if c.GraphiteInputs[1].Protocol != "udp" {
t.Fatalf("unexpected graphite protocol(0): %s", c.GraphiteInputs[0].Protocol)
t.Fatalf("unexpected graphite protocol: %s", c.GraphiteInputs[1].Protocol)
}
if c.CollectdInputs[1].BindAddress != ":1020" {
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[1].BindAddress)
}
}

View File

@ -241,19 +241,21 @@ func (s *Server) Open() error {
s.appendAdminService(s.config.Admin)
s.appendContinuousQueryService(s.config.ContinuousQuery)
s.appendHTTPDService(s.config.HTTPD)
s.appendCollectdService(s.config.Collectd)
if err := s.appendOpenTSDBService(s.config.OpenTSDB); err != nil {
return err
}
for _, g := range s.config.UDPInputs {
s.appendUDPService(g)
}
s.appendRetentionPolicyService(s.config.Retention)
for _, g := range s.config.GraphiteInputs {
if err := s.appendGraphiteService(g); err != nil {
return err
}
}
for _, i := range s.config.CollectdInputs {
s.appendCollectdService(i)
}
for _, i := range s.config.UDPInputs {
s.appendUDPService(i)
}
s.Subscriber.MetaClient = s.MetaClient
s.Subscriber.MetaClient = s.MetaClient

View File

@ -212,10 +212,10 @@ reporting-disabled = false
###
### [collectd]
###
### Controls the listener for collectd data.
### Controls one or many listeners for collectd data.
###
[collectd]
[[collectd]]
enabled = false
# bind-address = ""
# database = ""

View File

@ -22,7 +22,7 @@ Please note that UDP packets larger than the standard size of 1452 are dropped a
## Config Example
```
[collectd]
[[collectd]]
enabled = true
bind-address = ":25826" # the bind address
database = "collectd" # Name of the database that will be written to

View File

@ -68,3 +68,35 @@ func NewConfig() Config {
TypesDB: DefaultTypesDB,
}
}
// WithDefaults takes the given config and returns a new config with any required
// default values set.
func (c *Config) WithDefaults() *Config {
d := *c
if d.BindAddress == "" {
d.BindAddress = DefaultBindAddress
}
if d.Database == "" {
d.Database = DefaultDatabase
}
if d.RetentionPolicy == "" {
d.RetentionPolicy = DefaultRetentionPolicy
}
if d.BatchSize == 0 {
d.BatchSize = DefaultBatchSize
}
if d.BatchPending == 0 {
d.BatchPending = DefaultBatchPending
}
if d.BatchDuration == 0 {
d.BatchDuration = DefaultBatchDuration
}
if d.ReadBuffer == 0 {
d.ReadBuffer = DefaultReadBuffer
}
if d.TypesDB == "" {
d.TypesDB = DefaultTypesDB
}
return &d
}

View File

@ -63,13 +63,15 @@ type Service struct {
// NewService returns a new instance of the collectd service.
func NewService(c Config) *Service {
s := &Service{
Config: &c,
s := Service{
// Use defaults where necessary.
Config: c.WithDefaults(),
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
err: make(chan error),
}
return s
return &s
}
// Open starts the service.