From 4d809a8920e3dd74951783b308dff4029fd77e13 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 16 Apr 2015 14:28:55 -0600 Subject: [PATCH] no more port collisions --- cmd/influxd/config.go | 16 ++---------- cmd/influxd/config_test.go | 4 +-- cmd/influxd/run.go | 27 ++++++++++++++++--- cmd/influxd/server_integration_test.go | 22 +++++++--------- opentsdb/opentsdb.go | 36 ++++++++++++++++++-------- 5 files changed, 62 insertions(+), 43 deletions(-) diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 50f37e1857..9c2c565be8 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -16,7 +16,6 @@ import ( "github.com/BurntSushi/toml" "github.com/influxdb/influxdb/collectd" "github.com/influxdb/influxdb/graphite" - "github.com/influxdb/influxdb/opentsdb" ) const ( @@ -525,17 +524,6 @@ func (o OpenTSDB) DatabaseString() string { return o.Database } -func (o OpenTSDB) ListenAddress(defaultBindAddr string) string { - addr := o.Addr - // If no address specified, use default. - if addr == "" { - addr = defaultBindAddr - } - - port := o.Port - // If no port specified, use default. - if port == 0 { - port = opentsdb.DefaultPort - } - return net.JoinHostPort(addr, strconv.Itoa(port)) +func (o OpenTSDB) ListenAddress() string { + return net.JoinHostPort(o.Addr, strconv.Itoa(o.Port)) } diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 19282df745..25f982e1bd 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -236,8 +236,8 @@ func TestParseConfig(t *testing.T) { switch { case c.OpenTSDB.Enabled != true: t.Errorf("opentsdb enabled mismatch: expected: %v, got %v", true, c.OpenTSDB.Enabled) - case c.OpenTSDB.ListenAddress(c.BindAddress) != "192.168.0.3:4242": - t.Errorf("opentsdb listen address mismatch: expected %v, got %v", "192.168.0.3:4242", c.OpenTSDB.ListenAddress(c.BindAddress)) + case c.OpenTSDB.ListenAddress() != "192.168.0.3:4242": + t.Errorf("opentsdb listen address mismatch: expected %v, got %v", "192.168.0.3:4242", c.OpenTSDB.ListenAddress()) case c.OpenTSDB.DatabaseString() != "opentsdb_database": t.Errorf("opentsdb database mismatch: expected %v, got %v", "opentsdb_database", c.OpenTSDB.DatabaseString()) case c.OpenTSDB.RetentionPolicy != "raw": diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 6c4fc824fb..bb138f8975 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -48,6 +48,7 @@ type Node struct { clusterListener net.Listener // The cluster TCP listener apiListener net.Listener // The API TCP listener GraphiteServers []graphite.Server // The Graphite Servers + OpenTSDBServer *opentsdb.Server // The OpenTSDB Server } func (s *Node) ClusterAddr() net.Addr { @@ -97,6 +98,12 @@ func (s *Node) Close() error { } } + if s.OpenTSDBServer != nil { + if err := s.OpenTSDBServer.Close(); err != nil { + return err + } + } + if s.DataNode != nil { if err := s.DataNode.Close(); err != nil { return err @@ -273,13 +280,24 @@ func (cmd *RunCommand) Run(args ...string) error { func (cmd *RunCommand) CheckConfig() { // Set any defaults that aren't set // TODO: bring more defaults in here instead of letting helpers do it + + // Normalize Graphite configs for i, _ := range cmd.config.Graphites { - if cmd.config.Graphites[i].Port == 0 { - cmd.config.Graphites[i].Port = graphite.DefaultGraphitePort - } if cmd.config.Graphites[i].BindAddress == "" { cmd.config.Graphites[i].BindAddress = cmd.config.BindAddress } + if cmd.config.Graphites[i].Port == 0 { + cmd.config.Graphites[i].Port = graphite.DefaultGraphitePort + } + } + + // Normalize openTSDB config + if cmd.config.OpenTSDB.Addr == "" { + cmd.config.OpenTSDB.Addr = cmd.config.BindAddress + } + + if cmd.config.OpenTSDB.Port == 0 { + cmd.config.OpenTSDB.Port = opentsdb.DefaultPort } // Validate that we have a sane config @@ -418,7 +436,7 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node { if config.OpenTSDB.Enabled { o := config.OpenTSDB db := o.DatabaseString() - laddr := o.ListenAddress(config.BindAddress) + laddr := o.ListenAddress() policy := o.RetentionPolicy if err := s.CreateDatabaseIfNotExists(db); err != nil { @@ -437,6 +455,7 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node { log.Println("Starting OpenTSDB service on", laddr) go os.ListenAndServe(laddr) + cmd.node.OpenTSDBServer = os } // Start up self-monitoring if enabled. diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index a4d8b9d3d3..eb2ac90b4b 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1823,7 +1823,6 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) { } func Test_ServerOpenTSDBIntegration(t *testing.T) { - t.Skip() t.Parallel() if testing.Short() { t.Skip() @@ -1834,14 +1833,14 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) { now := time.Now().UTC().Round(time.Second) c, _ := main.NewTestConfig() o := main.OpenTSDB{ - Port: 4242, + Port: 0, Enabled: true, Database: "opentsdb", RetentionPolicy: "raw", } c.OpenTSDB = o - t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress)) + t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress()) nodes := createCombinedNodeCluster(t, testName, dir, nNodes, c) defer nodes.Close() @@ -1849,7 +1848,8 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) { createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up - conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) + host := nodes[0].node.OpenTSDBServer.Addr().String() + conn, err := net.Dial("tcp", host) if err != nil { t.Fatal(err) return @@ -1876,7 +1876,6 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) { } func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) { - t.Skip() t.Parallel() if testing.Short() { t.Skip() @@ -1896,7 +1895,7 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) { } c.OpenTSDB = o - t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress)) + t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress()) nodes := createCombinedNodeCluster(t, testName, dir, nNodes, c) defer nodes.Close() @@ -1904,7 +1903,8 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) { createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up - conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) + host := nodes[0].node.OpenTSDBServer.Addr().String() + conn, err := net.Dial("tcp", host) if err != nil { t.Fatal(err) return @@ -1934,7 +1934,6 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) { } func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) { - t.Skip() t.Parallel() if testing.Short() { t.Skip() @@ -1954,7 +1953,7 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) { } c.OpenTSDB = o - t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress)) + t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress()) nodes := createCombinedNodeCluster(t, testName, dir, nNodes, c) defer nodes.Close() @@ -1962,7 +1961,8 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) { createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up - conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) + host := nodes[0].node.OpenTSDBServer.Addr().String() + conn, err := net.Dial("tcp", host) if err != nil { t.Fatal(err) return @@ -1990,7 +1990,6 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) { } func TestSeparateBrokerDataNode(t *testing.T) { - t.Skip() t.Parallel() testName := "TestSeparateBrokerDataNode" if testing.Short() { @@ -2042,7 +2041,6 @@ func TestSeparateBrokerDataNode(t *testing.T) { } func TestSeparateBrokerTwoDataNodes(t *testing.T) { - t.Skip() t.Parallel() testName := "TestSeparateBrokerTwoDataNodes" if testing.Short() { diff --git a/opentsdb/opentsdb.go b/opentsdb/opentsdb.go index 512d3d6f2f..eba7bf742f 100644 --- a/opentsdb/opentsdb.go +++ b/opentsdb/opentsdb.go @@ -47,8 +47,13 @@ func NewServer(w SeriesWriter, retpol string, db string) *Server { return s } +func (s *Server) Addr() net.Addr { + return s.listener.Addr() +} + func (s *Server) ListenAndServe(listenAddress string) { var err error + log.Print(listenAddress) addr, err := net.ResolveTCPAddr("tcp4", listenAddress) if err != nil { @@ -62,19 +67,28 @@ func (s *Server) ListenAndServe(listenAddress string) { return } - defer s.listener.Close() - s.HandleListener(s.listener) + go func() { + for { + if s.listener == nil { + return + } + conn, err := s.listener.Accept() + if err != nil { + log.Println("Error accepting: ", err.Error()) + continue + } + go s.HandleConnection(conn) + } + }() } -func (s *Server) HandleListener(socket *net.TCPListener) { - for { - // Listen for an incoming connection. - conn, err := socket.Accept() - if err != nil { - log.Println("Error accepting: ", err.Error()) - } - // Handle connections in a new goroutine. - go s.HandleConnection(conn) +func (s *Server) Close() error { + if s.listener != nil { + err := s.listener.Close() + s.listener = nil + return err + } else { + return nil } }