make sure there are CQs before acquiring lease
parent
914a9a1de6
commit
c0df09d544
|
@ -192,11 +192,17 @@ func (s *Service) backgroundLoop() {
|
|||
s.Logger.Println("continuous query service terminating")
|
||||
return
|
||||
case req := <-s.RunCh:
|
||||
if !s.hasContinuousQueries() {
|
||||
continue
|
||||
}
|
||||
if _, err := s.MetaClient.AcquireLease(leaseName); err == nil {
|
||||
s.Logger.Printf("running continuous queries by request for time: %v", req.Now)
|
||||
s.runContinuousQueries(req)
|
||||
}
|
||||
case <-time.After(s.RunInterval):
|
||||
if !s.hasContinuousQueries() {
|
||||
continue
|
||||
}
|
||||
if _, err := s.MetaClient.AcquireLease(leaseName); err == nil {
|
||||
s.runContinuousQueries(&RunRequest{Now: time.Now()})
|
||||
}
|
||||
|
@ -204,6 +210,23 @@ func (s *Service) backgroundLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// hasContinuousQueries returns true if any CQs exist.
|
||||
func (s *Service) hasContinuousQueries() bool {
|
||||
// Get list of all databases.
|
||||
dbs, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
s.Logger.Println("error getting databases")
|
||||
return false
|
||||
}
|
||||
// Loop through all databases executing CQs.
|
||||
for _, db := range dbs {
|
||||
if len(db.ContinuousQueries) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// runContinuousQueries gets CQs from the meta store and runs them.
|
||||
func (s *Service) runContinuousQueries(req *RunRequest) {
|
||||
// Get list of all databases.
|
||||
|
|
Loading…
Reference in New Issue