Implement continuous queries with group by intervals
* Update defaults on Broker for when to trigger CQ runs * Add config settings for how often to calculate and recalculate CQ results * Update Server to have CQ settings * Update AST to fold removed time literals when setting time range * Add periodic CQ running functionality to serverpull/1285/head
parent
4217778a08
commit
f2d132b361
|
@ -32,9 +32,9 @@ type dataNodeStore interface {
|
|||
// NewBroker returns a new instance of a Broker with default values.
|
||||
func NewBroker() *Broker {
|
||||
b := &Broker{
|
||||
TriggerInterval: 1 * time.Second,
|
||||
TriggerTimeout: 2 * time.Second,
|
||||
TriggerFailurePause: 100 * time.Millisecond,
|
||||
TriggerInterval: 5 * time.Second,
|
||||
TriggerTimeout: 20 * time.Second,
|
||||
TriggerFailurePause: 1 * time.Second,
|
||||
}
|
||||
b.Broker = messaging.NewBroker()
|
||||
return b
|
||||
|
|
|
@ -107,7 +107,30 @@ type Config struct {
|
|||
} `toml:"logging"`
|
||||
|
||||
ContinuousQuery struct {
|
||||
}
|
||||
// when continuous queries are run we'll automatically recompute previous intervals
|
||||
// in case lagged data came in. Set to zero if you never have lagged data. We do
|
||||
// it this way because invalidating previously computed intervals would be insanely hard
|
||||
// and expensive.
|
||||
RecomputePreviousN int `toml:"recompute_previous_n"`
|
||||
// The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan
|
||||
// setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN
|
||||
// and have this set to 10m, then we'd only compute the previous two intervals for any
|
||||
// CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window
|
||||
RecomputeNoOlderThan Duration `toml:"recompute_no_older_than"`
|
||||
// ComputeRunsPerInterval will determine how many times the current and previous N intervals
|
||||
// will be computed. The group by time will be divided by this and it will get computed this many times:
|
||||
// group by time seconds / runs per interval
|
||||
// This will give partial results for current group by intervals and will determine how long it will
|
||||
// be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it
|
||||
// will be a minute past the previous 10m bucket of time before lagged data is picked up
|
||||
ComputeRunsPerInterval int `toml:"compute_runs_per_interval"`
|
||||
// ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller
|
||||
// group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting
|
||||
// to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN).
|
||||
// If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger
|
||||
// than 10m will get computed 10 times for each interval.
|
||||
ComputeNoMoreThan Duration `toml:"compute_no_more_than"`
|
||||
} `toml:"continuous_queries"`
|
||||
}
|
||||
|
||||
// NewConfig returns an instance of Config with reasonable defaults.
|
||||
|
@ -124,6 +147,12 @@ func NewConfig() *Config {
|
|||
c.Data.RetentionCheckPeriod = Duration(10 * time.Minute)
|
||||
c.Admin.Enabled = true
|
||||
c.Admin.Port = 8083
|
||||
c.Cluster.WriteBufferSize = 1000
|
||||
c.Cluster.MaxResponseBufferSize = 100
|
||||
c.ContinuousQuery.RecomputePreviousN = 2
|
||||
c.ContinuousQuery.RecomputeNoOlderThan = Duration(10 * time.Minute)
|
||||
c.ContinuousQuery.ComputeRunsPerInterval = 10
|
||||
c.ContinuousQuery.ComputeNoMoreThan = Duration(2 * time.Minute)
|
||||
|
||||
// Detect hostname (or set to localhost).
|
||||
if c.Hostname, _ = os.Hostname(); c.Hostname == "" {
|
||||
|
|
|
@ -55,7 +55,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
}
|
||||
|
||||
// Open server, initialize or join as necessary.
|
||||
s := openServer(config.DataDir(), config.DataURL(), b, initializing, configExists, joinURLs, logWriter)
|
||||
s := openServer(config, b, initializing, configExists, joinURLs, logWriter)
|
||||
s.SetAuthenticationEnabled(config.Authentication.Enabled)
|
||||
|
||||
// Enable retention policy enforcement if requested.
|
||||
|
@ -229,16 +229,21 @@ func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) {
|
|||
}
|
||||
|
||||
// creates and initializes a server.
|
||||
func openServer(path string, u *url.URL, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
|
||||
func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
|
||||
// Ignore if there's no existing server and we're not initializing or joining.
|
||||
if !fileExists(path) && !initializing && len(joinURLs) == 0 {
|
||||
if !fileExists(config.Data.Dir) && !initializing && len(joinURLs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create and open the server.
|
||||
s := influxdb.NewServer()
|
||||
s.SetLogOutput(w)
|
||||
if err := s.Open(path); err != nil {
|
||||
s.RecomputePreviousN = config.ContinuousQuery.RecomputePreviousN
|
||||
s.RecomputeNoOlderThan = time.Duration(config.ContinuousQuery.RecomputeNoOlderThan)
|
||||
s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval
|
||||
s.ComputeNoMoreThan = time.Duration(config.ContinuousQuery.ComputeNoMoreThan)
|
||||
|
||||
if err := s.Open(config.Data.Dir); err != nil {
|
||||
log.Fatalf("failed to open data server: %v", err.Error())
|
||||
}
|
||||
|
||||
|
@ -247,7 +252,7 @@ func openServer(path string, u *url.URL, b *influxdb.Broker, initializing, confi
|
|||
if len(joinURLs) == 0 {
|
||||
initializeServer(s, b, w)
|
||||
} else {
|
||||
joinServer(s, u, joinURLs)
|
||||
joinServer(s, config.DataURL(), joinURLs)
|
||||
openServerClient(s, joinURLs, w)
|
||||
}
|
||||
} else if !configExists {
|
||||
|
|
336
handler.go
336
handler.go
|
@ -1,336 +0,0 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/bmizerany/pat"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
)
|
||||
|
||||
// TODO: Standard response headers (see: HeaderHandler)
|
||||
// TODO: Compression (see: CompressionHeaderHandler)
|
||||
|
||||
// TODO: Check HTTP response codes: 400, 401, 403, 409.
|
||||
|
||||
// getUsernameAndPassword returns the username and password encoded in
|
||||
// a request. The credentials may be present as URL query params, or as
|
||||
// a Basic Authentication header.
|
||||
func getUsernameAndPassword(r *http.Request) (string, string, error) {
|
||||
q := r.URL.Query()
|
||||
username, password := q.Get("u"), q.Get("p")
|
||||
if username != "" && password != "" {
|
||||
return username, password, nil
|
||||
}
|
||||
auth := r.Header.Get("Authorization")
|
||||
if auth == "" {
|
||||
return "", "", nil
|
||||
}
|
||||
fields := strings.Split(auth, " ")
|
||||
if len(fields) != 2 {
|
||||
return "", "", fmt.Errorf("invalid Basic Authentication header")
|
||||
}
|
||||
bs, err := base64.StdEncoding.DecodeString(fields[1])
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("invalid Base64 encoding")
|
||||
}
|
||||
fields = strings.Split(string(bs), ":")
|
||||
if len(fields) != 2 {
|
||||
return "", "", fmt.Errorf("invalid Basic Authentication value")
|
||||
}
|
||||
return fields[0], fields[1], nil
|
||||
}
|
||||
|
||||
// Handler represents an HTTP handler for the InfluxDB server.
|
||||
type Handler struct {
|
||||
server *Server
|
||||
mux *pat.PatternServeMux
|
||||
|
||||
// Whether endpoints require authentication.
|
||||
AuthenticationEnabled bool
|
||||
|
||||
// The InfluxDB verion returned by the HTTP response header.
|
||||
Version string
|
||||
}
|
||||
|
||||
// NewHandler returns a new instance of Handler.
|
||||
func NewHandler(s *Server) *Handler {
|
||||
h := &Handler{
|
||||
server: s,
|
||||
mux: pat.New(),
|
||||
}
|
||||
|
||||
// Query serving route.
|
||||
h.mux.Get("/query", h.makeAuthenticationHandler(h.serveQuery))
|
||||
|
||||
// Data-ingest route.
|
||||
h.mux.Post("/write", h.makeAuthenticationHandler(h.serveWrite))
|
||||
|
||||
// Data node routes.
|
||||
h.mux.Get("/data_nodes", h.makeAuthenticationHandler(h.serveDataNodes))
|
||||
h.mux.Post("/data_nodes", h.makeAuthenticationHandler(h.serveCreateDataNode))
|
||||
h.mux.Del("/data_nodes/:id", h.makeAuthenticationHandler(h.serveDeleteDataNode))
|
||||
|
||||
// Utilities
|
||||
h.mux.Get("/metastore", h.makeAuthenticationHandler(h.serveMetastore))
|
||||
h.mux.Get("/ping", h.makeAuthenticationHandler(h.servePing))
|
||||
h.mux.Get("/process_continuous_queries", h.makeAuthenticationHandler(h.serveProcessContinuousQueries))
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// ServeHTTP responds to HTTP request to the handler.
|
||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Add("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Add("Access-Control-Max-Age", "2592000")
|
||||
w.Header().Add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE")
|
||||
w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept")
|
||||
w.Header().Add("X-Influxdb-Version", h.Version)
|
||||
|
||||
// If this is a CORS OPTIONS request then send back okie-dokie.
|
||||
if r.Method == "OPTIONS" {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise handle it via pat.
|
||||
h.mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// makeAuthenticationHandler takes a custom handler and returns a standard handler, ensuring that
|
||||
// if user credentials are passed in, an attempt is made to authenticate that user. If authentication
|
||||
// fails, an error is returned to the user.
|
||||
//
|
||||
// There is one exception: if there are no users in the system, authentication is not required. This
|
||||
// is to facilitate bootstrapping of a system with authentication enabled.
|
||||
func (h *Handler) makeAuthenticationHandler(fn func(http.ResponseWriter, *http.Request, *User)) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
var user *User
|
||||
if h.AuthenticationEnabled && len(h.server.Users()) > 0 {
|
||||
username, password, err := getUsernameAndPassword(r)
|
||||
if err != nil {
|
||||
h.error(w, err.Error(), http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if username == "" {
|
||||
h.error(w, "username required", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
user, err = h.server.Authenticate(username, password)
|
||||
if err != nil {
|
||||
h.error(w, err.Error(), http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
fn(w, r, user)
|
||||
}
|
||||
}
|
||||
|
||||
// serveQuery parses an incoming query and, if valid, executes the query.
|
||||
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, u *User) {
|
||||
q := r.URL.Query()
|
||||
p := influxql.NewParser(strings.NewReader(q.Get("q")))
|
||||
db := q.Get("db")
|
||||
|
||||
// Parse query from query string.
|
||||
query, err := p.ParseQuery()
|
||||
if err != nil {
|
||||
h.error(w, "error parsing query: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Execute query. One result will return for each statement.
|
||||
results := h.server.ExecuteQuery(query, db, u)
|
||||
|
||||
// If any statement errored then set the response status code.
|
||||
if results.Error() != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// Write resultset.
|
||||
_ = json.NewEncoder(w).Encode(results)
|
||||
}
|
||||
|
||||
type batchWrite struct {
|
||||
Points []Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// serveWrite receives incoming series data and writes it to the database.
|
||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, u *User) {
|
||||
var br batchWrite
|
||||
|
||||
dec := json.NewDecoder(r.Body)
|
||||
dec.UseNumber()
|
||||
|
||||
var writeError = func(result Result, statusCode int) {
|
||||
w.WriteHeader(statusCode)
|
||||
w.Header().Add("content-type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(&result)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if err := dec.Decode(&br); err != nil {
|
||||
if err.Error() == "EOF" {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
writeError(Result{Err: err}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if br.Database == "" {
|
||||
writeError(Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if h.server.databases[br.Database] == nil {
|
||||
writeError(Result{Err: fmt.Errorf("database not found: %q", br.Database)}, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO corylanou: Check if user can write to specified database
|
||||
//if !user_can_write(br.Database) {
|
||||
//writeError(&Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", u.Name)}, http.StatusUnauthorized)
|
||||
//return
|
||||
//}
|
||||
|
||||
for _, p := range br.Points {
|
||||
if p.Timestamp.IsZero() {
|
||||
p.Timestamp = br.Timestamp
|
||||
}
|
||||
if len(br.Tags) > 0 {
|
||||
for k, _ := range br.Tags {
|
||||
if p.Tags[k] == "" {
|
||||
p.Tags[k] = br.Tags[k]
|
||||
}
|
||||
}
|
||||
}
|
||||
if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []Point{p}); err != nil {
|
||||
writeError(Result{Err: err}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serveMetastore returns a copy of the metastore.
|
||||
func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request, u *User) {
|
||||
// Set headers.
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
w.Header().Set("Content-Disposition", `attachment; filename="meta"`)
|
||||
|
||||
if err := h.server.CopyMetastore(w); err != nil {
|
||||
h.error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
// servePing returns a simple response to let the client know the server is running.
|
||||
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request, u *User) {}
|
||||
|
||||
// serveDataNodes returns a list of all data nodes in the cluster.
|
||||
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request, u *User) {
|
||||
// Generate a list of objects for encoding to the API.
|
||||
a := make([]*dataNodeJSON, 0)
|
||||
for _, n := range h.server.DataNodes() {
|
||||
a = append(a, &dataNodeJSON{
|
||||
ID: n.ID,
|
||||
URL: n.URL.String(),
|
||||
})
|
||||
}
|
||||
|
||||
w.Header().Add("content-type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(a)
|
||||
}
|
||||
|
||||
// serveCreateDataNode creates a new data node in the cluster.
|
||||
func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request, _ *User) {
|
||||
// Read in data node from request body.
|
||||
var n dataNodeJSON
|
||||
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
|
||||
h.error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse the URL.
|
||||
u, err := url.Parse(n.URL)
|
||||
if err != nil {
|
||||
h.error(w, "invalid data node url", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Create the data node.
|
||||
if err := h.server.CreateDataNode(u); err == ErrDataNodeExists {
|
||||
h.error(w, err.Error(), http.StatusConflict)
|
||||
return
|
||||
} else if err != nil {
|
||||
h.error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Retrieve data node reference.
|
||||
node := h.server.DataNodeByURL(u)
|
||||
|
||||
// Create a new replica on the broker.
|
||||
if err := h.server.client.CreateReplica(node.ID); err != nil {
|
||||
h.error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
// Write new node back to client.
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
w.Header().Add("content-type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()})
|
||||
}
|
||||
|
||||
// serveDeleteDataNode removes an existing node.
|
||||
func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request, u *User) {
|
||||
// Parse node id.
|
||||
nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64)
|
||||
if err != nil {
|
||||
h.error(w, "invalid node id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Delete the node.
|
||||
if err := h.server.DeleteDataNode(nodeID); err == ErrDataNodeNotFound {
|
||||
h.error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
} else if err != nil {
|
||||
h.error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
type dataNodeJSON struct {
|
||||
ID uint64 `json:"id"`
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// error returns an error to the client in a standard format.
|
||||
func (h *Handler) error(w http.ResponseWriter, error string, code int) {
|
||||
// TODO: Return error as JSON.
|
||||
http.Error(w, error, code)
|
||||
}
|
||||
|
||||
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, u *User) {
|
||||
if err := h.server.RunContinuousQueries(); err != nil {
|
||||
h.error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
|
@ -728,7 +728,10 @@ func (s *SelectStatement) SetTimeRange(start, end time.Time) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.Condition = expr
|
||||
|
||||
// fold out any previously replaced time dimensios and set the condition
|
||||
s.Condition = Reduce(expr, nil)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
205
server.go
205
server.go
|
@ -108,6 +108,19 @@ type Server struct {
|
|||
Logger *log.Logger
|
||||
|
||||
authenticationEnabled bool
|
||||
|
||||
// continuous query settings
|
||||
RecomputePreviousN int
|
||||
RecomputeNoOlderThan time.Duration
|
||||
ComputeRunsPerInterval int
|
||||
ComputeNoMoreThan time.Duration
|
||||
|
||||
// This is the last time this data node has run continuous queries.
|
||||
// Keep this state in memory so if a broker makes a request in another second
|
||||
// to compute, it won't rerun CQs that have already been run. If this data node
|
||||
// is just getting the request after being off duty for running CQs then
|
||||
// it will recompute all of them
|
||||
lastContinuousQueryRun time.Time
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of Server.
|
||||
|
@ -2954,26 +2967,31 @@ func HashPassword(password string) ([]byte, error) {
|
|||
// each incoming event.
|
||||
type ContinuousQuery struct {
|
||||
Query string `json:"query"`
|
||||
cq *influxql.CreateContinuousQueryStatement
|
||||
// TODO: ParsedQuery *parser.SelectQuery
|
||||
|
||||
mu sync.Mutex
|
||||
cq *influxql.CreateContinuousQueryStatement
|
||||
lastRun time.Time
|
||||
}
|
||||
|
||||
// NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement
|
||||
func NewContinuousQuery(q string) (*ContinuousQuery, error) {
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cq, ok := stmt.(*influxql.CreateContinuousQueryStatement); ok {
|
||||
return &ContinuousQuery{
|
||||
Query: q,
|
||||
cq: cq,
|
||||
}, nil
|
||||
cq, ok := stmt.(*influxql.CreateContinuousQueryStatement)
|
||||
if !ok {
|
||||
return nil, errors.New("query isn't a continuous query")
|
||||
}
|
||||
|
||||
return nil, errors.New("query isn't a continuous query")
|
||||
return &ContinuousQuery{
|
||||
Query: q,
|
||||
cq: cq,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// applyCreateContinuousQueryCommand adds the continuous query to the database object and saves it to the metastore
|
||||
func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error {
|
||||
fmt.Println("applyCreateContinuousQueryCommand")
|
||||
var c createContinuousQueryCommand
|
||||
|
@ -3007,10 +3025,181 @@ func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// RunContinuousQueries will run any continuous queries that are due to run and write the
|
||||
// results back into the database
|
||||
func (s *Server) RunContinuousQueries() error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
for _, d := range s.databases {
|
||||
for _, c := range d.continuousQueries {
|
||||
if s.shouldRunContinuousQuery(c) {
|
||||
// go func(cq *ContinuousQuery) {
|
||||
s.runContinuousQuery(c)
|
||||
// }(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the
|
||||
// lastRunTime of the CQ and the rules for when to run set through the config to determine
|
||||
// if this CQ should be run
|
||||
func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool {
|
||||
cq.mu.Lock()
|
||||
defer cq.mu.Unlock()
|
||||
|
||||
// if it's not aggregated we don't run it
|
||||
if !cq.cq.Source.Aggregated() {
|
||||
return false
|
||||
}
|
||||
|
||||
// since it's aggregated we need to figure how often it should be run
|
||||
interval, err := cq.cq.Source.GroupByInterval()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// determine how often we should run this continuous query.
|
||||
// group by time / the number of times to compute
|
||||
computeEvery := time.Duration(interval.Nanoseconds()/int64(s.ComputeRunsPerInterval)) * time.Nanosecond
|
||||
// make sure we're running no more frequently than the setting in the config
|
||||
if computeEvery < s.ComputeNoMoreThan {
|
||||
computeEvery = s.ComputeNoMoreThan
|
||||
}
|
||||
|
||||
// if we've passed the amount of time since the last run, do it up
|
||||
if cq.lastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// runContinuousQuery will execute a continuous query
|
||||
// TODO: make this fan out to the cluster instead of running all the queries on this single data node
|
||||
func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
|
||||
cq.mu.Lock()
|
||||
defer cq.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
cq.lastRun = now
|
||||
|
||||
interval, err := cq.cq.Source.GroupByInterval()
|
||||
if err != nil || interval == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
startTime := now.Round(interval)
|
||||
if startTime.UnixNano() > now.UnixNano() {
|
||||
startTime = startTime.Add(-interval)
|
||||
}
|
||||
|
||||
if err := cq.cq.Source.SetTimeRange(startTime, startTime.Add(interval)); err != nil {
|
||||
log.Printf("cq error setting time range: %s\n", err.Error())
|
||||
}
|
||||
|
||||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
|
||||
log.Printf("cq error: %s. running: %s\n", err.Error(), cq.cq.String())
|
||||
}
|
||||
|
||||
for i := 0; i < s.RecomputePreviousN; i++ {
|
||||
// if we're already more time past the previous window than we're going to look back, stop
|
||||
if now.Sub(startTime) > s.RecomputeNoOlderThan {
|
||||
return
|
||||
}
|
||||
newStartTime := startTime.Add(-interval)
|
||||
|
||||
if err := cq.cq.Source.SetTimeRange(newStartTime, startTime); err != nil {
|
||||
log.Printf("cq error setting time range: %s\n", err.Error())
|
||||
}
|
||||
|
||||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
|
||||
log.Printf("cq error: %s. running: %s\n", err.Error(), cq.cq.String())
|
||||
}
|
||||
|
||||
startTime = newStartTime
|
||||
}
|
||||
}
|
||||
|
||||
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
|
||||
func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
||||
log.Printf("cq run: %s\n", cq.cq.String())
|
||||
|
||||
e, err := s.planSelectStatement(cq.cq.Source, cq.cq.Database)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Execute plan.
|
||||
ch, err := e.Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read all rows from channel and write them in
|
||||
// TODO paul: fill in db and retention policy when CQ parsing gets updated
|
||||
db := ""
|
||||
retentionPolicy := ""
|
||||
for row := range ch {
|
||||
points, err := s.convertRowToPoints(row)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO corylanou: implement batch writing
|
||||
for _, p := range points {
|
||||
_, err := s.WriteSeries(db, retentionPolicy, []Point{*p})
|
||||
if err != nil {
|
||||
log.Printf("cq write error: %s on: %s\n", err, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertRowToPoints will convert a query result Row into Points that can be written back in
|
||||
func (s *Server) convertRowToPoints(row *influxql.Row) ([]*Point, error) {
|
||||
// figure out which parts of the result are the time and which are the fields
|
||||
timeIndex := -1
|
||||
fieldIndexes := make(map[string]int)
|
||||
for i, c := range row.Columns {
|
||||
if c == "time" {
|
||||
timeIndex = i
|
||||
} else {
|
||||
fieldIndexes[c] = i
|
||||
}
|
||||
}
|
||||
|
||||
if timeIndex == -1 {
|
||||
return nil, errors.New("cq error finding time index in result")
|
||||
}
|
||||
|
||||
points := make([]*Point, 0, len(row.Values))
|
||||
for _, v := range row.Values {
|
||||
vals := make(map[string]interface{})
|
||||
for fieldName, fieldIndex := range fieldIndexes {
|
||||
vals[fieldName] = v[fieldIndex]
|
||||
}
|
||||
|
||||
p := &Point{
|
||||
Name: row.Name,
|
||||
Tags: row.Tags,
|
||||
Timestamp: v[timeIndex].(time.Time),
|
||||
Values: vals,
|
||||
}
|
||||
|
||||
points = append(points, p)
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
||||
// createContinuousQueryCommand is the raft command for creating a continuous query on a database
|
||||
type createContinuousQueryCommand struct {
|
||||
Query string `json:"query"`
|
||||
}
|
||||
|
|
|
@ -1123,12 +1123,9 @@ func TestServer_CreateContinuousQuery(t *testing.T) {
|
|||
t.Fatalf("error creating continuous query %s", err.Error())
|
||||
}
|
||||
|
||||
fmt.Println("Parsed Database: ", cq.Database)
|
||||
fmt.Println(cq.String())
|
||||
queries := s.ContinuousQueries("foo")
|
||||
cqObj, _ := influxdb.NewContinuousQuery(q)
|
||||
expected := []*influxdb.ContinuousQuery{cqObj}
|
||||
time.Sleep(time.Second)
|
||||
if !reflect.DeepEqual(queries, expected) {
|
||||
t.Fatalf("query not saved:\n\texp: %s\ngot: %s", mustMarshalJSON(expected), mustMarshalJSON(queries))
|
||||
}
|
||||
|
@ -1143,13 +1140,14 @@ func TestServer_CreateContinuousQuery(t *testing.T) {
|
|||
|
||||
// Ensure the server prevents a duplicate named continuous query from being created
|
||||
func TestServer_CreateContinuousQuery_ErrContinuousQueryExists(t *testing.T) {
|
||||
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
// Ensure the server returns an error when creating a continuous query on a database that doesn't exist
|
||||
func TestServer_CreateCreateContinuousQuery_ErrDatabaseNotFound(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
defer s.Close()
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
// Ensure the server returns an error when creating a continuous query on a retention policy that doesn't exist
|
||||
|
@ -1163,6 +1161,58 @@ func TestServer_CreateCreateContinuousQuery_ErrRetentionPolicyNotFound(t *testin
|
|||
}
|
||||
|
||||
// Create on an RP that doesn't exist
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
// Ensure
|
||||
func TestServer_RunContinuousQueries(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
defer s.Close()
|
||||
|
||||
// Create the "foo" database.
|
||||
if err := s.CreateDatabase("foo"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s.RecomputePreviousN = 2
|
||||
s.RecomputeNoOlderThan = 4 * time.Second
|
||||
s.ComputeRunsPerInterval = 5
|
||||
s.ComputeNoMoreThan = 2 * time.Second
|
||||
|
||||
// create and check
|
||||
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(5s) END"
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("error parsing query %s", err.Error())
|
||||
}
|
||||
cq := stmt.(*influxql.CreateContinuousQueryStatement)
|
||||
if err := s.CreateContinuousQuery(cq); err != nil {
|
||||
t.Fatalf("error creating continuous query %s", err.Error())
|
||||
}
|
||||
|
||||
// TODO: figure out how to actually test this
|
||||
t.Skip("pending")
|
||||
// fmt.Println("CQ 1")
|
||||
// s.RunContinuousQueries()
|
||||
// fmt.Println("CQ 2")
|
||||
// s.RunContinuousQueries()
|
||||
// time.Sleep(time.Second * 2)
|
||||
// fmt.Println("CQ 3")
|
||||
// s.RunContinuousQueries()
|
||||
// fmt.Println("CQ 4")
|
||||
// s.RunContinuousQueries()
|
||||
// time.Sleep(time.Second * 3)
|
||||
// fmt.Println("CQ 5")
|
||||
// s.RunContinuousQueries()
|
||||
// time.Sleep(time.Second * 3)
|
||||
// fmt.Println("CQ 6")
|
||||
// s.RunContinuousQueries()
|
||||
// time.Sleep(time.Second * 3)
|
||||
// fmt.Println("CQ 7")
|
||||
// s.RunContinuousQueries()
|
||||
}
|
||||
|
||||
func mustMarshalJSON(v interface{}) string {
|
||||
|
|
Loading…
Reference in New Issue