From 684b4e19a2e2d9488e3c9f313f4b88c46af3f894 Mon Sep 17 00:00:00 2001 From: Kelvin Wang Date: Mon, 15 Oct 2018 20:38:36 -0400 Subject: [PATCH] add json telegraf config --- http/swagger.yml | 2 +- telegraf.go | 195 ++++++++++++++++++++--- telegraf/plugins/inputs/base.go | 9 ++ telegraf/plugins/inputs/cpu.go | 17 +- telegraf/plugins/inputs/disk.go | 17 +- telegraf/plugins/inputs/diskio.go | 23 ++- telegraf/plugins/inputs/docker.go | 10 +- telegraf/plugins/inputs/file.go | 10 +- telegraf/plugins/inputs/inputs_test.go | 31 +++- telegraf/plugins/inputs/kernel.go | 17 +- telegraf/plugins/inputs/kubernetes.go | 10 +- telegraf/plugins/inputs/logparser.go | 10 +- telegraf/plugins/inputs/mem.go | 17 +- telegraf/plugins/inputs/net.go | 17 +- telegraf/plugins/inputs/net_response.go | 17 +- telegraf/plugins/inputs/ngnix.go | 10 +- telegraf/plugins/inputs/processes.go | 17 +- telegraf/plugins/inputs/procstats.go | 10 +- telegraf/plugins/inputs/prometheus.go | 10 +- telegraf/plugins/inputs/redis.go | 10 +- telegraf/plugins/inputs/swap.go | 23 ++- telegraf/plugins/inputs/syslog.go | 10 +- telegraf/plugins/inputs/system.go | 17 +- telegraf/plugins/inputs/tail.go | 10 +- telegraf/plugins/outputs/base.go | 9 ++ telegraf/plugins/outputs/file.go | 10 +- telegraf/plugins/outputs/influxdb_v2.go | 10 +- telegraf/plugins/outputs/outputs_test.go | 15 +- telegraf/plugins/type.go | 12 ++ telegraf_test.go | 146 +++++++++++++++++ 30 files changed, 639 insertions(+), 82 deletions(-) create mode 100644 telegraf/plugins/inputs/base.go create mode 100644 telegraf/plugins/outputs/base.go create mode 100644 telegraf/plugins/type.go create mode 100644 telegraf_test.go diff --git a/http/swagger.yml b/http/swagger.yml index 14c3e74154..7d592d5b46 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -4489,7 +4489,7 @@ components: enum: [input, output, processor, aggregator] comment: type: string - configs: + config: oneOf: - $ref: '#/components/schemas/TelegrafPluginInput' - $ref: '#/components/schemas/TelegrafPluginInputDocker' diff --git a/telegraf.go b/telegraf.go index 707c53ef45..720b75f0e9 100644 --- a/telegraf.go +++ b/telegraf.go @@ -2,7 +2,13 @@ package platform import ( "context" + "encoding/json" + "fmt" "time" + + "github.com/influxdata/platform/telegraf/plugins" + "github.com/influxdata/platform/telegraf/plugins/inputs" + "github.com/influxdata/platform/telegraf/plugins/outputs" ) // TelegrafConfigStore represents a service for managing telegraf config data. @@ -17,12 +23,12 @@ type TelegrafConfigStore interface { // Additional options provide pagination & sorting. FindTelegrafConfigs(ctx context.Context, filter UserResourceMappingFilter, opt ...FindOptions) ([]*TelegrafConfig, int, error) - // CreateTelegrafConfigFilter creates a new telegraf config and sets b.ID with the new identifier. - CreateTelegrafConfigFilter(ctx context.Context, filter *UserResourceMappingFilter) error + // CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier. + CreateTelegrafConfig(ctx context.Context, tc *TelegrafConfig, userID ID, now time.Time) error // UpdateTelegrafConfig updates a single telegraf config. // Returns the new telegraf config after update. - UpdateTelegrafConfig(ctx context.Context, id ID, tc *TelegrafConfig) (*TelegrafConfig, error) + UpdateTelegrafConfig(ctx context.Context, id ID, tc *TelegrafConfig, userID ID, now time.Time) (*TelegrafConfig, error) // DeleteTelegrafConfig removes a telegraf config by ID. DeleteTelegrafConfig(ctx context.Context, id ID) error @@ -30,6 +36,18 @@ type TelegrafConfigStore interface { // TelegrafConfig stores telegraf config for one telegraf instance. type TelegrafConfig struct { + ID ID + Name string + Created time.Time + LastMod time.Time + LastModBy ID + + Agent TelegrafAgentConfig + Plugins []TelegrafPlugin +} + +// telegrafConfigEncode is the helper struct for json encoding. +type telegrafConfigEncode struct { ID ID `json:"id"` Name string `json:"name"` Created time.Time `json:"created"` @@ -38,37 +56,176 @@ type TelegrafConfig struct { Agent TelegrafAgentConfig `json:"agent"` - Plugins []TelegrafPlugin `json:"plugins"` + Plugins []telegrafPluginEncode `json:"plugins"` +} + +// telegrafPluginEncode is the helper struct for json encoding. +type telegrafPluginEncode struct { + // Name of the telegraf plugin, exp "docker" + Name string `json:"name"` + Type plugins.Type `json:"type"` + Comment string `json:"comment"` + Config TelegrafPluginConfig `json:"config"` +} + +// telegrafConfigDecode is the helper struct for json decoding. +type telegrafConfigDecode struct { + ID ID `json:"id"` + Name string `json:"name"` + Created time.Time `json:"created"` + LastMod time.Time `json:"last_modified"` + LastModBy ID `json:"last_modified_by"` + + Agent TelegrafAgentConfig `json:"agent"` + + Plugins []telegrafPluginDecode `json:"plugins"` +} + +// telegrafPluginDecode is the helper struct for json decoding. +type telegrafPluginDecode struct { + // Name of the telegraf plugin, exp "docker" + Name string `json:"name"` + Type plugins.Type `json:"type"` + Comment string `json:"comment"` + Config json.RawMessage `json:"config"` } // TelegrafPlugin is the general wrapper of the telegraf plugin config type TelegrafPlugin struct { - // Name of the telegraf plugin, exp "docker" - Name string `json:"name"` - Type TelegrafPluginType `json:"type"` - Comment string `json:"comment"` - Configs []TelegrafAgentConfig `json:"configs"` + Comment string `json:"comment"` + Config TelegrafPluginConfig `json:"config"` } // TelegrafAgentConfig is based telegraf/internal/config AgentConfig. type TelegrafAgentConfig struct { - // Interval at which to gather information - Interval time.Duration `json:"collectionInterval"` + // Interval at which to gather information in miliseconds. + Interval int64 `json:"collectionInterval"` } // TelegrafPluginConfig interface for all plugins. type TelegrafPluginConfig interface { // TOML encodes to toml string TOML() string + // Type is the plugin type + Type() plugins.Type + // PluginName is the string value of telegraf plugin package name. + PluginName() string } -// TelegrafPluginType is a string enum type: input/output/processor/aggregator. -type TelegrafPluginType string - -// telegraf plugin types +// errors const ( - TelegrafPluginTypeInput TelegrafPluginType = "input" // TelegrafPluginTypeInput is an input plugin. - TelegrafPluginTypeOutput TelegrafPluginType = "output" // TelegrafPluginTypeOutput is an output plugin. - TelegrafPluginTypeProcessor TelegrafPluginType = "processor" // TelegrafPluginTypeProcessor is a processor plugin. - TelegrafPluginTypeAggregator TelegrafPluginType = "aggregator" // TelegrafPluginTypeAggregator is an aggregator plugin. + ErrTelegrafPluginNameUnmatch = "the telegraf plugin is name %s doesn't match the config %s" + ErrNoTelegrafPlugins = "there is no telegraf plugin in the config" + ErrUnsupportTelegrafPluginType = "unsupported telegraf plugin type %s" + ErrUnsupportTelegrafPluginName = "unsupported telegraf plugin %s, type %s" ) + +// MarshalJSON implement the json.Marshaler interface. +func (tc *TelegrafConfig) MarshalJSON() ([]byte, error) { + tce := new(telegrafConfigEncode) + *tce = telegrafConfigEncode{ + ID: tc.ID, + Name: tc.Name, + Agent: tc.Agent, + Created: tc.Created, + LastMod: tc.LastMod, + LastModBy: tc.LastModBy, + Plugins: make([]telegrafPluginEncode, len(tc.Plugins)), + } + for k, p := range tc.Plugins { + tce.Plugins[k] = telegrafPluginEncode{ + Name: p.Config.PluginName(), + Type: p.Config.Type(), + Comment: p.Comment, + Config: p.Config, + } + } + return json.Marshal(tce) +} + +// UnmarshalJSON implement the json.Unmarshaler interface. +func (tc *TelegrafConfig) UnmarshalJSON(b []byte) error { + tcd := new(telegrafConfigDecode) + if err := json.Unmarshal(b, tcd); err != nil { + return err + } + *tc = TelegrafConfig{ + ID: tcd.ID, + Name: tcd.Name, + Created: tcd.Created, + LastMod: tcd.LastMod, + LastModBy: tcd.LastModBy, + Agent: tcd.Agent, + Plugins: make([]TelegrafPlugin, len(tcd.Plugins)), + } + return decodePluginRaw(tcd, tc) +} + +func decodePluginRaw(tcd *telegrafConfigDecode, tc *TelegrafConfig) (err error) { + op := "unmarshal telegraf config raw plugin" + for k, pr := range tcd.Plugins { + var config TelegrafPluginConfig + var ok bool + switch pr.Type { + case plugins.Input: + config, ok = availableInputPlugins[pr.Name] + case plugins.Output: + config, ok = availableOutputPlugins[pr.Name] + default: + return &Error{ + Code: EInvalid, + Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginType, pr.Type), + Op: op, + } + } + if ok { + if err = json.Unmarshal(pr.Config, config); err != nil { + return &Error{ + Code: EInvalid, + Err: err, + Op: op, + } + } + tc.Plugins[k] = TelegrafPlugin{ + Comment: pr.Comment, + Config: config, + } + continue + } + return &Error{ + Code: EInvalid, + Op: op, + Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginName, pr.Name, pr.Type), + } + + } + return nil +} + +var availableInputPlugins = map[string]TelegrafPluginConfig{ + "cpu": &inputs.CPUStats{}, + "disk": &inputs.DiskStats{}, + "diskio": &inputs.DiskIO{}, + "docker": &inputs.Docker{}, + "file": &inputs.File{}, + "kernel": &inputs.Kernel{}, + "kubernetes": &inputs.Kubernetes{}, + "logparser": &inputs.LogParserPlugin{}, + "mem": &inputs.MemStats{}, + "net_response": &inputs.NetResponse{}, + "net": &inputs.NetIOStats{}, + "ngnix": &inputs.Nginx{}, + "processes": &inputs.Processes{}, + "procstats": &inputs.Procstat{}, + "prometheus": &inputs.Prometheus{}, + "redis": &inputs.Redis{}, + "swap": &inputs.SwapStats{}, + "syslog": &inputs.Syslog{}, + "system": &inputs.SystemStats{}, + "tail": &inputs.Tail{}, +} + +var availableOutputPlugins = map[string]TelegrafPluginConfig{ + "file": &outputs.File{}, + "influxdb_v2": &outputs.InfluxDBV2{}, +} diff --git a/telegraf/plugins/inputs/base.go b/telegraf/plugins/inputs/base.go new file mode 100644 index 0000000000..1afbc7fc60 --- /dev/null +++ b/telegraf/plugins/inputs/base.go @@ -0,0 +1,9 @@ +package inputs + +import "github.com/influxdata/platform/telegraf/plugins" + +type baseInput int + +func (b baseInput) Type() plugins.Type { + return plugins.Input +} diff --git a/telegraf/plugins/inputs/cpu.go b/telegraf/plugins/inputs/cpu.go index 86d24e9208..db217fa35a 100644 --- a/telegraf/plugins/inputs/cpu.go +++ b/telegraf/plugins/inputs/cpu.go @@ -1,10 +1,21 @@ package inputs +import ( + "fmt" +) + // CPUStats is based on telegraf CPUStats. -type CPUStats struct{} +type CPUStats struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (c *CPUStats) PluginName() string { + return "cpu" +} // TOML encodes to toml string func (c *CPUStats) TOML() string { - return `[[inputs.cpu]] -` + return fmt.Sprintf(`[[inputs.%s]] +`, c.PluginName()) } diff --git a/telegraf/plugins/inputs/disk.go b/telegraf/plugins/inputs/disk.go index 7c1cc8bb2e..ac44582203 100644 --- a/telegraf/plugins/inputs/disk.go +++ b/telegraf/plugins/inputs/disk.go @@ -1,10 +1,21 @@ package inputs +import ( + "fmt" +) + // DiskStats is based on telegraf DiskStats. -type DiskStats struct{} +type DiskStats struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (d *DiskStats) PluginName() string { + return "disk" +} // TOML encodes to toml string func (d *DiskStats) TOML() string { - return `[[inputs.disk]] -` + return fmt.Sprintf(`[[inputs.%s]] +`, d.PluginName()) } diff --git a/telegraf/plugins/inputs/diskio.go b/telegraf/plugins/inputs/diskio.go index 09c5d39d23..3a1ea45e0a 100644 --- a/telegraf/plugins/inputs/diskio.go +++ b/telegraf/plugins/inputs/diskio.go @@ -1,10 +1,21 @@ package inputs -// DiskIO is based on telegraf DiskIO. -type DiskIO struct{} +import ( + "fmt" +) -// TOML encodes to toml string -func (d *DiskIO) TOML() string { - return `[[inputs.diskio]] -` +// DiskIO is based on telegraf DiskIO. +type DiskIO struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (d *DiskIO) PluginName() string { + return "diskio" +} + +// TOML encodes to toml string. +func (d *DiskIO) TOML() string { + return fmt.Sprintf(`[[inputs.%s]] +`, d.PluginName()) } diff --git a/telegraf/plugins/inputs/docker.go b/telegraf/plugins/inputs/docker.go index 14b6f44e35..a5c2e9949d 100644 --- a/telegraf/plugins/inputs/docker.go +++ b/telegraf/plugins/inputs/docker.go @@ -6,16 +6,22 @@ import ( // Docker is based on telegraf Docker plugin. type Docker struct { + baseInput Endpoint string `json:"endpoint"` } +// PluginName is based on telegraf plugin name. +func (d *Docker) PluginName() string { + return "docker" +} + // TOML encodes to toml string func (d *Docker) TOML() string { - return fmt.Sprintf(`[[inputs.docker]] + return fmt.Sprintf(`[[inputs.%s]] ## Docker Endpoint ## To use TCP, set endpoint = "tcp://[ip]:[port]" ## To use environment variables (ie, docker-machine), set endpoint = "ENV" ## exp: unix:///var/run/docker.sock endpoint = "%s" -`, d.Endpoint) +`, d.PluginName(), d.Endpoint) } diff --git a/telegraf/plugins/inputs/file.go b/telegraf/plugins/inputs/file.go index e98d2b8af5..1bd77d477d 100644 --- a/telegraf/plugins/inputs/file.go +++ b/telegraf/plugins/inputs/file.go @@ -8,16 +8,22 @@ import ( // File is based on telegraf input File plugin. type File struct { + baseInput Files []string `json:"files"` } +// PluginName is based on telegraf plugin name. +func (f *File) PluginName() string { + return "file" +} + // TOML encodes to toml string func (f *File) TOML() string { s := make([]string, len(f.Files)) for k, v := range f.Files { s[k] = strconv.Quote(v) } - return fmt.Sprintf(`[[inputs.file]] + return fmt.Sprintf(`[[inputs.%s]] ## Files to parse each interval. ## These accept standard unix glob matching rules, but with the addition of ## ** as a "super asterisk". ie: @@ -25,5 +31,5 @@ func (f *File) TOML() string { ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/apache.log -> only read the apache log file files = [%s] -`, strings.Join(s, ", ")) +`, f.PluginName(), strings.Join(s, ", ")) } diff --git a/telegraf/plugins/inputs/inputs_test.go b/telegraf/plugins/inputs/inputs_test.go index f557df2fbc..aa781f3b8c 100644 --- a/telegraf/plugins/inputs/inputs_test.go +++ b/telegraf/plugins/inputs/inputs_test.go @@ -1,10 +1,23 @@ package inputs -import "testing" +import ( + "testing" + + "github.com/influxdata/platform/telegraf/plugins" +) // local plugin type telegrafPluginConfig interface { TOML() string + Type() plugins.Type + PluginName() string +} + +func TestType(t *testing.T) { + b := baseInput(0) + if b.Type() != plugins.Input { + t.Fatalf("input plugins type should be input, got %s", b.Type()) + } } func TestTOML(t *testing.T) { @@ -18,6 +31,13 @@ func TestTOML(t *testing.T) { &CPUStats{}: "[[inputs.cpu]]\n", &DiskStats{}: "[[inputs.disk]]\n", &DiskIO{}: "[[inputs.diskio]]\n", + &Docker{}: `[[inputs.docker]] + ## Docker Endpoint + ## To use TCP, set endpoint = "tcp://[ip]:[port]" + ## To use environment variables (ie, docker-machine), set endpoint = "ENV" + ## exp: unix:///var/run/docker.sock + endpoint = "" +`, &File{}: `[[inputs.file]] ## Files to parse each interval. ## These accept standard unix glob matching rules, but with the addition of @@ -100,6 +120,15 @@ func TestTOML(t *testing.T) { { name: "standard testing", plugins: map[telegrafPluginConfig]string{ + &Docker{ + Endpoint: "unix:///var/run/docker.sock", + }: `[[inputs.docker]] + ## Docker Endpoint + ## To use TCP, set endpoint = "tcp://[ip]:[port]" + ## To use environment variables (ie, docker-machine), set endpoint = "ENV" + ## exp: unix:///var/run/docker.sock + endpoint = "unix:///var/run/docker.sock" +`, &File{ Files: []string{ "/var/log/**.log", diff --git a/telegraf/plugins/inputs/kernel.go b/telegraf/plugins/inputs/kernel.go index 12b06292b0..96ac2ae8ac 100644 --- a/telegraf/plugins/inputs/kernel.go +++ b/telegraf/plugins/inputs/kernel.go @@ -1,10 +1,21 @@ package inputs +import ( + "fmt" +) + // Kernel is based on telegraf Kernel. -type Kernel struct{} +type Kernel struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (k *Kernel) PluginName() string { + return "kernel" +} // TOML encodes to toml string func (k *Kernel) TOML() string { - return `[[inputs.kernel]] -` + return fmt.Sprintf(`[[inputs.%s]] +`, k.PluginName()) } diff --git a/telegraf/plugins/inputs/kubernetes.go b/telegraf/plugins/inputs/kubernetes.go index b8a9973b5e..2f13ed41af 100644 --- a/telegraf/plugins/inputs/kubernetes.go +++ b/telegraf/plugins/inputs/kubernetes.go @@ -4,14 +4,20 @@ import "fmt" // Kubernetes is based on telegraf Kubernetes plugin type Kubernetes struct { + baseInput URL string `json:"url"` } +// PluginName is based on telegraf plugin name. +func (k *Kubernetes) PluginName() string { + return "kubernetes" +} + // TOML encodes to toml string. func (k *Kubernetes) TOML() string { - return fmt.Sprintf(`[[inputs.kubernetes]] + return fmt.Sprintf(`[[inputs.%s]] ## URL for the kubelet ## exp: http://1.1.1.1:10255 url = "%s" -`, k.URL) +`, k.PluginName(), k.URL) } diff --git a/telegraf/plugins/inputs/logparser.go b/telegraf/plugins/inputs/logparser.go index 96f0d3269f..cdbb07b880 100644 --- a/telegraf/plugins/inputs/logparser.go +++ b/telegraf/plugins/inputs/logparser.go @@ -8,16 +8,22 @@ import ( // LogParserPlugin is based on telegraf LogParserPlugin. type LogParserPlugin struct { + baseInput Files []string `json:"files"` } +// PluginName is based on telegraf plugin name. +func (l *LogParserPlugin) PluginName() string { + return "logparser" +} + // TOML encodes to toml string func (l *LogParserPlugin) TOML() string { s := make([]string, len(l.Files)) for k, v := range l.Files { s[k] = strconv.Quote(v) } - return fmt.Sprintf(`[[inputs.logparser]] + return fmt.Sprintf(`[[inputs.%s]] ## Log files to parse. ## These accept standard unix glob matching rules, but with the addition of ## ** as a "super asterisk". ie: @@ -25,5 +31,5 @@ func (l *LogParserPlugin) TOML() string { ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/apache.log -> only tail the apache log file files = [%s] -`, strings.Join(s, ", ")) +`, l.PluginName(), strings.Join(s, ", ")) } diff --git a/telegraf/plugins/inputs/mem.go b/telegraf/plugins/inputs/mem.go index eda138a8a8..ad21c07e47 100644 --- a/telegraf/plugins/inputs/mem.go +++ b/telegraf/plugins/inputs/mem.go @@ -1,10 +1,21 @@ package inputs +import ( + "fmt" +) + // MemStats is based on telegraf MemStats. -type MemStats struct{} +type MemStats struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (m *MemStats) PluginName() string { + return "mem" +} // TOML encodes to toml string func (m *MemStats) TOML() string { - return `[[inputs.mem]] -` + return fmt.Sprintf(`[[inputs.%s]] +`, m.PluginName()) } diff --git a/telegraf/plugins/inputs/net.go b/telegraf/plugins/inputs/net.go index d75d5257b2..4e09bab513 100644 --- a/telegraf/plugins/inputs/net.go +++ b/telegraf/plugins/inputs/net.go @@ -1,10 +1,21 @@ package inputs +import ( + "fmt" +) + // NetIOStats is based on telegraf NetIOStats. -type NetIOStats struct{} +type NetIOStats struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (n *NetIOStats) PluginName() string { + return "net" +} // TOML encodes to toml string func (n *NetIOStats) TOML() string { - return `[[inputs.net]] -` + return fmt.Sprintf(`[[inputs.%s]] +`, n.PluginName()) } diff --git a/telegraf/plugins/inputs/net_response.go b/telegraf/plugins/inputs/net_response.go index b1f6ca5a26..ecb78d1414 100644 --- a/telegraf/plugins/inputs/net_response.go +++ b/telegraf/plugins/inputs/net_response.go @@ -1,10 +1,21 @@ package inputs +import ( + "fmt" +) + // NetResponse is based on telegraf NetResponse. -type NetResponse struct{} +type NetResponse struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (n *NetResponse) PluginName() string { + return "net_response" +} // TOML encodes to toml string func (n *NetResponse) TOML() string { - return `[[inputs.net_response]] -` + return fmt.Sprintf(`[[inputs.%s]] +`, n.PluginName()) } diff --git a/telegraf/plugins/inputs/ngnix.go b/telegraf/plugins/inputs/ngnix.go index 474afe90b8..d9cbecdbcf 100644 --- a/telegraf/plugins/inputs/ngnix.go +++ b/telegraf/plugins/inputs/ngnix.go @@ -8,18 +8,24 @@ import ( // Nginx is based on telegraf nginx plugin. type Nginx struct { + baseInput URLs []string `json:"urls,omitempty"` } +// PluginName is based on telegraf plugin name. +func (n *Nginx) PluginName() string { + return "nginx" +} + // TOML encodes to toml string func (n *Nginx) TOML() string { s := make([]string, len(n.URLs)) for k, v := range n.URLs { s[k] = strconv.Quote(v) } - return fmt.Sprintf(`[[inputs.nginx]] + return fmt.Sprintf(`[[inputs.%s]] # An array of Nginx stub_status URI to gather stats. # exp http://localhost/server_status urls = [%s] -`, strings.Join(s, ", ")) +`, n.PluginName(), strings.Join(s, ", ")) } diff --git a/telegraf/plugins/inputs/processes.go b/telegraf/plugins/inputs/processes.go index a23448eefb..e0e078b66f 100644 --- a/telegraf/plugins/inputs/processes.go +++ b/telegraf/plugins/inputs/processes.go @@ -1,10 +1,21 @@ package inputs +import ( + "fmt" +) + // Processes is based on telegraf Processes. -type Processes struct{} +type Processes struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (p *Processes) PluginName() string { + return "processes" +} // TOML encodes to toml string func (p *Processes) TOML() string { - return `[[inputs.processes]] -` + return fmt.Sprintf(`[[inputs.%s]] +`, p.PluginName()) } diff --git a/telegraf/plugins/inputs/procstats.go b/telegraf/plugins/inputs/procstats.go index 7d0c9777b0..ebdd291452 100644 --- a/telegraf/plugins/inputs/procstats.go +++ b/telegraf/plugins/inputs/procstats.go @@ -4,13 +4,19 @@ import "fmt" // Procstat is based on telegraf procstat input plugin. type Procstat struct { + baseInput Exe string `json:"exe,omitempty"` } +// PluginName is based on telegraf plugin name. +func (p *Procstat) PluginName() string { + return "procstat" +} + // TOML encodes to toml string. func (p *Procstat) TOML() string { - return fmt.Sprintf(`[[inputs.procstat]] + return fmt.Sprintf(`[[inputs.%s]] ## executable name (ie, pgrep ) exe = "%s" -`, p.Exe) +`, p.PluginName(), p.Exe) } diff --git a/telegraf/plugins/inputs/prometheus.go b/telegraf/plugins/inputs/prometheus.go index 02763f7e38..3dc75ecf12 100644 --- a/telegraf/plugins/inputs/prometheus.go +++ b/telegraf/plugins/inputs/prometheus.go @@ -8,17 +8,23 @@ import ( // Prometheus is based on telegraf Prometheus plugin. type Prometheus struct { + baseInput URLs []string `json:"urls,omitempty"` } +// PluginName is based on telegraf plugin name. +func (p *Prometheus) PluginName() string { + return "prometheus" +} + // TOML encodes to toml string func (p *Prometheus) TOML() string { s := make([]string, len(p.URLs)) for k, v := range p.URLs { s[k] = strconv.Quote(v) } - return fmt.Sprintf(`[[inputs.prometheus]] + return fmt.Sprintf(`[[inputs.%s]] ## An array of urls to scrape metrics from. urls = [%s] -`, strings.Join(s, ", ")) +`, p.PluginName(), strings.Join(s, ", ")) } diff --git a/telegraf/plugins/inputs/redis.go b/telegraf/plugins/inputs/redis.go index a2ddeb6661..2793222fa5 100644 --- a/telegraf/plugins/inputs/redis.go +++ b/telegraf/plugins/inputs/redis.go @@ -8,10 +8,16 @@ import ( // Redis is based on telegraf Redis plugin. type Redis struct { + baseInput Servers []string `json:"servers"` Password string `json:"password"` } +// PluginName is based on telegraf plugin name. +func (r *Redis) PluginName() string { + return "redis" +} + // TOML encodes to toml string func (r *Redis) TOML() string { s := make([]string, len(r.Servers)) @@ -22,7 +28,7 @@ func (r *Redis) TOML() string { if r.Password != "" { password = fmt.Sprintf(` password = "%s"`, r.Password) } - return fmt.Sprintf(`[[inputs.redis]] + return fmt.Sprintf(`[[inputs.%s]] ## specify servers via a url matching: ## [protocol://][:password]@address[:port] ## e.g. @@ -36,5 +42,5 @@ func (r *Redis) TOML() string { ## specify server password %s -`, strings.Join(s, ", "), password) +`, r.PluginName(), strings.Join(s, ", "), password) } diff --git a/telegraf/plugins/inputs/swap.go b/telegraf/plugins/inputs/swap.go index 58787e74b0..64e245e6fa 100644 --- a/telegraf/plugins/inputs/swap.go +++ b/telegraf/plugins/inputs/swap.go @@ -1,10 +1,21 @@ package inputs -// SwapStats is based on telegraf SwapStats. -type SwapStats struct{} +import ( + "fmt" +) -// TOML encodes to toml string -func (c *SwapStats) TOML() string { - return `[[inputs.swap]] -` +// SwapStats is based on telegraf SwapStats. +type SwapStats struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (s *SwapStats) PluginName() string { + return "swap" +} + +// TOML encodes to toml string. +func (s *SwapStats) TOML() string { + return fmt.Sprintf(`[[inputs.%s]] +`, s.PluginName()) } diff --git a/telegraf/plugins/inputs/syslog.go b/telegraf/plugins/inputs/syslog.go index bb98ea00b4..952bdb059d 100644 --- a/telegraf/plugins/inputs/syslog.go +++ b/telegraf/plugins/inputs/syslog.go @@ -6,16 +6,22 @@ import ( // Syslog is based on telegraf Syslog plugin. type Syslog struct { + baseInput Address string `json:"server"` } +// PluginName is based on telegraf plugin name. +func (s *Syslog) PluginName() string { + return "syslog" +} + // TOML encodes to toml string func (s *Syslog) TOML() string { - return fmt.Sprintf(`[[inputs.syslog]] + return fmt.Sprintf(`[[inputs.%s]] ## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514 ## Protocol, address and port to host the syslog receiver. ## If no host is specified, then localhost is used. ## If no port is specified, 6514 is used (RFC5425#section-4.1). server = "%s" -`, s.Address) +`, s.PluginName(), s.Address) } diff --git a/telegraf/plugins/inputs/system.go b/telegraf/plugins/inputs/system.go index c6bdb61864..8cf673911e 100644 --- a/telegraf/plugins/inputs/system.go +++ b/telegraf/plugins/inputs/system.go @@ -1,10 +1,21 @@ package inputs +import ( + "fmt" +) + // SystemStats is based on telegraf SystemStats. -type SystemStats struct{} +type SystemStats struct { + baseInput +} + +// PluginName is based on telegraf plugin name. +func (s *SystemStats) PluginName() string { + return "system" +} // TOML encodes to toml string func (s *SystemStats) TOML() string { - return `[[inputs.system]] -` + return fmt.Sprintf(`[[inputs.%s]] +`, s.PluginName()) } diff --git a/telegraf/plugins/inputs/tail.go b/telegraf/plugins/inputs/tail.go index a4178e4eb4..beb00dc945 100644 --- a/telegraf/plugins/inputs/tail.go +++ b/telegraf/plugins/inputs/tail.go @@ -8,16 +8,22 @@ import ( // Tail is based on telegraf Tail plugin. type Tail struct { + baseInput Files []string `json:"files"` } +// PluginName is based on telegraf plugin name. +func (t *Tail) PluginName() string { + return "tail" +} + // TOML encodes to toml string func (t *Tail) TOML() string { s := make([]string, len(t.Files)) for k, v := range t.Files { s[k] = strconv.Quote(v) } - return fmt.Sprintf(`[[inputs.tail]] + return fmt.Sprintf(`[[inputs.%s]] ## files to tail. ## These accept standard unix glob matching rules, but with the addition of ## ** as a "super asterisk". ie: @@ -28,5 +34,5 @@ func (t *Tail) TOML() string { ## See https://github.com/gobwas/glob for more examples ## files = [%s] -`, strings.Join(s, ", ")) +`, t.PluginName(), strings.Join(s, ", ")) } diff --git a/telegraf/plugins/outputs/base.go b/telegraf/plugins/outputs/base.go new file mode 100644 index 0000000000..c9b103fe98 --- /dev/null +++ b/telegraf/plugins/outputs/base.go @@ -0,0 +1,9 @@ +package outputs + +import "github.com/influxdata/platform/telegraf/plugins" + +type baseOutput int + +func (b baseOutput) Type() plugins.Type { + return plugins.Output +} diff --git a/telegraf/plugins/outputs/file.go b/telegraf/plugins/outputs/file.go index 97429a1f04..a0bb3e2d49 100644 --- a/telegraf/plugins/outputs/file.go +++ b/telegraf/plugins/outputs/file.go @@ -8,6 +8,7 @@ import ( // File is based on telegraf file output plugin. type File struct { + baseOutput Files []FileConfig `json:"files"` } @@ -17,6 +18,11 @@ type FileConfig struct { Path string `json:"path"` } +// PluginName is based on telegraf plugin name. +func (f *File) PluginName() string { + return "file" +} + // TOML encodes to toml string. func (f *File) TOML() string { s := make([]string, len(f.Files)) @@ -27,8 +33,8 @@ func (f *File) TOML() string { } s[k] = strconv.Quote(v.Path) } - return fmt.Sprintf(`[[outputs.file]] + return fmt.Sprintf(`[[outputs.%s]] ## Files to write to, "stdout" is a specially handled file. files = [%s] -`, strings.Join(s, ", ")) +`, f.PluginName(), strings.Join(s, ", ")) } diff --git a/telegraf/plugins/outputs/influxdb_v2.go b/telegraf/plugins/outputs/influxdb_v2.go index e50e08064e..071a26e900 100644 --- a/telegraf/plugins/outputs/influxdb_v2.go +++ b/telegraf/plugins/outputs/influxdb_v2.go @@ -8,19 +8,25 @@ import ( // InfluxDBV2 is based on telegraf influxdb_v2 output plugin. type InfluxDBV2 struct { + baseOutput URLs []string `toml:"urls"` Token string `toml:"token"` Organization string `toml:"organization"` Bucket string `toml:"bucket"` } +// PluginName is based on telegraf plugin name. +func (i *InfluxDBV2) PluginName() string { + return "influxdb_v2" +} + // TOML encodes to toml string. func (i *InfluxDBV2) TOML() string { s := make([]string, len(i.URLs)) for k, v := range i.URLs { s[k] = strconv.Quote(v) } - return fmt.Sprintf(`[[outputs.influxdb_v2]] + return fmt.Sprintf(`[[outputs.%s]] ## The URLs of the InfluxDB cluster nodes. ## ## Multiple URLs can be specified for a single cluster, only ONE of the @@ -36,5 +42,5 @@ func (i *InfluxDBV2) TOML() string { ## Destination bucket to write into. bucket = "%s" -`, strings.Join(s, ", "), i.Token, i.Organization, i.Bucket) +`, i.PluginName(), strings.Join(s, ", "), i.Token, i.Organization, i.Bucket) } diff --git a/telegraf/plugins/outputs/outputs_test.go b/telegraf/plugins/outputs/outputs_test.go index 3f316a429c..e8b67b3655 100644 --- a/telegraf/plugins/outputs/outputs_test.go +++ b/telegraf/plugins/outputs/outputs_test.go @@ -1,10 +1,23 @@ package outputs -import "testing" +import ( + "testing" + + "github.com/influxdata/platform/telegraf/plugins" +) // local plugin type telegrafPluginConfig interface { TOML() string + Type() plugins.Type + PluginName() string +} + +func TestType(t *testing.T) { + b := baseOutput(0) + if b.Type() != plugins.Output { + t.Fatalf("output plugins type should be output, got %s", b.Type()) + } } func TestTOML(t *testing.T) { diff --git a/telegraf/plugins/type.go b/telegraf/plugins/type.go new file mode 100644 index 0000000000..b29ef661d3 --- /dev/null +++ b/telegraf/plugins/type.go @@ -0,0 +1,12 @@ +package plugins + +// Type is a telegraf plugin type. +type Type string + +// available types. +const ( + Input Type = "input" // Input is an input plugin. + Output Type = "output" // Output is an output plugin. + Processor Type = "processor" // Processor is a processor plugin. + Aggregator Type = "aggregator" // Aggregator is an aggregator plugin. +) diff --git a/telegraf_test.go b/telegraf_test.go new file mode 100644 index 0000000000..bb58765952 --- /dev/null +++ b/telegraf_test.go @@ -0,0 +1,146 @@ +package platform + +import ( + "encoding/json" + "fmt" + "reflect" + "testing" + + "github.com/influxdata/platform/telegraf/plugins" + "github.com/influxdata/platform/telegraf/plugins/outputs" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/platform/telegraf/plugins/inputs" +) + +type unsupportedPluginType struct { + Field string `json:"field"` +} + +func (u *unsupportedPluginType) TOML() string { + return "" +} + +func (u *unsupportedPluginType) PluginName() string { + return "bad_type" +} + +func (u *unsupportedPluginType) Type() plugins.Type { + return plugins.Aggregator +} + +type unsupportedPlugin struct { + Field string `json:"field"` +} + +func (u *unsupportedPlugin) TOML() string { + return "" +} + +func (u *unsupportedPlugin) PluginName() string { + return "kafka" +} + +func (u *unsupportedPlugin) Type() plugins.Type { + return plugins.Output +} + +func TestTelegrafConfigJSON(t *testing.T) { + id1, _ := IDFromString("020f755c3c082000") + id2, _ := IDFromString("020f755c3c082002") + cases := []struct { + name string + cfg *TelegrafConfig + err error + json string + }{ + { + name: "regular config", + cfg: &TelegrafConfig{ + ID: *id1, + Name: "n1", + LastModBy: *id2, + Agent: TelegrafAgentConfig{ + Interval: 4000, + }, + Plugins: []TelegrafPlugin{ + { + Comment: "comment1", + Config: &inputs.File{ + Files: []string{"f1", "f2"}, + }, + }, + { + Comment: "comment2", + Config: &inputs.CPUStats{}, + }, + + { + Comment: "comment3", + Config: &outputs.File{Files: []outputs.FileConfig{ + {Typ: "stdout"}, + }}, + }, + }, + }, + }, + { + name: "unsupported plugin type", + cfg: &TelegrafConfig{ + ID: *id1, + Name: "n1", + LastModBy: *id2, + Plugins: []TelegrafPlugin{ + { + Comment: "comment3", + Config: &unsupportedPluginType{ + Field: "f1", + }, + }, + }, + }, + err: &Error{ + Code: EInvalid, + Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginType, "aggregator"), + Op: "unmarshal telegraf config raw plugin", + }, + }, + { + name: "unsupported plugin", + cfg: &TelegrafConfig{ + ID: *id1, + Name: "n1", + LastModBy: *id2, + Plugins: []TelegrafPlugin{ + { + Config: &unsupportedPlugin{ + Field: "f2", + }, + }, + }, + }, + err: &Error{ + Code: EInvalid, + Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginName, "kafka", plugins.Output), + Op: "unmarshal telegraf config raw plugin", + }, + }, + } + for _, c := range cases { + result, err := json.Marshal(c.cfg) + // encode testing + if err != nil { + t.Fatalf("%s encode failed, want cfg: %v, should be nil", c.name, err) + } + got := new(TelegrafConfig) + err = json.Unmarshal([]byte(result), got) + if diff := cmp.Diff(err, c.err); diff != "" { + t.Fatalf("%s decode failed, got err: %v, should be %v", c.name, err, c.err) + } + + // use DeepEqual to ignore unexported field panic + if c.err == nil && !reflect.DeepEqual(got, c.cfg) { + t.Fatalf("%s decode failed, want: %v, got: %v", c.name, c.cfg, got) + } + } +}