parent
98af8535bd
commit
a18107ed7a
10
broker.go
10
broker.go
|
@ -6,6 +6,7 @@ import (
|
|||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/messaging"
|
||||
|
@ -27,6 +28,7 @@ const (
|
|||
|
||||
// Broker represents an InfluxDB specific messaging broker.
|
||||
type Broker struct {
|
||||
mu sync.RWMutex
|
||||
*messaging.Broker
|
||||
client *http.Client
|
||||
|
||||
|
@ -54,12 +56,18 @@ func NewBroker() *Broker {
|
|||
|
||||
// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
|
||||
func (b *Broker) RunContinuousQueryLoop() {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
b.done = make(chan struct{})
|
||||
go b.continuousQueryLoop(b.done)
|
||||
}
|
||||
|
||||
// Close closes the broker.
|
||||
func (b *Broker) Close() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if b.done != nil {
|
||||
close(b.done)
|
||||
b.done = nil
|
||||
|
@ -121,6 +129,8 @@ func (b *Broker) runContinuousQueries() {
|
|||
}
|
||||
|
||||
func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
// Send request.
|
||||
cqURL.Path = "/data/process_continuous_queries"
|
||||
cqURL.Scheme = "http"
|
||||
|
|
Loading…
Reference in New Issue