From 6cf78206f05e757d62495d4919f8a1b1a217c78b Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 5 Jun 2015 16:25:21 -0600 Subject: [PATCH] refactor run.Server to use tcp.Mux --- cmd/influxd/run/server.go | 41 +++++++++++++++++++++++++++++++++++++-- meta/store_test.go | 2 +- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 363964f398..eb36cbae33 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -2,6 +2,7 @@ package run import ( "fmt" + "net" "time" "github.com/influxdb/influxdb/cluster" @@ -13,6 +14,7 @@ import ( "github.com/influxdb/influxdb/services/opentsdb" "github.com/influxdb/influxdb/services/retention" "github.com/influxdb/influxdb/services/udp" + "github.com/influxdb/influxdb/tcp" "github.com/influxdb/influxdb/tsdb" ) @@ -23,6 +25,10 @@ type Server struct { err chan error closing chan struct{} + Hostname string + BindAddress string + Listener net.Listener + MetaStore *meta.Store TSDBStore *tsdb.Store QueryExecutor *tsdb.QueryExecutor @@ -36,8 +42,12 @@ type Server struct { func NewServer(c *Config) *Server { // Construct base meta store and data store. s := &Server{ - err: make(chan error), - closing: make(chan struct{}), + err: make(chan error), + closing: make(chan struct{}), + + Hostname: c.Meta.Hostname, + BindAddress: c.Meta.BindAddress, + MetaStore: meta.NewStore(c.Meta), TSDBStore: tsdb.NewStore(c.Data.Dir), } @@ -146,6 +156,30 @@ func (s *Server) Err() <-chan error { return s.err } // Open opens the meta and data store and all services. func (s *Server) Open() error { if err := func() error { + // Resolve host to address. + _, port, err := net.SplitHostPort(s.BindAddress) + if err != nil { + return fmt.Errorf("split bind address: %s", err) + } + hostport := net.JoinHostPort(s.Hostname, port) + addr, err := net.ResolveTCPAddr("tcp", hostport) + if err != nil { + return fmt.Errorf("resolve tcp: addr=%s, err=%s", hostport, err) + } + s.MetaStore.Addr = addr + + // Open shared TCP connection. + ln, err := net.Listen("tcp", s.BindAddress) + if err != nil { + return fmt.Errorf("listen: %s", err) + } + s.Listener = ln + + // Multiplex listener. + mux := tcp.NewMux() + s.MetaStore.RaftListener = mux.Listen(meta.MuxRaftHeader) + s.MetaStore.ExecListener = mux.Listen(meta.MuxExecHeader) + // Open meta store. if err := s.MetaStore.Open(); err != nil { return fmt.Errorf("open meta store: %s", err) @@ -178,6 +212,9 @@ func (s *Server) Open() error { // Close shuts down the meta and data stores and all services. func (s *Server) Close() error { + if s.Listener != nil { + s.Listener.Close() + } if s.MetaStore != nil { s.MetaStore.Close() } diff --git a/meta/store_test.go b/meta/store_test.go index f7b399d73b..abb611a8ac 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -31,7 +31,7 @@ func TestStore_Open_ErrStoreOpen(t *testing.T) { s := MustOpenStore() defer s.Close() - if err := s.Open(); err != meta.ErrStoreOpen { + if err := s.Store.Open(); err != meta.ErrStoreOpen { t.Fatalf("unexpected error: %s", err) } }