diff --git a/messaging/broker.go b/messaging/broker.go index 36092598c5..040ac65848 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -693,7 +693,7 @@ func (fsm *RaftFSM) Apply(e *raft.LogEntry) error { // Topics write their entries to segmented log files which contain a // contiguous range of entries. type Topic struct { - mu sync.Mutex + mu sync.RWMutex id uint64 // unique identifier index uint64 // current index path string // on-disk path @@ -738,15 +738,15 @@ func (t *Topic) Truncated() bool { // Index returns the highest replicated index for the topic. func (t *Topic) Index() uint64 { - t.mu.Lock() - defer t.mu.Unlock() + t.mu.RLock() + defer t.mu.RUnlock() return t.index } // DataURLs returns the data node URLs subscribed to this topic func (t *Topic) DataURLs() []url.URL { - t.mu.Lock() - defer t.mu.Unlock() + t.mu.RLock() + defer t.mu.RUnlock() var urls []url.URL for u, _ := range t.indexByURL { urls = append(urls, u) @@ -756,8 +756,8 @@ func (t *Topic) DataURLs() []url.URL { // IndexForURL returns the highest index replicated for a given data URL. func (t *Topic) IndexForURL(u url.URL) uint64 { - t.mu.Lock() - defer t.mu.Unlock() + t.mu.RLock() + defer t.mu.RUnlock() return t.indexByURL[u] } @@ -770,8 +770,8 @@ func (t *Topic) SetIndexForURL(index uint64, u url.URL) { // SegmentPath returns the path to a segment starting with a given log index. func (t *Topic) SegmentPath(index uint64) string { - t.mu.Lock() - defer t.mu.Unlock() + t.mu.RLock() + defer t.mu.RUnlock() return t.segmentPath(index) }