Fix shard close race condition.

pull/1935/head
Ben Johnson 2015-03-14 14:47:20 -06:00
parent 06d839223e
commit 7dc465b9db
1 changed files with 33 additions and 11 deletions

View File

@ -4,9 +4,11 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/influxdb/influxdb/messaging"
)
// ShardGroup represents a group of shards created for a single time range.
@ -63,6 +65,9 @@ type Shard struct {
index uint64 // highest replicated index
store *bolt.DB // underlying data store
conn MessagingConn // streaming connection to broker
wg sync.WaitGroup // pending goroutines
closing chan struct{} // close notification
}
// newShard returns a new initialized Shard instance.
@ -93,26 +98,35 @@ func (s *Shard) open(path string, conn MessagingConn) error {
s.index = btou64(buf)
}
// Open connection.
if err := conn.Open(s.index, true); err != nil {
return fmt.Errorf("open shard conn: id=%d, idx=%d, err=%s", s.ID, s.index, err)
}
return nil
}); err != nil {
_ = s.close()
return fmt.Errorf("init: %s", err)
}
// Open connection.
if err := conn.Open(s.index, true); err != nil {
_ = s.close()
return fmt.Errorf("open shard conn: id=%d, idx=%d, err=%s", s.ID, s.index, err)
}
// Start importing from connection.
go s.processor(conn)
s.closing = make(chan struct{})
s.wg.Add(1)
go s.processor(conn, s.closing)
return nil
}
// close shuts down the shard's store.
func (s *Shard) close() error {
// Wait for goroutines to stop.
if s.closing != nil {
close(s.closing)
s.closing = nil
}
s.wg.Wait()
if s.store != nil {
_ = s.store.Close()
}
@ -201,12 +215,20 @@ func (s *Shard) dropSeries(seriesID uint32) error {
}
// processor runs in a separate goroutine and processes all incoming broker messages.
func (s *Shard) processor(conn MessagingConn) {
func (s *Shard) processor(conn MessagingConn, closing <-chan struct{}) {
defer s.wg.Done()
for {
// Read incoming message.
// Exit if the connection has been closed.
m, ok := <-conn.C()
if !ok {
// Exit if the connection has been closed or if shard is closing.
var ok bool
var m *messaging.Message
select {
case m, ok = <-conn.C():
if !ok {
return
}
case <-closing:
return
}