Merge pull request #1978 from influxdata/feature/kapa-task

Update kapacitor rules API to include dbrps, various dates, error, and executing
pull/10616/head
Andrew Watkins 2017-09-12 09:22:22 -07:00 committed by GitHub
commit b6c68a6f4f
8 changed files with 982 additions and 704 deletions

View File

@ -2,12 +2,14 @@
### Bug Fixes
### Features
1. [#1885](https://github.com/influxdata/chronograf/pull/1885): Add `fill` options to data explorer and dashboard queries
1. [#1978](https://github.com/influxdata/chronograf/pull/1978): Support editing kapacitor TICKScript
### UI Improvements
## v1.3.8.1 [unreleased]
### Bug Fixes
### Features
### UI Improvements
## v1.3.8.0 [2017-09-07]

View File

@ -441,19 +441,32 @@ type SourcesStore interface {
Update(context.Context, Source) error
}
type DBRP struct {
DB string `json:"db"`
RP string `json:"rp"`
}
// AlertRule represents rules for building a tickscript alerting task
type AlertRule struct {
ID string `json:"id,omitempty"` // ID is the unique ID of the alert
TICKScript TICKScript `json:"tickscript"` // TICKScript is the raw tickscript associated with this Alert
Query *QueryConfig `json:"query"` // Query is the filter of data for the alert.
Every string `json:"every"` // Every how often to check for the alerting criteria
Alerts []string `json:"alerts"` // Alerts name all the services to notify (e.g. pagerduty)
AlertNodes []KapacitorNode `json:"alertNodes,omitempty"` // AlertNodes define additional arguments to alerts
Message string `json:"message"` // Message included with alert
Details string `json:"details"` // Details is generally used for the Email alert. If empty will not be added.
Trigger string `json:"trigger"` // Trigger is a type that defines when to trigger the alert
TriggerValues TriggerValues `json:"values"` // Defines the values that cause the alert to trigger
Name string `json:"name"` // Name is the user-defined name for the alert
ID string `json:"id,omitempty"` // ID is the unique ID of the alert
TICKScript TICKScript `json:"tickscript"` // TICKScript is the raw tickscript associated with this Alert
Query *QueryConfig `json:"query"` // Query is the filter of data for the alert.
Every string `json:"every"` // Every how often to check for the alerting criteria
Alerts []string `json:"alerts"` // Alerts name all the services to notify (e.g. pagerduty)
AlertNodes []KapacitorNode `json:"alertNodes,omitempty"` // AlertNodes define additional arguments to alerts
Message string `json:"message"` // Message included with alert
Details string `json:"details"` // Details is generally used for the Email alert. If empty will not be added.
Trigger string `json:"trigger"` // Trigger is a type that defines when to trigger the alert
TriggerValues TriggerValues `json:"values"` // Defines the values that cause the alert to trigger
Name string `json:"name"` // Name is the user-defined name for the alert
Type string `json:"type"` // Represents the task type where stream is data streamed to kapacitor and batch is queried by kapacitor
DBRPs []DBRP `json:"dbrps"` // List of database retention policy pairs the task is allowed to access
Status string `json:"status"` // Represents if this rule is enabled or disabled in kapacitor
Executing bool `json:"executing"` // Whether the task is currently executing
Error string `json:"error"` // Any error encountered when kapacitor executes the task
Created time.Time `json:"created"` // Date the task was first created
Modified time.Time `json:"modified"` // Date the task was last modified
LastEnabled time.Time `json:"last-enabled,omitempty"` // Date the task was last set to status enabled
}
// TICKScript task to be used by kapacitor

View File

@ -57,6 +57,46 @@ type Task struct {
TICKScript chronograf.TICKScript // TICKScript is the running script
}
// NewTask creates a task from a kapacitor client task
func NewTask(task *client.Task) *Task {
dbrps := make([]chronograf.DBRP, len(task.DBRPs))
for i := range task.DBRPs {
dbrps[i].DB = task.DBRPs[i].Database
dbrps[i].RP = task.DBRPs[i].RetentionPolicy
}
script := chronograf.TICKScript(task.TICKscript)
rule, err := Reverse(script)
if err != nil {
rule = chronograf.AlertRule{
Name: task.ID,
Query: nil,
}
}
rule.ID = task.ID
rule.TICKScript = script
rule.Type = task.Type.String()
rule.DBRPs = dbrps
rule.Status = task.Status.String()
rule.Executing = task.Executing
rule.Error = task.Error
rule.Created = task.Created
rule.Modified = task.Modified
rule.LastEnabled = task.LastEnabled
return &Task{
ID: task.ID,
Href: task.Link.Href,
HrefOutput: HrefOutput(task.ID),
Rule: rule,
}
}
// HrefOutput returns the link to a kapacitor task httpOut Node given an id
func HrefOutput(ID string) string {
return fmt.Sprintf("/kapacitor/v1/tasks/%s/%s", ID, HTTPEndpoint)
}
// Href returns the link to a kapacitor task given an id
func (c *Client) Href(ID string) string {
return fmt.Sprintf("/kapacitor/v1/tasks/%s", ID)
@ -64,16 +104,69 @@ func (c *Client) Href(ID string) string {
// HrefOutput returns the link to a kapacitor task httpOut Node given an id
func (c *Client) HrefOutput(ID string) string {
return fmt.Sprintf("/kapacitor/v1/tasks/%s/%s", ID, HTTPEndpoint)
return HrefOutput(ID)
}
// Create builds and POSTs a tickscript to kapacitor
func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task, error) {
var opt *client.CreateTaskOptions
var err error
if rule.Query != nil {
opt, err = c.createFromQueryConfig(rule)
} else {
opt, err = c.createFromTick(rule)
}
if err != nil {
return nil, err
}
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
if err != nil {
return nil, err
}
task, err := kapa.CreateTask(*opt)
if err != nil {
return nil, err
}
return NewTask(&task), nil
}
func (c *Client) createFromTick(rule chronograf.AlertRule) (*client.CreateTaskOptions, error) {
dbrps := make([]client.DBRP, len(rule.DBRPs))
for i := range rule.DBRPs {
dbrps[i] = client.DBRP{
Database: rule.DBRPs[i].DB,
RetentionPolicy: rule.DBRPs[i].RP,
}
}
status := client.Enabled
if rule.Status != "" {
if err := status.UnmarshalText([]byte(rule.Status)); err != nil {
return nil, err
}
}
taskType := client.StreamTask
if rule.Type != "stream" {
if err := taskType.UnmarshalText([]byte(rule.Type)); err != nil {
return nil, err
}
}
return &client.CreateTaskOptions{
ID: rule.ID,
Type: taskType,
DBRPs: dbrps,
TICKscript: string(rule.TICKScript),
Status: status,
}, nil
}
func (c *Client) createFromQueryConfig(rule chronograf.AlertRule) (*client.CreateTaskOptions, error) {
id, err := c.ID.Generate()
if err != nil {
return nil, err
@ -85,24 +178,12 @@ func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task,
}
kapaID := Prefix + id
rule.ID = kapaID
task, err := kapa.CreateTask(client.CreateTaskOptions{
return &client.CreateTaskOptions{
ID: kapaID,
Type: toTask(rule.Query),
DBRPs: []client.DBRP{{Database: rule.Query.Database, RetentionPolicy: rule.Query.RetentionPolicy}},
TICKscript: string(script),
Status: client.Enabled,
})
if err != nil {
return nil, err
}
return &Task{
ID: kapaID,
Href: task.Link.Href,
HrefOutput: c.HrefOutput(kapaID),
TICKScript: script,
Rule: c.Reverse(kapaID, script),
}, nil
}
@ -130,12 +211,7 @@ func (c *Client) updateStatus(ctx context.Context, href string, status client.Ta
return nil, err
}
return &Task{
ID: task.ID,
Href: task.Link.Href,
HrefOutput: c.HrefOutput(task.ID),
TICKScript: chronograf.TICKScript(task.TICKscript),
}, nil
return NewTask(&task), nil
}
// Disable changes the tickscript status to disabled for a given href.
@ -148,30 +224,6 @@ func (c *Client) Enable(ctx context.Context, href string) (*Task, error) {
return c.updateStatus(ctx, href, client.Enabled)
}
// AllStatus returns the status of all tasks in kapacitor
func (c *Client) AllStatus(ctx context.Context) (map[string]string, error) {
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
if err != nil {
return nil, err
}
// Only get the status, id and link section back
opts := &client.ListTasksOptions{
Fields: []string{"status"},
}
tasks, err := kapa.ListTasks(opts)
if err != nil {
return nil, err
}
taskStatuses := map[string]string{}
for _, task := range tasks {
taskStatuses[task.ID] = task.Status.String()
}
return taskStatuses, nil
}
// Status returns the status of a task in kapacitor
func (c *Client) Status(ctx context.Context, href string) (string, error) {
s, err := c.status(ctx, href)
@ -196,7 +248,7 @@ func (c *Client) status(ctx context.Context, href string) (client.TaskStatus, er
}
// All returns all tasks in kapacitor
func (c *Client) All(ctx context.Context) (map[string]chronograf.AlertRule, error) {
func (c *Client) All(ctx context.Context) (map[string]*Task, error) {
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
if err != nil {
return nil, err
@ -209,22 +261,11 @@ func (c *Client) All(ctx context.Context) (map[string]chronograf.AlertRule, erro
return nil, err
}
alerts := map[string]chronograf.AlertRule{}
all := map[string]*Task{}
for _, task := range tasks {
script := chronograf.TICKScript(task.TICKscript)
if rule, err := Reverse(script); err != nil {
alerts[task.ID] = chronograf.AlertRule{
ID: task.ID,
Name: task.ID,
TICKScript: script,
}
} else {
rule.ID = task.ID
rule.TICKScript = script
alerts[task.ID] = rule
}
all[task.ID] = NewTask(&task)
}
return alerts, nil
return all, nil
}
// Reverse builds a chronograf.AlertRule and its QueryConfig from a tickscript
@ -244,19 +285,18 @@ func (c *Client) Reverse(id string, script chronograf.TICKScript) chronograf.Ale
}
// Get returns a single alert in kapacitor
func (c *Client) Get(ctx context.Context, id string) (chronograf.AlertRule, error) {
func (c *Client) Get(ctx context.Context, id string) (*Task, error) {
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
if err != nil {
return chronograf.AlertRule{}, err
return nil, err
}
href := c.Href(id)
task, err := kapa.Task(client.Link{Href: href}, nil)
if err != nil {
return chronograf.AlertRule{}, chronograf.ErrAlertNotFound
return nil, chronograf.ErrAlertNotFound
}
script := chronograf.TICKScript(task.TICKscript)
return c.Reverse(task.ID, script), nil
return NewTask(&task), nil
}
// Update changes the tickscript of a given id.
@ -266,30 +306,19 @@ func (c *Client) Update(ctx context.Context, href string, rule chronograf.AlertR
return nil, err
}
script, err := c.Ticker.Generate(rule)
if err != nil {
return nil, err
}
prevStatus, err := c.status(ctx, href)
if err != nil {
return nil, err
}
// We need to disable the kapacitor task followed by enabling it during update.
opts := client.UpdateTaskOptions{
TICKscript: string(script),
Status: client.Disabled,
Type: toTask(rule.Query),
DBRPs: []client.DBRP{
{
Database: rule.Query.Database,
RetentionPolicy: rule.Query.RetentionPolicy,
},
},
var opt *client.UpdateTaskOptions
if rule.Query != nil {
opt, err = c.updateFromQueryConfig(rule)
} else {
opt, err = c.updateFromTick(rule)
}
task, err := kapa.UpdateTask(client.Link{Href: href}, opts)
task, err := kapa.UpdateTask(client.Link{Href: href}, *opt)
if err != nil {
return nil, err
}
@ -301,12 +330,51 @@ func (c *Client) Update(ctx context.Context, href string, rule chronograf.AlertR
}
}
return &Task{
ID: task.ID,
Href: task.Link.Href,
HrefOutput: c.HrefOutput(task.ID),
TICKScript: script,
Rule: c.Reverse(task.ID, script),
return NewTask(&task), nil
}
func (c *Client) updateFromQueryConfig(rule chronograf.AlertRule) (*client.UpdateTaskOptions, error) {
script, err := c.Ticker.Generate(rule)
if err != nil {
return nil, err
}
// We need to disable the kapacitor task followed by enabling it during update.
return &client.UpdateTaskOptions{
TICKscript: string(script),
Status: client.Disabled,
Type: toTask(rule.Query),
DBRPs: []client.DBRP{
{
Database: rule.Query.Database,
RetentionPolicy: rule.Query.RetentionPolicy,
},
},
}, nil
}
func (c *Client) updateFromTick(rule chronograf.AlertRule) (*client.UpdateTaskOptions, error) {
dbrps := make([]client.DBRP, len(rule.DBRPs))
for i := range rule.DBRPs {
dbrps[i] = client.DBRP{
Database: rule.DBRPs[i].DB,
RetentionPolicy: rule.DBRPs[i].RP,
}
}
taskType := client.StreamTask
if rule.Type != "stream" {
if err := taskType.UnmarshalText([]byte(rule.Type)); err != nil {
return nil, err
}
}
// We need to disable the kapacitor task followed by enabling it during update.
return &client.UpdateTaskOptions{
TICKscript: string(rule.TICKScript),
Status: client.Disabled,
Type: taskType,
DBRPs: dbrps,
}, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -319,7 +319,7 @@ func (h *Service) KapacitorRulesPost(w http.ResponseWriter, r *http.Request) {
Error(w, http.StatusInternalServerError, err.Error(), h.Logger)
return
}
res := newAlertResponse(task.Rule, task.TICKScript, task.Href, task.HrefOutput, "enabled", srv.SrcID, srv.ID)
res := newAlertResponse(task, srv.SrcID, srv.ID)
w.Header().Add("Location", res.Links.Self)
encodeJSON(w, http.StatusCreated, res, h.Logger)
}
@ -332,22 +332,18 @@ type alertLinks struct {
type alertResponse struct {
chronograf.AlertRule
TICKScript string `json:"tickscript"`
Status string `json:"status"`
Links alertLinks `json:"links"`
Links alertLinks `json:"links"`
}
// newAlertResponse formats task into an alertResponse
func newAlertResponse(rule chronograf.AlertRule, tickScript chronograf.TICKScript, href, hrefOutput string, status string, srcID, kapaID int) alertResponse {
res := alertResponse{
AlertRule: rule,
func newAlertResponse(task *kapa.Task, srcID, kapaID int) *alertResponse {
res := &alertResponse{
AlertRule: task.Rule,
Links: alertLinks{
Self: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/rules/%s", srcID, kapaID, rule.ID),
Kapacitor: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/proxy?path=%s", srcID, kapaID, url.QueryEscape(href)),
Output: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/proxy?path=%s", srcID, kapaID, url.QueryEscape(hrefOutput)),
Self: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/rules/%s", srcID, kapaID, task.ID),
Kapacitor: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/proxy?path=%s", srcID, kapaID, url.QueryEscape(task.Href)),
Output: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/proxy?path=%s", srcID, kapaID, url.QueryEscape(task.HrefOutput)),
},
TICKScript: string(tickScript),
Status: status,
}
if res.Alerts == nil {
@ -471,7 +467,7 @@ func (h *Service) KapacitorRulesPut(w http.ResponseWriter, r *http.Request) {
Error(w, http.StatusInternalServerError, err.Error(), h.Logger)
return
}
res := newAlertResponse(task.Rule, task.TICKScript, task.Href, task.HrefOutput, "enabled", srv.SrcID, srv.ID)
res := newAlertResponse(task, srv.SrcID, srv.ID)
encodeJSON(w, http.StatusOK, res, h.Logger)
}
@ -523,7 +519,7 @@ func (h *Service) KapacitorRulesStatus(w http.ResponseWriter, r *http.Request) {
}
// Check if the rule exists and is scoped correctly
alert, err := c.Get(ctx, tid)
_, err = c.Get(ctx, tid)
if err != nil {
if err == chronograf.ErrAlertNotFound {
notFound(w, id, h.Logger)
@ -545,7 +541,7 @@ func (h *Service) KapacitorRulesStatus(w http.ResponseWriter, r *http.Request) {
return
}
res := newAlertResponse(alert, task.TICKScript, task.Href, task.HrefOutput, req.Status, srv.SrcID, srv.ID)
res := newAlertResponse(task, srv.SrcID, srv.ID)
encodeJSON(w, http.StatusOK, res, h.Logger)
}
@ -571,35 +567,24 @@ func (h *Service) KapacitorRulesGet(w http.ResponseWriter, r *http.Request) {
}
c := kapa.NewClient(srv.URL, srv.Username, srv.Password)
rules, err := c.All(ctx)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error(), h.Logger)
return
}
statuses, err := c.AllStatus(ctx)
tasks, err := c.All(ctx)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error(), h.Logger)
return
}
res := allAlertsResponse{
Rules: []alertResponse{},
Rules: []*alertResponse{},
}
for _, rule := range rules {
status, ok := statuses[rule.ID]
// The defined rule is not actually in kapacitor
if !ok {
continue
}
ar := newAlertResponse(rule, rule.TICKScript, c.Href(rule.ID), c.HrefOutput(rule.ID), status, srv.SrcID, srv.ID)
for _, task := range tasks {
ar := newAlertResponse(task, srv.SrcID, srv.ID)
res.Rules = append(res.Rules, ar)
}
encodeJSON(w, http.StatusOK, res, h.Logger)
}
type allAlertsResponse struct {
Rules []alertResponse `json:"rules"`
Rules []*alertResponse `json:"rules"`
}
// KapacitorRulesID retrieves specific task
@ -627,7 +612,7 @@ func (h *Service) KapacitorRulesID(w http.ResponseWriter, r *http.Request) {
c := kapa.NewClient(srv.URL, srv.Username, srv.Password)
// Check if the rule exists within scope
rule, err := c.Get(ctx, tid)
task, err := c.Get(ctx, tid)
if err != nil {
if err == chronograf.ErrAlertNotFound {
notFound(w, id, h.Logger)
@ -636,13 +621,8 @@ func (h *Service) KapacitorRulesID(w http.ResponseWriter, r *http.Request) {
Error(w, http.StatusInternalServerError, err.Error(), h.Logger)
return
}
status, err := c.Status(ctx, c.Href(rule.ID))
if err != nil {
Error(w, http.StatusInternalServerError, err.Error(), h.Logger)
return
}
res := newAlertResponse(rule, rule.TICKScript, c.Href(rule.ID), c.HrefOutput(rule.ID), status, srv.SrcID, srv.ID)
res := newAlertResponse(task, srv.SrcID, srv.ID)
encodeJSON(w, http.StatusOK, res, h.Logger)
}

View File

@ -86,15 +86,23 @@ func Test_KapacitorRulesGet(t *testing.T) {
"/chronograf/v1/sources/1/kapacitors/1/rules",
[]chronograf.AlertRule{
{
ID: "cpu_alert",
Name: "cpu_alert",
ID: "cpu_alert",
Name: "cpu_alert",
Status: "enabled",
Type: "stream",
DBRPs: []chronograf.DBRP{{DB: "telegraf", RP: "autogen"}},
TICKScript: tickScript,
},
},
[]chronograf.AlertRule{
{
ID: "cpu_alert",
Name: "cpu_alert",
Alerts: []string{},
ID: "cpu_alert",
Name: "cpu_alert",
Status: "enabled",
Type: "stream",
DBRPs: []chronograf.DBRP{{DB: "telegraf", RP: "autogen"}},
Alerts: []string{},
TICKScript: tickScript,
},
},
},
@ -125,6 +133,13 @@ func Test_KapacitorRulesGet(t *testing.T) {
"id": task.ID,
"script": tickScript,
"status": "enabled",
"type": "stream",
"dbrps": []chronograf.DBRP{
{
DB: "telegraf",
RP: "autogen",
},
},
"link": map[string]interface{}{
"rel": "self",
"href": "/kapacitor/v1/tasks/cpu_alert",
@ -196,9 +211,7 @@ func Test_KapacitorRulesGet(t *testing.T) {
frame := struct {
Rules []struct {
chronograf.AlertRule
TICKScript json.RawMessage `json:"tickscript"`
Status json.RawMessage `json:"status"`
Links json.RawMessage `json:"links"`
Links json.RawMessage `json:"links"`
} `json:"rules"`
}{}

File diff suppressed because it is too large Load Diff

View File

@ -2749,11 +2749,11 @@ escape-html@~1.0.3:
version "1.0.3"
resolved "https://registry.yarnpkg.com/escape-html/-/escape-html-1.0.3.tgz#0258eae4d3d0c0974de1c169188ef0051d1d1988"
escape-string-regexp@1.0.2, escape-string-regexp@^1.0.2:
escape-string-regexp@1.0.2, escape-string-regexp@^1.0.0, escape-string-regexp@^1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/escape-string-regexp/-/escape-string-regexp-1.0.2.tgz#4dbc2fe674e71949caf3fb2695ce7f2dc1d9a8d1"
escape-string-regexp@^1.0.0, escape-string-regexp@^1.0.5:
escape-string-regexp@^1.0.5:
version "1.0.5"
resolved "https://registry.yarnpkg.com/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz#1b61c0562190a8dff6ae3bb2cf0200ca130b86d4"
@ -4512,17 +4512,17 @@ lodash.words@^3.0.0:
dependencies:
lodash._root "^3.0.0"
lodash@4.x.x, lodash@^4.0.0, lodash@^4.0.1, lodash@^4.1.0, lodash@^4.16.4, lodash@^4.17.2, lodash@^4.2.0, lodash@^4.2.1, lodash@^4.3.0, lodash@^4.5.0, lodash@^4.5.1:
version "4.17.3"
resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.3.tgz#557ed7d2a9438cac5fd5a43043ca60cb455e01f7"
lodash@4.x.x, lodash@^4.0.1, lodash@^4.1.0, lodash@^4.16.4, lodash@^4.17.4, lodash@^4.5.0:
version "4.17.4"
resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.4.tgz#78203a4d1c328ae1d86dca6460e369b57f4055ae"
lodash@^3.8.0:
version "3.10.1"
resolved "https://registry.yarnpkg.com/lodash/-/lodash-3.10.1.tgz#5bf45e8e49ba4189e17d482789dfd15bd140b7b6"
lodash@^4.17.4:
version "4.17.4"
resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.4.tgz#78203a4d1c328ae1d86dca6460e369b57f4055ae"
lodash@^4.0.0, lodash@^4.17.2, lodash@^4.2.0, lodash@^4.2.1, lodash@^4.3.0, lodash@^4.5.1:
version "4.17.3"
resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.3.tgz#557ed7d2a9438cac5fd5a43043ca60cb455e01f7"
lodash@~4.16.4:
version "4.16.6"