Don't ever try to query or write against a shard that is closed or deleted.
Uses a RWlock so there's no global lock on writes or queries.pull/866/head
parent
fb9d03e28d
commit
78509d918a
|
@ -7,6 +7,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"code.google.com/p/goprotobuf/proto"
|
"code.google.com/p/goprotobuf/proto"
|
||||||
|
@ -25,6 +26,7 @@ type Shard struct {
|
||||||
pointBatchSize int
|
pointBatchSize int
|
||||||
writeBatchSize int
|
writeBatchSize int
|
||||||
metaStore *metastore.Store
|
metaStore *metastore.Store
|
||||||
|
closeLock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewShard(db storage.Engine, pointBatchSize, writeBatchSize int, metaStore *metastore.Store) (*Shard, error) {
|
func NewShard(db storage.Engine, pointBatchSize, writeBatchSize int, metaStore *metastore.Store) (*Shard, error) {
|
||||||
|
@ -37,6 +39,12 @@ func NewShard(db storage.Engine, pointBatchSize, writeBatchSize int, metaStore *
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Shard) Write(database string, series []*protocol.Series) error {
|
func (self *Shard) Write(database string, series []*protocol.Series) error {
|
||||||
|
self.closeLock.RLock()
|
||||||
|
defer self.closeLock.RUnlock()
|
||||||
|
if self.closed {
|
||||||
|
return fmt.Errorf("Shard is closed")
|
||||||
|
}
|
||||||
|
|
||||||
wb := make([]storage.Write, 0)
|
wb := make([]storage.Write, 0)
|
||||||
|
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
|
@ -93,6 +101,12 @@ func (self *Shard) Write(database string, series []*protocol.Series) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
|
func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
|
||||||
|
self.closeLock.RLock()
|
||||||
|
defer self.closeLock.RUnlock()
|
||||||
|
if self.closed {
|
||||||
|
return fmt.Errorf("Shard is closed")
|
||||||
|
}
|
||||||
|
|
||||||
if querySpec.IsListSeriesQuery() {
|
if querySpec.IsListSeriesQuery() {
|
||||||
return fmt.Errorf("List series queries should never come to the shard")
|
return fmt.Errorf("List series queries should never come to the shard")
|
||||||
} else if querySpec.IsDeleteFromSeriesQuery() {
|
} else if querySpec.IsDeleteFromSeriesQuery() {
|
||||||
|
@ -328,6 +342,11 @@ func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor clu
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Shard) DropFields(fields []*metastore.Field) error {
|
func (self *Shard) DropFields(fields []*metastore.Field) error {
|
||||||
|
self.closeLock.RLock()
|
||||||
|
defer self.closeLock.RUnlock()
|
||||||
|
if self.closed {
|
||||||
|
return fmt.Errorf("Shard is closed")
|
||||||
|
}
|
||||||
startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
|
startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
|
||||||
endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
|
endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
|
||||||
return self.deleteRangeOfFields(fields, startTimeBytes, endTimeBytes)
|
return self.deleteRangeOfFields(fields, startTimeBytes, endTimeBytes)
|
||||||
|
@ -412,6 +431,8 @@ func (self *Shard) byteArrayForTime(t time.Time) []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Shard) close() {
|
func (self *Shard) close() {
|
||||||
|
self.closeLock.Lock()
|
||||||
|
defer self.closeLock.Unlock()
|
||||||
self.closed = true
|
self.closed = true
|
||||||
self.db.Close()
|
self.db.Close()
|
||||||
self.db = nil
|
self.db = nil
|
||||||
|
|
Loading…
Reference in New Issue