Merge pull request #7493 from influxdata/cjl-7431-remove-cq-endpoint-httpd
Remove ProcessContinousQueries from httpd endpointpull/7507/head
commit
e35178870e
|
@ -51,6 +51,7 @@
|
||||||
- [#7161](https://github.com/influxdata/influxdb/issues/7161): Drop measurement causes cache max memory exceeded error.
|
- [#7161](https://github.com/influxdata/influxdb/issues/7161): Drop measurement causes cache max memory exceeded error.
|
||||||
- [#7334](https://github.com/influxdata/influxdb/issues/7334): Panic with unread show series iterators during drop database
|
- [#7334](https://github.com/influxdata/influxdb/issues/7334): Panic with unread show series iterators during drop database
|
||||||
- [#7482](https://github.com/influxdata/influxdb/issues/7482): Fix issue where point would be written to wrong shard.
|
- [#7482](https://github.com/influxdata/influxdb/issues/7482): Fix issue where point would be written to wrong shard.
|
||||||
|
- [#7431](https://github.com/influxdata/influxdb/issues/7431): Remove /data/process_continuous_queries endpoint.
|
||||||
|
|
||||||
## v1.0.2 [2016-10-05]
|
## v1.0.2 [2016-10-05]
|
||||||
|
|
||||||
|
|
|
@ -267,13 +267,6 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
|
||||||
srv.Handler.PointsWriter = s.PointsWriter
|
srv.Handler.PointsWriter = s.PointsWriter
|
||||||
srv.Handler.Version = s.buildInfo.Version
|
srv.Handler.Version = s.buildInfo.Version
|
||||||
|
|
||||||
// If a ContinuousQuerier service has been started, attach it.
|
|
||||||
for _, srvc := range s.Services {
|
|
||||||
if cqsrvc, ok := srvc.(continuous_querier.ContinuousQuerier); ok {
|
|
||||||
srv.Handler.ContinuousQuerier = cqsrvc
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Services = append(s.Services, srv)
|
s.Services = append(s.Services, srv)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6268,15 +6268,6 @@ func TestServer_ContinuousQuery(t *testing.T) {
|
||||||
// Run first test to create CQs.
|
// Run first test to create CQs.
|
||||||
runTest(&test, t)
|
runTest(&test, t)
|
||||||
|
|
||||||
// Trigger CQs to run.
|
|
||||||
u := fmt.Sprintf("%s/data/process_continuous_queries?time=%d", s.URL(), interval0.UnixNano())
|
|
||||||
if _, err := s.HTTPPost(u, nil); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for CQs to run. TODO: fix this ugly hack
|
|
||||||
//time.Sleep(time.Second * 5)
|
|
||||||
|
|
||||||
// Setup tests to check the CQ results.
|
// Setup tests to check the CQ results.
|
||||||
test2 := NewTest("db0", "rp1")
|
test2 := NewTest("db0", "rp1")
|
||||||
test2.addQueries([]*Query{
|
test2.addQueries([]*Query{
|
||||||
|
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"github.com/influxdata/influxdb/influxql"
|
"github.com/influxdata/influxdb/influxql"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/monitor"
|
"github.com/influxdata/influxdb/monitor"
|
||||||
"github.com/influxdata/influxdb/services/continuous_querier"
|
|
||||||
"github.com/influxdata/influxdb/services/meta"
|
"github.com/influxdata/influxdb/services/meta"
|
||||||
"github.com/influxdata/influxdb/tsdb"
|
"github.com/influxdata/influxdb/tsdb"
|
||||||
"github.com/influxdata/influxdb/uuid"
|
"github.com/influxdata/influxdb/uuid"
|
||||||
|
@ -89,8 +88,6 @@ type Handler struct {
|
||||||
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
|
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
|
||||||
}
|
}
|
||||||
|
|
||||||
ContinuousQuerier continuous_querier.ContinuousQuerier
|
|
||||||
|
|
||||||
Config *Config
|
Config *Config
|
||||||
Logger *log.Logger
|
Logger *log.Logger
|
||||||
CLFLogger *log.Logger
|
CLFLogger *log.Logger
|
||||||
|
@ -144,11 +141,6 @@ func NewHandler(c Config) *Handler {
|
||||||
"status-head",
|
"status-head",
|
||||||
"HEAD", "/status", false, true, h.serveStatus,
|
"HEAD", "/status", false, true, h.serveStatus,
|
||||||
},
|
},
|
||||||
// TODO: (corylanou) remove this and associated code
|
|
||||||
Route{ // Tell data node to run CQs that should be run
|
|
||||||
"process-continuous-queries",
|
|
||||||
"POST", "/data/process_continuous_queries", false, false, h.serveProcessContinuousQueries,
|
|
||||||
},
|
|
||||||
}...)
|
}...)
|
||||||
|
|
||||||
return h
|
return h
|
||||||
|
@ -184,7 +176,6 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
|
||||||
Tags: tags,
|
Tags: tags,
|
||||||
Values: map[string]interface{}{
|
Values: map[string]interface{}{
|
||||||
statRequest: atomic.LoadInt64(&h.stats.Requests),
|
statRequest: atomic.LoadInt64(&h.stats.Requests),
|
||||||
statCQRequest: atomic.LoadInt64(&h.stats.CQRequests),
|
|
||||||
statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests),
|
statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests),
|
||||||
statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests),
|
statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests),
|
||||||
statPingRequest: atomic.LoadInt64(&h.stats.PingRequests),
|
statPingRequest: atomic.LoadInt64(&h.stats.PingRequests),
|
||||||
|
@ -279,47 +270,6 @@ func (h *Handler) writeHeader(w http.ResponseWriter, code int) {
|
||||||
w.WriteHeader(code)
|
w.WriteHeader(code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
|
||||||
atomic.AddInt64(&h.stats.CQRequests, 1)
|
|
||||||
|
|
||||||
// If the continuous query service isn't configured, return 404.
|
|
||||||
if h.ContinuousQuerier == nil {
|
|
||||||
h.writeHeader(w, http.StatusNotImplemented)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
q := r.URL.Query()
|
|
||||||
|
|
||||||
// Get the database name (blank means all databases).
|
|
||||||
db := q.Get("db")
|
|
||||||
// Get the name of the CQ to run (blank means run all).
|
|
||||||
name := q.Get("name")
|
|
||||||
// Get the time for which the CQ should be evaluated.
|
|
||||||
t := time.Now()
|
|
||||||
var err error
|
|
||||||
s := q.Get("time")
|
|
||||||
if s != "" {
|
|
||||||
t, err = time.Parse(time.RFC3339Nano, s)
|
|
||||||
if err != nil {
|
|
||||||
// Try parsing as an int64 nanosecond timestamp.
|
|
||||||
i, err := strconv.ParseInt(s, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
h.writeHeader(w, http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t = time.Unix(0, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pass the request to the CQ service.
|
|
||||||
if err := h.ContinuousQuerier.Run(db, name, t); err != nil {
|
|
||||||
h.writeHeader(w, http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
h.writeHeader(w, http.StatusNoContent)
|
|
||||||
}
|
|
||||||
|
|
||||||
// serveQuery parses an incoming query and, if valid, executes the query.
|
// serveQuery parses an incoming query and, if valid, executes the query.
|
||||||
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||||
atomic.AddInt64(&h.stats.QueryRequests, 1)
|
atomic.AddInt64(&h.stats.QueryRequests, 1)
|
||||||
|
|
|
@ -20,7 +20,6 @@ import (
|
||||||
// statistics gathered by the httpd package.
|
// statistics gathered by the httpd package.
|
||||||
const (
|
const (
|
||||||
statRequest = "req" // Number of HTTP requests served
|
statRequest = "req" // Number of HTTP requests served
|
||||||
statCQRequest = "cqReq" // Number of CQ-execute requests served
|
|
||||||
statQueryRequest = "queryReq" // Number of query requests served
|
statQueryRequest = "queryReq" // Number of query requests served
|
||||||
statWriteRequest = "writeReq" // Number of write requests serverd
|
statWriteRequest = "writeReq" // Number of write requests serverd
|
||||||
statPingRequest = "pingReq" // Number of ping requests served
|
statPingRequest = "pingReq" // Number of ping requests served
|
||||||
|
|
Loading…
Reference in New Issue