diff --git a/shard.go b/shard.go index e3e2434175..60629b4ba5 100644 --- a/shard.go +++ b/shard.go @@ -4,9 +4,11 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "time" "github.com/boltdb/bolt" + "github.com/influxdb/influxdb/messaging" ) // ShardGroup represents a group of shards created for a single time range. @@ -63,6 +65,9 @@ type Shard struct { index uint64 // highest replicated index store *bolt.DB // underlying data store conn MessagingConn // streaming connection to broker + + wg sync.WaitGroup // pending goroutines + closing chan struct{} // close notification } // newShard returns a new initialized Shard instance. @@ -93,26 +98,35 @@ func (s *Shard) open(path string, conn MessagingConn) error { s.index = btou64(buf) } + // Open connection. + if err := conn.Open(s.index, true); err != nil { + return fmt.Errorf("open shard conn: id=%d, idx=%d, err=%s", s.ID, s.index, err) + } + return nil }); err != nil { _ = s.close() return fmt.Errorf("init: %s", err) } - // Open connection. - if err := conn.Open(s.index, true); err != nil { - _ = s.close() - return fmt.Errorf("open shard conn: id=%d, idx=%d, err=%s", s.ID, s.index, err) - } - // Start importing from connection. - go s.processor(conn) + s.closing = make(chan struct{}) + s.wg.Add(1) + go s.processor(conn, s.closing) return nil } // close shuts down the shard's store. func (s *Shard) close() error { + // Wait for goroutines to stop. + if s.closing != nil { + close(s.closing) + s.closing = nil + } + + s.wg.Wait() + if s.store != nil { _ = s.store.Close() } @@ -201,12 +215,20 @@ func (s *Shard) dropSeries(seriesID uint32) error { } // processor runs in a separate goroutine and processes all incoming broker messages. -func (s *Shard) processor(conn MessagingConn) { +func (s *Shard) processor(conn MessagingConn, closing <-chan struct{}) { + defer s.wg.Done() + for { // Read incoming message. - // Exit if the connection has been closed. - m, ok := <-conn.C() - if !ok { + // Exit if the connection has been closed or if shard is closing. + var ok bool + var m *messaging.Message + select { + case m, ok = <-conn.C(): + if !ok { + return + } + case <-closing: return }