From eec8cc94c794732933e3f8f7ecc0c9df3600eb80 Mon Sep 17 00:00:00 2001 From: gunnaraasen Date: Tue, 5 Apr 2016 14:36:07 -0700 Subject: [PATCH] Support multiple OpenTSDB inputs --- cmd/influxd/run/config.go | 4 ++-- cmd/influxd/run/config_test.go | 29 ++++++++++++++++++++++---- cmd/influxd/run/server.go | 12 ++++++----- etc/config.sample.toml | 4 ++-- services/opentsdb/README.md | 8 ++++---- services/opentsdb/config.go | 37 +++++++++++++++++++++++++++++++++- services/opentsdb/service.go | 21 ++++++++++--------- 7 files changed, 88 insertions(+), 27 deletions(-) diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 8dd8abc951..7f42e65833 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -49,7 +49,7 @@ type Config struct { HTTPD httpd.Config `toml:"http"` GraphiteInputs []graphite.Config `toml:"graphite"` CollectdInputs []collectd.Config `toml:"collectd"` - OpenTSDB opentsdb.Config `toml:"opentsdb"` + OpenTSDBInputs []opentsdb.Config `toml:"opentsdb"` UDPInputs []udp.Config `toml:"udp"` ContinuousQuery continuous_querier.Config `toml:"continuous_queries"` @@ -79,10 +79,10 @@ func NewConfig() *Config { c.Monitor = monitor.NewConfig() c.Subscriber = subscriber.NewConfig() c.HTTPD = httpd.NewConfig() - c.OpenTSDB = opentsdb.NewConfig() c.GraphiteInputs = []graphite.Config{graphite.NewConfig()} c.CollectdInputs = []collectd.Config{collectd.NewConfig()} + c.OpenTSDBInputs = []opentsdb.Config{opentsdb.NewConfig()} c.UDPInputs = []udp.Config{udp.NewConfig()} c.ContinuousQuery = continuous_querier.NewConfig() diff --git a/cmd/influxd/run/config_test.go b/cmd/influxd/run/config_test.go index 9f430b16c9..18d3b8c6a8 100644 --- a/cmd/influxd/run/config_test.go +++ b/cmd/influxd/run/config_test.go @@ -41,9 +41,15 @@ bind-address = ":1000" [[collectd]] bind-address = ":1010" -[opentsdb] +[[opentsdb]] bind-address = ":2000" +[[opentsdb]] +bind-address = ":2010" + +[[opentsdb]] +bind-address = ":2020" + [[udp]] bind-address = ":4444" @@ -78,8 +84,12 @@ enabled = true t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[0].BindAddress) } else if c.CollectdInputs[1].BindAddress != ":1010" { t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[1].BindAddress) - } else if c.OpenTSDB.BindAddress != ":2000" { - t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress) + } else if c.OpenTSDBInputs[0].BindAddress != ":2000" { + t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[0].BindAddress) + } else if c.OpenTSDBInputs[1].BindAddress != ":2010" { + t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[1].BindAddress) + } else if c.OpenTSDBInputs[2].BindAddress != ":2020" { + t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[2].BindAddress) } else if c.UDPInputs[0].BindAddress != ":4444" { t.Fatalf("unexpected udp bind address: %s", c.UDPInputs[0].BindAddress) } else if c.Subscriber.Enabled != true { @@ -122,9 +132,12 @@ bind-address = ":1000" [[collectd]] bind-address = ":1010" -[opentsdb] +[[opentsdb]] bind-address = ":2000" +[[opentsdb]] +bind-address = ":2010" + [[udp]] bind-address = ":4444" @@ -149,6 +162,10 @@ enabled = true t.Fatalf("failed to set env var: %v", err) } + if err := os.Setenv("INFLUXDB_OPENTSDB_0_BIND_ADDRESS", ":2020"); err != nil { + t.Fatalf("failed to set env var: %v", err) + } + if err := c.ApplyEnvOverrides(); err != nil { t.Fatalf("failed to apply env overrides: %v", err) } @@ -164,6 +181,10 @@ enabled = true if c.CollectdInputs[1].BindAddress != ":1020" { t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[1].BindAddress) } + + if c.OpenTSDBInputs[0].BindAddress != ":2020" { + t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[0].BindAddress) + } } func TestConfig_ValidateNoServiceConfigured(t *testing.T) { diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 896f306886..cc1591219c 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -241,18 +241,20 @@ func (s *Server) Open() error { s.appendAdminService(s.config.Admin) s.appendContinuousQueryService(s.config.ContinuousQuery) s.appendHTTPDService(s.config.HTTPD) - if err := s.appendOpenTSDBService(s.config.OpenTSDB); err != nil { - return err - } s.appendRetentionPolicyService(s.config.Retention) - for _, g := range s.config.GraphiteInputs { - if err := s.appendGraphiteService(g); err != nil { + for _, i := range s.config.GraphiteInputs { + if err := s.appendGraphiteService(i); err != nil { return err } } for _, i := range s.config.CollectdInputs { s.appendCollectdService(i) } + for _, i := range s.config.OpenTSDBInputs { + if err := s.appendOpenTSDBService(i); err != nil { + return err + } + } for _, i := range s.config.UDPInputs { s.appendUDPService(i) } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 7db199c9fd..729670733f 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -233,10 +233,10 @@ reporting-disabled = false ### ### [opentsdb] ### -### Controls the listener for OpenTSDB data. +### Controls one or many listeners for OpenTSDB data. ### -[opentsdb] +[[opentsdb]] enabled = false # bind-address = ":4242" # database = "opentsdb" diff --git a/services/opentsdb/README.md b/services/opentsdb/README.md index fe84a3065e..46856b7bc1 100644 --- a/services/opentsdb/README.md +++ b/services/opentsdb/README.md @@ -1,10 +1,10 @@ -openTSDB Input +OpenTSDB Input ============ -InfluxDB supports both the telnet and HTTP openTSDB protocol. This means that InfluxDB can act as a drop-in replacement for your openTSDB system. +InfluxDB supports both the telnet and HTTP OpenTSDB protocol. This means that InfluxDB can act as a drop-in replacement for your OpenTSDB system. ## Configuration -The openTSDB input allows the binding address, target database, and target retention policy within that database, to be set. If the database does not exist, it will be created automatically when the input is initialized. If you also decide to configure retention policy (without configuration the input will use the auto-created default retention policy), both the database and retention policy must already exist. +The OpenTSDB inputs allow the binding address, target database, and target retention policy within that database, to be set. If the database does not exist, it will be created automatically when the input is initialized. If you also decide to configure retention policy (without configuration the input will use the auto-created default retention policy), both the database and retention policy must already exist. The write-consistency-level can also be set. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is `ONE`. -The openTSDB input also performs internal batching of the points it receives, as batched writes to the database are more efficient. The default _batch size_ is 1000, _pending batch_ factor is 5, with a _batch timeout_ of 1 second. This means the input will write batches of maximum size 1000, but if a batch has not reached 1000 points within 1 second of the first point being added to a batch, it will emit that batch regardless of size. The pending batch factor controls how many batches can be in memory at once, allowing the input to transmit a batch, while still building other batches. +The OpenTSDB input also performs internal batching of the points it receives, as batched writes to the database are more efficient. The default _batch size_ is 1000, _pending batch_ factor is 5, with a _batch timeout_ of 1 second. This means the input will write batches of maximum size 1000, but if a batch has not reached 1000 points within 1 second of the first point being added to a batch, it will emit that batch regardless of size. The pending batch factor controls how many batches can be in memory at once, allowing the input to transmit a batch, while still building other batches. diff --git a/services/opentsdb/config.go b/services/opentsdb/config.go index 821d8bdb79..71eba9f353 100644 --- a/services/opentsdb/config.go +++ b/services/opentsdb/config.go @@ -27,6 +27,9 @@ const ( // DefaultBatchPending is the default number of batches that can be in the queue. DefaultBatchPending = 5 + + // DefaultCertificate is the default location of the certificate used when TLS is enabled. + DefaultCertificate = "/etc/ssl/influxdb.pem" ) // Config represents the configuration of the OpenTSDB service. @@ -52,10 +55,42 @@ func NewConfig() Config { RetentionPolicy: DefaultRetentionPolicy, ConsistencyLevel: DefaultConsistencyLevel, TLSEnabled: false, - Certificate: "/etc/ssl/influxdb.pem", + Certificate: DefaultCertificate, BatchSize: DefaultBatchSize, BatchPending: DefaultBatchPending, BatchTimeout: toml.Duration(DefaultBatchTimeout), LogPointErrors: true, } } + +// WithDefaults takes the given config and returns a new config with any required +// default values set. +func (c *Config) WithDefaults() *Config { + d := *c + if d.BindAddress == "" { + d.BindAddress = DefaultBindAddress + } + if d.Database == "" { + d.Database = DefaultDatabase + } + if d.RetentionPolicy == "" { + d.RetentionPolicy = DefaultRetentionPolicy + } + if d.ConsistencyLevel == "" { + d.ConsistencyLevel = DefaultConsistencyLevel + } + if d.Certificate == "" { + d.Certificate = DefaultCertificate + } + if d.BatchSize == 0 { + d.BatchSize = DefaultBatchSize + } + if d.BatchPending == 0 { + d.BatchPending = DefaultBatchPending + } + if d.BatchTimeout == 0 { + d.BatchTimeout = toml.Duration(DefaultBatchTimeout) + } + + return &d +} diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 1db82b47d9..32b205f1e1 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -80,19 +80,22 @@ type Service struct { // NewService returns a new instance of Service. func NewService(c Config) (*Service, error) { + // Use defaults where necessary. + d := c.WithDefaults() + s := &Service{ done: make(chan struct{}), - tls: c.TLSEnabled, - cert: c.Certificate, + tls: d.TLSEnabled, + cert: d.Certificate, err: make(chan error), - BindAddress: c.BindAddress, - Database: c.Database, - RetentionPolicy: c.RetentionPolicy, - batchSize: c.BatchSize, - batchPending: c.BatchPending, - batchTimeout: time.Duration(c.BatchTimeout), + BindAddress: d.BindAddress, + Database: d.Database, + RetentionPolicy: d.RetentionPolicy, + batchSize: d.BatchSize, + batchPending: d.BatchPending, + batchTimeout: time.Duration(d.BatchTimeout), Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags), - LogPointErrors: c.LogPointErrors, + LogPointErrors: d.LogPointErrors, } return s, nil }