From 6b5a452f8a54005803b780c2637ff89102757137 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 30 Mar 2015 12:13:53 -0700 Subject: [PATCH] Use a channel to sync UDP shutdown --- CHANGELOG.md | 5 +++++ collectd/collectd.go | 39 ++++++++++++++++++++++----------------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58c3393554..389cc3ec4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## v0.9.0-rc18 [Unreleased] + +### Bugfixes +- [#2100](https://github.com/influxdb/influxdb/pull/2100): Use channel to synchronize collectd shutdown. + ## v0.9.0-rc17 [2015-03-29] ### Features diff --git a/collectd/collectd.go b/collectd/collectd.go index f780ad1499..47cd612112 100644 --- a/collectd/collectd.go +++ b/collectd/collectd.go @@ -21,8 +21,8 @@ type SeriesWriter interface { } type Server struct { - mu sync.Mutex - wg sync.WaitGroup + wg sync.WaitGroup + done chan struct{} conn *net.UDPConn @@ -34,6 +34,7 @@ type Server struct { func NewServer(w SeriesWriter, typesDBPath string) *Server { s := Server{ + done: make(chan struct{}), writer: w, typesdbpath: typesDBPath, typesdb: make(gollectd.Types), @@ -66,12 +67,12 @@ func ListenAndServe(s *Server, iface string) error { s.conn = conn s.wg.Add(1) - go s.serve(conn) + go s.serve(s.done) return nil } -func (s *Server) serve(conn *net.UDPConn) { +func (s *Server) serve(done chan struct{}) { defer s.wg.Done() // From https://collectd.org/wiki/index.php/Binary_protocol @@ -85,18 +86,22 @@ func (s *Server) serve(conn *net.UDPConn) { buffer := make([]byte, 1452) for { - n, _, err := conn.ReadFromUDP(buffer) - if err != nil && s.conn != nil { + select { + case <-s.done: + // We closed the connection, time to go. + return + default: + // Keep processing. + } + + n, _, err := s.conn.ReadFromUDP(buffer) + if err != nil { log.Printf("Collectd ReadFromUDP error: %s", err) continue } if n > 0 { s.handleMessage(buffer[:n]) } - if s.conn == nil { - // we closed the connection, time to go - return - } } } @@ -121,19 +126,19 @@ func (s *Server) handleMessage(buffer []byte) { // Close shuts down the server's listeners. func (s *Server) Close() error { - // Notify other goroutines of shutdown. - s.mu.Lock() - defer s.mu.Unlock() if s.conn == nil { return errors.New("server already closed") } + + // Close the connection, and wait for the goroutine to exit. s.conn.Close() - s.conn = nil - - // Wait for all goroutines to shutdown. + close(s.done) s.wg.Wait() - log.Printf("all waitgroups finished") + // Release all remaining resources. + s.done = nil + s.conn = nil + log.Println("collectd UDP closed") return nil }