Fix panic: assignment to entry in nil map
Closing the store did not properly return an error for in-flight writes because the closing channel was set to nil when closed. A nil channel is not selectable so writes continue on past the guard checks and trigger panics.pull/4308/head
parent
be477b2aab
commit
41e3294d4a
|
@ -28,6 +28,7 @@ func NewStore(path string) *Store {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrShardNotFound = fmt.Errorf("shard not found")
|
ErrShardNotFound = fmt.Errorf("shard not found")
|
||||||
|
ErrStoreClosed = fmt.Errorf("store is closed")
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -46,6 +47,7 @@ type Store struct {
|
||||||
|
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
opened bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Path returns the store's root path.
|
// Path returns the store's root path.
|
||||||
|
@ -78,7 +80,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closing:
|
case <-s.closing:
|
||||||
return fmt.Errorf("closing")
|
return ErrStoreClosed
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,6 +371,7 @@ func (s *Store) Open() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.periodicMaintenance()
|
go s.periodicMaintenance()
|
||||||
|
s.opened = true
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -376,6 +379,13 @@ func (s *Store) Open() error {
|
||||||
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
|
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-s.closing:
|
||||||
|
return ErrStoreClosed
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
sh, ok := s.shards[shardID]
|
sh, ok := s.shards[shardID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrShardNotFound
|
return ErrShardNotFound
|
||||||
|
@ -410,9 +420,9 @@ func (s *Store) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize
|
||||||
func (s *Store) Close() error {
|
func (s *Store) Close() error {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
if s.closing != nil {
|
|
||||||
|
if s.opened {
|
||||||
close(s.closing)
|
close(s.closing)
|
||||||
s.closing = nil
|
|
||||||
}
|
}
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
|
||||||
|
@ -421,10 +431,7 @@ func (s *Store) Close() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if s.closing != nil {
|
s.opened = false
|
||||||
close(s.closing)
|
|
||||||
}
|
|
||||||
s.closing = nil
|
|
||||||
s.shards = nil
|
s.shards = nil
|
||||||
s.databaseIndexes = nil
|
s.databaseIndexes = nil
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue