From 78509d918a38babdea59408f54457f52e813a779 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 22 Aug 2014 13:41:34 -0400 Subject: [PATCH] Don't ever try to query or write against a shard that is closed or deleted. Uses a RWlock so there's no global lock on writes or queries. --- datastore/shard.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/datastore/shard.go b/datastore/shard.go index 61ed101593..75b7b4d7df 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "regexp" + "sync" "time" "code.google.com/p/goprotobuf/proto" @@ -25,6 +26,7 @@ type Shard struct { pointBatchSize int writeBatchSize int metaStore *metastore.Store + closeLock sync.RWMutex } func NewShard(db storage.Engine, pointBatchSize, writeBatchSize int, metaStore *metastore.Store) (*Shard, error) { @@ -37,6 +39,12 @@ func NewShard(db storage.Engine, pointBatchSize, writeBatchSize int, metaStore * } func (self *Shard) Write(database string, series []*protocol.Series) error { + self.closeLock.RLock() + defer self.closeLock.RUnlock() + if self.closed { + return fmt.Errorf("Shard is closed") + } + wb := make([]storage.Write, 0) for _, s := range series { @@ -93,6 +101,12 @@ func (self *Shard) Write(database string, series []*protocol.Series) error { } func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { + self.closeLock.RLock() + defer self.closeLock.RUnlock() + if self.closed { + return fmt.Errorf("Shard is closed") + } + if querySpec.IsListSeriesQuery() { return fmt.Errorf("List series queries should never come to the shard") } else if querySpec.IsDeleteFromSeriesQuery() { @@ -328,6 +342,11 @@ func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor clu } func (self *Shard) DropFields(fields []*metastore.Field) error { + self.closeLock.RLock() + defer self.closeLock.RUnlock() + if self.closed { + return fmt.Errorf("Shard is closed") + } startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} return self.deleteRangeOfFields(fields, startTimeBytes, endTimeBytes) @@ -412,6 +431,8 @@ func (self *Shard) byteArrayForTime(t time.Time) []byte { } func (self *Shard) close() { + self.closeLock.Lock() + defer self.closeLock.Unlock() self.closed = true self.db.Close() self.db = nil