Use a channel to sync UDP shutdown
parent
c326a9c9c9
commit
6b5a452f8a
|
@ -1,3 +1,8 @@
|
|||
## v0.9.0-rc18 [Unreleased]
|
||||
|
||||
### Bugfixes
|
||||
- [#2100](https://github.com/influxdb/influxdb/pull/2100): Use channel to synchronize collectd shutdown.
|
||||
|
||||
## v0.9.0-rc17 [2015-03-29]
|
||||
|
||||
### Features
|
||||
|
|
|
@ -21,8 +21,8 @@ type SeriesWriter interface {
|
|||
}
|
||||
|
||||
type Server struct {
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
||||
conn *net.UDPConn
|
||||
|
||||
|
@ -34,6 +34,7 @@ type Server struct {
|
|||
|
||||
func NewServer(w SeriesWriter, typesDBPath string) *Server {
|
||||
s := Server{
|
||||
done: make(chan struct{}),
|
||||
writer: w,
|
||||
typesdbpath: typesDBPath,
|
||||
typesdb: make(gollectd.Types),
|
||||
|
@ -66,12 +67,12 @@ func ListenAndServe(s *Server, iface string) error {
|
|||
s.conn = conn
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.serve(conn)
|
||||
go s.serve(s.done)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) serve(conn *net.UDPConn) {
|
||||
func (s *Server) serve(done chan struct{}) {
|
||||
defer s.wg.Done()
|
||||
|
||||
// From https://collectd.org/wiki/index.php/Binary_protocol
|
||||
|
@ -85,18 +86,22 @@ func (s *Server) serve(conn *net.UDPConn) {
|
|||
buffer := make([]byte, 1452)
|
||||
|
||||
for {
|
||||
n, _, err := conn.ReadFromUDP(buffer)
|
||||
if err != nil && s.conn != nil {
|
||||
select {
|
||||
case <-s.done:
|
||||
// We closed the connection, time to go.
|
||||
return
|
||||
default:
|
||||
// Keep processing.
|
||||
}
|
||||
|
||||
n, _, err := s.conn.ReadFromUDP(buffer)
|
||||
if err != nil {
|
||||
log.Printf("Collectd ReadFromUDP error: %s", err)
|
||||
continue
|
||||
}
|
||||
if n > 0 {
|
||||
s.handleMessage(buffer[:n])
|
||||
}
|
||||
if s.conn == nil {
|
||||
// we closed the connection, time to go
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,19 +126,19 @@ func (s *Server) handleMessage(buffer []byte) {
|
|||
|
||||
// Close shuts down the server's listeners.
|
||||
func (s *Server) Close() error {
|
||||
// Notify other goroutines of shutdown.
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.conn == nil {
|
||||
return errors.New("server already closed")
|
||||
}
|
||||
|
||||
// Close the connection, and wait for the goroutine to exit.
|
||||
s.conn.Close()
|
||||
s.conn = nil
|
||||
|
||||
// Wait for all goroutines to shutdown.
|
||||
close(s.done)
|
||||
s.wg.Wait()
|
||||
log.Printf("all waitgroups finished")
|
||||
|
||||
// Release all remaining resources.
|
||||
s.done = nil
|
||||
s.conn = nil
|
||||
log.Println("collectd UDP closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue