diff --git a/Gopkg.lock b/Gopkg.lock index 4797cfffc..978e33b0e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -71,7 +71,7 @@ [[projects]] name = "github.com/influxdata/kapacitor" packages = ["client/v1","pipeline","pipeline/tick","services/k8s/client","tick","tick/ast","tick/stateful","udf/agent"] - revision = "291ca33f5d7b8b277cbb9a7afb65397d1769a99e" + revision = "6b3dc1247fd3e1c2a329b24ea6a0665fa6cf37c1" [[projects]] name = "github.com/influxdata/usage-client" @@ -140,6 +140,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "46184c2d3fedb48dad6649bb1a97237bc5eef1f48ee1f4b69373e99783a2a47f" + inputs-digest = "8dfb14505e667cb0e0402350f84db401dcb54cb73d5d41384fec0b98a1d2ba8c" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 628cdde62..0220d633b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -75,4 +75,4 @@ required = ["github.com/jteeuwen/go-bindata","github.com/gogo/protobuf/proto","g [[constraint]] name = "github.com/influxdata/kapacitor" - revision = "291ca33f5d7b8b277cbb9a7afb65397d1769a99e" + revision = "6b3dc1247fd3e1c2a329b24ea6a0665fa6cf37c1" diff --git a/vendor/github.com/influxdata/kapacitor/CHANGELOG.md b/vendor/github.com/influxdata/kapacitor/CHANGELOG.md index f9fedee72..7a082b279 100644 --- a/vendor/github.com/influxdata/kapacitor/CHANGELOG.md +++ b/vendor/github.com/influxdata/kapacitor/CHANGELOG.md @@ -1,5 +1,19 @@ # Changelog +## unreleased + +## v1.4.0-rc2 [2017-11-28] + +### Features + +- [#1622](https://github.com/influxdata/kapacitor/pull/1622): Add support for AWS EC2 autoscaling services. +- [#1566](https://github.com/influxdata/kapacitor/pull/1566): Add BarrierNode to emit BarrierMessage periodically + +### Bugfixes + +- [#1250](https://github.com/influxdata/kapacitor/issues/1250): Fix VictorOps "data" field being a string instead of actual JSON. +- [#1697](https://github.com/influxdata/kapacitor/issues/1697): Fix panic with MQTT toml configuration generation. + ## v1.4.0-rc1 [2017-11-09] ### Features diff --git a/vendor/github.com/influxdata/kapacitor/CONTRIBUTING.md b/vendor/github.com/influxdata/kapacitor/CONTRIBUTING.md index de27f3ba7..7c40d7caf 100644 --- a/vendor/github.com/influxdata/kapacitor/CONTRIBUTING.md +++ b/vendor/github.com/influxdata/kapacitor/CONTRIBUTING.md @@ -107,7 +107,7 @@ go fmt ./... go vet ./... ``` -For more information on `go vet`, [read the GoDoc](https://godoc.org/golang.org/x/tools/cmd/vet). +For more information on `go vet`, [read the GoDoc](https://golang.org/pkg/cmd/go/internal/vet/). Build and Test -------------- diff --git a/vendor/github.com/influxdata/kapacitor/autoscale.go b/vendor/github.com/influxdata/kapacitor/autoscale.go index 877f898b1..f3d7ade8f 100644 --- a/vendor/github.com/influxdata/kapacitor/autoscale.go +++ b/vendor/github.com/influxdata/kapacitor/autoscale.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" + ec2 "github.com/influxdata/kapacitor/services/ec2/client" k8s "github.com/influxdata/kapacitor/services/k8s/client" swarm "github.com/influxdata/kapacitor/services/swarm/client" "github.com/influxdata/kapacitor/tick/ast" @@ -536,3 +537,96 @@ func (a *swarmAutoscaler) SetResourceIDOnTags(id resourceID, tags models.Tags) { tags[a.outputServiceNameTag] = id.ID() } } + +///////////////////////////////////////////// +// EC2 implementation of Autoscaler + +type ec2Autoscaler struct { + client ec2.Client + + groupName string + groupNameTag string + outputGroupNameTag string +} + +func newEc2AutoscaleNode(et *ExecutingTask, n *pipeline.Ec2AutoscaleNode, d NodeDiagnostic) (*AutoscaleNode, error) { + client, err := et.tm.EC2Service.Client(n.Cluster) + if err != nil { + return nil, fmt.Errorf("cannot use the EC2Autoscale node, could not create ec2 client: %v", err) + } + outputGroupNameTag := n.OutputGroupNameTag + if outputGroupNameTag == "" { + outputGroupNameTag = n.GroupNameTag + } + a := &ec2Autoscaler{ + client: client, + + groupName: n.GroupName, + groupNameTag: n.GroupNameTag, + outputGroupNameTag: outputGroupNameTag, + } + return newAutoscaleNode( + et, + d, + n, + a, + int(n.Min), + int(n.Max), + n.IncreaseCooldown, + n.DecreaseCooldown, + n.CurrentField, + n.Replicas, + ) +} + +type ec2ResourceID string + +func (id ec2ResourceID) ID() string { + return string(id) +} + +func (a *ec2Autoscaler) ResourceIDFromTags(tags models.Tags) (resourceID, error) { + // Get the name of the resource + var name string + switch { + case a.groupName != "": + name = a.groupName + case a.groupNameTag != "": + t, ok := tags[a.groupNameTag] + if ok { + name = t + } + default: + return nil, errors.New("expected one of GroupName or GroupNameTag to be set") + } + if name == "" { + return nil, errors.New("could not determine the name of the resource") + } + return swarmResourceID(name), nil +} + +func (a *ec2Autoscaler) Replicas(id resourceID) (int, error) { + sid := id.ID() + group, err := a.client.Group(sid) + if err != nil { + return 0, errors.Wrapf(err, "failed to get ec2 autoscaleGroup for %q", id) + } + var desiredcapacity int64 + for _, resp := range group.AutoScalingGroups { + desiredcapacity = *resp.DesiredCapacity + } + return int(desiredcapacity), nil + +} + +func (a *ec2Autoscaler) SetReplicas(id resourceID, replicas int) error { + sid := id.ID() + + return a.client.UpdateGroup(sid, int64(replicas)) +} + +func (a *ec2Autoscaler) SetResourceIDOnTags(id resourceID, tags models.Tags) { + if a.outputGroupNameTag != "" { + tags[a.outputGroupNameTag] = id.ID() + } +} diff --git a/vendor/github.com/influxdata/kapacitor/barrier.go b/vendor/github.com/influxdata/kapacitor/barrier.go new file mode 100644 index 000000000..61d0507ca --- /dev/null +++ b/vendor/github.com/influxdata/kapacitor/barrier.go @@ -0,0 +1,277 @@ +package kapacitor + +import ( + "errors" + "time" + + "sync" + "sync/atomic" + + "github.com/influxdata/kapacitor/edge" + "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/pipeline" +) + +type BarrierNode struct { + node + b *pipeline.BarrierNode + barrierStopper map[models.GroupID]func() +} + +// Create a new BarrierNode, which emits a barrier if data traffic has been idle for the configured amount of time. +func newBarrierNode(et *ExecutingTask, n *pipeline.BarrierNode, d NodeDiagnostic) (*BarrierNode, error) { + if n.Idle == 0 && n.Period == 0 { + return nil, errors.New("barrier node must have either a non zero idle or a non zero period") + } + bn := &BarrierNode{ + node: node{Node: n, et: et, diag: d}, + b: n, + barrierStopper: map[models.GroupID]func(){}, + } + bn.node.runF = bn.runBarrierEmitter + return bn, nil +} + +func (n *BarrierNode) runBarrierEmitter([]byte) error { + defer n.stopBarrierEmitter() + consumer := edge.NewGroupedConsumer(n.ins[0], n) + n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar()) + return consumer.Consume() +} + +func (n *BarrierNode) stopBarrierEmitter() { + for _, stopF := range n.barrierStopper { + stopF() + } +} + +func (n *BarrierNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) { + r, stopF, err := n.newBarrier(group, first) + if err != nil { + return nil, err + } + n.barrierStopper[group.ID] = stopF + return edge.NewReceiverFromForwardReceiverWithStats( + n.outs, + edge.NewTimedForwardReceiver(n.timer, r), + ), nil +} + +func (n *BarrierNode) newBarrier(group edge.GroupInfo, first edge.PointMeta) (edge.ForwardReceiver, func(), error) { + switch { + case n.b.Idle != 0: + idleBarrier := newIdleBarrier( + first.Name(), + group, + n.b.Idle, + n.outs, + ) + return idleBarrier, idleBarrier.Stop, nil + case n.b.Period != 0: + periodicBarrier := newPeriodicBarrier( + first.Name(), + group, + n.b.Period, + n.outs, + ) + return periodicBarrier, periodicBarrier.Stop, nil + default: + return nil, nil, errors.New("unreachable code, barrier node should have non-zero idle or non-zero period") + } +} + +type idleBarrier struct { + name string + group edge.GroupInfo + + idle time.Duration + lastT atomic.Value + wg sync.WaitGroup + outs []edge.StatsEdge + stopC chan struct{} + resetTimerC chan struct{} +} + +func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs []edge.StatsEdge) *idleBarrier { + r := &idleBarrier{ + name: name, + group: group, + idle: idle, + lastT: atomic.Value{}, + wg: sync.WaitGroup{}, + outs: outs, + stopC: make(chan struct{}), + resetTimerC: make(chan struct{}), + } + + r.Init() + + return r +} + +func (n *idleBarrier) Init() { + n.lastT.Store(time.Time{}) + n.wg.Add(1) + + go n.idleHandler() +} + +func (n *idleBarrier) Stop() { + close(n.stopC) + n.wg.Wait() +} + +func (n *idleBarrier) BeginBatch(m edge.BeginBatchMessage) (edge.Message, error) { + return m, nil +} +func (n *idleBarrier) BatchPoint(m edge.BatchPointMessage) (edge.Message, error) { + if !m.Time().Before(n.lastT.Load().(time.Time)) { + n.resetTimer() + return m, nil + } + return nil, nil +} +func (n *idleBarrier) EndBatch(m edge.EndBatchMessage) (edge.Message, error) { + return m, nil +} +func (n *idleBarrier) Barrier(m edge.BarrierMessage) (edge.Message, error) { + if !m.Time().Before(n.lastT.Load().(time.Time)) { + n.resetTimer() + return m, nil + } + return nil, nil +} +func (n *idleBarrier) DeleteGroup(m edge.DeleteGroupMessage) (edge.Message, error) { + if m.GroupID() == n.group.ID { + n.Stop() + } + return m, nil +} + +func (n *idleBarrier) Point(m edge.PointMessage) (edge.Message, error) { + if !m.Time().Before(n.lastT.Load().(time.Time)) { + n.resetTimer() + return m, nil + } + return nil, nil +} + +func (n *idleBarrier) resetTimer() { + n.resetTimerC <- struct{}{} +} + +func (n *idleBarrier) emitBarrier() error { + nowT := time.Now().UTC() + n.lastT.Store(nowT) + return edge.Forward(n.outs, edge.NewBarrierMessage(n.group, nowT)) +} + +func (n *idleBarrier) idleHandler() { + defer n.wg.Done() + idleTimer := time.NewTimer(n.idle) + for { + select { + case <-n.resetTimerC: + if !idleTimer.Stop() { + <-idleTimer.C + } + idleTimer.Reset(n.idle) + case <-idleTimer.C: + n.emitBarrier() + idleTimer.Reset(n.idle) + case <-n.stopC: + idleTimer.Stop() + return + } + } +} + +type periodicBarrier struct { + name string + group edge.GroupInfo + + lastT atomic.Value + ticker *time.Ticker + wg sync.WaitGroup + outs []edge.StatsEdge + stopC chan struct{} +} + +func newPeriodicBarrier(name string, group edge.GroupInfo, period time.Duration, outs []edge.StatsEdge) *periodicBarrier { + r := &periodicBarrier{ + name: name, + group: group, + lastT: atomic.Value{}, + ticker: time.NewTicker(period), + wg: sync.WaitGroup{}, + outs: outs, + stopC: make(chan struct{}), + } + + r.Init() + + return r +} + +func (n *periodicBarrier) Init() { + n.lastT.Store(time.Time{}) + n.wg.Add(1) + + go n.periodicEmitter() +} + +func (n *periodicBarrier) Stop() { + close(n.stopC) + n.ticker.Stop() + n.wg.Wait() +} + +func (n *periodicBarrier) BeginBatch(m edge.BeginBatchMessage) (edge.Message, error) { + return m, nil +} +func (n *periodicBarrier) BatchPoint(m edge.BatchPointMessage) (edge.Message, error) { + if !m.Time().Before(n.lastT.Load().(time.Time)) { + return m, nil + } + return nil, nil +} +func (n *periodicBarrier) EndBatch(m edge.EndBatchMessage) (edge.Message, error) { + return m, nil +} +func (n *periodicBarrier) Barrier(m edge.BarrierMessage) (edge.Message, error) { + if !m.Time().Before(n.lastT.Load().(time.Time)) { + return m, nil + } + return nil, nil +} +func (n *periodicBarrier) DeleteGroup(m edge.DeleteGroupMessage) (edge.Message, error) { + if m.GroupID() == n.group.ID { + n.Stop() + } + return m, nil +} + +func (n *periodicBarrier) Point(m edge.PointMessage) (edge.Message, error) { + if !m.Time().Before(n.lastT.Load().(time.Time)) { + return m, nil + } + return nil, nil +} + +func (n *periodicBarrier) emitBarrier() error { + nowT := time.Now().UTC() + n.lastT.Store(nowT) + return edge.Forward(n.outs, edge.NewBarrierMessage(n.group, nowT)) +} + +func (n *periodicBarrier) periodicEmitter() { + defer n.wg.Done() + for { + select { + case <-n.ticker.C: + n.emitBarrier() + case <-n.stopC: + return + } + } +} diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/barrier.go b/vendor/github.com/influxdata/kapacitor/pipeline/barrier.go new file mode 100644 index 000000000..366bd440f --- /dev/null +++ b/vendor/github.com/influxdata/kapacitor/pipeline/barrier.go @@ -0,0 +1,57 @@ +package pipeline + +import ( + "errors" + "time" +) + +// A BarrierNode will emit a barrier with the current time, according to the system +// clock. Since the BarrierNode emits based on system time, it allows pipelines to be +// forced in the absence of data traffic. The barrier emitted will be based on either +// idle time since the last received message or on a periodic timer based on the system +// clock. Any messages received after an emitted barrier that is older than the last +// emitted barrier will be dropped. +// +// Example: +// stream +// |barrier().idle(5s) +// |window() +// .period(10s) +// .every(5s) +// |top(10, 'value') +// //Post the top 10 results over the last 10s updated every 5s. +// |httpPost('http://example.com/api/top10') +// +type BarrierNode struct { + chainnode + + // Emit barrier based on idle time since the last received message. + // Must be greater than zero. + Idle time.Duration + + // Emit barrier based on periodic timer. The timer is based on system + // clock rather than message time. + // Must be greater than zero. + Period time.Duration +} + +func newBarrierNode(wants EdgeType) *BarrierNode { + return &BarrierNode{ + chainnode: newBasicChainNode("barrier", wants, wants), + } +} + +// tick:ignore +func (b *BarrierNode) validate() error { + if b.Idle != 0 && b.Period != 0 { + return errors.New("cannot specify both idle and period") + } + if b.Period == 0 && b.Idle <= 0 { + return errors.New("idle must be greater than zero") + } + if b.Period <= 0 && b.Idle == 0 { + return errors.New("period must be greater than zero") + } + + return nil +} diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/ec2_autoscale.go b/vendor/github.com/influxdata/kapacitor/pipeline/ec2_autoscale.go new file mode 100644 index 000000000..c18b4f190 --- /dev/null +++ b/vendor/github.com/influxdata/kapacitor/pipeline/ec2_autoscale.go @@ -0,0 +1,135 @@ +package pipeline + +import ( + "errors" + "fmt" + "time" + + "github.com/influxdata/kapacitor/tick/ast" +) + +// EC2AutoscaleNode triggers autoscale events for a group on a AWS Autoscaling group. +// The node also outputs points for the triggered events. +// +// Example: +// // Target 80% cpu per ec2 instance +// var target = 80.0 +// var min = 1 +// var max = 10 +// var period = 5m +// var every = period +// stream +// |from() +// .measurement('cpu') +// .groupBy('host_name','group_name') +// .where(lambda: "cpu" == 'cpu-total') +// |eval(lambda: 100.0 - "usage_idle") +// .as('usage_percent') +// |window() +// .period(period) +// .every(every) +// |mean('usage_percent') +// .as('mean_cpu') +// |groupBy('group_name') +// |sum('mean_cpu') +// .as('total_cpu') +// |ec2Autoscale() +// // Get the group name of the VM(EC2 instance) from "group_name" tag. +// .groupNameTag('group_name') +// .min(min) +// .max(max) +// // Set the desired number of replicas based on target. +// .replicas(lambda: int(ceil("total_cpu" / target))) +// |influxDBOut() +// .database('deployments') +// .measurement('scale_events') +// .precision('s') +// +// +// The above example computes the mean of cpu usage_percent by host_name name and group_name +// Then sum of mean cpu_usage is calculated as total_cpu. +// Using the total_cpu over the last time period a desired number of replicas is computed +// based on the target percentage usage of cpu. +// +// If the desired number of replicas has changed, Kapacitor makes the appropriate API call to AWS autoscaling group +// to update the replicas spec. +// +// Any time the Ec2Autoscale node changes a replica count, it emits a point. +// The point is tagged with the group name, +// using the groupName respectively +// In addition the group by tags will be preserved on the emitted point. +// The point contains two fields: `old`, and `new` representing change in the replicas. +// +// Available Statistics: +// +// * increase_events -- number of times the replica count was increased. +// * decrease_events -- number of times the replica count was decreased. +// * cooldown_drops -- number of times an event was dropped because of a cooldown timer. +// * errors -- number of errors encountered, typically related to communicating with the AWS autoscaling API. +// +type Ec2AutoscaleNode struct { + chainnode + + // Cluster is the ID of ec2 autoscale group to use. + // The ID of the cluster is specified in the kapacitor configuration. + Cluster string + + // GroupName is the name of the autoscaling group to autoscale. + GroupName string + // GroupName is the name of a tag which contains the name of the autoscaling group to autoscale. + GroupNameTag string + // OutputGroupName is the name of a tag into which the group name will be written for output autoscale events. + // Defaults to the value of GroupNameTag if its not empty. + OutputGroupNameTag string + + // CurrentField is the name of a field into which the current replica count will be set as an int. + // If empty no field will be set. + // Useful for computing deltas on the current state. + // + // Example: + // |ec2Autoscale() + // .currentField('replicas') + // // Increase the replicas by 1 if the qps is over the threshold + // .replicas(lambda: if("qps" > threshold, "replicas" + 1, "replicas")) + // + CurrentField string + + // The maximum scale factor to set. + // If 0 then there is no upper limit. + // Default: 0, a.k.a no limit. + Max int64 + + // The minimum scale factor to set. + // Default: 1 + Min int64 + + // Replicas is a lambda expression that should evaluate to the desired number of replicas for the resource. + Replicas *ast.LambdaNode + + // Only one increase event can be triggered per resource every IncreaseCooldown interval. + IncreaseCooldown time.Duration + // Only one decrease event can be triggered per resource every DecreaseCooldown interval. + DecreaseCooldown time.Duration +} + +func newEc2AutoscaleNode(e EdgeType) *Ec2AutoscaleNode { + k := &Ec2AutoscaleNode{ + chainnode: newBasicChainNode("ec2_autoscale", e, StreamEdge), + Min: 1, + } + return k +} + +func (n *Ec2AutoscaleNode) validate() error { + if (n.GroupName == "" && n.GroupNameTag == "") || + (n.GroupName != "" && n.GroupNameTag != "") { + return fmt.Errorf("must specify exactly one of GroupName or GroupNameTag") + } + if n.Min < 1 { + return fmt.Errorf("min must be >= 1, got %d", n.Min) + } + if n.Replicas == nil { + return errors.New("must provide a replicas lambda expression") + } + return nil +} diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/node.go b/vendor/github.com/influxdata/kapacitor/pipeline/node.go index 0d2158dac..4050429b6 100644 --- a/vendor/github.com/influxdata/kapacitor/pipeline/node.go +++ b/vendor/github.com/influxdata/kapacitor/pipeline/node.go @@ -417,6 +417,15 @@ func (n *chainnode) Window() *WindowNode { return w } +// Create a new Barrier node that emits a BarrierMessage periodically +// +// One BarrierMessage will be emitted every period duration +func (n *chainnode) Barrier() *BarrierNode { + b := newBarrierNode(n.provides) + n.linkChild(b) + return b +} + // Create a new node that samples the incoming points or batches. // // One point will be emitted every count or duration specified. @@ -475,6 +484,13 @@ func (n *chainnode) SwarmAutoscale() *SwarmAutoscaleNode { return k } +// Create a node that can trigger autoscale events for a ec2 autoscalegroup. +func (n *chainnode) Ec2Autoscale() *Ec2AutoscaleNode { + k := newEc2AutoscaleNode(n.Provides()) + n.linkChild(k) + return k +} + // Create a node that tracks duration in a given state. func (n *chainnode) StateDuration(expression *ast.LambdaNode) *StateDurationNode { sd := newStateDurationNode(n.provides, expression) diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/alert.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/alert.go index 7ff634d52..08fd71f52 100644 --- a/vendor/github.com/influxdata/kapacitor/pipeline/tick/alert.go +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/alert.go @@ -73,8 +73,7 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) { } for _, h := range a.TcpHandlers { - n.Dot("tcp"). - Dot("address", h.Address) + n.Dot("tcp", h.Address) } for _, h := range a.EmailHandlers { diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/alert_test.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/alert_test.go index bf469bde5..f7c2529c6 100644 --- a/vendor/github.com/influxdata/kapacitor/pipeline/tick/alert_test.go +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/alert_test.go @@ -1,6 +1,7 @@ package tick_test import ( + "encoding/json" "testing" "time" @@ -124,8 +125,50 @@ func TestAlertTCP(t *testing.T) { .message('{{ .ID }} is {{ .Level }}') .details('{{ json . }}') .history(21) - .tcp() - .address('echo:7') + .tcp('echo:7') +` + PipelineTickTestHelper(t, pipe, want) +} +func TestAlertTCPJSON(t *testing.T) { + pipe, _, from := StreamFrom() + j := ` + { + "typeOf": "alert", + "stateChangesOnly": false, + "useFlapping": false, + "message": "", + "details": "", + "post": null, + "tcp": [ + { + "address": "echo:7" + } + ], + "email": null, + "exec": null, + "log": null, + "victorOps": null, + "pagerDuty": null, + "pushover": null, + "sensu": null, + "slack": null, + "telegram": null, + "hipChat": null, + "alerta": null, + "opsGenie": null, + "talk": null + }` + node := from.Alert() + if err := json.Unmarshal([]byte(j), node); err != nil { + t.Errorf("unable to unmarshal alert %v", err) + } + + want := `stream + |from() + |alert() + .id('{{ .Name }}:{{ .Group }}') + .history(21) + .tcp('echo:7') ` PipelineTickTestHelper(t, pipe, want) } diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/ast.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/ast.go index f0472e22f..d5278207b 100644 --- a/vendor/github.com/influxdata/kapacitor/pipeline/tick/ast.go +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/ast.go @@ -88,6 +88,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) { return NewJoin(parents).Build(node) case *pipeline.AlertNode: return NewAlert(parents).Build(node) + case *pipeline.BarrierNode: + return NewBarrierNode(parents).Build(node) case *pipeline.CombineNode: return NewCombine(parents).Build(node) case *pipeline.DefaultNode: @@ -96,6 +98,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) { return NewDelete(parents).Build(node) case *pipeline.DerivativeNode: return NewDerivative(parents).Build(node) + case *pipeline.Ec2AutoscaleNode: + return NewEc2Autoscale(parents).Build(node) case *pipeline.EvalNode: return NewEval(parents).Build(node) case *pipeline.FlattenNode: diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/barrier.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/barrier.go new file mode 100644 index 000000000..ee7e46464 --- /dev/null +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/barrier.go @@ -0,0 +1,28 @@ +package tick + +import ( + "github.com/influxdata/kapacitor/pipeline" + "github.com/influxdata/kapacitor/tick/ast" +) + +// BarrierNode converts the window pipeline node into the TICKScript AST +type BarrierNode struct { + Function +} + +// NewBarrierNode creates a Barrier function builder +func NewBarrierNode(parents []ast.Node) *BarrierNode { + return &BarrierNode{ + Function{ + Parents: parents, + }, + } +} + +// Build creates a window ast.Node +func (n *BarrierNode) Build(b *pipeline.BarrierNode) (ast.Node, error) { + n.Pipe("barrier"). + Dot("idle", b.Idle). + Dot("period", b.Period) + return n.prev, n.err +} diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/barrier_test.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/barrier_test.go new file mode 100644 index 000000000..517b5a58f --- /dev/null +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/barrier_test.go @@ -0,0 +1,61 @@ +package tick_test + +import ( + "testing" + "time" + + "github.com/influxdata/kapacitor/pipeline" +) + +func TestBarrierNode(t *testing.T) { + type args struct { + idle time.Duration + period time.Duration + } + tests := []struct { + name string + args args + want string + }{ + { + name: "barrier with idle", + args: args{ + idle: time.Second, + }, + want: `stream + |from() + |barrier() + .idle(1s) +`, + }, + { + name: "barrier with period", + args: args{ + period: time.Second, + }, + want: `stream + |from() + |barrier() + .period(1s) +`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stream := &pipeline.StreamNode{} + pipe := pipeline.CreatePipelineSources(stream) + + b := stream.From().Barrier() + b.Idle = tt.args.idle + b.Period = tt.args.period + + got, err := PipelineTick(pipe) + if err != nil { + t.Fatalf("Unexpected error building pipeline %v", err) + } + if got != tt.want { + t.Errorf("%q. TestBarrier() =\n%v\n want\n%v\n", tt.name, got, tt.want) + } + }) + } +} diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/ec2_autoscale.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/ec2_autoscale.go new file mode 100644 index 000000000..7334f8145 --- /dev/null +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/ec2_autoscale.go @@ -0,0 +1,37 @@ +package tick + +import ( + "github.com/influxdata/kapacitor/pipeline" + "github.com/influxdata/kapacitor/tick/ast" +) + +// Ec2AutoscaleNode converts the ec2 autoscaling pipeline node into the TICKScript AST +type Ec2AutoscaleNode struct { + Function +} + +// NewEc2Autoscale creates a Ec2Autoscale function builder +func NewEc2Autoscale(parents []ast.Node) *Ec2AutoscaleNode { + return &Ec2AutoscaleNode{ + Function{ + Parents: parents, + }, + } +} + +// Build creates a Ec2Autoscale ast.Node +func (n *Ec2AutoscaleNode) Build(s *pipeline.Ec2AutoscaleNode) (ast.Node, error) { + n.Pipe("ec2Autoscale"). + Dot("cluster", s.Cluster). + Dot("groupName", s.GroupNameTag). + Dot("groupNameTag", s.GroupNameTag). + Dot("outputGroupNameTag", s.OutputGroupNameTag). + Dot("currentField", s.CurrentField). + Dot("max", s.Max). + Dot("min", s.Min). + Dot("replicas", s.Replicas). + Dot("increaseCooldown", s.IncreaseCooldown). + Dot("decreaseCooldown", s.DecreaseCooldown) + + return n.prev, n.err +} diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/ec2_autoscale_test.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/ec2_autoscale_test.go new file mode 100644 index 000000000..09e5f6d8e --- /dev/null +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/ec2_autoscale_test.go @@ -0,0 +1,115 @@ +package tick_test + +import ( + "testing" + "time" + + "github.com/influxdata/kapacitor/tick/ast" +) + +func TestEc2Autoscale(t *testing.T) { + type args struct { + cluster string + groupName string + groupNameTag string + outputGroupNameTag string + currentField string + max int64 + min int64 + replicas *ast.LambdaNode + increaseCooldown time.Duration + decreaseCooldown time.Duration + } + tests := []struct { + name string + args args + want string + }{ + { + name: "upgrade mutalisk_autoscale to guardian_autoscale", + args: args{ + cluster: "zerg", + groupName: "mutalisk_autoscale", + groupNameTag: "mutalisk_autoscale", + outputGroupNameTag: "guardian_autoscale", + currentField: "hitPoints", + max: 10, + min: 5, + replicas: &ast.LambdaNode{ + Expression: &ast.FunctionNode{ + Type: ast.GlobalFunc, + Func: "if", + Args: []ast.Node{ + &ast.BinaryNode{ + Operator: ast.TokenGreater, + Left: &ast.ReferenceNode{ + Reference: "greater spire", + }, + Right: &ast.NumberNode{ + IsInt: true, + Int64: 1, + Base: 10, + }, + }, + &ast.BinaryNode{ + Operator: ast.TokenPlus, + Left: &ast.ReferenceNode{ + Reference: "replicas", + }, + Right: &ast.NumberNode{ + IsInt: true, + Int64: 1, + Base: 10, + }, + }, + &ast.ReferenceNode{ + Reference: "replicas", + }, + }, + }, + }, + increaseCooldown: 6670 * time.Millisecond, + decreaseCooldown: 2500 * time.Millisecond, + }, + want: `stream + |from() + |ec2Autoscale() + .cluster('zerg') + .groupName('mutalisk_autoscale') + .groupNameTag('mutalisk_autoscale') + .outputGroupNameTag('guardian_autoscale') + .currentField('hitPoints') + .max(10) + .min(5) + .replicas(lambda: if("greater spire" > 1, "replicas" + 1, "replicas")) + .increaseCooldown(6670ms) + .decreaseCooldown(2500ms) +`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pipe, _, from := StreamFrom() + n := from.Ec2Autoscale() + n.Cluster = tt.args.cluster + n.GroupName = tt.args.groupName + n.GroupNameTag = tt.args.groupNameTag + n.OutputGroupNameTag = tt.args.outputGroupNameTag + n.CurrentField = tt.args.currentField + n.Max = tt.args.max + n.Min = tt.args.min + n.Replicas = tt.args.replicas + n.IncreaseCooldown = tt.args.increaseCooldown + n.DecreaseCooldown = tt.args.decreaseCooldown + + got, err := PipelineTick(pipe) + if err != nil { + t.Fatalf("Unexpected error building pipeline %v", err) + } + if got != tt.want { + t.Errorf("%q. TestEc2Autoscale() =\n%v\n want\n%v\n", tt.name, got, tt.want) + t.Log(got) + } + }) + } +} diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/swarm_autoscale.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/swarm_autoscale.go index 5ca40e40d..51df71ad1 100644 --- a/vendor/github.com/influxdata/kapacitor/pipeline/tick/swarm_autoscale.go +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/swarm_autoscale.go @@ -23,7 +23,7 @@ func NewSwarmAutoscale(parents []ast.Node) *SwarmAutoscaleNode { func (n *SwarmAutoscaleNode) Build(s *pipeline.SwarmAutoscaleNode) (ast.Node, error) { n.Pipe("swarmAutoscale"). Dot("cluster", s.Cluster). - Dot("servceName", s.ServiceNameTag). + Dot("serviceName", s.ServiceNameTag). Dot("serviceNameTag", s.ServiceNameTag). Dot("outputServiceNameTag", s.OutputServiceNameTag). Dot("currentField", s.CurrentField). diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/tick/swarm_autoscale_test.go b/vendor/github.com/influxdata/kapacitor/pipeline/tick/swarm_autoscale_test.go index a06a7d789..6444d4e0e 100644 --- a/vendor/github.com/influxdata/kapacitor/pipeline/tick/swarm_autoscale_test.go +++ b/vendor/github.com/influxdata/kapacitor/pipeline/tick/swarm_autoscale_test.go @@ -75,7 +75,7 @@ func TestSwarmAutoscale(t *testing.T) { |from() |swarmAutoscale() .cluster('zerg') - .servceName('mutalisk') + .serviceName('mutalisk') .serviceNameTag('mutalisk') .outputServiceNameTag('guardian') .currentField('hitPoints') diff --git a/vendor/github.com/influxdata/kapacitor/pipeline/window.go b/vendor/github.com/influxdata/kapacitor/pipeline/window.go index 1ecb8d2b7..92eddb1ce 100644 --- a/vendor/github.com/influxdata/kapacitor/pipeline/window.go +++ b/vendor/github.com/influxdata/kapacitor/pipeline/window.go @@ -15,9 +15,9 @@ import ( // The `every` property of `window` defines the frequency at which the window // is emitted to the next node in the pipeline. // -//The `align` property of `window` defines how to align the window edges. -//(By default, the edges are defined relative to the first data point the `window` -//node receives.) +// The `align` property of `window` defines how to align the window edges. +// (By default, the edges are defined relative to the first data point the `window` +// node receives.) // // Example: // stream @@ -26,7 +26,7 @@ import ( // .every(5m) // |httpOut('recent') // -// his example emits the last `10 minute` period every `5 minutes` to the pipeline's `httpOut` node. +// This example emits the last `10 minute` period every `5 minutes` to the pipeline's `httpOut` node. // Because `every` is less than `period`, each time the window is emitted it contains `5 minutes` of // new data and `5 minutes` of the previous period's data. // @@ -139,7 +139,7 @@ func (w *WindowNode) validate() error { return errors.New("can only align windows based off time, not count") } if w.PeriodCount != 0 && w.EveryCount <= 0 { - return fmt.Errorf("everyCount must be greater than zero") + return errors.New("everyCount must be greater than zero") } return nil } diff --git a/vendor/github.com/influxdata/kapacitor/task.go b/vendor/github.com/influxdata/kapacitor/task.go index 3439f499b..ac2dc18f3 100644 --- a/vendor/github.com/influxdata/kapacitor/task.go +++ b/vendor/github.com/influxdata/kapacitor/task.go @@ -505,12 +505,16 @@ func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, n, err = newK8sAutoscaleNode(et, t, d) case *pipeline.SwarmAutoscaleNode: n, err = newSwarmAutoscaleNode(et, t, d) + case *pipeline.Ec2AutoscaleNode: + n, err = newEc2AutoscaleNode(et, t, d) case *pipeline.StateDurationNode: n, err = newStateDurationNode(et, t, d) case *pipeline.StateCountNode: n, err = newStateCountNode(et, t, d) case *pipeline.SideloadNode: n, err = newSideloadNode(et, t, d) + case *pipeline.BarrierNode: + n, err = newBarrierNode(et, t, d) default: return nil, fmt.Errorf("unknown pipeline node type %T", p) } diff --git a/vendor/github.com/influxdata/kapacitor/task_master.go b/vendor/github.com/influxdata/kapacitor/task_master.go index f1ef4d4da..98922e30f 100644 --- a/vendor/github.com/influxdata/kapacitor/task_master.go +++ b/vendor/github.com/influxdata/kapacitor/task_master.go @@ -19,6 +19,7 @@ import ( "github.com/influxdata/kapacitor/server/vars" alertservice "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/alerta" + ec2 "github.com/influxdata/kapacitor/services/ec2/client" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/httppost" @@ -173,6 +174,9 @@ type TaskMaster struct { SwarmService interface { Client(string) (swarm.Client, error) } + EC2Service interface { + Client(string) (ec2.Client, error) + } SideloadService interface { Source(dir string) (sideload.Source, error) diff --git a/vendor/github.com/influxdata/kapacitor/window.go b/vendor/github.com/influxdata/kapacitor/window.go index 3eca87bc7..f752579d9 100644 --- a/vendor/github.com/influxdata/kapacitor/window.go +++ b/vendor/github.com/influxdata/kapacitor/window.go @@ -28,10 +28,11 @@ func newWindowNode(et *ExecutingTask, n *pipeline.WindowNode, d NodeDiagnostic) return wn, nil } -func (n *WindowNode) runWindow([]byte) error { +func (n *WindowNode) runWindow([]byte) (err error) { consumer := edge.NewGroupedConsumer(n.ins[0], n) n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar()) - return consumer.Consume() + err = consumer.Consume() + return } func (n *WindowNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) { @@ -145,9 +146,39 @@ func (w *windowByTime) BatchPoint(edge.BatchPointMessage) (edge.Message, error) func (w *windowByTime) EndBatch(edge.EndBatchMessage) (edge.Message, error) { return nil, errors.New("window does not support batch data") } -func (w *windowByTime) Barrier(b edge.BarrierMessage) (edge.Message, error) { - //TODO(nathanielc): Implement barrier messages to flush window - return b, nil +func (w *windowByTime) Barrier(b edge.BarrierMessage) (msg edge.Message, err error) { + if w.every == 0 { + // Since we are emitting every point we can use a right aligned window (oldest, now] + if !b.Time().Before(w.nextEmit) { + // purge old points + oldest := b.Time().Add(-1 * w.period) + w.buf.purge(oldest, false) + + // get current batch + msg = w.batch(b.Time()) + + // Next emit time is now + w.nextEmit = b.Time() + } + } else { + // Since more points can arrive with the same time we need to use a left aligned window [oldest, now). + if !b.Time().Before(w.nextEmit) { + // purge old points + oldest := w.nextEmit.Add(-1 * w.period) + w.buf.purge(oldest, true) + + // get current batch + msg = w.batch(w.nextEmit) + + // Determine next emit time. + // This is dependent on the current time not the last time we emitted. + w.nextEmit = b.Time().Add(w.every) + if w.align { + w.nextEmit = w.nextEmit.Truncate(w.every) + } + } + } + return } func (w *windowByTime) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) { return d, nil