diff --git a/commands.go b/commands.go index 8e584fb23a..cae0523c19 100644 --- a/commands.go +++ b/commands.go @@ -28,9 +28,8 @@ const ( deleteUserMessageType = messaging.MessageType(0x32) // Shard messages - createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40) - deleteShardGroupMessageType = messaging.MessageType(0x41) - preCreateShardGroupIfNotExistsMessageType = messaging.MessageType(0x42) + createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40) + deleteShardGroupMessageType = messaging.MessageType(0x41) // Series messages dropSeriesMessageType = messaging.MessageType(0x50) @@ -77,12 +76,6 @@ type deleteShardGroupCommand struct { ID uint64 `json:"id"` } -type preCreateShardGroupIfNotExistsCommand struct { - Database string `json:"database"` - Policy string `json:"policy"` - ID uint64 `json:"id"` -} - type createUserCommand struct { Username string `json:"username"` Password string `json:"password"` diff --git a/server.go b/server.go index f07e1a47f5..1da49e8447 100644 --- a/server.go +++ b/server.go @@ -375,6 +375,7 @@ func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) { Database string Retention string ID uint64 + Timestamp time.Time } groups := make([]group, 0) @@ -393,7 +394,7 @@ func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) { // Check to see if it is going to end before our interval if g.EndTime.Before(cutoff) { log.Printf("pre-creating shard group for %d, retention policy %s, database %s", g.ID, rp.Name, db.name) - groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID}) + groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID, Timestamp: g.EndTime.Add(1 * time.Nanosecond)}) } } } @@ -401,8 +402,8 @@ func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) { }() for _, g := range groups { - if err := s.PreCreateShardGroupIfNotExists(g.Database, g.Retention, g.ID); err != nil { - log.Printf("failed to request pre-creation of shard group %d: %s", g.ID, err.Error()) + if err := s.CreateShardGroupIfNotExists(g.Database, g.Retention, g.Timestamp); err != nil { + log.Printf("failed to request pre-creation of shard group %d for time %s: %s", g.ID, g.Timestamp, err.Error()) } } } @@ -841,15 +842,37 @@ func (s *Server) ShardGroups(database string) ([]*ShardGroup, error) { return a, nil } -func (s *Server) createShardGroupIfNotExists(index uint64, db *database, rp *RetentionPolicy, timestamp time.Time) (err error) { +// CreateShardGroupIfNotExists creates the shard group for a retention policy for the interval a timestamp falls into. +func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) error { + c := &createShardGroupIfNotExistsCommand{Database: database, Policy: policy, Timestamp: timestamp} + _, err := s.broadcast(createShardGroupIfNotExistsMessageType, c) + return err +} + +func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) { + var c createShardGroupIfNotExistsCommand + mustUnmarshalJSON(m.Data, &c) + + // Retrieve database. + db := s.databases[c.Database] + if s.databases[c.Database] == nil { + return ErrDatabaseNotFound + } + + // Validate retention policy. + rp := db.policies[c.Policy] + if rp == nil { + return ErrRetentionPolicyNotFound + } + // If we can match to an existing shard group date range then just ignore request. - if g := rp.shardGroupByTimestamp(timestamp); g != nil { + if g := rp.shardGroupByTimestamp(c.Timestamp); g != nil { return nil } // If no shards match then create a new one. g := newShardGroup() - g.StartTime = timestamp.Truncate(rp.ShardGroupDuration).UTC() + g.StartTime = c.Timestamp.Truncate(rp.ShardGroupDuration).UTC() g.EndTime = g.StartTime.Add(rp.ShardGroupDuration).UTC() // Sort nodes so they're consistently assigned to the shards. @@ -879,7 +902,7 @@ func (s *Server) createShardGroupIfNotExists(index uint64, db *database, rp *Ret } // Persist to metastore if a shard was created. - if err = s.meta.mustUpdate(index, func(tx *metatx) error { + if err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { // Generate an ID for the group. g.ID = tx.nextShardGroupID() @@ -890,7 +913,7 @@ func (s *Server) createShardGroupIfNotExists(index uint64, db *database, rp *Ret // Assign data nodes to shards via round robin. // Start from a repeatably "random" place in the node list. - nodeIndex := int(index % uint64(len(nodes))) + nodeIndex := int(m.Index % uint64(len(nodes))) for _, sh := range g.Shards { for i := 0; i < replicaN; i++ { node := nodes[nodeIndex%len(nodes)] @@ -942,68 +965,6 @@ func (s *Server) createShardGroupIfNotExists(index uint64, db *database, rp *Ret } return - -} - -// CreateShardGroupIfNotExists creates the shard group for a retention policy for the interval a timestamp falls into. -func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) error { - c := &createShardGroupIfNotExistsCommand{Database: database, Policy: policy, Timestamp: timestamp} - _, err := s.broadcast(createShardGroupIfNotExistsMessageType, c) - return err -} - -func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) { - var c createShardGroupIfNotExistsCommand - mustUnmarshalJSON(m.Data, &c) - - // Retrieve database. - db := s.databases[c.Database] - if s.databases[c.Database] == nil { - return ErrDatabaseNotFound - } - - // Validate retention policy. - rp := db.policies[c.Policy] - if rp == nil { - return ErrRetentionPolicyNotFound - } - - return s.createShardGroupIfNotExists(m.Index, db, rp, c.Timestamp) -} - -// PreCreateShardGroupIfNotExists creates the shard group for a retention policy for the interval a timestamp falls into. -func (s *Server) PreCreateShardGroupIfNotExists(database, policy string, shardID uint64) error { - c := &preCreateShardGroupIfNotExistsCommand{Database: database, Policy: policy, ID: shardID} - _, err := s.broadcast(preCreateShardGroupIfNotExistsMessageType, c) - return err -} - -// applyPreCreateShardGroupIfNotExists creates the shard groups and writes them to the metastore. -func (s *Server) applyPreCreateShardGroupIfNotExists(m *messaging.Message) (err error) { - var c preCreateShardGroupIfNotExistsCommand - mustUnmarshalJSON(m.Data, &c) - - // Retrieve database. - db := s.databases[c.Database] - if s.databases[c.Database] == nil { - return ErrDatabaseNotFound - } - - // Validate retention policy. - rp := db.policies[c.Policy] - if rp == nil { - return ErrRetentionPolicyNotFound - } - - // If shard group no longer exists, then ignore request. This can occur if data - // was deleted/dropped - g := rp.shardGroupByID(c.ID) - if g == nil { - return nil - } - - // Create the next shard group - return s.createShardGroupIfNotExists(m.Index, db, rp, g.EndTime.Add(time.Nanosecond*1)) } // DeleteShardGroup deletes the shard group identified by shardID. @@ -2890,8 +2851,6 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) { err = s.applyDeleteRetentionPolicy(m) case createShardGroupIfNotExistsMessageType: err = s.applyCreateShardGroupIfNotExists(m) - case preCreateShardGroupIfNotExistsMessageType: - err = s.applyPreCreateShardGroupIfNotExists(m) case deleteShardGroupMessageType: err = s.applyDeleteShardGroup(m) case setDefaultRetentionPolicyMessageType: