run all create shards groups through same methods

pull/1897/head
Cory LaNou 2015-03-09 18:54:37 -06:00
parent 7495e6eb1c
commit c150a8c8bc
2 changed files with 33 additions and 81 deletions

View File

@ -30,7 +30,6 @@ const (
// Shard messages
createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40)
deleteShardGroupMessageType = messaging.MessageType(0x41)
preCreateShardGroupIfNotExistsMessageType = messaging.MessageType(0x42)
// Series messages
dropSeriesMessageType = messaging.MessageType(0x50)
@ -77,12 +76,6 @@ type deleteShardGroupCommand struct {
ID uint64 `json:"id"`
}
type preCreateShardGroupIfNotExistsCommand struct {
Database string `json:"database"`
Policy string `json:"policy"`
ID uint64 `json:"id"`
}
type createUserCommand struct {
Username string `json:"username"`
Password string `json:"password"`

103
server.go
View File

@ -375,6 +375,7 @@ func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) {
Database string
Retention string
ID uint64
Timestamp time.Time
}
groups := make([]group, 0)
@ -393,7 +394,7 @@ func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) {
// Check to see if it is going to end before our interval
if g.EndTime.Before(cutoff) {
log.Printf("pre-creating shard group for %d, retention policy %s, database %s", g.ID, rp.Name, db.name)
groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID})
groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID, Timestamp: g.EndTime.Add(1 * time.Nanosecond)})
}
}
}
@ -401,8 +402,8 @@ func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) {
}()
for _, g := range groups {
if err := s.PreCreateShardGroupIfNotExists(g.Database, g.Retention, g.ID); err != nil {
log.Printf("failed to request pre-creation of shard group %d: %s", g.ID, err.Error())
if err := s.CreateShardGroupIfNotExists(g.Database, g.Retention, g.Timestamp); err != nil {
log.Printf("failed to request pre-creation of shard group %d for time %s: %s", g.ID, g.Timestamp, err.Error())
}
}
}
@ -841,15 +842,37 @@ func (s *Server) ShardGroups(database string) ([]*ShardGroup, error) {
return a, nil
}
func (s *Server) createShardGroupIfNotExists(index uint64, db *database, rp *RetentionPolicy, timestamp time.Time) (err error) {
// CreateShardGroupIfNotExists creates the shard group for a retention policy for the interval a timestamp falls into.
func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) error {
c := &createShardGroupIfNotExistsCommand{Database: database, Policy: policy, Timestamp: timestamp}
_, err := s.broadcast(createShardGroupIfNotExistsMessageType, c)
return err
}
func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) {
var c createShardGroupIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)
// Retrieve database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
// Validate retention policy.
rp := db.policies[c.Policy]
if rp == nil {
return ErrRetentionPolicyNotFound
}
// If we can match to an existing shard group date range then just ignore request.
if g := rp.shardGroupByTimestamp(timestamp); g != nil {
if g := rp.shardGroupByTimestamp(c.Timestamp); g != nil {
return nil
}
// If no shards match then create a new one.
g := newShardGroup()
g.StartTime = timestamp.Truncate(rp.ShardGroupDuration).UTC()
g.StartTime = c.Timestamp.Truncate(rp.ShardGroupDuration).UTC()
g.EndTime = g.StartTime.Add(rp.ShardGroupDuration).UTC()
// Sort nodes so they're consistently assigned to the shards.
@ -879,7 +902,7 @@ func (s *Server) createShardGroupIfNotExists(index uint64, db *database, rp *Ret
}
// Persist to metastore if a shard was created.
if err = s.meta.mustUpdate(index, func(tx *metatx) error {
if err = s.meta.mustUpdate(m.Index, func(tx *metatx) error {
// Generate an ID for the group.
g.ID = tx.nextShardGroupID()
@ -890,7 +913,7 @@ func (s *Server) createShardGroupIfNotExists(index uint64, db *database, rp *Ret
// Assign data nodes to shards via round robin.
// Start from a repeatably "random" place in the node list.
nodeIndex := int(index % uint64(len(nodes)))
nodeIndex := int(m.Index % uint64(len(nodes)))
for _, sh := range g.Shards {
for i := 0; i < replicaN; i++ {
node := nodes[nodeIndex%len(nodes)]
@ -942,68 +965,6 @@ func (s *Server) createShardGroupIfNotExists(index uint64, db *database, rp *Ret
}
return
}
// CreateShardGroupIfNotExists creates the shard group for a retention policy for the interval a timestamp falls into.
func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) error {
c := &createShardGroupIfNotExistsCommand{Database: database, Policy: policy, Timestamp: timestamp}
_, err := s.broadcast(createShardGroupIfNotExistsMessageType, c)
return err
}
func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) {
var c createShardGroupIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)
// Retrieve database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
// Validate retention policy.
rp := db.policies[c.Policy]
if rp == nil {
return ErrRetentionPolicyNotFound
}
return s.createShardGroupIfNotExists(m.Index, db, rp, c.Timestamp)
}
// PreCreateShardGroupIfNotExists creates the shard group for a retention policy for the interval a timestamp falls into.
func (s *Server) PreCreateShardGroupIfNotExists(database, policy string, shardID uint64) error {
c := &preCreateShardGroupIfNotExistsCommand{Database: database, Policy: policy, ID: shardID}
_, err := s.broadcast(preCreateShardGroupIfNotExistsMessageType, c)
return err
}
// applyPreCreateShardGroupIfNotExists creates the shard groups and writes them to the metastore.
func (s *Server) applyPreCreateShardGroupIfNotExists(m *messaging.Message) (err error) {
var c preCreateShardGroupIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)
// Retrieve database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
// Validate retention policy.
rp := db.policies[c.Policy]
if rp == nil {
return ErrRetentionPolicyNotFound
}
// If shard group no longer exists, then ignore request. This can occur if data
// was deleted/dropped
g := rp.shardGroupByID(c.ID)
if g == nil {
return nil
}
// Create the next shard group
return s.createShardGroupIfNotExists(m.Index, db, rp, g.EndTime.Add(time.Nanosecond*1))
}
// DeleteShardGroup deletes the shard group identified by shardID.
@ -2890,8 +2851,6 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
err = s.applyDeleteRetentionPolicy(m)
case createShardGroupIfNotExistsMessageType:
err = s.applyCreateShardGroupIfNotExists(m)
case preCreateShardGroupIfNotExistsMessageType:
err = s.applyPreCreateShardGroupIfNotExists(m)
case deleteShardGroupMessageType:
err = s.applyDeleteShardGroup(m)
case setDefaultRetentionPolicyMessageType: