influxdb/broker.go

131 lines
3.3 KiB
Go

package influxdb
import (
"fmt"
"log"
"net/http"
"time"
"github.com/influxdb/influxdb/messaging"
)
// Broker represents an InfluxDB specific messaging broker.
type Broker struct {
*messaging.Broker
done chan struct{}
// send CQ processing requests to the same data node
currentCQProcessingNode *messaging.Replica
// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
TriggerFailurePause time.Duration
}
const (
// DefaultContinuousQueryCheckTime is how frequently the broker will ask a data node
// in the cluster to run any continuous queries that should be run.
DefaultContinuousQueryCheckTime = 1 * time.Second
// DefaultDataNodeTimeout is how long the broker will wait before timing out on a data node
// that it has requested process continuous queries.
DefaultDataNodeTimeout = 1 * time.Second
// DefaultFailureSleep is how long the broker will sleep before trying the next data node in
// the cluster if the current data node failed to respond
DefaultFailureSleep = 100 * time.Millisecond
)
// NewBroker returns a new instance of a Broker with default values.
func NewBroker() *Broker {
b := &Broker{
TriggerInterval: 5 * time.Second,
TriggerTimeout: 20 * time.Second,
TriggerFailurePause: 1 * time.Second,
}
b.Broker = messaging.NewBroker()
return b
}
// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
func (b *Broker) RunContinuousQueryLoop() {
b.done = make(chan struct{})
go b.continuousQueryLoop(b.done)
}
// Close closes the broker.
func (b *Broker) Close() error {
if b.done != nil {
close(b.done)
b.done = nil
}
return b.Broker.Close()
}
func (b *Broker) continuousQueryLoop(done chan struct{}) {
for {
// Check if broker is currently leader.
if b.Broker.IsLeader() {
b.runContinuousQueries()
}
// Sleep until either the broker is closed or we need to run continuous queries again
select {
case <-done:
return
case <-time.After(DefaultContinuousQueryCheckTime):
}
}
}
func (b *Broker) runContinuousQueries() {
next := 0
for {
// if the current node hasn't been set it's our first time or we're reset. move to the next one
if b.currentCQProcessingNode == nil {
dataNodes := b.Broker.Replicas()
if len(dataNodes) == 0 {
return // don't have any nodes to try, give it up
}
next = next % len(dataNodes)
b.currentCQProcessingNode = dataNodes[next]
next++
}
// if no error, we're all good
err := b.requestContinuousQueryProcessing()
if err == nil {
return
}
log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode.URL, err.Error())
// reset and let the loop try the next data node in the cluster
b.currentCQProcessingNode = nil
<-time.After(DefaultFailureSleep)
}
}
func (b *Broker) requestContinuousQueryProcessing() error {
// Send request.
cqURL := copyURL(b.currentCQProcessingNode.URL)
cqURL.Path = "/process_continuous_queries"
cqURL.Scheme = "http"
client := &http.Client{
Timeout: DefaultDataNodeTimeout,
}
resp, err := client.Post(cqURL.String(), "application/octet-stream", nil)
if err != nil {
return err
}
defer resp.Body.Close()
// Check if created.
if resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("request returned status %s", resp.Status)
}
return nil
}