Implement "delete shard group" command

pull/1562/head
Philip O'Toole 2015-02-10 15:43:03 -08:00 committed by Philip O'Toole
parent 37647cc6a6
commit 6a20100d2f
2 changed files with 73 additions and 0 deletions

View File

@ -806,6 +806,15 @@ func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup {
return nil
}
func (rp *RetentionPolicy) removeShardGroupByID(shardID uint64) {
for i, g := range rp.shardGroups {
if g.ID == shardID {
rp.shardGroups[i] = nil
rp.shardGroups = append(rp.shardGroups[:i], rp.shardGroups[i+1:]...)
}
}
}
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
var o retentionPolicyJSON

View File

@ -66,6 +66,7 @@ const (
// Shard messages
createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40)
deleteShardGroupMessageType = messaging.MessageType(0x41)
// Series messages
createSeriesIfNotExistsMessageType = messaging.MessageType(0x50)
@ -919,6 +920,67 @@ type createShardGroupIfNotExistsCommand struct {
Timestamp time.Time `json:"timestamp"`
}
// DeleteShardGroup deletes the shard group identified by shardID.
func (s *Server) DeleteShardGroup(database, policy string, shardID uint64) error {
c := &deleteShardGroupCommand{Database: database, Policy: policy, ID: shardID}
_, err := s.broadcast(deleteShardGroupMessageType, c)
return err
}
// applyDeleteShardGroup deletes shard data from disk and updates the metastore.
func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {
var c deleteShardGroupCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
// Validate retention policy.
rp := db.policies[c.Policy]
if rp == nil {
return ErrRetentionPolicyNotFound
}
// If shard group no longer exists, then ignore request. This can occur if multiple
// data nodes triggered the deletion.
g := rp.shardGroupByID(c.ID)
if g == nil {
return nil
}
for _, shard := range g.Shards {
// Ignore shards not on this server.
if !shard.HasDataNodeID(s.id) {
continue
}
path := shard.store.Path()
if err := os.Remove(path); err != nil {
// Log, but keep going.
log.Printf("error deleting shard %s, group ID %d, policy %s: %s", path, g.ID, rp.Name, err.Error())
}
}
// Remove from metastore.
rp.removeShardGroupByID(c.ID)
err = s.meta.mustUpdate(func(tx *metatx) error {
return tx.saveDatabase(db)
})
return
}
type deleteShardGroupCommand struct {
Database string `json:"database"`
Policy string `json:"policy"`
ID uint64 `json:"id"`
}
// User returns a user by username
// Returns nil if the user does not exist.
func (s *Server) User(name string) *User {
@ -2561,6 +2623,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
err = s.applyDeleteRetentionPolicy(m)
case createShardGroupIfNotExistsMessageType:
err = s.applyCreateShardGroupIfNotExists(m)
case deleteShardGroupMessageType:
err = s.applyDeleteShardGroup(m)
case setDefaultRetentionPolicyMessageType:
err = s.applySetDefaultRetentionPolicy(m)
case createSeriesIfNotExistsMessageType: