diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e82be1502..f3723ca931 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - [#2452](https://github.com/influxdb/influxdb/issues/2452): Fix panic with shard stats on multiple clusters - [#2460](https://github.com/influxdb/influxdb/issues/2460): Collectd input should use "value" for fields values. Fixes 2412. Thanks @josh-padnick - [#2465](https://github.com/influxdb/influxdb/pull/2465): HTTP response logging paniced with chunked requests. Thanks @Jackkoz +- [#2475](https://github.com/influxdb/influxdb/pull/2475): RLock server when checking if shards groups are required during write. ## v0.9.0-rc28 [04-27-2015] diff --git a/server.go b/server.go index 9ebceee257..697be7e4f8 100644 --- a/server.go +++ b/server.go @@ -2061,17 +2061,37 @@ func (s *Server) applyDropMeasurement(m *messaging.Message) error { // createShardGroupsIfNotExist walks the "points" and ensures that all required shards exist on the cluster. func (s *Server) createShardGroupsIfNotExists(database, retentionPolicy string, points []Point) error { - for _, p := range points { - // Check if shard group exists first. - g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp) - if err != nil { - return err - } else if g != nil { - continue + var commands = make([]*createShardGroupIfNotExistsCommand, 0) + + err := func() error { + // Local function makes locking fool-proof. + s.mu.RLock() + defer s.mu.RUnlock() + for _, p := range points { + // Check if shard group exists first. + g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp) + if err != nil { + return err + } else if g != nil { + continue + } + commands = append(commands, &createShardGroupIfNotExistsCommand{ + Database: database, + Policy: retentionPolicy, + Timestamp: p.Timestamp, + }) } - err = s.CreateShardGroupIfNotExists(database, retentionPolicy, p.Timestamp) + return nil + }() + if err != nil { + return err + } + + // Create any required shard groups across the cluster. Must be done without holding the lock. + for _, c := range commands { + err = s.CreateShardGroupIfNotExists(c.Database, c.Policy, c.Timestamp) if err != nil { - return fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, p.Timestamp.Format(time.RFC3339Nano), err) + return fmt.Errorf("create shard(%s:%s/%s): %s", c.Database, c.Policy, c.Timestamp.Format(time.RFC3339Nano), err) } }