diff --git a/broker.go b/broker.go index cf08797cdf..1f5296cccb 100644 --- a/broker.go +++ b/broker.go @@ -6,6 +6,7 @@ import ( "math/rand" "net/http" "net/url" + "sync" "time" "github.com/influxdb/influxdb/messaging" @@ -27,6 +28,7 @@ const ( // Broker represents an InfluxDB specific messaging broker. type Broker struct { + mu sync.RWMutex *messaging.Broker client *http.Client @@ -54,12 +56,18 @@ func NewBroker() *Broker { // RunContinuousQueryLoop starts running continuous queries on a background goroutine. func (b *Broker) RunContinuousQueryLoop() { + b.mu.Lock() + defer b.mu.Unlock() + b.done = make(chan struct{}) go b.continuousQueryLoop(b.done) } // Close closes the broker. func (b *Broker) Close() error { + b.mu.Lock() + defer b.mu.Unlock() + if b.done != nil { close(b.done) b.done = nil @@ -121,6 +129,8 @@ func (b *Broker) runContinuousQueries() { } func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error { + b.mu.RLock() + defer b.mu.RUnlock() // Send request. cqURL.Path = "/data/process_continuous_queries" cqURL.Scheme = "http"