Update kapacitor vendoring to 6b3dc1247fd3e1c2a329b24ea6a0665fa6cf37c1

Signed-off-by: Deniz Kusefoglu <denizk@gmail.com>
pull/2447/head
Chris Goller 2017-11-30 14:39:33 -06:00
parent 4599530296
commit 1f9c3654aa
22 changed files with 939 additions and 20 deletions

4
Gopkg.lock generated
View File

@ -71,7 +71,7 @@
[[projects]] [[projects]]
name = "github.com/influxdata/kapacitor" name = "github.com/influxdata/kapacitor"
packages = ["client/v1","pipeline","pipeline/tick","services/k8s/client","tick","tick/ast","tick/stateful","udf/agent"] packages = ["client/v1","pipeline","pipeline/tick","services/k8s/client","tick","tick/ast","tick/stateful","udf/agent"]
revision = "291ca33f5d7b8b277cbb9a7afb65397d1769a99e" revision = "6b3dc1247fd3e1c2a329b24ea6a0665fa6cf37c1"
[[projects]] [[projects]]
name = "github.com/influxdata/usage-client" name = "github.com/influxdata/usage-client"
@ -140,6 +140,6 @@
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "46184c2d3fedb48dad6649bb1a97237bc5eef1f48ee1f4b69373e99783a2a47f" inputs-digest = "8dfb14505e667cb0e0402350f84db401dcb54cb73d5d41384fec0b98a1d2ba8c"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View File

@ -75,4 +75,4 @@ required = ["github.com/jteeuwen/go-bindata","github.com/gogo/protobuf/proto","g
[[constraint]] [[constraint]]
name = "github.com/influxdata/kapacitor" name = "github.com/influxdata/kapacitor"
revision = "291ca33f5d7b8b277cbb9a7afb65397d1769a99e" revision = "6b3dc1247fd3e1c2a329b24ea6a0665fa6cf37c1"

View File

@ -1,5 +1,19 @@
# Changelog # Changelog
## unreleased
## v1.4.0-rc2 [2017-11-28]
### Features
- [#1622](https://github.com/influxdata/kapacitor/pull/1622): Add support for AWS EC2 autoscaling services.
- [#1566](https://github.com/influxdata/kapacitor/pull/1566): Add BarrierNode to emit BarrierMessage periodically
### Bugfixes
- [#1250](https://github.com/influxdata/kapacitor/issues/1250): Fix VictorOps "data" field being a string instead of actual JSON.
- [#1697](https://github.com/influxdata/kapacitor/issues/1697): Fix panic with MQTT toml configuration generation.
## v1.4.0-rc1 [2017-11-09] ## v1.4.0-rc1 [2017-11-09]
### Features ### Features

View File

@ -107,7 +107,7 @@ go fmt ./...
go vet ./... go vet ./...
``` ```
For more information on `go vet`, [read the GoDoc](https://godoc.org/golang.org/x/tools/cmd/vet). For more information on `go vet`, [read the GoDoc](https://golang.org/pkg/cmd/go/internal/vet/).
Build and Test Build and Test
-------------- --------------

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/pipeline"
ec2 "github.com/influxdata/kapacitor/services/ec2/client"
k8s "github.com/influxdata/kapacitor/services/k8s/client" k8s "github.com/influxdata/kapacitor/services/k8s/client"
swarm "github.com/influxdata/kapacitor/services/swarm/client" swarm "github.com/influxdata/kapacitor/services/swarm/client"
"github.com/influxdata/kapacitor/tick/ast" "github.com/influxdata/kapacitor/tick/ast"
@ -536,3 +537,96 @@ func (a *swarmAutoscaler) SetResourceIDOnTags(id resourceID, tags models.Tags) {
tags[a.outputServiceNameTag] = id.ID() tags[a.outputServiceNameTag] = id.ID()
} }
} }
/////////////////////////////////////////////
// EC2 implementation of Autoscaler
type ec2Autoscaler struct {
client ec2.Client
groupName string
groupNameTag string
outputGroupNameTag string
}
func newEc2AutoscaleNode(et *ExecutingTask, n *pipeline.Ec2AutoscaleNode, d NodeDiagnostic) (*AutoscaleNode, error) {
client, err := et.tm.EC2Service.Client(n.Cluster)
if err != nil {
return nil, fmt.Errorf("cannot use the EC2Autoscale node, could not create ec2 client: %v", err)
}
outputGroupNameTag := n.OutputGroupNameTag
if outputGroupNameTag == "" {
outputGroupNameTag = n.GroupNameTag
}
a := &ec2Autoscaler{
client: client,
groupName: n.GroupName,
groupNameTag: n.GroupNameTag,
outputGroupNameTag: outputGroupNameTag,
}
return newAutoscaleNode(
et,
d,
n,
a,
int(n.Min),
int(n.Max),
n.IncreaseCooldown,
n.DecreaseCooldown,
n.CurrentField,
n.Replicas,
)
}
type ec2ResourceID string
func (id ec2ResourceID) ID() string {
return string(id)
}
func (a *ec2Autoscaler) ResourceIDFromTags(tags models.Tags) (resourceID, error) {
// Get the name of the resource
var name string
switch {
case a.groupName != "":
name = a.groupName
case a.groupNameTag != "":
t, ok := tags[a.groupNameTag]
if ok {
name = t
}
default:
return nil, errors.New("expected one of GroupName or GroupNameTag to be set")
}
if name == "" {
return nil, errors.New("could not determine the name of the resource")
}
return swarmResourceID(name), nil
}
func (a *ec2Autoscaler) Replicas(id resourceID) (int, error) {
sid := id.ID()
group, err := a.client.Group(sid)
if err != nil {
return 0, errors.Wrapf(err, "failed to get ec2 autoscaleGroup for %q", id)
}
var desiredcapacity int64
for _, resp := range group.AutoScalingGroups {
desiredcapacity = *resp.DesiredCapacity
}
return int(desiredcapacity), nil
}
func (a *ec2Autoscaler) SetReplicas(id resourceID, replicas int) error {
sid := id.ID()
return a.client.UpdateGroup(sid, int64(replicas))
}
func (a *ec2Autoscaler) SetResourceIDOnTags(id resourceID, tags models.Tags) {
if a.outputGroupNameTag != "" {
tags[a.outputGroupNameTag] = id.ID()
}
}

277
vendor/github.com/influxdata/kapacitor/barrier.go generated vendored Normal file
View File

@ -0,0 +1,277 @@
package kapacitor
import (
"errors"
"time"
"sync"
"sync/atomic"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
type BarrierNode struct {
node
b *pipeline.BarrierNode
barrierStopper map[models.GroupID]func()
}
// Create a new BarrierNode, which emits a barrier if data traffic has been idle for the configured amount of time.
func newBarrierNode(et *ExecutingTask, n *pipeline.BarrierNode, d NodeDiagnostic) (*BarrierNode, error) {
if n.Idle == 0 && n.Period == 0 {
return nil, errors.New("barrier node must have either a non zero idle or a non zero period")
}
bn := &BarrierNode{
node: node{Node: n, et: et, diag: d},
b: n,
barrierStopper: map[models.GroupID]func(){},
}
bn.node.runF = bn.runBarrierEmitter
return bn, nil
}
func (n *BarrierNode) runBarrierEmitter([]byte) error {
defer n.stopBarrierEmitter()
consumer := edge.NewGroupedConsumer(n.ins[0], n)
n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar())
return consumer.Consume()
}
func (n *BarrierNode) stopBarrierEmitter() {
for _, stopF := range n.barrierStopper {
stopF()
}
}
func (n *BarrierNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) {
r, stopF, err := n.newBarrier(group, first)
if err != nil {
return nil, err
}
n.barrierStopper[group.ID] = stopF
return edge.NewReceiverFromForwardReceiverWithStats(
n.outs,
edge.NewTimedForwardReceiver(n.timer, r),
), nil
}
func (n *BarrierNode) newBarrier(group edge.GroupInfo, first edge.PointMeta) (edge.ForwardReceiver, func(), error) {
switch {
case n.b.Idle != 0:
idleBarrier := newIdleBarrier(
first.Name(),
group,
n.b.Idle,
n.outs,
)
return idleBarrier, idleBarrier.Stop, nil
case n.b.Period != 0:
periodicBarrier := newPeriodicBarrier(
first.Name(),
group,
n.b.Period,
n.outs,
)
return periodicBarrier, periodicBarrier.Stop, nil
default:
return nil, nil, errors.New("unreachable code, barrier node should have non-zero idle or non-zero period")
}
}
type idleBarrier struct {
name string
group edge.GroupInfo
idle time.Duration
lastT atomic.Value
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan struct{}
resetTimerC chan struct{}
}
func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs []edge.StatsEdge) *idleBarrier {
r := &idleBarrier{
name: name,
group: group,
idle: idle,
lastT: atomic.Value{},
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}),
resetTimerC: make(chan struct{}),
}
r.Init()
return r
}
func (n *idleBarrier) Init() {
n.lastT.Store(time.Time{})
n.wg.Add(1)
go n.idleHandler()
}
func (n *idleBarrier) Stop() {
close(n.stopC)
n.wg.Wait()
}
func (n *idleBarrier) BeginBatch(m edge.BeginBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *idleBarrier) BatchPoint(m edge.BatchPointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
n.resetTimer()
return m, nil
}
return nil, nil
}
func (n *idleBarrier) EndBatch(m edge.EndBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *idleBarrier) Barrier(m edge.BarrierMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
n.resetTimer()
return m, nil
}
return nil, nil
}
func (n *idleBarrier) DeleteGroup(m edge.DeleteGroupMessage) (edge.Message, error) {
if m.GroupID() == n.group.ID {
n.Stop()
}
return m, nil
}
func (n *idleBarrier) Point(m edge.PointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
n.resetTimer()
return m, nil
}
return nil, nil
}
func (n *idleBarrier) resetTimer() {
n.resetTimerC <- struct{}{}
}
func (n *idleBarrier) emitBarrier() error {
nowT := time.Now().UTC()
n.lastT.Store(nowT)
return edge.Forward(n.outs, edge.NewBarrierMessage(n.group, nowT))
}
func (n *idleBarrier) idleHandler() {
defer n.wg.Done()
idleTimer := time.NewTimer(n.idle)
for {
select {
case <-n.resetTimerC:
if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(n.idle)
case <-idleTimer.C:
n.emitBarrier()
idleTimer.Reset(n.idle)
case <-n.stopC:
idleTimer.Stop()
return
}
}
}
type periodicBarrier struct {
name string
group edge.GroupInfo
lastT atomic.Value
ticker *time.Ticker
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan struct{}
}
func newPeriodicBarrier(name string, group edge.GroupInfo, period time.Duration, outs []edge.StatsEdge) *periodicBarrier {
r := &periodicBarrier{
name: name,
group: group,
lastT: atomic.Value{},
ticker: time.NewTicker(period),
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}),
}
r.Init()
return r
}
func (n *periodicBarrier) Init() {
n.lastT.Store(time.Time{})
n.wg.Add(1)
go n.periodicEmitter()
}
func (n *periodicBarrier) Stop() {
close(n.stopC)
n.ticker.Stop()
n.wg.Wait()
}
func (n *periodicBarrier) BeginBatch(m edge.BeginBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *periodicBarrier) BatchPoint(m edge.BatchPointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
return m, nil
}
return nil, nil
}
func (n *periodicBarrier) EndBatch(m edge.EndBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *periodicBarrier) Barrier(m edge.BarrierMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
return m, nil
}
return nil, nil
}
func (n *periodicBarrier) DeleteGroup(m edge.DeleteGroupMessage) (edge.Message, error) {
if m.GroupID() == n.group.ID {
n.Stop()
}
return m, nil
}
func (n *periodicBarrier) Point(m edge.PointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
return m, nil
}
return nil, nil
}
func (n *periodicBarrier) emitBarrier() error {
nowT := time.Now().UTC()
n.lastT.Store(nowT)
return edge.Forward(n.outs, edge.NewBarrierMessage(n.group, nowT))
}
func (n *periodicBarrier) periodicEmitter() {
defer n.wg.Done()
for {
select {
case <-n.ticker.C:
n.emitBarrier()
case <-n.stopC:
return
}
}
}

View File

@ -0,0 +1,57 @@
package pipeline
import (
"errors"
"time"
)
// A BarrierNode will emit a barrier with the current time, according to the system
// clock. Since the BarrierNode emits based on system time, it allows pipelines to be
// forced in the absence of data traffic. The barrier emitted will be based on either
// idle time since the last received message or on a periodic timer based on the system
// clock. Any messages received after an emitted barrier that is older than the last
// emitted barrier will be dropped.
//
// Example:
// stream
// |barrier().idle(5s)
// |window()
// .period(10s)
// .every(5s)
// |top(10, 'value')
// //Post the top 10 results over the last 10s updated every 5s.
// |httpPost('http://example.com/api/top10')
//
type BarrierNode struct {
chainnode
// Emit barrier based on idle time since the last received message.
// Must be greater than zero.
Idle time.Duration
// Emit barrier based on periodic timer. The timer is based on system
// clock rather than message time.
// Must be greater than zero.
Period time.Duration
}
func newBarrierNode(wants EdgeType) *BarrierNode {
return &BarrierNode{
chainnode: newBasicChainNode("barrier", wants, wants),
}
}
// tick:ignore
func (b *BarrierNode) validate() error {
if b.Idle != 0 && b.Period != 0 {
return errors.New("cannot specify both idle and period")
}
if b.Period == 0 && b.Idle <= 0 {
return errors.New("idle must be greater than zero")
}
if b.Period <= 0 && b.Idle == 0 {
return errors.New("period must be greater than zero")
}
return nil
}

View File

@ -0,0 +1,135 @@
package pipeline
import (
"errors"
"fmt"
"time"
"github.com/influxdata/kapacitor/tick/ast"
)
// EC2AutoscaleNode triggers autoscale events for a group on a AWS Autoscaling group.
// The node also outputs points for the triggered events.
//
// Example:
// // Target 80% cpu per ec2 instance
// var target = 80.0
// var min = 1
// var max = 10
// var period = 5m
// var every = period
// stream
// |from()
// .measurement('cpu')
// .groupBy('host_name','group_name')
// .where(lambda: "cpu" == 'cpu-total')
// |eval(lambda: 100.0 - "usage_idle")
// .as('usage_percent')
// |window()
// .period(period)
// .every(every)
// |mean('usage_percent')
// .as('mean_cpu')
// |groupBy('group_name')
// |sum('mean_cpu')
// .as('total_cpu')
// |ec2Autoscale()
// // Get the group name of the VM(EC2 instance) from "group_name" tag.
// .groupNameTag('group_name')
// .min(min)
// .max(max)
// // Set the desired number of replicas based on target.
// .replicas(lambda: int(ceil("total_cpu" / target)))
// |influxDBOut()
// .database('deployments')
// .measurement('scale_events')
// .precision('s')
//
//
// The above example computes the mean of cpu usage_percent by host_name name and group_name
// Then sum of mean cpu_usage is calculated as total_cpu.
// Using the total_cpu over the last time period a desired number of replicas is computed
// based on the target percentage usage of cpu.
//
// If the desired number of replicas has changed, Kapacitor makes the appropriate API call to AWS autoscaling group
// to update the replicas spec.
//
// Any time the Ec2Autoscale node changes a replica count, it emits a point.
// The point is tagged with the group name,
// using the groupName respectively
// In addition the group by tags will be preserved on the emitted point.
// The point contains two fields: `old`, and `new` representing change in the replicas.
//
// Available Statistics:
//
// * increase_events -- number of times the replica count was increased.
// * decrease_events -- number of times the replica count was decreased.
// * cooldown_drops -- number of times an event was dropped because of a cooldown timer.
// * errors -- number of errors encountered, typically related to communicating with the AWS autoscaling API.
//
type Ec2AutoscaleNode struct {
chainnode
// Cluster is the ID of ec2 autoscale group to use.
// The ID of the cluster is specified in the kapacitor configuration.
Cluster string
// GroupName is the name of the autoscaling group to autoscale.
GroupName string
// GroupName is the name of a tag which contains the name of the autoscaling group to autoscale.
GroupNameTag string
// OutputGroupName is the name of a tag into which the group name will be written for output autoscale events.
// Defaults to the value of GroupNameTag if its not empty.
OutputGroupNameTag string
// CurrentField is the name of a field into which the current replica count will be set as an int.
// If empty no field will be set.
// Useful for computing deltas on the current state.
//
// Example:
// |ec2Autoscale()
// .currentField('replicas')
// // Increase the replicas by 1 if the qps is over the threshold
// .replicas(lambda: if("qps" > threshold, "replicas" + 1, "replicas"))
//
CurrentField string
// The maximum scale factor to set.
// If 0 then there is no upper limit.
// Default: 0, a.k.a no limit.
Max int64
// The minimum scale factor to set.
// Default: 1
Min int64
// Replicas is a lambda expression that should evaluate to the desired number of replicas for the resource.
Replicas *ast.LambdaNode
// Only one increase event can be triggered per resource every IncreaseCooldown interval.
IncreaseCooldown time.Duration
// Only one decrease event can be triggered per resource every DecreaseCooldown interval.
DecreaseCooldown time.Duration
}
func newEc2AutoscaleNode(e EdgeType) *Ec2AutoscaleNode {
k := &Ec2AutoscaleNode{
chainnode: newBasicChainNode("ec2_autoscale", e, StreamEdge),
Min: 1,
}
return k
}
func (n *Ec2AutoscaleNode) validate() error {
if (n.GroupName == "" && n.GroupNameTag == "") ||
(n.GroupName != "" && n.GroupNameTag != "") {
return fmt.Errorf("must specify exactly one of GroupName or GroupNameTag")
}
if n.Min < 1 {
return fmt.Errorf("min must be >= 1, got %d", n.Min)
}
if n.Replicas == nil {
return errors.New("must provide a replicas lambda expression")
}
return nil
}

View File

@ -417,6 +417,15 @@ func (n *chainnode) Window() *WindowNode {
return w return w
} }
// Create a new Barrier node that emits a BarrierMessage periodically
//
// One BarrierMessage will be emitted every period duration
func (n *chainnode) Barrier() *BarrierNode {
b := newBarrierNode(n.provides)
n.linkChild(b)
return b
}
// Create a new node that samples the incoming points or batches. // Create a new node that samples the incoming points or batches.
// //
// One point will be emitted every count or duration specified. // One point will be emitted every count or duration specified.
@ -475,6 +484,13 @@ func (n *chainnode) SwarmAutoscale() *SwarmAutoscaleNode {
return k return k
} }
// Create a node that can trigger autoscale events for a ec2 autoscalegroup.
func (n *chainnode) Ec2Autoscale() *Ec2AutoscaleNode {
k := newEc2AutoscaleNode(n.Provides())
n.linkChild(k)
return k
}
// Create a node that tracks duration in a given state. // Create a node that tracks duration in a given state.
func (n *chainnode) StateDuration(expression *ast.LambdaNode) *StateDurationNode { func (n *chainnode) StateDuration(expression *ast.LambdaNode) *StateDurationNode {
sd := newStateDurationNode(n.provides, expression) sd := newStateDurationNode(n.provides, expression)

View File

@ -73,8 +73,7 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) {
} }
for _, h := range a.TcpHandlers { for _, h := range a.TcpHandlers {
n.Dot("tcp"). n.Dot("tcp", h.Address)
Dot("address", h.Address)
} }
for _, h := range a.EmailHandlers { for _, h := range a.EmailHandlers {

View File

@ -1,6 +1,7 @@
package tick_test package tick_test
import ( import (
"encoding/json"
"testing" "testing"
"time" "time"
@ -124,8 +125,50 @@ func TestAlertTCP(t *testing.T) {
.message('{{ .ID }} is {{ .Level }}') .message('{{ .ID }} is {{ .Level }}')
.details('{{ json . }}') .details('{{ json . }}')
.history(21) .history(21)
.tcp() .tcp('echo:7')
.address('echo:7') `
PipelineTickTestHelper(t, pipe, want)
}
func TestAlertTCPJSON(t *testing.T) {
pipe, _, from := StreamFrom()
j := `
{
"typeOf": "alert",
"stateChangesOnly": false,
"useFlapping": false,
"message": "",
"details": "",
"post": null,
"tcp": [
{
"address": "echo:7"
}
],
"email": null,
"exec": null,
"log": null,
"victorOps": null,
"pagerDuty": null,
"pushover": null,
"sensu": null,
"slack": null,
"telegram": null,
"hipChat": null,
"alerta": null,
"opsGenie": null,
"talk": null
}`
node := from.Alert()
if err := json.Unmarshal([]byte(j), node); err != nil {
t.Errorf("unable to unmarshal alert %v", err)
}
want := `stream
|from()
|alert()
.id('{{ .Name }}:{{ .Group }}')
.history(21)
.tcp('echo:7')
` `
PipelineTickTestHelper(t, pipe, want) PipelineTickTestHelper(t, pipe, want)
} }

View File

@ -88,6 +88,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) {
return NewJoin(parents).Build(node) return NewJoin(parents).Build(node)
case *pipeline.AlertNode: case *pipeline.AlertNode:
return NewAlert(parents).Build(node) return NewAlert(parents).Build(node)
case *pipeline.BarrierNode:
return NewBarrierNode(parents).Build(node)
case *pipeline.CombineNode: case *pipeline.CombineNode:
return NewCombine(parents).Build(node) return NewCombine(parents).Build(node)
case *pipeline.DefaultNode: case *pipeline.DefaultNode:
@ -96,6 +98,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) {
return NewDelete(parents).Build(node) return NewDelete(parents).Build(node)
case *pipeline.DerivativeNode: case *pipeline.DerivativeNode:
return NewDerivative(parents).Build(node) return NewDerivative(parents).Build(node)
case *pipeline.Ec2AutoscaleNode:
return NewEc2Autoscale(parents).Build(node)
case *pipeline.EvalNode: case *pipeline.EvalNode:
return NewEval(parents).Build(node) return NewEval(parents).Build(node)
case *pipeline.FlattenNode: case *pipeline.FlattenNode:

View File

@ -0,0 +1,28 @@
package tick
import (
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
)
// BarrierNode converts the window pipeline node into the TICKScript AST
type BarrierNode struct {
Function
}
// NewBarrierNode creates a Barrier function builder
func NewBarrierNode(parents []ast.Node) *BarrierNode {
return &BarrierNode{
Function{
Parents: parents,
},
}
}
// Build creates a window ast.Node
func (n *BarrierNode) Build(b *pipeline.BarrierNode) (ast.Node, error) {
n.Pipe("barrier").
Dot("idle", b.Idle).
Dot("period", b.Period)
return n.prev, n.err
}

View File

@ -0,0 +1,61 @@
package tick_test
import (
"testing"
"time"
"github.com/influxdata/kapacitor/pipeline"
)
func TestBarrierNode(t *testing.T) {
type args struct {
idle time.Duration
period time.Duration
}
tests := []struct {
name string
args args
want string
}{
{
name: "barrier with idle",
args: args{
idle: time.Second,
},
want: `stream
|from()
|barrier()
.idle(1s)
`,
},
{
name: "barrier with period",
args: args{
period: time.Second,
},
want: `stream
|from()
|barrier()
.period(1s)
`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stream := &pipeline.StreamNode{}
pipe := pipeline.CreatePipelineSources(stream)
b := stream.From().Barrier()
b.Idle = tt.args.idle
b.Period = tt.args.period
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}
if got != tt.want {
t.Errorf("%q. TestBarrier() =\n%v\n want\n%v\n", tt.name, got, tt.want)
}
})
}
}

View File

@ -0,0 +1,37 @@
package tick
import (
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
)
// Ec2AutoscaleNode converts the ec2 autoscaling pipeline node into the TICKScript AST
type Ec2AutoscaleNode struct {
Function
}
// NewEc2Autoscale creates a Ec2Autoscale function builder
func NewEc2Autoscale(parents []ast.Node) *Ec2AutoscaleNode {
return &Ec2AutoscaleNode{
Function{
Parents: parents,
},
}
}
// Build creates a Ec2Autoscale ast.Node
func (n *Ec2AutoscaleNode) Build(s *pipeline.Ec2AutoscaleNode) (ast.Node, error) {
n.Pipe("ec2Autoscale").
Dot("cluster", s.Cluster).
Dot("groupName", s.GroupNameTag).
Dot("groupNameTag", s.GroupNameTag).
Dot("outputGroupNameTag", s.OutputGroupNameTag).
Dot("currentField", s.CurrentField).
Dot("max", s.Max).
Dot("min", s.Min).
Dot("replicas", s.Replicas).
Dot("increaseCooldown", s.IncreaseCooldown).
Dot("decreaseCooldown", s.DecreaseCooldown)
return n.prev, n.err
}

View File

@ -0,0 +1,115 @@
package tick_test
import (
"testing"
"time"
"github.com/influxdata/kapacitor/tick/ast"
)
func TestEc2Autoscale(t *testing.T) {
type args struct {
cluster string
groupName string
groupNameTag string
outputGroupNameTag string
currentField string
max int64
min int64
replicas *ast.LambdaNode
increaseCooldown time.Duration
decreaseCooldown time.Duration
}
tests := []struct {
name string
args args
want string
}{
{
name: "upgrade mutalisk_autoscale to guardian_autoscale",
args: args{
cluster: "zerg",
groupName: "mutalisk_autoscale",
groupNameTag: "mutalisk_autoscale",
outputGroupNameTag: "guardian_autoscale",
currentField: "hitPoints",
max: 10,
min: 5,
replicas: &ast.LambdaNode{
Expression: &ast.FunctionNode{
Type: ast.GlobalFunc,
Func: "if",
Args: []ast.Node{
&ast.BinaryNode{
Operator: ast.TokenGreater,
Left: &ast.ReferenceNode{
Reference: "greater spire",
},
Right: &ast.NumberNode{
IsInt: true,
Int64: 1,
Base: 10,
},
},
&ast.BinaryNode{
Operator: ast.TokenPlus,
Left: &ast.ReferenceNode{
Reference: "replicas",
},
Right: &ast.NumberNode{
IsInt: true,
Int64: 1,
Base: 10,
},
},
&ast.ReferenceNode{
Reference: "replicas",
},
},
},
},
increaseCooldown: 6670 * time.Millisecond,
decreaseCooldown: 2500 * time.Millisecond,
},
want: `stream
|from()
|ec2Autoscale()
.cluster('zerg')
.groupName('mutalisk_autoscale')
.groupNameTag('mutalisk_autoscale')
.outputGroupNameTag('guardian_autoscale')
.currentField('hitPoints')
.max(10)
.min(5)
.replicas(lambda: if("greater spire" > 1, "replicas" + 1, "replicas"))
.increaseCooldown(6670ms)
.decreaseCooldown(2500ms)
`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pipe, _, from := StreamFrom()
n := from.Ec2Autoscale()
n.Cluster = tt.args.cluster
n.GroupName = tt.args.groupName
n.GroupNameTag = tt.args.groupNameTag
n.OutputGroupNameTag = tt.args.outputGroupNameTag
n.CurrentField = tt.args.currentField
n.Max = tt.args.max
n.Min = tt.args.min
n.Replicas = tt.args.replicas
n.IncreaseCooldown = tt.args.increaseCooldown
n.DecreaseCooldown = tt.args.decreaseCooldown
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}
if got != tt.want {
t.Errorf("%q. TestEc2Autoscale() =\n%v\n want\n%v\n", tt.name, got, tt.want)
t.Log(got)
}
})
}
}

View File

@ -23,7 +23,7 @@ func NewSwarmAutoscale(parents []ast.Node) *SwarmAutoscaleNode {
func (n *SwarmAutoscaleNode) Build(s *pipeline.SwarmAutoscaleNode) (ast.Node, error) { func (n *SwarmAutoscaleNode) Build(s *pipeline.SwarmAutoscaleNode) (ast.Node, error) {
n.Pipe("swarmAutoscale"). n.Pipe("swarmAutoscale").
Dot("cluster", s.Cluster). Dot("cluster", s.Cluster).
Dot("servceName", s.ServiceNameTag). Dot("serviceName", s.ServiceNameTag).
Dot("serviceNameTag", s.ServiceNameTag). Dot("serviceNameTag", s.ServiceNameTag).
Dot("outputServiceNameTag", s.OutputServiceNameTag). Dot("outputServiceNameTag", s.OutputServiceNameTag).
Dot("currentField", s.CurrentField). Dot("currentField", s.CurrentField).

View File

@ -75,7 +75,7 @@ func TestSwarmAutoscale(t *testing.T) {
|from() |from()
|swarmAutoscale() |swarmAutoscale()
.cluster('zerg') .cluster('zerg')
.servceName('mutalisk') .serviceName('mutalisk')
.serviceNameTag('mutalisk') .serviceNameTag('mutalisk')
.outputServiceNameTag('guardian') .outputServiceNameTag('guardian')
.currentField('hitPoints') .currentField('hitPoints')

View File

@ -15,9 +15,9 @@ import (
// The `every` property of `window` defines the frequency at which the window // The `every` property of `window` defines the frequency at which the window
// is emitted to the next node in the pipeline. // is emitted to the next node in the pipeline.
// //
//The `align` property of `window` defines how to align the window edges. // The `align` property of `window` defines how to align the window edges.
//(By default, the edges are defined relative to the first data point the `window` // (By default, the edges are defined relative to the first data point the `window`
//node receives.) // node receives.)
// //
// Example: // Example:
// stream // stream
@ -26,7 +26,7 @@ import (
// .every(5m) // .every(5m)
// |httpOut('recent') // |httpOut('recent')
// //
// his example emits the last `10 minute` period every `5 minutes` to the pipeline's `httpOut` node. // This example emits the last `10 minute` period every `5 minutes` to the pipeline's `httpOut` node.
// Because `every` is less than `period`, each time the window is emitted it contains `5 minutes` of // Because `every` is less than `period`, each time the window is emitted it contains `5 minutes` of
// new data and `5 minutes` of the previous period's data. // new data and `5 minutes` of the previous period's data.
// //
@ -139,7 +139,7 @@ func (w *WindowNode) validate() error {
return errors.New("can only align windows based off time, not count") return errors.New("can only align windows based off time, not count")
} }
if w.PeriodCount != 0 && w.EveryCount <= 0 { if w.PeriodCount != 0 && w.EveryCount <= 0 {
return fmt.Errorf("everyCount must be greater than zero") return errors.New("everyCount must be greater than zero")
} }
return nil return nil
} }

View File

@ -505,12 +505,16 @@ func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node,
n, err = newK8sAutoscaleNode(et, t, d) n, err = newK8sAutoscaleNode(et, t, d)
case *pipeline.SwarmAutoscaleNode: case *pipeline.SwarmAutoscaleNode:
n, err = newSwarmAutoscaleNode(et, t, d) n, err = newSwarmAutoscaleNode(et, t, d)
case *pipeline.Ec2AutoscaleNode:
n, err = newEc2AutoscaleNode(et, t, d)
case *pipeline.StateDurationNode: case *pipeline.StateDurationNode:
n, err = newStateDurationNode(et, t, d) n, err = newStateDurationNode(et, t, d)
case *pipeline.StateCountNode: case *pipeline.StateCountNode:
n, err = newStateCountNode(et, t, d) n, err = newStateCountNode(et, t, d)
case *pipeline.SideloadNode: case *pipeline.SideloadNode:
n, err = newSideloadNode(et, t, d) n, err = newSideloadNode(et, t, d)
case *pipeline.BarrierNode:
n, err = newBarrierNode(et, t, d)
default: default:
return nil, fmt.Errorf("unknown pipeline node type %T", p) return nil, fmt.Errorf("unknown pipeline node type %T", p)
} }

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/kapacitor/server/vars" "github.com/influxdata/kapacitor/server/vars"
alertservice "github.com/influxdata/kapacitor/services/alert" alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/alerta"
ec2 "github.com/influxdata/kapacitor/services/ec2/client"
"github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/httppost"
@ -173,6 +174,9 @@ type TaskMaster struct {
SwarmService interface { SwarmService interface {
Client(string) (swarm.Client, error) Client(string) (swarm.Client, error)
} }
EC2Service interface {
Client(string) (ec2.Client, error)
}
SideloadService interface { SideloadService interface {
Source(dir string) (sideload.Source, error) Source(dir string) (sideload.Source, error)

View File

@ -28,10 +28,11 @@ func newWindowNode(et *ExecutingTask, n *pipeline.WindowNode, d NodeDiagnostic)
return wn, nil return wn, nil
} }
func (n *WindowNode) runWindow([]byte) error { func (n *WindowNode) runWindow([]byte) (err error) {
consumer := edge.NewGroupedConsumer(n.ins[0], n) consumer := edge.NewGroupedConsumer(n.ins[0], n)
n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar()) n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar())
return consumer.Consume() err = consumer.Consume()
return
} }
func (n *WindowNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) { func (n *WindowNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) {
@ -145,9 +146,39 @@ func (w *windowByTime) BatchPoint(edge.BatchPointMessage) (edge.Message, error)
func (w *windowByTime) EndBatch(edge.EndBatchMessage) (edge.Message, error) { func (w *windowByTime) EndBatch(edge.EndBatchMessage) (edge.Message, error) {
return nil, errors.New("window does not support batch data") return nil, errors.New("window does not support batch data")
} }
func (w *windowByTime) Barrier(b edge.BarrierMessage) (edge.Message, error) { func (w *windowByTime) Barrier(b edge.BarrierMessage) (msg edge.Message, err error) {
//TODO(nathanielc): Implement barrier messages to flush window if w.every == 0 {
return b, nil // Since we are emitting every point we can use a right aligned window (oldest, now]
if !b.Time().Before(w.nextEmit) {
// purge old points
oldest := b.Time().Add(-1 * w.period)
w.buf.purge(oldest, false)
// get current batch
msg = w.batch(b.Time())
// Next emit time is now
w.nextEmit = b.Time()
}
} else {
// Since more points can arrive with the same time we need to use a left aligned window [oldest, now).
if !b.Time().Before(w.nextEmit) {
// purge old points
oldest := w.nextEmit.Add(-1 * w.period)
w.buf.purge(oldest, true)
// get current batch
msg = w.batch(w.nextEmit)
// Determine next emit time.
// This is dependent on the current time not the last time we emitted.
w.nextEmit = b.Time().Add(w.every)
if w.align {
w.nextEmit = w.nextEmit.Truncate(w.every)
}
}
}
return
} }
func (w *windowByTime) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) { func (w *windowByTime) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil return d, nil