Merge pull request #1097 from influxdata/feat/json_telegraf_config

add json telegraf config
pull/10616/head
kelwang 2018-10-16 12:55:51 -04:00 committed by GitHub
commit 1f98dc2d77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 639 additions and 82 deletions

View File

@ -4489,7 +4489,7 @@ components:
enum: [input, output, processor, aggregator]
comment:
type: string
configs:
config:
oneOf:
- $ref: '#/components/schemas/TelegrafPluginInput'
- $ref: '#/components/schemas/TelegrafPluginInputDocker'

View File

@ -2,7 +2,13 @@ package platform
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/influxdata/platform/telegraf/plugins"
"github.com/influxdata/platform/telegraf/plugins/inputs"
"github.com/influxdata/platform/telegraf/plugins/outputs"
)
// TelegrafConfigStore represents a service for managing telegraf config data.
@ -17,12 +23,12 @@ type TelegrafConfigStore interface {
// Additional options provide pagination & sorting.
FindTelegrafConfigs(ctx context.Context, filter UserResourceMappingFilter, opt ...FindOptions) ([]*TelegrafConfig, int, error)
// CreateTelegrafConfigFilter creates a new telegraf config and sets b.ID with the new identifier.
CreateTelegrafConfigFilter(ctx context.Context, filter *UserResourceMappingFilter) error
// CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier.
CreateTelegrafConfig(ctx context.Context, tc *TelegrafConfig, userID ID, now time.Time) error
// UpdateTelegrafConfig updates a single telegraf config.
// Returns the new telegraf config after update.
UpdateTelegrafConfig(ctx context.Context, id ID, tc *TelegrafConfig) (*TelegrafConfig, error)
UpdateTelegrafConfig(ctx context.Context, id ID, tc *TelegrafConfig, userID ID, now time.Time) (*TelegrafConfig, error)
// DeleteTelegrafConfig removes a telegraf config by ID.
DeleteTelegrafConfig(ctx context.Context, id ID) error
@ -30,6 +36,18 @@ type TelegrafConfigStore interface {
// TelegrafConfig stores telegraf config for one telegraf instance.
type TelegrafConfig struct {
ID ID
Name string
Created time.Time
LastMod time.Time
LastModBy ID
Agent TelegrafAgentConfig
Plugins []TelegrafPlugin
}
// telegrafConfigEncode is the helper struct for json encoding.
type telegrafConfigEncode struct {
ID ID `json:"id"`
Name string `json:"name"`
Created time.Time `json:"created"`
@ -38,37 +56,176 @@ type TelegrafConfig struct {
Agent TelegrafAgentConfig `json:"agent"`
Plugins []TelegrafPlugin `json:"plugins"`
Plugins []telegrafPluginEncode `json:"plugins"`
}
// telegrafPluginEncode is the helper struct for json encoding.
type telegrafPluginEncode struct {
// Name of the telegraf plugin, exp "docker"
Name string `json:"name"`
Type plugins.Type `json:"type"`
Comment string `json:"comment"`
Config TelegrafPluginConfig `json:"config"`
}
// telegrafConfigDecode is the helper struct for json decoding.
type telegrafConfigDecode struct {
ID ID `json:"id"`
Name string `json:"name"`
Created time.Time `json:"created"`
LastMod time.Time `json:"last_modified"`
LastModBy ID `json:"last_modified_by"`
Agent TelegrafAgentConfig `json:"agent"`
Plugins []telegrafPluginDecode `json:"plugins"`
}
// telegrafPluginDecode is the helper struct for json decoding.
type telegrafPluginDecode struct {
// Name of the telegraf plugin, exp "docker"
Name string `json:"name"`
Type plugins.Type `json:"type"`
Comment string `json:"comment"`
Config json.RawMessage `json:"config"`
}
// TelegrafPlugin is the general wrapper of the telegraf plugin config
type TelegrafPlugin struct {
// Name of the telegraf plugin, exp "docker"
Name string `json:"name"`
Type TelegrafPluginType `json:"type"`
Comment string `json:"comment"`
Configs []TelegrafAgentConfig `json:"configs"`
Comment string `json:"comment"`
Config TelegrafPluginConfig `json:"config"`
}
// TelegrafAgentConfig is based telegraf/internal/config AgentConfig.
type TelegrafAgentConfig struct {
// Interval at which to gather information
Interval time.Duration `json:"collectionInterval"`
// Interval at which to gather information in miliseconds.
Interval int64 `json:"collectionInterval"`
}
// TelegrafPluginConfig interface for all plugins.
type TelegrafPluginConfig interface {
// TOML encodes to toml string
TOML() string
// Type is the plugin type
Type() plugins.Type
// PluginName is the string value of telegraf plugin package name.
PluginName() string
}
// TelegrafPluginType is a string enum type: input/output/processor/aggregator.
type TelegrafPluginType string
// telegraf plugin types
// errors
const (
TelegrafPluginTypeInput TelegrafPluginType = "input" // TelegrafPluginTypeInput is an input plugin.
TelegrafPluginTypeOutput TelegrafPluginType = "output" // TelegrafPluginTypeOutput is an output plugin.
TelegrafPluginTypeProcessor TelegrafPluginType = "processor" // TelegrafPluginTypeProcessor is a processor plugin.
TelegrafPluginTypeAggregator TelegrafPluginType = "aggregator" // TelegrafPluginTypeAggregator is an aggregator plugin.
ErrTelegrafPluginNameUnmatch = "the telegraf plugin is name %s doesn't match the config %s"
ErrNoTelegrafPlugins = "there is no telegraf plugin in the config"
ErrUnsupportTelegrafPluginType = "unsupported telegraf plugin type %s"
ErrUnsupportTelegrafPluginName = "unsupported telegraf plugin %s, type %s"
)
// MarshalJSON implement the json.Marshaler interface.
func (tc *TelegrafConfig) MarshalJSON() ([]byte, error) {
tce := new(telegrafConfigEncode)
*tce = telegrafConfigEncode{
ID: tc.ID,
Name: tc.Name,
Agent: tc.Agent,
Created: tc.Created,
LastMod: tc.LastMod,
LastModBy: tc.LastModBy,
Plugins: make([]telegrafPluginEncode, len(tc.Plugins)),
}
for k, p := range tc.Plugins {
tce.Plugins[k] = telegrafPluginEncode{
Name: p.Config.PluginName(),
Type: p.Config.Type(),
Comment: p.Comment,
Config: p.Config,
}
}
return json.Marshal(tce)
}
// UnmarshalJSON implement the json.Unmarshaler interface.
func (tc *TelegrafConfig) UnmarshalJSON(b []byte) error {
tcd := new(telegrafConfigDecode)
if err := json.Unmarshal(b, tcd); err != nil {
return err
}
*tc = TelegrafConfig{
ID: tcd.ID,
Name: tcd.Name,
Created: tcd.Created,
LastMod: tcd.LastMod,
LastModBy: tcd.LastModBy,
Agent: tcd.Agent,
Plugins: make([]TelegrafPlugin, len(tcd.Plugins)),
}
return decodePluginRaw(tcd, tc)
}
func decodePluginRaw(tcd *telegrafConfigDecode, tc *TelegrafConfig) (err error) {
op := "unmarshal telegraf config raw plugin"
for k, pr := range tcd.Plugins {
var config TelegrafPluginConfig
var ok bool
switch pr.Type {
case plugins.Input:
config, ok = availableInputPlugins[pr.Name]
case plugins.Output:
config, ok = availableOutputPlugins[pr.Name]
default:
return &Error{
Code: EInvalid,
Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginType, pr.Type),
Op: op,
}
}
if ok {
if err = json.Unmarshal(pr.Config, config); err != nil {
return &Error{
Code: EInvalid,
Err: err,
Op: op,
}
}
tc.Plugins[k] = TelegrafPlugin{
Comment: pr.Comment,
Config: config,
}
continue
}
return &Error{
Code: EInvalid,
Op: op,
Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginName, pr.Name, pr.Type),
}
}
return nil
}
var availableInputPlugins = map[string]TelegrafPluginConfig{
"cpu": &inputs.CPUStats{},
"disk": &inputs.DiskStats{},
"diskio": &inputs.DiskIO{},
"docker": &inputs.Docker{},
"file": &inputs.File{},
"kernel": &inputs.Kernel{},
"kubernetes": &inputs.Kubernetes{},
"logparser": &inputs.LogParserPlugin{},
"mem": &inputs.MemStats{},
"net_response": &inputs.NetResponse{},
"net": &inputs.NetIOStats{},
"ngnix": &inputs.Nginx{},
"processes": &inputs.Processes{},
"procstats": &inputs.Procstat{},
"prometheus": &inputs.Prometheus{},
"redis": &inputs.Redis{},
"swap": &inputs.SwapStats{},
"syslog": &inputs.Syslog{},
"system": &inputs.SystemStats{},
"tail": &inputs.Tail{},
}
var availableOutputPlugins = map[string]TelegrafPluginConfig{
"file": &outputs.File{},
"influxdb_v2": &outputs.InfluxDBV2{},
}

View File

@ -0,0 +1,9 @@
package inputs
import "github.com/influxdata/platform/telegraf/plugins"
type baseInput int
func (b baseInput) Type() plugins.Type {
return plugins.Input
}

View File

@ -1,10 +1,21 @@
package inputs
import (
"fmt"
)
// CPUStats is based on telegraf CPUStats.
type CPUStats struct{}
type CPUStats struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (c *CPUStats) PluginName() string {
return "cpu"
}
// TOML encodes to toml string
func (c *CPUStats) TOML() string {
return `[[inputs.cpu]]
`
return fmt.Sprintf(`[[inputs.%s]]
`, c.PluginName())
}

View File

@ -1,10 +1,21 @@
package inputs
import (
"fmt"
)
// DiskStats is based on telegraf DiskStats.
type DiskStats struct{}
type DiskStats struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (d *DiskStats) PluginName() string {
return "disk"
}
// TOML encodes to toml string
func (d *DiskStats) TOML() string {
return `[[inputs.disk]]
`
return fmt.Sprintf(`[[inputs.%s]]
`, d.PluginName())
}

View File

@ -1,10 +1,21 @@
package inputs
// DiskIO is based on telegraf DiskIO.
type DiskIO struct{}
import (
"fmt"
)
// TOML encodes to toml string
func (d *DiskIO) TOML() string {
return `[[inputs.diskio]]
`
// DiskIO is based on telegraf DiskIO.
type DiskIO struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (d *DiskIO) PluginName() string {
return "diskio"
}
// TOML encodes to toml string.
func (d *DiskIO) TOML() string {
return fmt.Sprintf(`[[inputs.%s]]
`, d.PluginName())
}

View File

@ -6,16 +6,22 @@ import (
// Docker is based on telegraf Docker plugin.
type Docker struct {
baseInput
Endpoint string `json:"endpoint"`
}
// PluginName is based on telegraf plugin name.
func (d *Docker) PluginName() string {
return "docker"
}
// TOML encodes to toml string
func (d *Docker) TOML() string {
return fmt.Sprintf(`[[inputs.docker]]
return fmt.Sprintf(`[[inputs.%s]]
## Docker Endpoint
## To use TCP, set endpoint = "tcp://[ip]:[port]"
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
## exp: unix:///var/run/docker.sock
endpoint = "%s"
`, d.Endpoint)
`, d.PluginName(), d.Endpoint)
}

View File

@ -8,16 +8,22 @@ import (
// File is based on telegraf input File plugin.
type File struct {
baseInput
Files []string `json:"files"`
}
// PluginName is based on telegraf plugin name.
func (f *File) PluginName() string {
return "file"
}
// TOML encodes to toml string
func (f *File) TOML() string {
s := make([]string, len(f.Files))
for k, v := range f.Files {
s[k] = strconv.Quote(v)
}
return fmt.Sprintf(`[[inputs.file]]
return fmt.Sprintf(`[[inputs.%s]]
## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
@ -25,5 +31,5 @@ func (f *File) TOML() string {
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only read the apache log file
files = [%s]
`, strings.Join(s, ", "))
`, f.PluginName(), strings.Join(s, ", "))
}

View File

@ -1,10 +1,23 @@
package inputs
import "testing"
import (
"testing"
"github.com/influxdata/platform/telegraf/plugins"
)
// local plugin
type telegrafPluginConfig interface {
TOML() string
Type() plugins.Type
PluginName() string
}
func TestType(t *testing.T) {
b := baseInput(0)
if b.Type() != plugins.Input {
t.Fatalf("input plugins type should be input, got %s", b.Type())
}
}
func TestTOML(t *testing.T) {
@ -18,6 +31,13 @@ func TestTOML(t *testing.T) {
&CPUStats{}: "[[inputs.cpu]]\n",
&DiskStats{}: "[[inputs.disk]]\n",
&DiskIO{}: "[[inputs.diskio]]\n",
&Docker{}: `[[inputs.docker]]
## Docker Endpoint
## To use TCP, set endpoint = "tcp://[ip]:[port]"
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
## exp: unix:///var/run/docker.sock
endpoint = ""
`,
&File{}: `[[inputs.file]]
## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
@ -100,6 +120,15 @@ func TestTOML(t *testing.T) {
{
name: "standard testing",
plugins: map[telegrafPluginConfig]string{
&Docker{
Endpoint: "unix:///var/run/docker.sock",
}: `[[inputs.docker]]
## Docker Endpoint
## To use TCP, set endpoint = "tcp://[ip]:[port]"
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
## exp: unix:///var/run/docker.sock
endpoint = "unix:///var/run/docker.sock"
`,
&File{
Files: []string{
"/var/log/**.log",

View File

@ -1,10 +1,21 @@
package inputs
import (
"fmt"
)
// Kernel is based on telegraf Kernel.
type Kernel struct{}
type Kernel struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (k *Kernel) PluginName() string {
return "kernel"
}
// TOML encodes to toml string
func (k *Kernel) TOML() string {
return `[[inputs.kernel]]
`
return fmt.Sprintf(`[[inputs.%s]]
`, k.PluginName())
}

View File

@ -4,14 +4,20 @@ import "fmt"
// Kubernetes is based on telegraf Kubernetes plugin
type Kubernetes struct {
baseInput
URL string `json:"url"`
}
// PluginName is based on telegraf plugin name.
func (k *Kubernetes) PluginName() string {
return "kubernetes"
}
// TOML encodes to toml string.
func (k *Kubernetes) TOML() string {
return fmt.Sprintf(`[[inputs.kubernetes]]
return fmt.Sprintf(`[[inputs.%s]]
## URL for the kubelet
## exp: http://1.1.1.1:10255
url = "%s"
`, k.URL)
`, k.PluginName(), k.URL)
}

View File

@ -8,16 +8,22 @@ import (
// LogParserPlugin is based on telegraf LogParserPlugin.
type LogParserPlugin struct {
baseInput
Files []string `json:"files"`
}
// PluginName is based on telegraf plugin name.
func (l *LogParserPlugin) PluginName() string {
return "logparser"
}
// TOML encodes to toml string
func (l *LogParserPlugin) TOML() string {
s := make([]string, len(l.Files))
for k, v := range l.Files {
s[k] = strconv.Quote(v)
}
return fmt.Sprintf(`[[inputs.logparser]]
return fmt.Sprintf(`[[inputs.%s]]
## Log files to parse.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
@ -25,5 +31,5 @@ func (l *LogParserPlugin) TOML() string {
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = [%s]
`, strings.Join(s, ", "))
`, l.PluginName(), strings.Join(s, ", "))
}

View File

@ -1,10 +1,21 @@
package inputs
import (
"fmt"
)
// MemStats is based on telegraf MemStats.
type MemStats struct{}
type MemStats struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (m *MemStats) PluginName() string {
return "mem"
}
// TOML encodes to toml string
func (m *MemStats) TOML() string {
return `[[inputs.mem]]
`
return fmt.Sprintf(`[[inputs.%s]]
`, m.PluginName())
}

View File

@ -1,10 +1,21 @@
package inputs
import (
"fmt"
)
// NetIOStats is based on telegraf NetIOStats.
type NetIOStats struct{}
type NetIOStats struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (n *NetIOStats) PluginName() string {
return "net"
}
// TOML encodes to toml string
func (n *NetIOStats) TOML() string {
return `[[inputs.net]]
`
return fmt.Sprintf(`[[inputs.%s]]
`, n.PluginName())
}

View File

@ -1,10 +1,21 @@
package inputs
import (
"fmt"
)
// NetResponse is based on telegraf NetResponse.
type NetResponse struct{}
type NetResponse struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (n *NetResponse) PluginName() string {
return "net_response"
}
// TOML encodes to toml string
func (n *NetResponse) TOML() string {
return `[[inputs.net_response]]
`
return fmt.Sprintf(`[[inputs.%s]]
`, n.PluginName())
}

View File

@ -8,18 +8,24 @@ import (
// Nginx is based on telegraf nginx plugin.
type Nginx struct {
baseInput
URLs []string `json:"urls,omitempty"`
}
// PluginName is based on telegraf plugin name.
func (n *Nginx) PluginName() string {
return "nginx"
}
// TOML encodes to toml string
func (n *Nginx) TOML() string {
s := make([]string, len(n.URLs))
for k, v := range n.URLs {
s[k] = strconv.Quote(v)
}
return fmt.Sprintf(`[[inputs.nginx]]
return fmt.Sprintf(`[[inputs.%s]]
# An array of Nginx stub_status URI to gather stats.
# exp http://localhost/server_status
urls = [%s]
`, strings.Join(s, ", "))
`, n.PluginName(), strings.Join(s, ", "))
}

View File

@ -1,10 +1,21 @@
package inputs
import (
"fmt"
)
// Processes is based on telegraf Processes.
type Processes struct{}
type Processes struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (p *Processes) PluginName() string {
return "processes"
}
// TOML encodes to toml string
func (p *Processes) TOML() string {
return `[[inputs.processes]]
`
return fmt.Sprintf(`[[inputs.%s]]
`, p.PluginName())
}

View File

@ -4,13 +4,19 @@ import "fmt"
// Procstat is based on telegraf procstat input plugin.
type Procstat struct {
baseInput
Exe string `json:"exe,omitempty"`
}
// PluginName is based on telegraf plugin name.
func (p *Procstat) PluginName() string {
return "procstat"
}
// TOML encodes to toml string.
func (p *Procstat) TOML() string {
return fmt.Sprintf(`[[inputs.procstat]]
return fmt.Sprintf(`[[inputs.%s]]
## executable name (ie, pgrep <exe>)
exe = "%s"
`, p.Exe)
`, p.PluginName(), p.Exe)
}

View File

@ -8,17 +8,23 @@ import (
// Prometheus is based on telegraf Prometheus plugin.
type Prometheus struct {
baseInput
URLs []string `json:"urls,omitempty"`
}
// PluginName is based on telegraf plugin name.
func (p *Prometheus) PluginName() string {
return "prometheus"
}
// TOML encodes to toml string
func (p *Prometheus) TOML() string {
s := make([]string, len(p.URLs))
for k, v := range p.URLs {
s[k] = strconv.Quote(v)
}
return fmt.Sprintf(`[[inputs.prometheus]]
return fmt.Sprintf(`[[inputs.%s]]
## An array of urls to scrape metrics from.
urls = [%s]
`, strings.Join(s, ", "))
`, p.PluginName(), strings.Join(s, ", "))
}

View File

@ -8,10 +8,16 @@ import (
// Redis is based on telegraf Redis plugin.
type Redis struct {
baseInput
Servers []string `json:"servers"`
Password string `json:"password"`
}
// PluginName is based on telegraf plugin name.
func (r *Redis) PluginName() string {
return "redis"
}
// TOML encodes to toml string
func (r *Redis) TOML() string {
s := make([]string, len(r.Servers))
@ -22,7 +28,7 @@ func (r *Redis) TOML() string {
if r.Password != "" {
password = fmt.Sprintf(` password = "%s"`, r.Password)
}
return fmt.Sprintf(`[[inputs.redis]]
return fmt.Sprintf(`[[inputs.%s]]
## specify servers via a url matching:
## [protocol://][:password]@address[:port]
## e.g.
@ -36,5 +42,5 @@ func (r *Redis) TOML() string {
## specify server password
%s
`, strings.Join(s, ", "), password)
`, r.PluginName(), strings.Join(s, ", "), password)
}

View File

@ -1,10 +1,21 @@
package inputs
// SwapStats is based on telegraf SwapStats.
type SwapStats struct{}
import (
"fmt"
)
// TOML encodes to toml string
func (c *SwapStats) TOML() string {
return `[[inputs.swap]]
`
// SwapStats is based on telegraf SwapStats.
type SwapStats struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (s *SwapStats) PluginName() string {
return "swap"
}
// TOML encodes to toml string.
func (s *SwapStats) TOML() string {
return fmt.Sprintf(`[[inputs.%s]]
`, s.PluginName())
}

View File

@ -6,16 +6,22 @@ import (
// Syslog is based on telegraf Syslog plugin.
type Syslog struct {
baseInput
Address string `json:"server"`
}
// PluginName is based on telegraf plugin name.
func (s *Syslog) PluginName() string {
return "syslog"
}
// TOML encodes to toml string
func (s *Syslog) TOML() string {
return fmt.Sprintf(`[[inputs.syslog]]
return fmt.Sprintf(`[[inputs.%s]]
## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514
## Protocol, address and port to host the syslog receiver.
## If no host is specified, then localhost is used.
## If no port is specified, 6514 is used (RFC5425#section-4.1).
server = "%s"
`, s.Address)
`, s.PluginName(), s.Address)
}

View File

@ -1,10 +1,21 @@
package inputs
import (
"fmt"
)
// SystemStats is based on telegraf SystemStats.
type SystemStats struct{}
type SystemStats struct {
baseInput
}
// PluginName is based on telegraf plugin name.
func (s *SystemStats) PluginName() string {
return "system"
}
// TOML encodes to toml string
func (s *SystemStats) TOML() string {
return `[[inputs.system]]
`
return fmt.Sprintf(`[[inputs.%s]]
`, s.PluginName())
}

View File

@ -8,16 +8,22 @@ import (
// Tail is based on telegraf Tail plugin.
type Tail struct {
baseInput
Files []string `json:"files"`
}
// PluginName is based on telegraf plugin name.
func (t *Tail) PluginName() string {
return "tail"
}
// TOML encodes to toml string
func (t *Tail) TOML() string {
s := make([]string, len(t.Files))
for k, v := range t.Files {
s[k] = strconv.Quote(v)
}
return fmt.Sprintf(`[[inputs.tail]]
return fmt.Sprintf(`[[inputs.%s]]
## files to tail.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
@ -28,5 +34,5 @@ func (t *Tail) TOML() string {
## See https://github.com/gobwas/glob for more examples
##
files = [%s]
`, strings.Join(s, ", "))
`, t.PluginName(), strings.Join(s, ", "))
}

View File

@ -0,0 +1,9 @@
package outputs
import "github.com/influxdata/platform/telegraf/plugins"
type baseOutput int
func (b baseOutput) Type() plugins.Type {
return plugins.Output
}

View File

@ -8,6 +8,7 @@ import (
// File is based on telegraf file output plugin.
type File struct {
baseOutput
Files []FileConfig `json:"files"`
}
@ -17,6 +18,11 @@ type FileConfig struct {
Path string `json:"path"`
}
// PluginName is based on telegraf plugin name.
func (f *File) PluginName() string {
return "file"
}
// TOML encodes to toml string.
func (f *File) TOML() string {
s := make([]string, len(f.Files))
@ -27,8 +33,8 @@ func (f *File) TOML() string {
}
s[k] = strconv.Quote(v.Path)
}
return fmt.Sprintf(`[[outputs.file]]
return fmt.Sprintf(`[[outputs.%s]]
## Files to write to, "stdout" is a specially handled file.
files = [%s]
`, strings.Join(s, ", "))
`, f.PluginName(), strings.Join(s, ", "))
}

View File

@ -8,19 +8,25 @@ import (
// InfluxDBV2 is based on telegraf influxdb_v2 output plugin.
type InfluxDBV2 struct {
baseOutput
URLs []string `toml:"urls"`
Token string `toml:"token"`
Organization string `toml:"organization"`
Bucket string `toml:"bucket"`
}
// PluginName is based on telegraf plugin name.
func (i *InfluxDBV2) PluginName() string {
return "influxdb_v2"
}
// TOML encodes to toml string.
func (i *InfluxDBV2) TOML() string {
s := make([]string, len(i.URLs))
for k, v := range i.URLs {
s[k] = strconv.Quote(v)
}
return fmt.Sprintf(`[[outputs.influxdb_v2]]
return fmt.Sprintf(`[[outputs.%s]]
## The URLs of the InfluxDB cluster nodes.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
@ -36,5 +42,5 @@ func (i *InfluxDBV2) TOML() string {
## Destination bucket to write into.
bucket = "%s"
`, strings.Join(s, ", "), i.Token, i.Organization, i.Bucket)
`, i.PluginName(), strings.Join(s, ", "), i.Token, i.Organization, i.Bucket)
}

View File

@ -1,10 +1,23 @@
package outputs
import "testing"
import (
"testing"
"github.com/influxdata/platform/telegraf/plugins"
)
// local plugin
type telegrafPluginConfig interface {
TOML() string
Type() plugins.Type
PluginName() string
}
func TestType(t *testing.T) {
b := baseOutput(0)
if b.Type() != plugins.Output {
t.Fatalf("output plugins type should be output, got %s", b.Type())
}
}
func TestTOML(t *testing.T) {

12
telegraf/plugins/type.go Normal file
View File

@ -0,0 +1,12 @@
package plugins
// Type is a telegraf plugin type.
type Type string
// available types.
const (
Input Type = "input" // Input is an input plugin.
Output Type = "output" // Output is an output plugin.
Processor Type = "processor" // Processor is a processor plugin.
Aggregator Type = "aggregator" // Aggregator is an aggregator plugin.
)

146
telegraf_test.go Normal file
View File

@ -0,0 +1,146 @@
package platform
import (
"encoding/json"
"fmt"
"reflect"
"testing"
"github.com/influxdata/platform/telegraf/plugins"
"github.com/influxdata/platform/telegraf/plugins/outputs"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/telegraf/plugins/inputs"
)
type unsupportedPluginType struct {
Field string `json:"field"`
}
func (u *unsupportedPluginType) TOML() string {
return ""
}
func (u *unsupportedPluginType) PluginName() string {
return "bad_type"
}
func (u *unsupportedPluginType) Type() plugins.Type {
return plugins.Aggregator
}
type unsupportedPlugin struct {
Field string `json:"field"`
}
func (u *unsupportedPlugin) TOML() string {
return ""
}
func (u *unsupportedPlugin) PluginName() string {
return "kafka"
}
func (u *unsupportedPlugin) Type() plugins.Type {
return plugins.Output
}
func TestTelegrafConfigJSON(t *testing.T) {
id1, _ := IDFromString("020f755c3c082000")
id2, _ := IDFromString("020f755c3c082002")
cases := []struct {
name string
cfg *TelegrafConfig
err error
json string
}{
{
name: "regular config",
cfg: &TelegrafConfig{
ID: *id1,
Name: "n1",
LastModBy: *id2,
Agent: TelegrafAgentConfig{
Interval: 4000,
},
Plugins: []TelegrafPlugin{
{
Comment: "comment1",
Config: &inputs.File{
Files: []string{"f1", "f2"},
},
},
{
Comment: "comment2",
Config: &inputs.CPUStats{},
},
{
Comment: "comment3",
Config: &outputs.File{Files: []outputs.FileConfig{
{Typ: "stdout"},
}},
},
},
},
},
{
name: "unsupported plugin type",
cfg: &TelegrafConfig{
ID: *id1,
Name: "n1",
LastModBy: *id2,
Plugins: []TelegrafPlugin{
{
Comment: "comment3",
Config: &unsupportedPluginType{
Field: "f1",
},
},
},
},
err: &Error{
Code: EInvalid,
Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginType, "aggregator"),
Op: "unmarshal telegraf config raw plugin",
},
},
{
name: "unsupported plugin",
cfg: &TelegrafConfig{
ID: *id1,
Name: "n1",
LastModBy: *id2,
Plugins: []TelegrafPlugin{
{
Config: &unsupportedPlugin{
Field: "f2",
},
},
},
},
err: &Error{
Code: EInvalid,
Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginName, "kafka", plugins.Output),
Op: "unmarshal telegraf config raw plugin",
},
},
}
for _, c := range cases {
result, err := json.Marshal(c.cfg)
// encode testing
if err != nil {
t.Fatalf("%s encode failed, want cfg: %v, should be nil", c.name, err)
}
got := new(TelegrafConfig)
err = json.Unmarshal([]byte(result), got)
if diff := cmp.Diff(err, c.err); diff != "" {
t.Fatalf("%s decode failed, got err: %v, should be %v", c.name, err, c.err)
}
// use DeepEqual to ignore unexported field panic
if c.err == nil && !reflect.DeepEqual(got, c.cfg) {
t.Fatalf("%s decode failed, want: %v, got: %v", c.name, c.cfg, got)
}
}
}