2016-11-03 06:10:02 +00:00
|
|
|
package kapacitor
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/influxdata/chronograf"
|
2017-05-05 21:14:02 +00:00
|
|
|
"github.com/influxdata/chronograf/uuid"
|
2016-11-03 06:10:02 +00:00
|
|
|
client "github.com/influxdata/kapacitor/client/v1"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2016-11-03 22:27:58 +00:00
|
|
|
// Prefix is prepended to the ID of all alerts
|
|
|
|
Prefix = "chronograf-v1-"
|
2016-11-03 06:10:02 +00:00
|
|
|
)
|
|
|
|
|
2017-05-05 21:14:02 +00:00
|
|
|
// Client communicates to kapacitor
|
|
|
|
type Client struct {
|
|
|
|
URL string
|
|
|
|
Username string
|
|
|
|
Password string
|
|
|
|
ID chronograf.ID
|
|
|
|
Ticker chronograf.Ticker
|
|
|
|
kapaClient func(url, username, password string) (KapaClient, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// KapaClient represents a connection to a kapacitor instance
|
|
|
|
type KapaClient interface {
|
|
|
|
CreateTask(opt client.CreateTaskOptions) (client.Task, error)
|
|
|
|
Task(link client.Link, opt *client.TaskOptions) (client.Task, error)
|
|
|
|
ListTasks(opt *client.ListTasksOptions) ([]client.Task, error)
|
|
|
|
UpdateTask(link client.Link, opt client.UpdateTaskOptions) (client.Task, error)
|
|
|
|
DeleteTask(link client.Link) error
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewClient creates a client that interfaces with Kapacitor tasks
|
|
|
|
func NewClient(url, username, password string) *Client {
|
|
|
|
return &Client{
|
|
|
|
URL: url,
|
|
|
|
Username: username,
|
|
|
|
Password: password,
|
|
|
|
ID: &uuid.V4{},
|
|
|
|
Ticker: &Alert{},
|
|
|
|
kapaClient: NewKapaClient,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-03 06:10:02 +00:00
|
|
|
// Task represents a running kapacitor task
|
|
|
|
type Task struct {
|
|
|
|
ID string // Kapacitor ID
|
|
|
|
Href string // Kapacitor relative URI
|
2016-11-10 17:27:42 +00:00
|
|
|
HrefOutput string // Kapacitor relative URI to HTTPOutNode
|
2017-04-06 01:04:42 +00:00
|
|
|
Rule chronograf.AlertRule // Rule is the rule that represents this Task
|
2016-11-03 06:10:02 +00:00
|
|
|
TICKScript chronograf.TICKScript // TICKScript is the running script
|
|
|
|
}
|
|
|
|
|
2016-11-03 22:27:58 +00:00
|
|
|
// Href returns the link to a kapacitor task given an id
|
2016-11-03 06:42:52 +00:00
|
|
|
func (c *Client) Href(ID string) string {
|
|
|
|
return fmt.Sprintf("/kapacitor/v1/tasks/%s", ID)
|
|
|
|
}
|
|
|
|
|
2016-11-10 18:56:34 +00:00
|
|
|
// HrefOutput returns the link to a kapacitor task httpOut Node given an id
|
|
|
|
func (c *Client) HrefOutput(ID string) string {
|
|
|
|
return fmt.Sprintf("/kapacitor/v1/tasks/%s/%s", ID, HTTPEndpoint)
|
|
|
|
}
|
|
|
|
|
2016-11-03 22:27:58 +00:00
|
|
|
// Create builds and POSTs a tickscript to kapacitor
|
2016-11-03 06:10:02 +00:00
|
|
|
func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task, error) {
|
2017-05-05 21:14:02 +00:00
|
|
|
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
2016-11-03 06:10:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
id, err := c.ID.Generate()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
script, err := c.Ticker.Generate(rule)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
kapaID := Prefix + id
|
2017-04-06 01:04:42 +00:00
|
|
|
rule.ID = kapaID
|
2016-11-03 06:10:02 +00:00
|
|
|
task, err := kapa.CreateTask(client.CreateTaskOptions{
|
|
|
|
ID: kapaID,
|
2016-11-03 22:27:58 +00:00
|
|
|
Type: toTask(rule.Query),
|
2016-11-03 06:10:02 +00:00
|
|
|
DBRPs: []client.DBRP{{Database: rule.Query.Database, RetentionPolicy: rule.Query.RetentionPolicy}},
|
|
|
|
TICKscript: string(script),
|
|
|
|
Status: client.Enabled,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &Task{
|
|
|
|
ID: kapaID,
|
|
|
|
Href: task.Link.Href,
|
2016-11-10 18:56:34 +00:00
|
|
|
HrefOutput: c.HrefOutput(kapaID),
|
2016-11-03 06:10:02 +00:00
|
|
|
TICKScript: script,
|
2017-05-23 22:12:40 +00:00
|
|
|
Rule: c.Reverse(kapaID, script),
|
2016-11-03 06:10:02 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2016-11-03 22:27:58 +00:00
|
|
|
// Delete removes tickscript task from kapacitor
|
2016-11-03 06:10:02 +00:00
|
|
|
func (c *Client) Delete(ctx context.Context, href string) error {
|
2017-05-05 21:14:02 +00:00
|
|
|
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
2016-11-03 06:10:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return kapa.DeleteTask(client.Link{Href: href})
|
|
|
|
}
|
|
|
|
|
2016-11-10 18:56:34 +00:00
|
|
|
func (c *Client) updateStatus(ctx context.Context, href string, status client.TaskStatus) (*Task, error) {
|
2017-05-05 21:14:02 +00:00
|
|
|
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
2016-11-10 18:56:34 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
opts := client.UpdateTaskOptions{
|
|
|
|
Status: status,
|
|
|
|
}
|
|
|
|
|
|
|
|
task, err := kapa.UpdateTask(client.Link{Href: href}, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &Task{
|
|
|
|
ID: task.ID,
|
|
|
|
Href: task.Link.Href,
|
|
|
|
HrefOutput: c.HrefOutput(task.ID),
|
|
|
|
TICKScript: chronograf.TICKScript(task.TICKscript),
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Disable changes the tickscript status to disabled for a given href.
|
|
|
|
func (c *Client) Disable(ctx context.Context, href string) (*Task, error) {
|
|
|
|
return c.updateStatus(ctx, href, client.Disabled)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Enable changes the tickscript status to disabled for a given href.
|
|
|
|
func (c *Client) Enable(ctx context.Context, href string) (*Task, error) {
|
|
|
|
return c.updateStatus(ctx, href, client.Enabled)
|
|
|
|
}
|
|
|
|
|
2017-02-10 19:48:42 +00:00
|
|
|
// AllStatus returns the status of all tasks in kapacitor
|
|
|
|
func (c *Client) AllStatus(ctx context.Context) (map[string]string, error) {
|
2017-05-05 21:14:02 +00:00
|
|
|
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
2017-02-10 19:48:42 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Only get the status, id and link section back
|
|
|
|
opts := &client.ListTasksOptions{
|
|
|
|
Fields: []string{"status"},
|
|
|
|
}
|
|
|
|
tasks, err := kapa.ListTasks(opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
taskStatuses := map[string]string{}
|
|
|
|
for _, task := range tasks {
|
|
|
|
taskStatuses[task.ID] = task.Status.String()
|
|
|
|
}
|
|
|
|
|
|
|
|
return taskStatuses, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Status returns the status of a task in kapacitor
|
|
|
|
func (c *Client) Status(ctx context.Context, href string) (string, error) {
|
2017-06-20 18:08:39 +00:00
|
|
|
s, err := c.status(ctx, href)
|
2017-02-10 19:48:42 +00:00
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
2017-06-20 18:08:39 +00:00
|
|
|
|
|
|
|
return s.String(), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Client) status(ctx context.Context, href string) (client.TaskStatus, error) {
|
|
|
|
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2017-02-10 19:48:42 +00:00
|
|
|
task, err := kapa.Task(client.Link{Href: href}, nil)
|
|
|
|
if err != nil {
|
2017-06-20 18:08:39 +00:00
|
|
|
return 0, err
|
2017-02-10 19:48:42 +00:00
|
|
|
}
|
|
|
|
|
2017-06-20 18:08:39 +00:00
|
|
|
return task.Status, nil
|
2017-02-10 19:48:42 +00:00
|
|
|
}
|
|
|
|
|
2017-04-06 01:04:42 +00:00
|
|
|
// All returns all tasks in kapacitor
|
|
|
|
func (c *Client) All(ctx context.Context) (map[string]chronograf.AlertRule, error) {
|
2017-05-05 21:14:02 +00:00
|
|
|
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
2017-04-06 01:04:42 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Only get the status, id and link section back
|
|
|
|
opts := &client.ListTasksOptions{}
|
|
|
|
tasks, err := kapa.ListTasks(opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
alerts := map[string]chronograf.AlertRule{}
|
|
|
|
for _, task := range tasks {
|
|
|
|
script := chronograf.TICKScript(task.TICKscript)
|
|
|
|
if rule, err := Reverse(script); err != nil {
|
|
|
|
alerts[task.ID] = chronograf.AlertRule{
|
2017-04-06 04:19:06 +00:00
|
|
|
ID: task.ID,
|
2017-04-06 01:04:42 +00:00
|
|
|
Name: task.ID,
|
|
|
|
TICKScript: script,
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
rule.ID = task.ID
|
|
|
|
rule.TICKScript = script
|
|
|
|
alerts[task.ID] = rule
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return alerts, nil
|
|
|
|
}
|
|
|
|
|
2017-05-23 22:12:40 +00:00
|
|
|
// Reverse builds a chronograf.AlertRule and its QueryConfig from a tickscript
|
|
|
|
func (c *Client) Reverse(id string, script chronograf.TICKScript) chronograf.AlertRule {
|
|
|
|
rule, err := Reverse(script)
|
|
|
|
if err != nil {
|
|
|
|
return chronograf.AlertRule{
|
|
|
|
ID: id,
|
|
|
|
Name: id,
|
|
|
|
Query: nil,
|
|
|
|
TICKScript: script,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
rule.ID = id
|
|
|
|
rule.TICKScript = script
|
|
|
|
return rule
|
|
|
|
}
|
|
|
|
|
2017-04-06 01:04:42 +00:00
|
|
|
// Get returns a single alert in kapacitor
|
|
|
|
func (c *Client) Get(ctx context.Context, id string) (chronograf.AlertRule, error) {
|
2017-05-05 21:14:02 +00:00
|
|
|
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
2017-04-06 01:04:42 +00:00
|
|
|
if err != nil {
|
|
|
|
return chronograf.AlertRule{}, err
|
|
|
|
}
|
|
|
|
href := c.Href(id)
|
|
|
|
task, err := kapa.Task(client.Link{Href: href}, nil)
|
|
|
|
if err != nil {
|
|
|
|
return chronograf.AlertRule{}, chronograf.ErrAlertNotFound
|
|
|
|
}
|
|
|
|
|
|
|
|
script := chronograf.TICKScript(task.TICKscript)
|
2017-05-23 22:12:40 +00:00
|
|
|
return c.Reverse(task.ID, script), nil
|
2017-04-06 01:04:42 +00:00
|
|
|
}
|
|
|
|
|
2016-11-03 22:27:58 +00:00
|
|
|
// Update changes the tickscript of a given id.
|
2016-11-03 06:10:02 +00:00
|
|
|
func (c *Client) Update(ctx context.Context, href string, rule chronograf.AlertRule) (*Task, error) {
|
2017-05-05 21:14:02 +00:00
|
|
|
kapa, err := c.kapaClient(c.URL, c.Username, c.Password)
|
2016-11-03 06:10:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
script, err := c.Ticker.Generate(rule)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-06-20 18:08:39 +00:00
|
|
|
prevStatus, err := c.status(ctx, href)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-11-10 18:56:34 +00:00
|
|
|
// We need to disable the kapacitor task followed by enabling it during update.
|
2016-11-03 06:10:02 +00:00
|
|
|
opts := client.UpdateTaskOptions{
|
|
|
|
TICKscript: string(script),
|
2016-11-10 18:56:34 +00:00
|
|
|
Status: client.Disabled,
|
2016-11-03 22:27:58 +00:00
|
|
|
Type: toTask(rule.Query),
|
2016-11-03 06:10:02 +00:00
|
|
|
DBRPs: []client.DBRP{
|
|
|
|
{
|
|
|
|
Database: rule.Query.Database,
|
|
|
|
RetentionPolicy: rule.Query.RetentionPolicy,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
task, err := kapa.UpdateTask(client.Link{Href: href}, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-06-20 18:08:39 +00:00
|
|
|
// Now enable the task if previously enabled
|
|
|
|
if prevStatus == client.Enabled {
|
|
|
|
if _, err := c.Enable(ctx, href); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-11-10 18:56:34 +00:00
|
|
|
}
|
|
|
|
|
2016-11-03 06:10:02 +00:00
|
|
|
return &Task{
|
|
|
|
ID: task.ID,
|
|
|
|
Href: task.Link.Href,
|
2016-11-10 18:56:34 +00:00
|
|
|
HrefOutput: c.HrefOutput(task.ID),
|
2016-11-03 06:10:02 +00:00
|
|
|
TICKScript: script,
|
2017-05-23 22:12:40 +00:00
|
|
|
Rule: c.Reverse(task.ID, script),
|
2016-11-03 06:10:02 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2017-05-05 21:14:02 +00:00
|
|
|
func toTask(q *chronograf.QueryConfig) client.TaskType {
|
|
|
|
if q == nil || q.RawText == nil || *q.RawText == "" {
|
|
|
|
return client.StreamTask
|
|
|
|
}
|
|
|
|
return client.BatchTask
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewKapaClient creates a Kapacitor client connection
|
|
|
|
func NewKapaClient(url, username, password string) (KapaClient, error) {
|
2016-11-03 06:10:02 +00:00
|
|
|
var creds *client.Credentials
|
2017-05-05 21:14:02 +00:00
|
|
|
if username != "" {
|
2016-11-03 06:10:02 +00:00
|
|
|
creds = &client.Credentials{
|
|
|
|
Method: client.UserAuthentication,
|
2017-05-05 21:14:02 +00:00
|
|
|
Username: username,
|
|
|
|
Password: password,
|
2016-11-03 06:10:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return client.New(client.Config{
|
2017-05-05 21:14:02 +00:00
|
|
|
URL: url,
|
2016-11-03 06:10:02 +00:00
|
|
|
Credentials: creds,
|
|
|
|
})
|
|
|
|
}
|