Add ref counting to which leveldb shards are open so they can be closed after references released
parent
0e98711613
commit
0d6e2d66ff
|
@ -851,7 +851,7 @@ func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardDat
|
|||
log.Info("%s: %d - start: %s. end: %s. isLocal: %d. servers: %s",
|
||||
message, shard.Id(),
|
||||
shard.StartTime().Format("Mon Jan 2 15:04:05 -0700 MST 2006"), shard.EndTime().Format("Mon Jan 2 15:04:05 -0700 MST 2006"),
|
||||
shard.IsLocal(), shard.ServerIds())
|
||||
shard.IsLocal, shard.ServerIds())
|
||||
}
|
||||
return createdShards, nil
|
||||
}
|
||||
|
|
|
@ -60,12 +60,12 @@ type ShardData struct {
|
|||
servers []wal.Server
|
||||
clusterServers []*ClusterServer
|
||||
store LocalShardStore
|
||||
localShard LocalShardDb
|
||||
serverIds []uint32
|
||||
shardType ShardType
|
||||
durationIsSplit bool
|
||||
shardDuration time.Duration
|
||||
localServerId uint32
|
||||
IsLocal bool
|
||||
}
|
||||
|
||||
func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, durationIsSplit bool, wal WAL) *ShardData {
|
||||
|
@ -106,6 +106,7 @@ type LocalShardStore interface {
|
|||
SetWriteBuffer(writeBuffer *WriteBuffer)
|
||||
BufferWrite(request *p.Request)
|
||||
GetOrCreateShard(id uint32) (LocalShardDb, error)
|
||||
ReturnShard(id uint32)
|
||||
DeleteShard(shardId uint32) error
|
||||
}
|
||||
|
||||
|
@ -141,19 +142,17 @@ func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32
|
|||
self.sortServerIds()
|
||||
|
||||
self.store = store
|
||||
shard, err := self.store.GetOrCreateShard(self.id)
|
||||
// make sure we can open up the shard
|
||||
_, err := self.store.GetOrCreateShard(self.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
self.localShard = shard
|
||||
self.store.ReturnShard(self.id)
|
||||
self.IsLocal = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *ShardData) IsLocal() bool {
|
||||
return self.store != nil
|
||||
}
|
||||
|
||||
func (self *ShardData) ServerIds() []uint32 {
|
||||
return self.serverIds
|
||||
}
|
||||
|
@ -190,7 +189,7 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
|
|||
}
|
||||
}
|
||||
|
||||
if self.localShard != nil {
|
||||
if self.IsLocal {
|
||||
var processor QueryProcessor
|
||||
if querySpec.IsListSeriesQuery() {
|
||||
processor = engine.NewListSeriesEngine(response)
|
||||
|
@ -205,7 +204,12 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
|
|||
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
|
||||
}
|
||||
}
|
||||
err := self.localShard.Query(querySpec, processor)
|
||||
shard, err := self.store.GetOrCreateShard(self.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer self.store.ReturnShard(self.id)
|
||||
shard.Query(querySpec, processor)
|
||||
processor.Close()
|
||||
return err
|
||||
}
|
||||
|
@ -231,8 +235,11 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
|
|||
}
|
||||
|
||||
func (self *ShardData) DropDatabase(database string, sendToServers bool) {
|
||||
if self.localShard != nil {
|
||||
self.localShard.DropDatabase(database)
|
||||
if self.IsLocal {
|
||||
if shard, err := self.store.GetOrCreateShard(self.id); err == nil {
|
||||
defer self.store.ReturnShard(self.id)
|
||||
shard.DropDatabase(database)
|
||||
}
|
||||
}
|
||||
|
||||
if !sendToServers {
|
||||
|
@ -258,7 +265,7 @@ func (self *ShardData) String() string {
|
|||
serversString = append(serversString, fmt.Sprintf("%d", s.GetId()))
|
||||
}
|
||||
local := "false"
|
||||
if self.localShard != nil {
|
||||
if self.IsLocal {
|
||||
local = "true"
|
||||
}
|
||||
|
||||
|
@ -311,7 +318,12 @@ func (self *ShardData) deleteDataLocally(querySpec *parser.QuerySpec) (<-chan *p
|
|||
// this doesn't really apply at this point since destructive queries don't output anything, but it may later
|
||||
maxPointsFromDestructiveQuery := 1000
|
||||
processor := engine.NewPassthroughEngine(localResponses, maxPointsFromDestructiveQuery)
|
||||
err := self.localShard.Query(querySpec, processor)
|
||||
shard, err := self.store.GetOrCreateShard(self.id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer self.store.ReturnShard(self.id)
|
||||
err = shard.Query(querySpec, processor)
|
||||
processor.Close()
|
||||
return localResponses, err
|
||||
}
|
||||
|
@ -332,14 +344,14 @@ func (self *ShardData) forwardRequest(request *p.Request) ([]<-chan *p.Response,
|
|||
}
|
||||
|
||||
func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) error {
|
||||
if self.localShard == nil && runLocalOnly {
|
||||
if self.IsLocal && runLocalOnly {
|
||||
return nil
|
||||
}
|
||||
|
||||
responseCahnnels := []<-chan *p.Response{}
|
||||
serverIds := []uint32{}
|
||||
|
||||
if self.localShard != nil {
|
||||
if self.IsLocal {
|
||||
channel, err := self.deleteDataLocally(querySpec)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -20,6 +20,8 @@ type LevelDbShardDatastore struct {
|
|||
config *configuration.Configuration
|
||||
shards map[uint32]*LevelDbShard
|
||||
lastAccess map[uint32]int64
|
||||
shardRefCounts map[uint32]int
|
||||
shardsToClose map[uint32]bool
|
||||
shardsLock sync.RWMutex
|
||||
levelDbOptions *levigo.Options
|
||||
writeBuffer *cluster.WriteBuffer
|
||||
|
@ -87,6 +89,8 @@ func NewLevelDbShardDatastore(config *configuration.Configuration) (*LevelDbShar
|
|||
levelDbOptions: opts,
|
||||
maxOpenShards: config.LevelDbMaxOpenShards,
|
||||
lastAccess: make(map[uint32]int64),
|
||||
shardRefCounts: make(map[uint32]int),
|
||||
shardsToClose: make(map[uint32]bool),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -106,6 +110,7 @@ func (self *LevelDbShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalSha
|
|||
self.lastAccess[id] = now
|
||||
|
||||
if db != nil {
|
||||
self.shardRefCounts[id] += 1
|
||||
return db, nil
|
||||
}
|
||||
|
||||
|
@ -113,12 +118,6 @@ func (self *LevelDbShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalSha
|
|||
self.closeOldestShard()
|
||||
}
|
||||
|
||||
// check to make sure it hasn't been put there between the RUnlock and the Lock
|
||||
db = self.shards[id]
|
||||
if db != nil {
|
||||
return db, nil
|
||||
}
|
||||
|
||||
dbDir := self.shardDir(id)
|
||||
|
||||
log.Info("DATASTORE: opening or creating shard %s", dbDir)
|
||||
|
@ -132,14 +131,25 @@ func (self *LevelDbShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalSha
|
|||
return nil, err
|
||||
}
|
||||
self.shards[id] = db
|
||||
self.shardRefCounts[id] += 1
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func (self *LevelDbShardDatastore) ReturnShard(id uint32) {
|
||||
self.shardsLock.Lock()
|
||||
defer self.shardsLock.Unlock()
|
||||
self.shardRefCounts[id] -= 1
|
||||
if self.shardsToClose[id] && self.shardRefCounts[id] == 0 {
|
||||
self.closeShard(id)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *LevelDbShardDatastore) Write(request *protocol.Request) error {
|
||||
shardDb, err := self.GetOrCreateShard(*request.ShardId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer self.ReturnShard(*request.ShardId)
|
||||
return shardDb.Write(*request.Database, request.Series)
|
||||
}
|
||||
|
||||
|
@ -180,12 +190,22 @@ func (self *LevelDbShardDatastore) closeOldestShard() {
|
|||
oldestAccess = lastAccess
|
||||
}
|
||||
}
|
||||
shard := self.shards[oldestId]
|
||||
if self.shardRefCounts[oldestId] == 0 {
|
||||
self.closeShard(oldestId)
|
||||
} else {
|
||||
self.shardsToClose[oldestId] = true
|
||||
}
|
||||
}
|
||||
|
||||
func (self *LevelDbShardDatastore) closeShard(id uint32) {
|
||||
delete(self.shardRefCounts, id)
|
||||
delete(self.shards, id)
|
||||
delete(self.lastAccess, id)
|
||||
delete(self.shardsToClose, id)
|
||||
shard := self.shards[id]
|
||||
if shard != nil {
|
||||
shard.close()
|
||||
}
|
||||
delete(self.shards, oldestId)
|
||||
delete(self.lastAccess, oldestId)
|
||||
}
|
||||
|
||||
// // returns true if the point has the correct field id and is
|
||||
|
|
Loading…
Reference in New Issue