refactor runContinousQueries

pull/2200/head
Cory LaNou 2015-04-08 15:14:59 -06:00
parent 3c91700da2
commit 913f8955bd
1 changed files with 20 additions and 28 deletions

View File

@ -3,6 +3,7 @@ package influxdb
import (
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"time"
@ -30,9 +31,6 @@ type Broker struct {
done chan struct{}
// send CQ processing requests to the same data node
currentCQProcessingNode *url.URL
// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
@ -81,42 +79,36 @@ func (b *Broker) continuousQueryLoop(done chan struct{}) {
}
func (b *Broker) runContinuousQueries() {
next := 0
for {
// if the current node hasn't been set it's our first time or we're reset. move to the next one
if b.currentCQProcessingNode == nil {
topic := b.Broker.Topic(BroadcastTopicID)
if topic == nil {
log.Println("broker cq: no topics currently available.")
return // don't have any topics to get data urls from, give it up
}
dataURLs := topic.DataURLs()
if len(dataURLs) == 0 {
log.Println("broker cq: no data nodes currently available.")
return // don't have any data urls to try, give it up
}
next = next % len(dataURLs)
u := dataURLs[next]
b.currentCQProcessingNode = &u
next++
}
topic := b.Broker.Topic(BroadcastTopicID)
if topic == nil {
log.Println("broker cq: no topics currently available.")
return // don't have any topics to get data urls from, give it up
}
dataURLs := topic.DataURLs()
if len(dataURLs) == 0 {
log.Println("broker cq: no data nodes currently available.")
return // don't have any data urls to try, give it up
}
rand.Seed(time.Now().UnixNano())
// get a set of random indexes so we can randomly distribute cq load over nodes
ri := rand.Perm(len(dataURLs))
for _, i := range ri {
u := dataURLs[i]
// if no error, we're all good
err := b.requestContinuousQueryProcessing()
err := b.requestContinuousQueryProcessing(u)
if err == nil {
return
}
log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode, err.Error())
log.Printf("broker cq: error hitting data node: %s: %s\n", u.String(), err.Error())
// reset and let the loop try the next data node in the cluster
b.currentCQProcessingNode = nil
// let the loop try the next data node in the cluster
<-time.After(DefaultFailureSleep)
}
}
func (b *Broker) requestContinuousQueryProcessing() error {
func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error {
// Send request.
cqURL := copyURL(b.currentCQProcessingNode)
cqURL.Path = "/process_continuous_queries"
cqURL.Scheme = "http"
client := &http.Client{