diff --git a/tsdb/store.go b/tsdb/store.go index bee68c7fdd..be7076d00e 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -28,6 +28,7 @@ func NewStore(path string) *Store { var ( ErrShardNotFound = fmt.Errorf("shard not found") + ErrStoreClosed = fmt.Errorf("store is closed") ) const ( @@ -46,6 +47,7 @@ type Store struct { closing chan struct{} wg sync.WaitGroup + opened bool } // Path returns the store's root path. @@ -78,7 +80,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er select { case <-s.closing: - return fmt.Errorf("closing") + return ErrStoreClosed default: } @@ -369,6 +371,7 @@ func (s *Store) Open() error { } go s.periodicMaintenance() + s.opened = true return nil } @@ -376,6 +379,13 @@ func (s *Store) Open() error { func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { s.mu.RLock() defer s.mu.RUnlock() + + select { + case <-s.closing: + return ErrStoreClosed + default: + } + sh, ok := s.shards[shardID] if !ok { return ErrShardNotFound @@ -410,9 +420,9 @@ func (s *Store) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize func (s *Store) Close() error { s.mu.Lock() defer s.mu.Unlock() - if s.closing != nil { + + if s.opened { close(s.closing) - s.closing = nil } s.wg.Wait() @@ -421,10 +431,7 @@ func (s *Store) Close() error { return err } } - if s.closing != nil { - close(s.closing) - } - s.closing = nil + s.opened = false s.shards = nil s.databaseIndexes = nil