Update CreateShardIfNotExists to check for shard first

pull/1165/head
Paul Dix 2014-12-03 10:12:20 -05:00
parent 20098bbd5a
commit bbc2d2e477
1 changed files with 7 additions and 7 deletions

View File

@ -274,6 +274,10 @@ func (db *Database) applySetDefaultRetentionPolicy(name string) error {
// CreateShardIfNotExists creates a shard for a retention policy for a given timestamp and returns the shard for the series
func (db *Database) CreateShardIfNotExists(policy *RetentionPolicy, id uint32, timestamp time.Time) (*Shard, error) {
if s := policy.ShardBySeriesTimestamp(id, timestamp); s != nil {
return s, nil
}
c := &createShardIfNotExistsCommand{Database: db.name, Policy: policy.Name, Timestamp: timestamp}
if _, err := db.server.broadcast(createShardIfNotExistsMessageType, c); err != nil {
return nil, err
@ -345,13 +349,9 @@ func (db *Database) WriteSeries(retentionPolicy, name string, tags map[string]st
}
// now write it into the shard
s := rp.ShardBySeriesTimestamp(id, timestamp)
if s == nil {
var err error
s, err = db.CreateShardIfNotExists(rp, id, timestamp)
if err != nil {
return fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
}
s, err := db.CreateShardIfNotExists(rp, id, timestamp)
if err != nil {
return fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
}
data, err := marshalPoint(id, timestamp, values)