Support multiple OpenTSDB inputs

pull/6228/head
gunnaraasen 2016-04-05 14:36:07 -07:00
parent fccd0943a8
commit eec8cc94c7
7 changed files with 88 additions and 27 deletions

View File

@ -49,7 +49,7 @@ type Config struct {
HTTPD httpd.Config `toml:"http"`
GraphiteInputs []graphite.Config `toml:"graphite"`
CollectdInputs []collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
OpenTSDBInputs []opentsdb.Config `toml:"opentsdb"`
UDPInputs []udp.Config `toml:"udp"`
ContinuousQuery continuous_querier.Config `toml:"continuous_queries"`
@ -79,10 +79,10 @@ func NewConfig() *Config {
c.Monitor = monitor.NewConfig()
c.Subscriber = subscriber.NewConfig()
c.HTTPD = httpd.NewConfig()
c.OpenTSDB = opentsdb.NewConfig()
c.GraphiteInputs = []graphite.Config{graphite.NewConfig()}
c.CollectdInputs = []collectd.Config{collectd.NewConfig()}
c.OpenTSDBInputs = []opentsdb.Config{opentsdb.NewConfig()}
c.UDPInputs = []udp.Config{udp.NewConfig()}
c.ContinuousQuery = continuous_querier.NewConfig()

View File

@ -41,9 +41,15 @@ bind-address = ":1000"
[[collectd]]
bind-address = ":1010"
[opentsdb]
[[opentsdb]]
bind-address = ":2000"
[[opentsdb]]
bind-address = ":2010"
[[opentsdb]]
bind-address = ":2020"
[[udp]]
bind-address = ":4444"
@ -78,8 +84,12 @@ enabled = true
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[0].BindAddress)
} else if c.CollectdInputs[1].BindAddress != ":1010" {
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[1].BindAddress)
} else if c.OpenTSDB.BindAddress != ":2000" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress)
} else if c.OpenTSDBInputs[0].BindAddress != ":2000" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[0].BindAddress)
} else if c.OpenTSDBInputs[1].BindAddress != ":2010" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[1].BindAddress)
} else if c.OpenTSDBInputs[2].BindAddress != ":2020" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[2].BindAddress)
} else if c.UDPInputs[0].BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDPInputs[0].BindAddress)
} else if c.Subscriber.Enabled != true {
@ -122,9 +132,12 @@ bind-address = ":1000"
[[collectd]]
bind-address = ":1010"
[opentsdb]
[[opentsdb]]
bind-address = ":2000"
[[opentsdb]]
bind-address = ":2010"
[[udp]]
bind-address = ":4444"
@ -149,6 +162,10 @@ enabled = true
t.Fatalf("failed to set env var: %v", err)
}
if err := os.Setenv("INFLUXDB_OPENTSDB_0_BIND_ADDRESS", ":2020"); err != nil {
t.Fatalf("failed to set env var: %v", err)
}
if err := c.ApplyEnvOverrides(); err != nil {
t.Fatalf("failed to apply env overrides: %v", err)
}
@ -164,6 +181,10 @@ enabled = true
if c.CollectdInputs[1].BindAddress != ":1020" {
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[1].BindAddress)
}
if c.OpenTSDBInputs[0].BindAddress != ":2020" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[0].BindAddress)
}
}
func TestConfig_ValidateNoServiceConfigured(t *testing.T) {

View File

@ -241,18 +241,20 @@ func (s *Server) Open() error {
s.appendAdminService(s.config.Admin)
s.appendContinuousQueryService(s.config.ContinuousQuery)
s.appendHTTPDService(s.config.HTTPD)
if err := s.appendOpenTSDBService(s.config.OpenTSDB); err != nil {
return err
}
s.appendRetentionPolicyService(s.config.Retention)
for _, g := range s.config.GraphiteInputs {
if err := s.appendGraphiteService(g); err != nil {
for _, i := range s.config.GraphiteInputs {
if err := s.appendGraphiteService(i); err != nil {
return err
}
}
for _, i := range s.config.CollectdInputs {
s.appendCollectdService(i)
}
for _, i := range s.config.OpenTSDBInputs {
if err := s.appendOpenTSDBService(i); err != nil {
return err
}
}
for _, i := range s.config.UDPInputs {
s.appendUDPService(i)
}

View File

@ -233,10 +233,10 @@ reporting-disabled = false
###
### [opentsdb]
###
### Controls the listener for OpenTSDB data.
### Controls one or many listeners for OpenTSDB data.
###
[opentsdb]
[[opentsdb]]
enabled = false
# bind-address = ":4242"
# database = "opentsdb"

View File

@ -1,10 +1,10 @@
openTSDB Input
OpenTSDB Input
============
InfluxDB supports both the telnet and HTTP openTSDB protocol. This means that InfluxDB can act as a drop-in replacement for your openTSDB system.
InfluxDB supports both the telnet and HTTP OpenTSDB protocol. This means that InfluxDB can act as a drop-in replacement for your OpenTSDB system.
## Configuration
The openTSDB input allows the binding address, target database, and target retention policy within that database, to be set. If the database does not exist, it will be created automatically when the input is initialized. If you also decide to configure retention policy (without configuration the input will use the auto-created default retention policy), both the database and retention policy must already exist.
The OpenTSDB inputs allow the binding address, target database, and target retention policy within that database, to be set. If the database does not exist, it will be created automatically when the input is initialized. If you also decide to configure retention policy (without configuration the input will use the auto-created default retention policy), both the database and retention policy must already exist.
The write-consistency-level can also be set. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is `ONE`.
The openTSDB input also performs internal batching of the points it receives, as batched writes to the database are more efficient. The default _batch size_ is 1000, _pending batch_ factor is 5, with a _batch timeout_ of 1 second. This means the input will write batches of maximum size 1000, but if a batch has not reached 1000 points within 1 second of the first point being added to a batch, it will emit that batch regardless of size. The pending batch factor controls how many batches can be in memory at once, allowing the input to transmit a batch, while still building other batches.
The OpenTSDB input also performs internal batching of the points it receives, as batched writes to the database are more efficient. The default _batch size_ is 1000, _pending batch_ factor is 5, with a _batch timeout_ of 1 second. This means the input will write batches of maximum size 1000, but if a batch has not reached 1000 points within 1 second of the first point being added to a batch, it will emit that batch regardless of size. The pending batch factor controls how many batches can be in memory at once, allowing the input to transmit a batch, while still building other batches.

View File

@ -27,6 +27,9 @@ const (
// DefaultBatchPending is the default number of batches that can be in the queue.
DefaultBatchPending = 5
// DefaultCertificate is the default location of the certificate used when TLS is enabled.
DefaultCertificate = "/etc/ssl/influxdb.pem"
)
// Config represents the configuration of the OpenTSDB service.
@ -52,10 +55,42 @@ func NewConfig() Config {
RetentionPolicy: DefaultRetentionPolicy,
ConsistencyLevel: DefaultConsistencyLevel,
TLSEnabled: false,
Certificate: "/etc/ssl/influxdb.pem",
Certificate: DefaultCertificate,
BatchSize: DefaultBatchSize,
BatchPending: DefaultBatchPending,
BatchTimeout: toml.Duration(DefaultBatchTimeout),
LogPointErrors: true,
}
}
// WithDefaults takes the given config and returns a new config with any required
// default values set.
func (c *Config) WithDefaults() *Config {
d := *c
if d.BindAddress == "" {
d.BindAddress = DefaultBindAddress
}
if d.Database == "" {
d.Database = DefaultDatabase
}
if d.RetentionPolicy == "" {
d.RetentionPolicy = DefaultRetentionPolicy
}
if d.ConsistencyLevel == "" {
d.ConsistencyLevel = DefaultConsistencyLevel
}
if d.Certificate == "" {
d.Certificate = DefaultCertificate
}
if d.BatchSize == 0 {
d.BatchSize = DefaultBatchSize
}
if d.BatchPending == 0 {
d.BatchPending = DefaultBatchPending
}
if d.BatchTimeout == 0 {
d.BatchTimeout = toml.Duration(DefaultBatchTimeout)
}
return &d
}

View File

@ -80,19 +80,22 @@ type Service struct {
// NewService returns a new instance of Service.
func NewService(c Config) (*Service, error) {
// Use defaults where necessary.
d := c.WithDefaults()
s := &Service{
done: make(chan struct{}),
tls: c.TLSEnabled,
cert: c.Certificate,
tls: d.TLSEnabled,
cert: d.Certificate,
err: make(chan error),
BindAddress: c.BindAddress,
Database: c.Database,
RetentionPolicy: c.RetentionPolicy,
batchSize: c.BatchSize,
batchPending: c.BatchPending,
batchTimeout: time.Duration(c.BatchTimeout),
BindAddress: d.BindAddress,
Database: d.Database,
RetentionPolicy: d.RetentionPolicy,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
batchTimeout: time.Duration(d.BatchTimeout),
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
LogPointErrors: c.LogPointErrors,
LogPointErrors: d.LogPointErrors,
}
return s, nil
}