Update kapacitor to latest json-pipeline fixes

pull/2388/head
Chris Goller 2017-11-17 15:49:25 -06:00
parent f033af1b24
commit 34ee885316
22 changed files with 788 additions and 124 deletions

4
Gopkg.lock generated
View File

@ -71,7 +71,7 @@
[[projects]]
name = "github.com/influxdata/kapacitor"
packages = ["client/v1","pipeline","pipeline/tick","services/k8s/client","tick","tick/ast","tick/stateful","udf/agent"]
revision = "599f07a300c4178cb5b5f8c07415cf8fc67b8528"
revision = "291ca33f5d7b8b277cbb9a7afb65397d1769a99e"
[[projects]]
name = "github.com/influxdata/usage-client"
@ -140,6 +140,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "85a5451fc9e0596e486a676204eb2de0b12900522341ee0804cf9ec86fb2765e"
inputs-digest = "46184c2d3fedb48dad6649bb1a97237bc5eef1f48ee1f4b69373e99783a2a47f"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -75,4 +75,4 @@ required = ["github.com/jteeuwen/go-bindata","github.com/gogo/protobuf/proto","g
[[constraint]]
name = "github.com/influxdata/kapacitor"
revision = "599f07a300c4178cb5b5f8c07415cf8fc67b8528"
revision = "291ca33f5d7b8b277cbb9a7afb65397d1769a99e"

View File

@ -1,6 +1,6 @@
# Changelog
## Unreleased
## v1.4.0-rc1 [2017-11-09]
### Features
- [#1408](https://github.com/influxdata/kapacitor/issues/1408): Add Previous state
@ -26,9 +26,12 @@
Topic-Handler file format was modified to include the TopicID and HandlerID in the file.
Load service was added; the service can load tasks/handlers from a directory.
- [#1606](https://github.com/influxdata/kapacitor/pull/1606): Update Go version to 1.9.1
- [#1578](https://github.com/influxdata/kapacitor/pull/1578): Add support for exposing logs via the API.
- [#1578](https://github.com/influxdata/kapacitor/pull/1578): Add support for exposing logs via the API. API is released as a technical preview.
- [#1605](https://github.com/influxdata/kapacitor/issues/1605): Add support for {{ .Duration }} on Alert Message property.
- [#1644](https://github.com/influxdata/kapacitor/issues/1644): Add support for [JSON lines](https://en.wikipedia.org/wiki/JSON_Streaming#Line_delimited_JSON) for steaming HTTP logs.
- [#1637](https://github.com/influxdata/kapacitor/issues/1637): Add new node Sideload, that allows loading data from files into the stream of data. Data can be loaded using a hierarchy.
- [#1667](https://github.com/influxdata/kapacitor/pull/1667): Promote Alert API to stable v1 path.
- [#1668](https://github.com/influxdata/kapacitor/pull/1668): Change WARN level logs to INFO level.
### Bugfixes
@ -43,6 +46,12 @@
- [#1623](https://github.com/influxdata/kapacitor/issues/1623): Fix k8s incluster master api dns resolution
- [#1630](https://github.com/influxdata/kapacitor/issues/1630): Remove the pidfile after the server has exited.
- [#1641](https://github.com/influxdata/kapacitor/issues/1641): Logs API writes multiple http headers.
- [#1657](https://github.com/influxdata/kapacitor/issues/1657): Fix missing dependency in rpm package.
- [#1660](https://github.com/influxdata/kapacitor/pull/1660): Force tar owner/group to be root.
- [#1663](https://github.com/influxdata/kapacitor/pull/1663): Fixed install/remove of kapacitor on non-systemd Debian/Ubuntu systems.
Fixes packaging to not enable services on RHEL systems.
Fixes issues with recusive symlinks on systemd systems.
- [#1662](https://github.com/influxdata/kapacitor/issues/1662): Fix invalid default MQTT config.
## v1.3.3 [2017-08-11]

View File

@ -33,7 +33,7 @@ RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSIO
# Install go
ENV GOPATH /root/go
ENV GO_VERSION 1.9.1
ENV GO_VERSION 1.9.2
ENV GO_ARCH 386
RUN wget -q https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \
tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \

View File

@ -43,7 +43,7 @@ RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSIO
# Install go
ENV GOPATH /root/go
ENV GO_VERSION 1.9.1
ENV GO_VERSION 1.9.2
ENV GO_ARCH amd64
RUN wget -q https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \
tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \

View File

@ -242,7 +242,7 @@ def package_python_udf(version, dist_dir):
fname = "python-kapacitor_udf-{}.tar.gz".format(version)
outfile = os.path.join(dist_dir, fname)
tar_cmd = ['tar', '-cz', '-C', './udf/agent/py', '--transform', 's/^./kapacitor_udf-{}/'.format(version), '-f']
tar_cmd = ['tar', '-cz', '-C', './udf/agent/py', '--owner=root', '--group=root', '--transform', 's/^./kapacitor_udf-{}/'.format(version), '-f']
tar_cmd.append(outfile)
exclude_list = ['*.pyc', '*.pyo', '__pycache__']
for e in exclude_list:
@ -724,7 +724,7 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
package_arch)
current_location = os.path.join(os.getcwd(), current_location)
if package_type == 'tar':
tar_command = "cd {} && tar -cvzf {}.tar.gz ./*".format(package_build_root, name)
tar_command = "cd {} && tar -cvzf {}.tar.gz --owner=root --group=root ./*".format(package_build_root, name)
run(tar_command, shell=True)
run("mv {}.tar.gz {}".format(os.path.join(package_build_root, name), current_location), shell=True)
outfile = os.path.join(current_location, name + ".tar.gz")
@ -748,7 +748,7 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
package_build_root,
current_location)
if package_type == "rpm":
fpm_command += "--depends coreutils --rpm-posttrans {}".format(POSTINST_SCRIPT)
fpm_command += "--depends coreutils --depends shadow-utils --rpm-posttrans {}".format(POSTINST_SCRIPT)
out = run(fpm_command, shell=True)
matches = re.search(':path=>"(.*)"', out)
outfile = None

View File

@ -9,6 +9,7 @@
* [Alerts](#alerts)
* [Configuration](#configuration)
* [Storage](#storage)
* [Logs](#logs)
* [Testing Services](#testing-services)
* [Miscellaneous](#miscellaneous)
@ -1407,12 +1408,6 @@ GET /kapacitor/v1/replays
Kapacitor can generate and handle alerts.
The API allows you to see the current state of any alert and to configure various handlers for the alerts.
>NOTE: All API endpoints related to alerts are in a technical preview.
Meaning that they are subject to change in the future until the technical preview is completed.
As such the URL for the endpoints uses the base path `/kapacitor/v1preview`.
Once the technical preview is deemed complete the endpoint paths will be promoted to use the v1 `/kapacitor/v1` base path.
### Topics
Alerts are grouped into topics.
@ -1422,7 +1417,7 @@ You can either specify the alert topic in the TICKscript or one will be generate
### Creating and Removing Topics
Topics are created dynamically when they referenced in TICKscripts or in handlers.
To delete a topic make a `DELETE` request to `/kapacitor/v1preview/alerts/topics/<topic id>`.
To delete a topic make a `DELETE` request to `/kapacitor/v1/alerts/topics/<topic id>`.
This will delete all known events and state for the topic.
>NOTE: Since topics are dynamically created, a topic may return after having deleted it, if a new event is created for the topic.
@ -1431,11 +1426,11 @@ This will delete all known events and state for the topic.
#### Example
```
DELETE /kapacitor/v1preview/alerts/topics/system
DELETE /kapacitor/v1/alerts/topics/system
```
### List Topics
To query the list of available topics make a GET requests to `/kapacitor/v1preview/alerts/topics`.
To query the list of available topics make a GET requests to `/kapacitor/v1/alerts/topics`.
| Query Parameter | Default | Purpose |
| --------------- | ------- | ------- |
@ -1448,24 +1443,24 @@ To query the list of available topics make a GET requests to `/kapacitor/v1previ
Get all topics.
```
GET /kapacitor/v1preview/alerts/topics
GET /kapacitor/v1/alerts/topics
```
```
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics"},
"topics": [
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system"},
"events-link" : {"rel":"events","href":"/kapacitor/v1preview/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1preview/alerts/topics/system/handlers"},
"link": {"rel":"self","href":"/kapacitor/v/alerts/topics/system"},
"events-link" : {"rel":"events","href":"/kapacitor/v1/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1/alerts/topics/system/handlers"},
"id": "system",
"level":"CRITICAL"
},
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/app"},
"events-link" : {"rel":"events","href":"/kapacitor/v1preview/alerts/topics/app/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1preview/alerts/topics/app/handlers"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics/app"},
"events-link" : {"rel":"events","href":"/kapacitor/v1/alerts/topics/app/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1/alerts/topics/app/handlers"},
"id": "app",
"level":"OK"
}
@ -1477,17 +1472,17 @@ Get all topics in a WARNING or CRITICAL state.
```
GET /kapacitor/v1preview/alerts/topics?min-level=WARNING
GET /kapacitor/v1/alerts/topics?min-level=WARNING
```
```
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics"},
"topics": [
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system"},
"events-link" : {"rel":"events","href":"/kapacitor/v1preview/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1preview/alerts/topics/system/handlers"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics/system"},
"events-link" : {"rel":"events","href":"/kapacitor/v1/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1/alerts/topics/system/handlers"},
"id": "system",
"level":"CRITICAL"
}
@ -1497,27 +1492,27 @@ GET /kapacitor/v1preview/alerts/topics?min-level=WARNING
### Topic State
To query the state of a topic make a GET request to `/kapacitor/v1preview/alerts/topics/<topic id>`.
To query the state of a topic make a GET request to `/kapacitor/v1/alerts/topics/<topic id>`.
#### Example
```
GET /kapacitor/v1preview/alerts/topics/system
GET /kapacitor/v1/alerts/topics/system
```
```
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics/system"},
"id": "system",
"level":"CRITICAL"
"events-link" : {"rel":"events","href":"/kapacitor/v1preview/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1preview/alerts/topics/system/handlers"},
"events-link" : {"rel":"events","href":"/kapacitor/v1/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1/alerts/topics/system/handlers"},
}
```
### List Topic Events
To query all the events within a topic make a GET request to `/kapacitor/v1preview/alerts/topics/<topic id>/events`.
To query all the events within a topic make a GET request to `/kapacitor/v1/alerts/topics/<topic id>/events`.
| Query Parameter | Default | Purpose |
| --------------- | ------- | ------- |
@ -1526,16 +1521,16 @@ To query all the events within a topic make a GET request to `/kapacitor/v1previ
#### Example
```
GET /kapacitor/v1preview/alerts/topics/system/events
GET /kapacitor/v1/alerts/topics/system/events
```
```
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/events"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics/system/events"},
"topic": "system",
"events": [
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/events/cpu"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/events/cpu"},
"id": "cpu",
"state": {
"level": "WARNING",
@ -1545,7 +1540,7 @@ GET /kapacitor/v1preview/alerts/topics/system/events
}
},
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/events/mem"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/events/mem"},
"id": "mem",
"state": {
"level": "CRITICAL",
@ -1560,17 +1555,17 @@ GET /kapacitor/v1preview/alerts/topics/system/events
### Topic Event
You can query a specific event within a topic by making a GET request to `/kapacitor/v1preview/alerts/topics/<topic id>/events/<event id>`.
You can query a specific event within a topic by making a GET request to `/kapacitor/v1/alerts/topics/<topic id>/events/<event id>`.
#### Example
```
GET /kapacitor/v1preview/alerts/topics/system/events/cpu
GET /kapacitor/v1/alerts/topics/system/events/cpu
```
```
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/events/cpu"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/events/cpu"},
"id": "cpu",
"state": {
"level": "WARNING",
@ -1584,7 +1579,7 @@ GET /kapacitor/v1preview/alerts/topics/system/events/cpu
### List Topic Handlers
Handlers are created within a topic.
You can get a list of handlers configured for a topic by making a GET request to `/kapacitor/v1preview/alerts/topics/<topic id>/handlers`.
You can get a list of handlers configured for a topic by making a GET request to `/kapacitor/v1/alerts/topics/<topic id>/handlers`.
| Query Parameter | Default | Purpose |
| --------------- | ------- | ------- |
@ -1598,16 +1593,16 @@ You can get a list of handlers configured for a topic by making a GET request to
Get the handlers for the `system` topic.
```
GET /kapacitor/v1preview/alerts/topics/system/handlers
GET /kapacitor/v1/alerts/topics/system/handlers
```
```
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers"},
"topic": "system",
"handlers": [
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id":"slack",
"actions": [{
"kind":"slack",
@ -1617,7 +1612,7 @@ GET /kapacitor/v1preview/alerts/topics/system/handlers
}]
},
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/smtp"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/smtp"},
"id":"smtp",
"actions": [{
"kind":"smtp"
@ -1631,12 +1626,12 @@ This `main:alert_cpu:alert5` topic represents an auto-generated topic from a tas
Anonymous handlers cannot be listed or modified via the API.
```
GET /kapacitor/v1preview/alerts/topics/main:alert_cpu:alert5/handlers
GET /kapacitor/v1/alerts/topics/main:alert_cpu:alert5/handlers
```
```
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers"},
"topic": "main:alert_cpu:alert5",
"handlers": null
}
@ -1644,17 +1639,17 @@ GET /kapacitor/v1preview/alerts/topics/main:alert_cpu:alert5/handlers
### Get a Handler
To query information about a specific handler make a GET request to `/kapacitor/v1preview/alerts/topics/<topic id>/handlers/<handler id>`.
To query information about a specific handler make a GET request to `/kapacitor/v1/alerts/topics/<topic id>/handlers/<handler id>`.
#### Example
```
GET /kapacitor/v1preview/alerts/topics/system/handlers/slack
GET /kapacitor/v1/alerts/topics/system/handlers/slack
```
```
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id":"slack",
"actions": [{
"kind":"slack",
@ -1667,10 +1662,10 @@ GET /kapacitor/v1preview/alerts/topics/system/handlers/slack
### Create a Handler
To create a new handler make a POST request to `/kapacitor/v1preview/alerts/topics/system/handlers`.
To create a new handler make a POST request to `/kapacitor/v1/alerts/topics/system/handlers`.
```
POST /kapacitor/v1preview/alerts/topics/system/handlers
POST /kapacitor/v1/alerts/topics/system/handlers
{
"id":"slack",
"actions": [{
@ -1685,7 +1680,7 @@ POST /kapacitor/v1preview/alerts/topics/system/handlers
```
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id": "slack",
"actions": [{
"kind":"slack",
@ -1698,7 +1693,7 @@ POST /kapacitor/v1preview/alerts/topics/system/handlers
### Update a Handler
To update an existing handler you can either make a PUT or PATCH request to `/kapacitor/v1preview/alerts/topics/system/handlers/<handler id>`.
To update an existing handler you can either make a PUT or PATCH request to `/kapacitor/v1/alerts/topics/system/handlers/<handler id>`.
Using PUT will replace the entire handler, by using PATCH specific parts of the handler can be modified.
@ -1709,7 +1704,7 @@ PATCH will apply JSON patch object to the existing handler, see [rfc6902](https:
Update the topics and actions for a handler using the PATCH method.
```
PATCH /kapacitor/v1preview/alerts/topics/system/handlers/slack
PATCH /kapacitor/v1/alerts/topics/system/handlers/slack
[
{"op":"replace", "path":"/topics", "value":["system", "test"]},
{"op":"replace", "path":"/actions/0/options/channel", "value":"#testing_alerts"}
@ -1718,7 +1713,7 @@ PATCH /kapacitor/v1preview/alerts/topics/system/handlers/slack
```
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id": "slack",
"actions": [
{
@ -1734,7 +1729,7 @@ PATCH /kapacitor/v1preview/alerts/topics/system/handlers/slack
Replace an entire handler using the PUT method.
```
PUT /kapacitor/v1preview/alerts/topics/system/handlers/slack
PUT /kapacitor/v1/alerts/topics/system/handlers/slack
{
"id": "slack",
"actions": [
@ -1750,7 +1745,7 @@ PUT /kapacitor/v1preview/alerts/topics/system/handlers/slack
```
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id": "slack",
"actions": [
{
@ -1765,10 +1760,10 @@ PUT /kapacitor/v1preview/alerts/topics/system/handlers/slack
### Remove a Handler
To remove an existing handler make a DELETE request to `/kapacitor/v1preview/alerts/topics/system/handlers/<handler id>`.
To remove an existing handler make a DELETE request to `/kapacitor/v1/alerts/topics/system/handlers/<handler id>`.
```
DELETE /kapacitor/v1preview/alerts/topics/system/handlers/<handler id>
DELETE /kapacitor/v1/alerts/topics/system/handlers/<handler id>
```
@ -2212,6 +2207,47 @@ POST /kapacitor/v1/storage/stores/tasks
| 400 | Unknown action |
| 404 | The specified store does not exist |
## Logs
The logging API is being release under [Technical Preview](#technical-preview).
Kapacitor allows users to retrieve the kapacitor logs remotely via HTTP using
[Chunked Transfer Encoding](https://en.wikipedia.org/wiki/Chunked_transfer_encoding).
The logs may be queried using key-value pairs correspoding to the log entry.
These key-value are specified as query parameter.
The logging API will return logs in two formats: [logfmt](https://brandur.org/logfmt) and JSON.
To receive logs in JSON format, you must specify `Content-Type: application/json`. If we receive
any content type other than `application/json`, we will return the logs in logfmt format.
Each chunk returned to the client will contain a single complete log followed by a `\n`.
### Example
#### Logs as JSON
```
GET /kapacitor/v1preview/logs?task=mytask
Content-Type: application/json
```
returns the following
```
{"ts":"2017-11-08T17:40:47.183-05:00","lvl":"info","msg":"created log session","service":"sessions","id":"7021fb9d-467e-482f-870c-d811aa9e74b7","content-type":"application/json","tags":"nil"}
{"ts":"2017-11-08T17:40:47.183-05:00","lvl":"info","msg":"created log session","service":"sessions","id":"7021fb9d-467e-482f-870c-d811aa9e74b7","content-type":"application/json","tags":"nil"}
{"ts":"2017-11-08T17:40:47.183-05:00","lvl":"info","msg":"created log session","service":"sessions","id":"7021fb9d-467e-482f-870c-d811aa9e74b7","content-type":"application/json","tags":"nil"}
```
#### Logs as logfmt
```
GET /kapacitor/v1preview/logs?task=mytask
```
returns the following
```
ts=2017-11-08T17:42:47.014-05:00 lvl=info msg="created log session" service=sessions id=ce4d7819-1e38-4bf4-ba54-78b0a8769b7e content-type=
ts=2017-11-08T17:42:47.014-05:00 lvl=info msg="created log session" service=sessions id=ce4d7819-1e38-4bf4-ba54-78b0a8769b7e content-type=
ts=2017-11-08T17:42:47.014-05:00 lvl=info msg="created log session" service=sessions id=ce4d7819-1e38-4bf4-ba54-78b0a8769b7e content-type=
```
## Testing Services
Kapacitor makes use of various service integrations.
@ -2357,6 +2393,22 @@ GET /kapacitor/v1/ping
| ---- | ------- |
| 204 | Success |
### Sideload Reload
You can trigger a reload of all sideload sources by making a POST request to `kapacitor/v1/sideload/reload`, with an empty body.
#### Example
```
POST /kapacitor/v1/sideload/reload
```
#### Response
| Code | Meaning |
| ---- | ------- |
| 204 | Success |
### Debug Vars

View File

@ -33,7 +33,7 @@ const (
basePreviewPath = "/kapacitor/v1preview"
pingPath = basePath + "/ping"
logLevelPath = basePath + "/loglevel"
logsPath = basePath + "/logs"
logsPath = basePreviewPath + "/logs"
debugVarsPath = basePath + "/debug/vars"
tasksPath = basePath + "/tasks"
templatesPath = basePath + "/templates"
@ -46,7 +46,7 @@ const (
replayQueryPath = basePath + "/replays/query"
configPath = basePath + "/config"
serviceTestsPath = basePath + "/service-tests"
alertsPath = basePreviewPath + "/alerts"
alertsPath = basePath + "/alerts"
topicsPath = alertsPath + "/topics"
topicEventsPath = "events"
topicHandlersPath = "handlers"

View File

@ -2517,13 +2517,13 @@ func Test_DoServiceTest(t *testing.T) {
func Test_Topic(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system" &&
r.Method == "GET" {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system"},
"events-link" : {"rel":"events","href":"/kapacitor/v1preview/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1preview/alerts/topics/system/handlers"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics/system"},
"events-link" : {"rel":"events","href":"/kapacitor/v1/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1/alerts/topics/system/handlers"},
"id": "system",
"level":"CRITICAL",
"collected": 5
@ -2544,9 +2544,9 @@ func Test_Topic(t *testing.T) {
}
exp := client.Topic{
ID: "system",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system"},
EventsLink: client.Link{Relation: "events", Href: "/kapacitor/v1preview/alerts/topics/system/events"},
HandlersLink: client.Link{Relation: "handlers", Href: "/kapacitor/v1preview/alerts/topics/system/handlers"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system"},
EventsLink: client.Link{Relation: "events", Href: "/kapacitor/v1/alerts/topics/system/events"},
HandlersLink: client.Link{Relation: "handlers", Href: "/kapacitor/v1/alerts/topics/system/handlers"},
Level: "CRITICAL",
Collected: 5,
}
@ -2557,23 +2557,23 @@ func Test_Topic(t *testing.T) {
func Test_ListTopics(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() == "/kapacitor/v1preview/alerts/topics?min-level=WARNING&pattern=%2A" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics?min-level=WARNING&pattern=%2A" &&
r.Method == "GET" {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics"},
"topics": [
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system"},
"events-link" : {"rel":"events","href":"/kapacitor/v1preview/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1preview/alerts/topics/system/handlers"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics/system"},
"events-link" : {"rel":"events","href":"/kapacitor/v1/alerts/topics/system/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1/alerts/topics/system/handlers"},
"id": "system",
"level":"CRITICAL"
},
{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/app"},
"events-link" : {"rel":"events","href":"/kapacitor/v1preview/alerts/topics/app/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1preview/alerts/topics/app/handlers"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics/app"},
"events-link" : {"rel":"events","href":"/kapacitor/v1/alerts/topics/app/events"},
"handlers-link": {"rel":"handlers","href":"/kapacitor/v1/alerts/topics/app/handlers"},
"id": "app",
"level":"WARNING"
}
@ -2597,20 +2597,20 @@ func Test_ListTopics(t *testing.T) {
t.Fatal(err)
}
exp := client.Topics{
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics"},
Topics: []client.Topic{
{
ID: "system",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system"},
EventsLink: client.Link{Relation: "events", Href: "/kapacitor/v1preview/alerts/topics/system/events"},
HandlersLink: client.Link{Relation: "handlers", Href: "/kapacitor/v1preview/alerts/topics/system/handlers"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system"},
EventsLink: client.Link{Relation: "events", Href: "/kapacitor/v1/alerts/topics/system/events"},
HandlersLink: client.Link{Relation: "handlers", Href: "/kapacitor/v1/alerts/topics/system/handlers"},
Level: "CRITICAL",
},
{
ID: "app",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/app"},
EventsLink: client.Link{Relation: "events", Href: "/kapacitor/v1preview/alerts/topics/app/events"},
HandlersLink: client.Link{Relation: "handlers", Href: "/kapacitor/v1preview/alerts/topics/app/handlers"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/app"},
EventsLink: client.Link{Relation: "events", Href: "/kapacitor/v1/alerts/topics/app/events"},
HandlersLink: client.Link{Relation: "handlers", Href: "/kapacitor/v1/alerts/topics/app/handlers"},
Level: "WARNING",
},
},
@ -2622,7 +2622,7 @@ func Test_ListTopics(t *testing.T) {
func Test_DeleteTopic(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system" &&
r.Method == "DELETE" {
w.WriteHeader(http.StatusNoContent)
} else {
@ -2643,11 +2643,11 @@ func Test_DeleteTopic(t *testing.T) {
func Test_TopicEvent(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system/events/cpu" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system/events/cpu" &&
r.Method == "GET" {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/events/cpu"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/events/cpu"},
"id": "cpu",
"state": {
"level": "WARNING",
@ -2672,7 +2672,7 @@ func Test_TopicEvent(t *testing.T) {
}
exp := client.TopicEvent{
ID: "cpu",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/events/cpu"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/events/cpu"},
State: client.EventState{
Message: "cpu is WARNING",
Time: time.Date(2016, 12, 1, 0, 0, 0, 0, time.UTC),
@ -2687,15 +2687,15 @@ func Test_TopicEvent(t *testing.T) {
func Test_ListTopicEvents(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system/events?min-level=OK" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system/events?min-level=OK" &&
r.Method == "GET" {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link": {"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/events?min-level=OK"},
"link": {"rel":"self","href":"/kapacitor/v1/alerts/topics/system/events?min-level=OK"},
"topic": "system",
"events": [
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/events/cpu"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/events/cpu"},
"id": "cpu",
"state": {
"level": "WARNING",
@ -2705,7 +2705,7 @@ func Test_ListTopicEvents(t *testing.T) {
}
},
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/events/mem"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/events/mem"},
"id": "mem",
"state": {
"level": "CRITICAL",
@ -2731,12 +2731,12 @@ func Test_ListTopicEvents(t *testing.T) {
t.Fatal(err)
}
exp := client.TopicEvents{
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/events?min-level=OK"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/events?min-level=OK"},
Topic: "system",
Events: []client.TopicEvent{
{
ID: "cpu",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/events/cpu"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/events/cpu"},
State: client.EventState{
Message: "cpu is WARNING",
Time: time.Date(2016, 12, 1, 0, 0, 0, 0, time.UTC),
@ -2746,7 +2746,7 @@ func Test_ListTopicEvents(t *testing.T) {
},
{
ID: "mem",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/events/mem"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/events/mem"},
State: client.EventState{
Message: "mem is CRITICAL",
Time: time.Date(2016, 12, 1, 0, 10, 0, 0, time.UTC),
@ -2762,15 +2762,15 @@ func Test_ListTopicEvents(t *testing.T) {
}
func Test_ListTopicHandlers(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system/handlers?pattern=" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system/handlers?pattern=" &&
r.Method == "GET" {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers?pattern="},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers?pattern="},
"topic": "system",
"handlers": [
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id":"slack",
"kind":"slack",
"options":{
@ -2778,7 +2778,7 @@ func Test_ListTopicHandlers(t *testing.T) {
}
},
{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/smtp"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/smtp"},
"id":"smtp",
"kind":"smtp"
}
@ -2799,12 +2799,12 @@ func Test_ListTopicHandlers(t *testing.T) {
t.Fatal(err)
}
exp := client.TopicHandlers{
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/handlers?pattern="},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/handlers?pattern="},
Topic: "system",
Handlers: []client.TopicHandler{
{
ID: "slack",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/handlers/slack"},
Kind: "slack",
Options: map[string]interface{}{
"channel": "#alerts",
@ -2812,7 +2812,7 @@ func Test_ListTopicHandlers(t *testing.T) {
},
{
ID: "smtp",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/handlers/smtp"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/handlers/smtp"},
Kind: "smtp",
},
},
@ -2823,11 +2823,11 @@ func Test_ListTopicHandlers(t *testing.T) {
}
func Test_TopicHandler(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system/handlers/slack" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system/handlers/slack" &&
r.Method == "GET" {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id":"slack",
"topic": "system",
"kind":"slack",
@ -2851,7 +2851,7 @@ func Test_TopicHandler(t *testing.T) {
}
exp := client.TopicHandler{
ID: "slack",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/handlers/slack"},
Kind: "slack",
Options: map[string]interface{}{
"channel": "#alerts",
@ -2872,12 +2872,12 @@ func Test_CreateTopicHandler(t *testing.T) {
"channel": "#alerts",
},
}
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system/handlers" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system/handlers" &&
r.Method == "POST" &&
reflect.DeepEqual(expOptions, options) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id": "slack",
"kind":"slack",
"options": {
@ -2906,7 +2906,7 @@ func Test_CreateTopicHandler(t *testing.T) {
}
exp := client.TopicHandler{
ID: "slack",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/handlers/slack"},
Kind: "slack",
Options: map[string]interface{}{
"channel": "#alerts",
@ -2932,12 +2932,12 @@ func Test_PatchTopicHandler(t *testing.T) {
Value: "#testing_alerts",
},
}
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system/handlers/slack" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system/handlers/slack" &&
r.Method == "PATCH" &&
reflect.DeepEqual(expPatch, patch) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id": "slack",
"kind":"slack",
"options": {
@ -2971,7 +2971,7 @@ func Test_PatchTopicHandler(t *testing.T) {
}
exp := client.TopicHandler{
ID: "slack",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/handlers/slack"},
Kind: "slack",
Options: map[string]interface{}{
"channel": "#testing_alerts",
@ -2992,12 +2992,12 @@ func Test_ReplaceTopicHandler(t *testing.T) {
"channel": "#testing_alerts",
},
}
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system/handlers/slack" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system/handlers/slack" &&
r.Method == "PUT" &&
reflect.DeepEqual(expOptions, options) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{
"link":{"rel":"self","href":"/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
"link":{"rel":"self","href":"/kapacitor/v1/alerts/topics/system/handlers/slack"},
"id": "slack",
"kind":"slack",
"options": {
@ -3026,7 +3026,7 @@ func Test_ReplaceTopicHandler(t *testing.T) {
}
exp := client.TopicHandler{
ID: "slack",
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/system/handlers/slack"},
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/alerts/topics/system/handlers/slack"},
Kind: "slack",
Options: map[string]interface{}{
"channel": "#testing_alerts",
@ -3038,7 +3038,7 @@ func Test_ReplaceTopicHandler(t *testing.T) {
}
func Test_DeleteTopicHandler(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() == "/kapacitor/v1preview/alerts/topics/system/handlers/slack" &&
if r.URL.String() == "/kapacitor/v1/alerts/topics/system/handlers/slack" &&
r.Method == "DELETE" {
w.WriteHeader(http.StatusNoContent)
} else {

View File

@ -126,6 +126,20 @@ func (p *Pipeline) MarshalJSON() ([]byte, error) {
}{}
for _, n := range p.sorted {
// we skip all noop nodes
if _, ok := n.(*NoOpNode); ok {
continue
}
// With a stats node we "fake" a parent to hook it correctly into the graph
if stat, ok := n.(*StatsNode); ok {
raw.Edges = append(raw.Edges,
Edge{
Parent: stat.SourceNode.ID(),
Child: stat.ID(),
})
}
raw.Nodes = append(raw.Nodes, n)
for _, parent := range n.Parents() {
raw.Edges = append(raw.Edges,
@ -166,6 +180,7 @@ func init() {
"stateDuration": func(parent chainnodeAlias) Node { return parent.StateDuration(nil) },
"stateCount": func(parent chainnodeAlias) Node { return parent.StateCount(nil) },
"shift": func(parent chainnodeAlias) Node { return parent.Shift(0) },
"sideload": func(parent chainnodeAlias) Node { return parent.Sideload() },
"sample": func(parent chainnodeAlias) Node { return parent.Sample(0) },
"log": func(parent chainnodeAlias) Node { return parent.Log() },
"kapacitorLoopback": func(parent chainnodeAlias) Node { return parent.KapacitorLoopback() },
@ -481,6 +496,10 @@ func isChainNode(node Node) (chainnodeAlias, bool) {
if ok {
return &alert.AlertNodeData.chainnode, true
}
shift, ok := node.(*ShiftNode)
if ok {
return &shift.chainnode, true
}
return nil, false
}
@ -527,6 +546,7 @@ type chainnodeAlias interface {
Sample(interface{}) *SampleNode
SetName(string)
Shift(time.Duration) *ShiftNode
Sideload() *SideloadNode
Spread(string) *InfluxQLNode
StateCount(*ast.LambdaNode) *StateCountNode
StateDuration(*ast.LambdaNode) *StateDurationNode

View File

@ -488,3 +488,10 @@ func (n *chainnode) StateCount(expression *ast.LambdaNode) *StateCountNode {
n.linkChild(sc)
return sc
}
// Create a node that can load data from external sources
func (n *chainnode) Sideload() *SideloadNode {
s := newSideloadNode(n.provides)
n.linkChild(s)
return s
}

View File

@ -2,6 +2,7 @@ package pipeline
import (
"encoding/json"
"reflect"
"testing"
"time"
@ -106,8 +107,7 @@ func MarshalIndentTestHelper(t *testing.T, node interface{}, wantErr bool, want
return
}
if string(got) != want {
t.Log(string(got))
t.Errorf("error = %s, want %s", string(got), want)
t.Errorf("unexpected JSON\ngot:\n%s\nwant:\n%s\n", string(got), want)
}
}
@ -123,3 +123,14 @@ func MarshalTestHelper(t *testing.T, node interface{}, wantErr bool, want string
t.Errorf("error = %s, want %s", string(got), want)
}
}
func UnmarshalJSONTestHelper(t *testing.T, input []byte, node interface{}, wantErr bool, want interface{}) {
t.Helper()
err := json.Unmarshal(input, node)
if (err != nil) != wantErr {
t.Errorf("UnmarshalJSON() error = %v, wantErr %v", err, wantErr)
return
}
if !wantErr && !reflect.DeepEqual(node, want) {
t.Errorf("UnmarshalJSON() =\ngot:\n%#+v\nwant:\n%#+v\n", node, want)
}
}

View File

@ -0,0 +1,109 @@
package pipeline
import (
"encoding/json"
"fmt"
)
// Sideload adds fields and tags to points based on hierarchical data from various sources.
//
// Example:
// |sideload()
// .source('file:///path/to/dir')
// .order('host/{{.host}}.yml', 'hostgroup/{{.hostgroup}}.yml')
// .field('cpu_threshold', 0.0)
// .tag('foo', 'unknown')
//
// Add a field `cpu_threshold` and a tag `foo` to each point based on the value loaded from the hierarchical source.
// The list of templates in the `.order()` property are evaluated using the points tags.
// The files paths are checked then checked in order for the specified keys and the first value that is found is used.
type SideloadNode struct {
chainnode
// Source for the data, currently only `file://` based sources are supported
Source string `json:"source"`
// Order is a list of paths that indicate the hierarchical order.
// The paths are relative to the source and can have `{}` that will be replaced with the tag value from the point.
// This allows for values to be overridden based on a hierarchy of tags.
// tick:ignore
OrderList []string `tick:"Order" json:"order"`
// Fields is a list of fields to load.
// tick:ignore
Fields map[string]interface{} `tick:"Field" json:"fields"`
// Tags is a list of tags to load.
// tick:ignore
Tags map[string]string `tick:"Tag" json:"tags"`
}
func newSideloadNode(wants EdgeType) *SideloadNode {
return &SideloadNode{
chainnode: newBasicChainNode("sideload", wants, wants),
Fields: make(map[string]interface{}),
Tags: make(map[string]string),
}
}
// Order is a list of paths that indicate the hierarchical order.
// The paths are relative to the source and can have template markers like `{{.tagname}}` that will be replaced with the tag value of the point.
// The paths are then searched in order for the keys and the first value that is found is used.
// This allows for values to be overridden based on a hierarchy of tags.
// tick:property
func (n *SideloadNode) Order(order ...string) *SideloadNode {
n.OrderList = order
return n
}
// Field is the name of a field to load from the source and its default value.
// The type loaded must match the type of the default value.
// Otherwise an error is recorded and the default value is used.
// tick:property
func (n *SideloadNode) Field(f string, v interface{}) *SideloadNode {
n.Fields[f] = v
return n
}
// Tag is the name of a tag to load from the source and its default value.
// The loaded values must be strings, otherwise an error is recorded and the default value is used.
// tick:property
func (n *SideloadNode) Tag(t string, v string) *SideloadNode {
n.Tags[t] = v
return n
}
// MarshalJSON converts SideloadNode to JSON
func (n *SideloadNode) MarshalJSON() ([]byte, error) {
type Alias SideloadNode
var raw = &struct {
TypeOf
*Alias
}{
TypeOf: TypeOf{
Type: "sideload",
ID: n.ID(),
},
Alias: (*Alias)(n),
}
return json.Marshal(raw)
}
// UnmarshalJSON converts JSON to an SideloadNode
func (n *SideloadNode) UnmarshalJSON(data []byte) error {
type Alias SideloadNode
var raw = &struct {
TypeOf
*Alias
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "sideload" {
return fmt.Errorf("error unmarshaling node %d of type %s as SideloadNode", raw.ID, raw.Type)
}
n.setID(raw.ID)
return nil
}

View File

@ -0,0 +1,137 @@
package pipeline
import (
"testing"
)
func TestSideloadNode_MarshalJSON(t *testing.T) {
type fields struct {
Source string
Order []string
Fields map[string]interface{}
Tags map[string]string
}
tests := []struct {
name string
fields fields
want string
wantErr bool
}{
{
name: "all fields set",
fields: fields{
Source: "file:///src",
Order: []string{"a", "b", "c"},
Fields: map[string]interface{}{
"f1": 42.0,
"f2": "",
},
Tags: map[string]string{
"t1": "k1",
"t2": "",
},
},
want: `{
"typeOf": "sideload",
"id": "0",
"source": "file:///src",
"order": [
"a",
"b",
"c"
],
"fields": {
"f1": 42,
"f2": ""
},
"tags": {
"t1": "k1",
"t2": ""
}
}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := newSideloadNode(StreamEdge)
w.setID(0)
w.Source = tt.fields.Source
w.OrderList = tt.fields.Order
w.Fields = tt.fields.Fields
w.Tags = tt.fields.Tags
MarshalIndentTestHelper(t, w, tt.wantErr, tt.want)
})
}
}
func TestSideloadNode_UnmarshalJSON(t *testing.T) {
tests := []struct {
name string
input string
want *SideloadNode
wantErr bool
}{
{
name: "all fields set",
input: `{
"typeOf": "sideload",
"id": "0",
"source": "file:///src",
"order": ["a", "b", "c"],
"fields": {
"f1": 42.0,
"f2": ""
},
"tags": {
"t1": "k1",
"t2": ""
}
}`,
want: &SideloadNode{
Source: "file:///src",
OrderList: []string{"a", "b", "c"},
Fields: map[string]interface{}{
"f1": 42.0,
"f2": "",
},
Tags: map[string]string{
"t1": "k1",
"t2": "",
},
},
},
{
name: "set id correctly",
input: `{"typeOf":"sideload","id":"5"}`,
want: &SideloadNode{
chainnode: chainnode{
node: node{
id: 5,
},
},
},
},
{
name: "invalid data",
input: `{"typeOf":"sideload","id":"0", "source": 56.0}`,
wantErr: true,
},
{
name: "invalid node type",
input: `{"typeOf":"invalid","id"0"}`,
wantErr: true,
},
{
name: "invalid id type",
input: `{"typeOf":"window","id":"invalid"}`,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &SideloadNode{}
UnmarshalJSONTestHelper(t, []byte(tt.input), w, tt.wantErr, tt.want)
})
}
}

View File

@ -124,6 +124,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) {
return NewSample(parents).Build(node)
case *pipeline.ShiftNode:
return NewShift(parents).Build(node)
case *pipeline.SideloadNode:
return NewSideload(parents).Build(node)
case *pipeline.StateCountNode:
return NewStateCount(parents).Build(node)
case *pipeline.StateDurationNode:

View File

@ -0,0 +1,53 @@
package tick
import (
"sort"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
)
// SideloadNode converts the Sideload pipeline node into the TICKScript AST
type SideloadNode struct {
Function
}
// NewSideload creates a Sideload function builder
func NewSideload(parents []ast.Node) *SideloadNode {
return &SideloadNode{
Function{
Parents: parents,
},
}
}
// Build creates a Sideload ast.Node
func (n *SideloadNode) Build(d *pipeline.SideloadNode) (ast.Node, error) {
n.Pipe("sideload")
n.Dot("source", d.Source)
order := make([]interface{}, len(d.OrderList))
for i := range d.OrderList {
order[i] = d.OrderList[i]
}
n.Dot("order", order...)
var fieldKeys []string
for k := range d.Fields {
fieldKeys = append(fieldKeys, k)
}
sort.Strings(fieldKeys)
for _, k := range fieldKeys {
n.Dot("field", k, d.Fields[k])
}
var tagKeys []string
for k := range d.Tags {
tagKeys = append(tagKeys, k)
}
sort.Strings(tagKeys)
for _, k := range tagKeys {
n.Dot("tag", k, d.Tags[k])
}
return n.prev, n.err
}

View File

@ -0,0 +1,28 @@
package tick_test
import (
"testing"
)
func TestSideload(t *testing.T) {
pipe, _, from := StreamFrom()
def := from.Sideload()
def.Source = "file:///tmpdir"
def.Order("a", "b", "c")
def.Field("judgement", "plantiff")
def.Field("finance", "loan")
def.Tag("vocabulary", "volcano")
def.Tag("make", "toyota")
want := `stream
|from()
|sideload()
.source('file:///tmpdir')
.order('a', 'b', 'c')
.field('finance', 'loan')
.field('judgement', 'plantiff')
.tag('make', 'toyota')
.tag('vocabulary', 'volcano')
`
PipelineTickTestHelper(t, pipe, want)
}

View File

@ -113,7 +113,7 @@ func PipelineTickTestHelper(t *testing.T, pipe *pipeline.Pipeline, want string)
}
if got != want {
t.Errorf("%v, want %v", got, want)
t.Errorf("unexpected TICKscript:\ngot:\n%v\nwant:\n%v\n", got, want)
t.Log(got) // print is helpful to get the correct format.
}
}

228
vendor/github.com/influxdata/kapacitor/sideload.go generated vendored Normal file
View File

@ -0,0 +1,228 @@
package kapacitor
import (
"fmt"
"strconv"
"text/template"
text "text/template"
"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/sideload"
"github.com/pkg/errors"
)
type SideloadNode struct {
node
s *pipeline.SideloadNode
source sideload.Source
orderTmpls []orderTmpl
order []string
bufferPool *bufpool.Pool
}
// Create a new SideloadNode which loads fields and tags from external sources.
func newSideloadNode(et *ExecutingTask, n *pipeline.SideloadNode, d NodeDiagnostic) (*SideloadNode, error) {
sn := &SideloadNode{
node: node{Node: n, et: et, diag: d},
s: n,
bufferPool: bufpool.New(),
order: make([]string, len(n.OrderList)),
orderTmpls: make([]orderTmpl, len(n.OrderList)),
}
src, err := et.tm.SideloadService.Source(n.Source)
if err != nil {
return nil, err
}
sn.source = src
for i, o := range n.OrderList {
op, err := newOrderTmpl(o, sn.bufferPool)
if err != nil {
return nil, err
}
sn.orderTmpls[i] = op
}
sn.node.runF = sn.runSideload
sn.node.stopF = sn.stopSideload
return sn, nil
}
func (n *SideloadNode) runSideload([]byte) error {
consumer := edge.NewConsumerWithReceiver(
n.ins[0],
edge.NewReceiverFromForwardReceiverWithStats(
n.outs,
edge.NewTimedForwardReceiver(n.timer, n),
),
)
return consumer.Consume()
}
func (n *SideloadNode) stopSideload() {
n.source.Close()
}
type orderTmpl struct {
raw string
tmpl *text.Template
bufferPool *bufpool.Pool
}
func newOrderTmpl(text string, bp *bufpool.Pool) (orderTmpl, error) {
t, err := template.New("order").Parse(text)
if err != nil {
return orderTmpl{}, err
}
return orderTmpl{
raw: text,
tmpl: t,
bufferPool: bp,
}, nil
}
func (t orderTmpl) Path(tags models.Tags) (string, error) {
buf := t.bufferPool.Get()
defer t.bufferPool.Put(buf)
err := t.tmpl.Execute(buf, tags)
if err != nil {
return "", err
}
return buf.String(), nil
}
func (n *SideloadNode) doSideload(p edge.FieldsTagsTimeSetter) {
for i, o := range n.orderTmpls {
p, err := o.Path(p.Tags())
if err != nil {
n.diag.Error("failed to evaluate order template", err, keyvalue.KV("order", o.raw))
return
}
n.order[i] = p
}
if len(n.s.Fields) > 0 {
fields := p.Fields().Copy()
for key, dflt := range n.s.Fields {
value := n.source.Lookup(n.order, key)
if value == nil {
// Use default
fields[key] = dflt
} else {
v, err := convertType(value, dflt)
if err != nil {
n.diag.Error("failed to load key", err, keyvalue.KV("key", key), keyvalue.KV("expected", fmt.Sprintf("%T", dflt)), keyvalue.KV("got", fmt.Sprintf("%T", value)))
fields[key] = dflt
} else {
fields[key] = v
}
}
}
p.SetFields(fields)
}
if len(n.s.Tags) > 0 {
tags := p.Tags().Copy()
for key, dflt := range n.s.Tags {
value := n.source.Lookup(n.order, key)
if value == nil {
tags[key] = dflt
} else {
v, err := convertType(value, dflt)
if err != nil {
n.diag.Error("failed to load key", err, keyvalue.KV("key", key), keyvalue.KV("expected", "string"), keyvalue.KV("got", fmt.Sprintf("%T", value)))
tags[key] = dflt
} else {
tags[key] = v.(string)
}
}
}
p.SetTags(tags)
}
}
func (n *SideloadNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
begin = begin.ShallowCopy()
return begin, nil
}
func (n *SideloadNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
bp = bp.ShallowCopy()
n.doSideload(bp)
return bp, nil
}
func (n *SideloadNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
return end, nil
}
func (n *SideloadNode) Point(p edge.PointMessage) (edge.Message, error) {
p = p.ShallowCopy()
n.doSideload(p)
return p, nil
}
func (n *SideloadNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (n *SideloadNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}
func convertType(src, dflt interface{}) (interface{}, error) {
switch dflt.(type) {
case int64:
switch src := src.(type) {
case int64:
return src, nil
case float64:
i := int64(src)
if float64(i) == src {
return i, nil
}
case string:
i, err := strconv.Atoi(src)
if err != nil {
return nil, errors.Wrap(err, "cannot convert string to int64")
}
return i, nil
}
case float64:
switch src := src.(type) {
case int64:
return float64(src), nil
case float64:
return src, nil
case string:
f, err := strconv.ParseFloat(src, 64)
if err != nil {
return nil, errors.Wrap(err, "cannot convert string to float64")
}
return f, nil
}
case bool:
switch src := src.(type) {
case bool:
return src, nil
case string:
b, err := strconv.ParseBool(src)
if err != nil {
return nil, errors.Wrap(err, "cannot convert string to bool")
}
return b, nil
}
case string:
switch src := src.(type) {
case int64:
return strconv.FormatInt(src, 10), nil
case float64:
return strconv.FormatFloat(src, 'f', -1, 64), nil
case bool:
return strconv.FormatBool(src), nil
case string:
return src, nil
}
}
return nil, fmt.Errorf("cannot convert value of type %T to type %T", src, dflt)
}

View File

@ -509,6 +509,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node,
n, err = newStateDurationNode(et, t, d)
case *pipeline.StateCountNode:
n, err = newStateCountNode(et, t, d)
case *pipeline.SideloadNode:
n, err = newSideloadNode(et, t, d)
default:
return nil, fmt.Errorf("unknown pipeline node type %T", p)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
"github.com/influxdata/kapacitor/services/sensu"
"github.com/influxdata/kapacitor/services/sideload"
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/snmptrap"
@ -173,6 +174,10 @@ type TaskMaster struct {
Client(string) (swarm.Client, error)
}
SideloadService interface {
Source(dir string) (sideload.Source, error)
}
Commander command.Commander
DefaultRetentionPolicy string
@ -263,6 +268,7 @@ func (tm *TaskMaster) New(id string) *TaskMaster {
n.TimingService = tm.TimingService
n.K8sService = tm.K8sService
n.Commander = tm.Commander
n.SideloadService = tm.SideloadService
return n
}

View File

@ -226,7 +226,7 @@ func (j JSONNode) NodeList(field string) ([]Node, error) {
if err != nil {
return nil, err
}
list, ok := l.([]map[string]interface{})
list, ok := l.([]interface{})
if !ok {
return nil, fmt.Errorf("field %s is not a list of values but is %T", field, l)
}