diff --git a/broker.go b/broker.go index 8a1797de67..87024519b6 100644 --- a/broker.go +++ b/broker.go @@ -3,6 +3,7 @@ package influxdb import ( "fmt" "log" + "math/rand" "net/http" "net/url" "time" @@ -30,9 +31,6 @@ type Broker struct { done chan struct{} - // send CQ processing requests to the same data node - currentCQProcessingNode *url.URL - // variables to control when to trigger processing and when to timeout TriggerInterval time.Duration TriggerTimeout time.Duration @@ -81,42 +79,36 @@ func (b *Broker) continuousQueryLoop(done chan struct{}) { } func (b *Broker) runContinuousQueries() { - next := 0 - for { - // if the current node hasn't been set it's our first time or we're reset. move to the next one - if b.currentCQProcessingNode == nil { - topic := b.Broker.Topic(BroadcastTopicID) - if topic == nil { - log.Println("broker cq: no topics currently available.") - return // don't have any topics to get data urls from, give it up - } - dataURLs := topic.DataURLs() - if len(dataURLs) == 0 { - log.Println("broker cq: no data nodes currently available.") - return // don't have any data urls to try, give it up - } - next = next % len(dataURLs) - u := dataURLs[next] - b.currentCQProcessingNode = &u - next++ - } + topic := b.Broker.Topic(BroadcastTopicID) + if topic == nil { + log.Println("broker cq: no topics currently available.") + return // don't have any topics to get data urls from, give it up + } + dataURLs := topic.DataURLs() + if len(dataURLs) == 0 { + log.Println("broker cq: no data nodes currently available.") + return // don't have any data urls to try, give it up + } + rand.Seed(time.Now().UnixNano()) + // get a set of random indexes so we can randomly distribute cq load over nodes + ri := rand.Perm(len(dataURLs)) + for _, i := range ri { + u := dataURLs[i] // if no error, we're all good - err := b.requestContinuousQueryProcessing() + err := b.requestContinuousQueryProcessing(u) if err == nil { return } - log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode, err.Error()) + log.Printf("broker cq: error hitting data node: %s: %s\n", u.String(), err.Error()) - // reset and let the loop try the next data node in the cluster - b.currentCQProcessingNode = nil + // let the loop try the next data node in the cluster <-time.After(DefaultFailureSleep) } } -func (b *Broker) requestContinuousQueryProcessing() error { +func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error { // Send request. - cqURL := copyURL(b.currentCQProcessingNode) cqURL.Path = "/process_continuous_queries" cqURL.Scheme = "http" client := &http.Client{