RLock server when checking shard groups
This fixes a race detected by the race detector. "Create shard groups" commands must be broadcast across the cluster without holding the server lock so the commands are created under lock, and then processed after releasing the lock.pull/2475/head
parent
de1e695c25
commit
b93554e39b
|
@ -9,6 +9,7 @@
|
|||
- [#2452](https://github.com/influxdb/influxdb/issues/2452): Fix panic with shard stats on multiple clusters
|
||||
- [#2460](https://github.com/influxdb/influxdb/issues/2460): Collectd input should use "value" for fields values. Fixes 2412. Thanks @josh-padnick
|
||||
- [#2465](https://github.com/influxdb/influxdb/pull/2465): HTTP response logging paniced with chunked requests. Thanks @Jackkoz
|
||||
- [#2475](https://github.com/influxdb/influxdb/pull/2475): RLock server when checking if shards groups are required during write.
|
||||
|
||||
## v0.9.0-rc28 [04-27-2015]
|
||||
|
||||
|
|
24
server.go
24
server.go
|
@ -2061,6 +2061,12 @@ func (s *Server) applyDropMeasurement(m *messaging.Message) error {
|
|||
|
||||
// createShardGroupsIfNotExist walks the "points" and ensures that all required shards exist on the cluster.
|
||||
func (s *Server) createShardGroupsIfNotExists(database, retentionPolicy string, points []Point) error {
|
||||
var commands = make([]*createShardGroupIfNotExistsCommand, 0)
|
||||
|
||||
err := func() error {
|
||||
// Local function makes locking fool-proof.
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
for _, p := range points {
|
||||
// Check if shard group exists first.
|
||||
g, err := s.shardGroupByTimestamp(database, retentionPolicy, p.Timestamp)
|
||||
|
@ -2069,9 +2075,23 @@ func (s *Server) createShardGroupsIfNotExists(database, retentionPolicy string,
|
|||
} else if g != nil {
|
||||
continue
|
||||
}
|
||||
err = s.CreateShardGroupIfNotExists(database, retentionPolicy, p.Timestamp)
|
||||
commands = append(commands, &createShardGroupIfNotExistsCommand{
|
||||
Database: database,
|
||||
Policy: retentionPolicy,
|
||||
Timestamp: p.Timestamp,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, p.Timestamp.Format(time.RFC3339Nano), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Create any required shard groups across the cluster. Must be done without holding the lock.
|
||||
for _, c := range commands {
|
||||
err = s.CreateShardGroupIfNotExists(c.Database, c.Policy, c.Timestamp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create shard(%s:%s/%s): %s", c.Database, c.Policy, c.Timestamp.Format(time.RFC3339Nano), err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue