Merge pull request #10150 from influxdata/jmw-tls-config
Allow specifying some TLS configuration.pull/10152/head
commit
881f7126a4
|
@ -118,6 +118,7 @@ type Config struct {
|
||||||
WriteConsistency string
|
WriteConsistency string
|
||||||
UnsafeSsl bool
|
UnsafeSsl bool
|
||||||
Proxy func(req *http.Request) (*url.URL, error)
|
Proxy func(req *http.Request) (*url.URL, error)
|
||||||
|
TLS *tls.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig will create a config to be used in connecting to the client
|
// NewConfig will create a config to be used in connecting to the client
|
||||||
|
@ -154,9 +155,11 @@ const (
|
||||||
|
|
||||||
// NewClient will instantiate and return a connected client to issue commands to the server.
|
// NewClient will instantiate and return a connected client to issue commands to the server.
|
||||||
func NewClient(c Config) (*Client, error) {
|
func NewClient(c Config) (*Client, error) {
|
||||||
tlsConfig := &tls.Config{
|
tlsConfig := new(tls.Config)
|
||||||
InsecureSkipVerify: c.UnsafeSsl,
|
if c.TLS != nil {
|
||||||
|
tlsConfig = c.TLS.Clone()
|
||||||
}
|
}
|
||||||
|
tlsConfig.InsecureSkipVerify = c.UnsafeSsl
|
||||||
|
|
||||||
tr := &http.Transport{
|
tr := &http.Transport{
|
||||||
Proxy: c.Proxy,
|
Proxy: c.Proxy,
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/influxdata/influxdb/logger"
|
"github.com/influxdata/influxdb/logger"
|
||||||
"github.com/influxdata/influxdb/monitor"
|
"github.com/influxdata/influxdb/monitor"
|
||||||
"github.com/influxdata/influxdb/monitor/diagnostics"
|
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||||
|
"github.com/influxdata/influxdb/pkg/tlsconfig"
|
||||||
"github.com/influxdata/influxdb/services/collectd"
|
"github.com/influxdata/influxdb/services/collectd"
|
||||||
"github.com/influxdata/influxdb/services/continuous_querier"
|
"github.com/influxdata/influxdb/services/continuous_querier"
|
||||||
"github.com/influxdata/influxdb/services/graphite"
|
"github.com/influxdata/influxdb/services/graphite"
|
||||||
|
@ -62,6 +63,9 @@ type Config struct {
|
||||||
|
|
||||||
// BindAddress is the address that all TCP services use (Raft, Snapshot, Cluster, etc.)
|
// BindAddress is the address that all TCP services use (Raft, Snapshot, Cluster, etc.)
|
||||||
BindAddress string `toml:"bind-address"`
|
BindAddress string `toml:"bind-address"`
|
||||||
|
|
||||||
|
// TLS provides configuration options for all https endpoints.
|
||||||
|
TLS tlsconfig.Config `toml:"tls"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig returns an instance of Config with reasonable defaults.
|
// NewConfig returns an instance of Config with reasonable defaults.
|
||||||
|
@ -188,6 +192,10 @@ func (c *Config) Validate() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := c.TLS.Validate(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,9 @@ enabled = true
|
||||||
|
|
||||||
[continuous_queries]
|
[continuous_queries]
|
||||||
enabled = true
|
enabled = true
|
||||||
|
|
||||||
|
[tls]
|
||||||
|
ciphers = ["cipher"]
|
||||||
`); err != nil {
|
`); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -97,6 +100,8 @@ enabled = true
|
||||||
t.Fatalf("unexpected subscriber enabled: %v", c.Subscriber.Enabled)
|
t.Fatalf("unexpected subscriber enabled: %v", c.Subscriber.Enabled)
|
||||||
} else if !c.ContinuousQuery.Enabled {
|
} else if !c.ContinuousQuery.Enabled {
|
||||||
t.Fatalf("unexpected continuous query enabled: %v", c.ContinuousQuery.Enabled)
|
t.Fatalf("unexpected continuous query enabled: %v", c.ContinuousQuery.Enabled)
|
||||||
|
} else if c.TLS.Ciphers[0] != "cipher" {
|
||||||
|
t.Fatalf("unexpected tls: %q", c.TLS.Ciphers)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +155,9 @@ enabled = true
|
||||||
|
|
||||||
[continuous_queries]
|
[continuous_queries]
|
||||||
enabled = true
|
enabled = true
|
||||||
|
|
||||||
|
[tls]
|
||||||
|
min-version = "tls1.0"
|
||||||
`, &c); err != nil {
|
`, &c); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -179,6 +187,8 @@ enabled = true
|
||||||
case "INFLUXDB_COORDINATOR_QUERY_TIMEOUT":
|
case "INFLUXDB_COORDINATOR_QUERY_TIMEOUT":
|
||||||
// duration type
|
// duration type
|
||||||
return "1m"
|
return "1m"
|
||||||
|
case "INFLUXDB_TLS_MIN_VERSION":
|
||||||
|
return "tls1.2"
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
@ -226,6 +236,10 @@ enabled = true
|
||||||
if c.Coordinator.QueryTimeout != influxtoml.Duration(time.Minute) {
|
if c.Coordinator.QueryTimeout != influxtoml.Duration(time.Minute) {
|
||||||
t.Fatalf("unexpected query timeout: %v", c.Coordinator.QueryTimeout)
|
t.Fatalf("unexpected query timeout: %v", c.Coordinator.QueryTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.TLS.MinVersion != "tls1.2" {
|
||||||
|
t.Fatalf("unexpected tls min version: %q", c.TLS.MinVersion)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfig_ValidateNoServiceConfigured(t *testing.T) {
|
func TestConfig_ValidateNoServiceConfigured(t *testing.T) {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package run
|
package run
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
@ -101,8 +102,30 @@ type Server struct {
|
||||||
config *Config
|
config *Config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateTLSConfig stores with into the tls config pointed at by into but only if with is not nil
|
||||||
|
// and into is nil. Think of it as setting the default value.
|
||||||
|
func updateTLSConfig(into **tls.Config, with *tls.Config) {
|
||||||
|
if with != nil && into != nil && *into == nil {
|
||||||
|
*into = with
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewServer returns a new instance of Server built from a config.
|
// NewServer returns a new instance of Server built from a config.
|
||||||
func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
||||||
|
// First grab the base tls config we will use for all clients and servers
|
||||||
|
tlsConfig, err := c.TLS.Parse()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("tls configuration: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the TLS values on each of the configs to be the parsed one if
|
||||||
|
// not already specified (set the default).
|
||||||
|
updateTLSConfig(&c.HTTPD.TLS, tlsConfig)
|
||||||
|
updateTLSConfig(&c.Subscriber.TLS, tlsConfig)
|
||||||
|
for i := range c.OpenTSDBInputs {
|
||||||
|
updateTLSConfig(&c.OpenTSDBInputs[i].TLS, tlsConfig)
|
||||||
|
}
|
||||||
|
|
||||||
// We need to ensure that a meta directory always exists even if
|
// We need to ensure that a meta directory always exists even if
|
||||||
// we don't start the meta store. node.json is always stored under
|
// we don't start the meta store. node.json is always stored under
|
||||||
// the meta directory.
|
// the meta directory.
|
||||||
|
@ -122,7 +145,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := influxdb.LoadNode(c.Meta.Dir)
|
_, err = influxdb.LoadNode(c.Meta.Dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -487,7 +487,7 @@
|
||||||
# bind-address = ":8089"
|
# bind-address = ":8089"
|
||||||
# database = "udp"
|
# database = "udp"
|
||||||
# retention-policy = ""
|
# retention-policy = ""
|
||||||
|
|
||||||
# InfluxDB precision for timestamps on received points ("" or "n", "u", "ms", "s", "m", "h")
|
# InfluxDB precision for timestamps on received points ("" or "n", "u", "ms", "s", "m", "h")
|
||||||
# precision = ""
|
# precision = ""
|
||||||
|
|
||||||
|
@ -525,3 +525,27 @@
|
||||||
|
|
||||||
# interval for how often continuous queries will be checked if they need to run
|
# interval for how often continuous queries will be checked if they need to run
|
||||||
# run-interval = "1s"
|
# run-interval = "1s"
|
||||||
|
|
||||||
|
###
|
||||||
|
### [tls]
|
||||||
|
###
|
||||||
|
### Global configuration settings for TLS in InfluxDB.
|
||||||
|
###
|
||||||
|
|
||||||
|
[tls]
|
||||||
|
# Determines the available set of cipher suites. See https://golang.org/pkg/crypto/tls/#pkg-constants
|
||||||
|
# for a list of available ciphers, which depends on the version of Go (use the query
|
||||||
|
# SHOW DIAGNOSTICS to see the version of Go used to build InfluxDB). If not specified, uses
|
||||||
|
# the default settings from Go's crypto/tls package.
|
||||||
|
# ciphers = [
|
||||||
|
# "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
|
||||||
|
# "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
|
||||||
|
# ]
|
||||||
|
|
||||||
|
# Minimum version of the tls protocol that will be negotiated. If not specified, uses the
|
||||||
|
# default settings from Go's crypto/tls package.
|
||||||
|
# min-version = "tls1.2"
|
||||||
|
|
||||||
|
# Maximum version of the tls protocol that will be negotiated. If not specified, uses the
|
||||||
|
# default settings from Go's crypto/tls package.
|
||||||
|
# max-version = "tls1.2"
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
package tlsconfig
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Ciphers []string `toml:"ciphers"`
|
||||||
|
MinVersion string `toml:"min-version"`
|
||||||
|
MaxVersion string `toml:"max-version"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConfig() Config {
|
||||||
|
return Config{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Config) Validate() error {
|
||||||
|
_, err := c.Parse()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Config) Parse() (out *tls.Config, err error) {
|
||||||
|
if len(c.Ciphers) > 0 {
|
||||||
|
if out == nil {
|
||||||
|
out = new(tls.Config)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, name := range c.Ciphers {
|
||||||
|
cipher, ok := ciphersMap[strings.ToUpper(name)]
|
||||||
|
if !ok {
|
||||||
|
return nil, unknownCipher(name)
|
||||||
|
}
|
||||||
|
out.CipherSuites = append(out.CipherSuites, cipher)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.MinVersion != "" {
|
||||||
|
if out == nil {
|
||||||
|
out = new(tls.Config)
|
||||||
|
}
|
||||||
|
|
||||||
|
version, ok := versionsMap[strings.ToUpper(c.MinVersion)]
|
||||||
|
if !ok {
|
||||||
|
return nil, unknownVersion(c.MinVersion)
|
||||||
|
}
|
||||||
|
out.MinVersion = version
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.MaxVersion != "" {
|
||||||
|
if out == nil {
|
||||||
|
out = new(tls.Config)
|
||||||
|
}
|
||||||
|
|
||||||
|
version, ok := versionsMap[strings.ToUpper(c.MaxVersion)]
|
||||||
|
if !ok {
|
||||||
|
return nil, unknownVersion(c.MaxVersion)
|
||||||
|
}
|
||||||
|
out.MaxVersion = version
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var ciphersMap = map[string]uint16{
|
||||||
|
"TLS_RSA_WITH_RC4_128_SHA": tls.TLS_RSA_WITH_RC4_128_SHA,
|
||||||
|
"TLS_RSA_WITH_3DES_EDE_CBC_SHA": tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
|
||||||
|
"TLS_RSA_WITH_AES_128_CBC_SHA": tls.TLS_RSA_WITH_AES_128_CBC_SHA,
|
||||||
|
"TLS_RSA_WITH_AES_256_CBC_SHA": tls.TLS_RSA_WITH_AES_256_CBC_SHA,
|
||||||
|
"TLS_RSA_WITH_AES_128_CBC_SHA256": tls.TLS_RSA_WITH_AES_128_CBC_SHA256,
|
||||||
|
"TLS_RSA_WITH_AES_128_GCM_SHA256": tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
|
||||||
|
"TLS_RSA_WITH_AES_256_GCM_SHA384": tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
|
||||||
|
"TLS_ECDHE_ECDSA_WITH_RC4_128_SHA": tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA,
|
||||||
|
"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA": tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
|
||||||
|
"TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA": tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
|
||||||
|
"TLS_ECDHE_RSA_WITH_RC4_128_SHA": tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA,
|
||||||
|
"TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA": tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
|
||||||
|
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA": tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
|
||||||
|
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA": tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
|
||||||
|
"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256": tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,
|
||||||
|
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256": tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,
|
||||||
|
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256": tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||||
|
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256": tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||||
|
"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384": tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||||
|
"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384": tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||||
|
"TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305": tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
|
||||||
|
"TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305": tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
|
||||||
|
}
|
||||||
|
|
||||||
|
func unknownCipher(name string) error {
|
||||||
|
available := make([]string, 0, len(ciphersMap))
|
||||||
|
for name := range ciphersMap {
|
||||||
|
available = append(available, name)
|
||||||
|
}
|
||||||
|
sort.Strings(available)
|
||||||
|
|
||||||
|
return fmt.Errorf("unknown cipher suite: %q. available ciphers: %s",
|
||||||
|
name, strings.Join(available, ", "))
|
||||||
|
}
|
||||||
|
|
||||||
|
var versionsMap = map[string]uint16{
|
||||||
|
"SSL3.0": tls.VersionSSL30,
|
||||||
|
"TLS1.0": tls.VersionTLS10,
|
||||||
|
"1.0": tls.VersionTLS11,
|
||||||
|
"TLS1.1": tls.VersionTLS11,
|
||||||
|
"1.1": tls.VersionTLS11,
|
||||||
|
"TLS1.2": tls.VersionTLS12,
|
||||||
|
"1.2": tls.VersionTLS12,
|
||||||
|
}
|
||||||
|
|
||||||
|
func unknownVersion(name string) error {
|
||||||
|
available := make([]string, 0, len(versionsMap))
|
||||||
|
for name := range versionsMap {
|
||||||
|
// skip the ones that just begin with a number. they may be confusing
|
||||||
|
// due to the duplication, and just help if the user specifies without
|
||||||
|
// the TLS part.
|
||||||
|
if name[0] == '1' {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
available = append(available, name)
|
||||||
|
}
|
||||||
|
sort.Strings(available)
|
||||||
|
|
||||||
|
return fmt.Errorf("unknown tls version: %q. available versions: %s",
|
||||||
|
name, strings.Join(available, ", "))
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package httpd
|
package httpd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/monitor/diagnostics"
|
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||||
|
@ -50,6 +51,7 @@ type Config struct {
|
||||||
MaxConcurrentWriteLimit int `toml:"max-concurrent-write-limit"`
|
MaxConcurrentWriteLimit int `toml:"max-concurrent-write-limit"`
|
||||||
MaxEnqueuedWriteLimit int `toml:"max-enqueued-write-limit"`
|
MaxEnqueuedWriteLimit int `toml:"max-enqueued-write-limit"`
|
||||||
EnqueuedWriteTimeout time.Duration `toml:"enqueued-write-timeout"`
|
EnqueuedWriteTimeout time.Duration `toml:"enqueued-write-timeout"`
|
||||||
|
TLS *tls.Config `toml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig returns a new Config with default settings.
|
// NewConfig returns a new Config with default settings.
|
||||||
|
|
|
@ -46,13 +46,14 @@ const (
|
||||||
|
|
||||||
// Service manages the listener and handler for an HTTP endpoint.
|
// Service manages the listener and handler for an HTTP endpoint.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
ln net.Listener
|
ln net.Listener
|
||||||
addr string
|
addr string
|
||||||
https bool
|
https bool
|
||||||
cert string
|
cert string
|
||||||
key string
|
key string
|
||||||
limit int
|
limit int
|
||||||
err chan error
|
tlsConfig *tls.Config
|
||||||
|
err chan error
|
||||||
|
|
||||||
unixSocket bool
|
unixSocket bool
|
||||||
unixSocketPerm uint32
|
unixSocketPerm uint32
|
||||||
|
@ -73,6 +74,7 @@ func NewService(c Config) *Service {
|
||||||
cert: c.HTTPSCertificate,
|
cert: c.HTTPSCertificate,
|
||||||
key: c.HTTPSPrivateKey,
|
key: c.HTTPSPrivateKey,
|
||||||
limit: c.MaxConnectionLimit,
|
limit: c.MaxConnectionLimit,
|
||||||
|
tlsConfig: c.TLS,
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
unixSocket: c.UnixSocketEnabled,
|
unixSocket: c.UnixSocketEnabled,
|
||||||
unixSocketPerm: uint32(c.UnixSocketPermissions),
|
unixSocketPerm: uint32(c.UnixSocketPermissions),
|
||||||
|
@ -80,6 +82,9 @@ func NewService(c Config) *Service {
|
||||||
Handler: NewHandler(c),
|
Handler: NewHandler(c),
|
||||||
Logger: zap.NewNop(),
|
Logger: zap.NewNop(),
|
||||||
}
|
}
|
||||||
|
if s.tlsConfig == nil {
|
||||||
|
s.tlsConfig = new(tls.Config)
|
||||||
|
}
|
||||||
if s.key == "" {
|
if s.key == "" {
|
||||||
s.key = s.cert
|
s.key = s.cert
|
||||||
}
|
}
|
||||||
|
@ -103,9 +108,10 @@ func (s *Service) Open() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
listener, err := tls.Listen("tcp", s.addr, &tls.Config{
|
tlsConfig := s.tlsConfig.Clone()
|
||||||
Certificates: []tls.Certificate{cert},
|
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||||
})
|
|
||||||
|
listener, err := tls.Listen("tcp", s.addr, tlsConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package opentsdb
|
package opentsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/monitor/diagnostics"
|
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||||
|
@ -46,6 +47,7 @@ type Config struct {
|
||||||
BatchPending int `toml:"batch-pending"`
|
BatchPending int `toml:"batch-pending"`
|
||||||
BatchTimeout toml.Duration `toml:"batch-timeout"`
|
BatchTimeout toml.Duration `toml:"batch-timeout"`
|
||||||
LogPointErrors bool `toml:"log-point-errors"`
|
LogPointErrors bool `toml:"log-point-errors"`
|
||||||
|
TLS *tls.Config `toml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig returns a new config for the service.
|
// NewConfig returns a new config for the service.
|
||||||
|
|
|
@ -47,9 +47,10 @@ type Service struct {
|
||||||
ln net.Listener // main listener
|
ln net.Listener // main listener
|
||||||
httpln *chanListener // http channel-based listener
|
httpln *chanListener // http channel-based listener
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
tls bool
|
tls bool
|
||||||
cert string
|
tlsConfig *tls.Config
|
||||||
|
cert string
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
ready bool // Has the required database been created?
|
ready bool // Has the required database been created?
|
||||||
|
@ -86,6 +87,7 @@ func NewService(c Config) (*Service, error) {
|
||||||
|
|
||||||
s := &Service{
|
s := &Service{
|
||||||
tls: d.TLSEnabled,
|
tls: d.TLSEnabled,
|
||||||
|
tlsConfig: d.TLS,
|
||||||
cert: d.Certificate,
|
cert: d.Certificate,
|
||||||
BindAddress: d.BindAddress,
|
BindAddress: d.BindAddress,
|
||||||
Database: d.Database,
|
Database: d.Database,
|
||||||
|
@ -98,6 +100,10 @@ func NewService(c Config) (*Service, error) {
|
||||||
stats: &Statistics{},
|
stats: &Statistics{},
|
||||||
defaultTags: models.StatisticTags{"bind": d.BindAddress},
|
defaultTags: models.StatisticTags{"bind": d.BindAddress},
|
||||||
}
|
}
|
||||||
|
if s.tlsConfig == nil {
|
||||||
|
s.tlsConfig = new(tls.Config)
|
||||||
|
}
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,9 +133,10 @@ func (s *Service) Open() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
listener, err := tls.Listen("tcp", s.BindAddress, &tls.Config{
|
tlsConfig := s.tlsConfig.Clone()
|
||||||
Certificates: []tls.Certificate{cert},
|
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||||
})
|
|
||||||
|
listener, err := tls.Listen("tcp", s.BindAddress, tlsConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package subscriber
|
package subscriber
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
@ -42,6 +43,9 @@ type Config struct {
|
||||||
|
|
||||||
// The number of in-flight writes buffered in the write channel.
|
// The number of in-flight writes buffered in the write channel.
|
||||||
WriteBufferSize int `toml:"write-buffer-size"`
|
WriteBufferSize int `toml:"write-buffer-size"`
|
||||||
|
|
||||||
|
// TLS is a base tls config to use for https clients.
|
||||||
|
TLS *tls.Config `toml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig returns a new instance of a subscriber config.
|
// NewConfig returns a new instance of a subscriber config.
|
||||||
|
|
|
@ -17,12 +17,12 @@ type HTTP struct {
|
||||||
|
|
||||||
// NewHTTP returns a new HTTP points writer with default options.
|
// NewHTTP returns a new HTTP points writer with default options.
|
||||||
func NewHTTP(addr string, timeout time.Duration) (*HTTP, error) {
|
func NewHTTP(addr string, timeout time.Duration) (*HTTP, error) {
|
||||||
return NewHTTPS(addr, timeout, false, "")
|
return NewHTTPS(addr, timeout, false, "", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured.
|
// NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured.
|
||||||
func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string) (*HTTP, error) {
|
func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string, tlsConfig *tls.Config) (*HTTP, error) {
|
||||||
tlsConfig, err := createTLSConfig(caCerts)
|
tlsConfig, err := createTLSConfig(caCerts, tlsConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -54,22 +54,28 @@ func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTLSConfig(caCerts string) (*tls.Config, error) {
|
func createTLSConfig(caCerts string, tlsConfig *tls.Config) (*tls.Config, error) {
|
||||||
if caCerts == "" {
|
if caCerts == "" {
|
||||||
|
if tlsConfig != nil {
|
||||||
|
return tlsConfig.Clone(), nil
|
||||||
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return loadCaCerts(caCerts)
|
return loadCaCerts(caCerts, tlsConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadCaCerts(caCerts string) (*tls.Config, error) {
|
func loadCaCerts(caCerts string, tlsConfig *tls.Config) (*tls.Config, error) {
|
||||||
caCert, err := ioutil.ReadFile(caCerts)
|
caCert, err := ioutil.ReadFile(caCerts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
caCertPool := x509.NewCertPool()
|
|
||||||
caCertPool.AppendCertsFromPEM(caCert)
|
|
||||||
|
|
||||||
return &tls.Config{
|
out := new(tls.Config)
|
||||||
RootCAs: caCertPool,
|
if tlsConfig != nil {
|
||||||
}, nil
|
out = tlsConfig.Clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
out.RootCAs = x509.NewCertPool()
|
||||||
|
out.RootCAs.AppendCertsFromPEM(caCert)
|
||||||
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -348,7 +348,7 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) {
|
||||||
if s.conf.InsecureSkipVerify {
|
if s.conf.InsecureSkipVerify {
|
||||||
s.Logger.Warn("'insecure-skip-verify' is true. This will skip all certificate verifications.")
|
s.Logger.Warn("'insecure-skip-verify' is true. This will skip all certificate verifications.")
|
||||||
}
|
}
|
||||||
return NewHTTPS(u.String(), time.Duration(s.conf.HTTPTimeout), s.conf.InsecureSkipVerify, s.conf.CaCerts)
|
return NewHTTPS(u.String(), time.Duration(s.conf.HTTPTimeout), s.conf.InsecureSkipVerify, s.conf.CaCerts, s.conf.TLS)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme)
|
return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue