Update kapacitor pipeline with comments

pull/2388/head
Chris Goller 2017-11-17 16:42:54 -06:00
parent 55aacfc26c
commit 9e4ccf5c87
2 changed files with 16 additions and 13 deletions

View File

@ -3,29 +3,22 @@ package kapacitor
import (
"bytes"
"encoding/json"
"strings"
"github.com/influxdata/chronograf"
"github.com/influxdata/kapacitor/pipeline"
totick "github.com/influxdata/kapacitor/pipeline/tick"
"github.com/influxdata/kapacitor/tick"
"github.com/influxdata/kapacitor/tick/stateful"
)
// MarshalTICK converts tickscript to JSON representation
func MarshalTICK(script string) ([]byte, error) {
edge := pipeline.StreamEdge
if strings.Contains(script, "batch") {
edge = pipeline.BatchEdge
}
scope := stateful.NewScope()
predefinedVars := map[string]tick.Var{}
pipeline, err := pipeline.CreatePipeline(script, edge, scope, &deadman{}, predefinedVars)
pipeline, err := newPipeline(chronograf.TICKScript(script))
if err != nil {
return nil, err
}
return json.MarshalIndent(pipeline, "", " ")
}
// UnmarshalTICK converts JSON to tickscript
func UnmarshalTICK(octets []byte) (string, error) {
pipe := &pipeline.Pipeline{}
if err := pipe.Unmarshal(octets); err != nil {

View File

@ -3,6 +3,7 @@ package kapacitor
import (
"bytes"
"fmt"
"strings"
"time"
"github.com/influxdata/chronograf"
@ -33,10 +34,19 @@ func formatTick(tickscript string) (chronograf.TICKScript, error) {
}
func validateTick(script chronograf.TICKScript) error {
_, err := newPipeline(script)
return err
}
func newPipeline(script chronograf.TICKScript) (*pipeline.Pipeline, error) {
edge := pipeline.StreamEdge
if strings.Contains(string(script), "batch") {
edge = pipeline.BatchEdge
}
scope := stateful.NewScope()
predefinedVars := map[string]tick.Var{}
_, err := pipeline.CreatePipeline(string(script), pipeline.StreamEdge, scope, &deadman{}, predefinedVars)
return err
return pipeline.CreatePipeline(string(script), edge, scope, &deadman{}, predefinedVars)
}
// deadman is an empty implementation of a kapacitor DeadmanService to allow CreatePipeline