Merge pull request #2445 from influxdb/jw-locks

Read locks and data race fixes
pull/2443/head
Jason Wilder 2015-04-28 07:31:56 -07:00
commit b086598445
5 changed files with 41 additions and 33 deletions

View File

@ -15,6 +15,7 @@
- [#2429](https://github.com/influxdb/influxdb/pull/2429): Ensure no field value is null.
- [#2431](https://github.com/influxdb/influxdb/pull/2431): Always append shard path in diags. Thanks @marcosnils
- [#2441](https://github.com/influxdb/influxdb/pull/2441): Correctly release server RLock during "drop series".
- [#2445](https://github.com/influxdb/influxdb/pull/2445): Read locks and data race fixes
## v0.9.0-rc27 [04-23-2015]

View File

@ -693,7 +693,7 @@ func (fsm *RaftFSM) Apply(e *raft.LogEntry) error {
// Topics write their entries to segmented log files which contain a
// contiguous range of entries.
type Topic struct {
mu sync.Mutex
mu sync.RWMutex
id uint64 // unique identifier
index uint64 // current index
path string // on-disk path
@ -738,15 +738,15 @@ func (t *Topic) Truncated() bool {
// Index returns the highest replicated index for the topic.
func (t *Topic) Index() uint64 {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.RLock()
defer t.mu.RUnlock()
return t.index
}
// DataURLs returns the data node URLs subscribed to this topic
func (t *Topic) DataURLs() []url.URL {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.RLock()
defer t.mu.RUnlock()
var urls []url.URL
for u, _ := range t.indexByURL {
urls = append(urls, u)
@ -756,8 +756,8 @@ func (t *Topic) DataURLs() []url.URL {
// IndexForURL returns the highest index replicated for a given data URL.
func (t *Topic) IndexForURL(u url.URL) uint64 {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.RLock()
defer t.mu.RUnlock()
return t.indexByURL[u]
}
@ -770,8 +770,8 @@ func (t *Topic) SetIndexForURL(index uint64, u url.URL) {
// SegmentPath returns the path to a segment starting with a given log index.
func (t *Topic) SegmentPath(index uint64) string {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.RLock()
defer t.mu.RUnlock()
return t.segmentPath(index)
}

View File

@ -33,7 +33,7 @@ const (
// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
mu sync.RWMutex
path string // config file path
conns map[uint64]*Conn // all connections opened by client
url url.URL // current known leader URL
@ -68,8 +68,8 @@ func NewClient(dataURL url.URL) *Client {
// URL returns the current broker leader's URL.
func (c *Client) URL() url.URL {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
defer c.mu.RUnlock()
return c.url
}
@ -92,8 +92,8 @@ func (c *Client) setURL(u url.URL) {
// URLs returns a list of possible broker URLs to connect to.
func (c *Client) URLs() []url.URL {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
defer c.mu.RUnlock()
return c.urls
}
@ -223,6 +223,9 @@ func (c *Client) loadConfig() error {
// setConfig writes a new config to disk and updates urls on the client.
func (c *Client) setConfig(config ClientConfig) error {
c.mu.Lock()
defer c.mu.Unlock()
// Only write to disk if we have a path.
if c.path != "" {
// Open config file for writing.
@ -434,7 +437,7 @@ type clientConfigJSON struct {
// Conn represents a stream over the client for a single topic.
type Conn struct {
mu sync.Mutex
mu sync.RWMutex
topicID uint64 // topic identifier
index uint64 // highest index sent over the channel
streaming bool // use streaming reader, if true
@ -476,8 +479,8 @@ func (c *Conn) C() <-chan *Message { return c.c }
// Index returns the highest index replicated to the caller.
func (c *Conn) Index() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
defer c.mu.RUnlock()
return c.index
}
@ -490,15 +493,15 @@ func (c *Conn) SetIndex(index uint64) {
// Streaming returns true if the connection streams messages continuously.
func (c *Conn) Streaming() bool {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
defer c.mu.RUnlock()
return c.streaming
}
// URL returns the current URL of the connection.
func (c *Conn) URL() url.URL {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
defer c.mu.RUnlock()
return c.url
}
@ -591,9 +594,9 @@ func (c *Conn) Heartbeat() error {
var err error
// Retrieve the parameters under lock.
c.mu.Lock()
c.mu.RLock()
topicID, index, u := c.topicID, c.index, c.url
c.mu.Unlock()
c.mu.RUnlock()
// Send the message to the messages endpoint.
u.Path = "/messaging/heartbeat"

View File

@ -1255,8 +1255,8 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {
// User returns a user by username
// Returns nil if the user does not exist.
func (s *Server) User(name string) *User {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
return s.users[name]
}
@ -1280,6 +1280,8 @@ func (s *Server) UserCount() int {
// AdminUserExists returns whether at least 1 admin-level user exists.
func (s *Server) AdminUserExists() bool {
s.mu.RLock()
defer s.mu.RUnlock()
for _, u := range s.users {
if u.Admin {
return true
@ -1291,8 +1293,8 @@ func (s *Server) AdminUserExists() bool {
// Authenticate returns an authenticated user by username. If any error occurs,
// or the authentication credentials are invalid, an error is returned.
func (s *Server) Authenticate(username, password string) (*User, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
u := s.users[username]
@ -1451,8 +1453,8 @@ func (s *Server) applySetPrivilege(m *messaging.Message) error {
// RetentionPolicy returns a retention policy by name.
// Returns an error if the database doesn't exist.
func (s *Server) RetentionPolicy(database, name string) (*RetentionPolicy, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
// Lookup database.
db := s.databases[database]
@ -3866,8 +3868,8 @@ func (s *Server) applyDropContinuousQueryCommand(m *messaging.Message) error {
// RunContinuousQueries will run any continuous queries that are due to run and write the
// results back into the database
func (s *Server) RunContinuousQueries() error {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
for _, d := range s.databases {
for _, c := range d.continuousQueries {
@ -4090,8 +4092,8 @@ func (s *Server) CreateSnapshotWriter() (*SnapshotWriter, error) {
}
func (s *Server) URL() url.URL {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
if n := s.dataNodes[s.id]; n != nil {
return *n.URL
}

View File

@ -90,7 +90,9 @@ func (s *Stats) Walk(f func(string, int64)) {
defer s.mu.RUnlock()
for k, v := range s.m {
v.mu.RLock()
f(k, v.i)
v.mu.RUnlock()
}
}