fix(platform): fix the pointer alloc for telegraf config

pull/10616/head
Kelvin Wang 2018-12-18 21:07:29 -05:00
parent 5797324cfc
commit 726cf3cdd6
5 changed files with 257 additions and 60 deletions

View File

@ -190,10 +190,11 @@ func (c *Client) UpdateTelegrafConfig(ctx context.Context, id platform.ID, tc *p
tc.LastModBy = userID
pErr = c.putTelegrafConfig(ctx, tx, tc)
if pErr != nil {
pErr.Op = op
err = pErr
return &platform.Error{
Err: pErr,
}
}
return err
return nil
})
return tc, err
}

View File

@ -131,10 +131,10 @@ type telegrafConfigEncode struct {
// telegrafPluginEncode is the helper struct for json encoding.
type telegrafPluginEncode struct {
// Name of the telegraf plugin, exp "docker"
Name string `json:"name"`
Type plugins.Type `json:"type"`
Comment string `json:"comment"`
Config TelegrafPluginConfig `json:"config"`
Name string `json:"name"`
Type plugins.Type `json:"type"`
Comment string `json:"comment"`
Config plugins.Config `json:"config"`
}
// telegrafConfigDecode is the helper struct for json decoding.
@ -161,8 +161,8 @@ type telegrafPluginDecode struct {
// TelegrafPlugin is the general wrapper of the telegraf plugin config
type TelegrafPlugin struct {
Comment string `json:"comment"`
Config TelegrafPluginConfig `json:"config"`
Comment string `json:"comment"`
Config plugins.Config `json:"config"`
}
// TelegrafAgentConfig is based telegraf/internal/config AgentConfig.
@ -171,18 +171,6 @@ type TelegrafAgentConfig struct {
Interval int64 `json:"collectionInterval"`
}
// TelegrafPluginConfig interface for all plugins.
type TelegrafPluginConfig interface {
// TOML encodes to toml string
TOML() string
// UnmarshalTOML decodes the parsed data to the object
UnmarshalTOML(data interface{}) error
// Type is the plugin type
Type() plugins.Type
// PluginName is the string value of telegraf plugin package name.
PluginName() string
}
// errors
const (
ErrTelegrafPluginNameUnmatch = "the telegraf plugin is name %s doesn't match the config %s"
@ -268,22 +256,25 @@ func (tc *TelegrafConfig) UnmarshalTOML(data interface{}) error {
func (tc *TelegrafConfig) parseTOMLPluginConfig(typ, name string, configData interface{}) error {
var ok bool
var p TelegrafPluginConfig
var tpFn func() plugins.Config
switch typ {
case "inputs":
p, ok = availableInputPlugins[name]
tpFn, ok = availableInputPlugins[name]
case "outputs":
p, ok = availableOutputPlugins[name]
tpFn, ok = availableOutputPlugins[name]
default:
return &Error{
Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginType, typ),
}
}
if !ok {
return &Error{
Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginName, name, typ),
}
}
p := tpFn()
if err := p.UnmarshalTOML(configData); err != nil {
return err
}
@ -314,13 +305,14 @@ func (tc *TelegrafConfig) UnmarshalJSON(b []byte) error {
func decodePluginRaw(tcd *telegrafConfigDecode, tc *TelegrafConfig) (err error) {
op := "unmarshal telegraf config raw plugin"
for k, pr := range tcd.Plugins {
var config TelegrafPluginConfig
var tpFn func() plugins.Config
var config plugins.Config
var ok bool
switch pr.Type {
case plugins.Input:
config, ok = availableInputPlugins[pr.Name]
tpFn, ok = availableInputPlugins[pr.Name]
case plugins.Output:
config, ok = availableOutputPlugins[pr.Name]
tpFn, ok = availableOutputPlugins[pr.Name]
default:
return &Error{
Code: EInvalid,
@ -329,6 +321,7 @@ func decodePluginRaw(tcd *telegrafConfigDecode, tc *TelegrafConfig) (err error)
}
}
if ok {
config = tpFn()
if err = json.Unmarshal(pr.Config, config); err != nil {
return &Error{
Code: EInvalid,
@ -352,30 +345,30 @@ func decodePluginRaw(tcd *telegrafConfigDecode, tc *TelegrafConfig) (err error)
return nil
}
var availableInputPlugins = map[string]TelegrafPluginConfig{
"cpu": &inputs.CPUStats{},
"disk": &inputs.DiskStats{},
"diskio": &inputs.DiskIO{},
"docker": &inputs.Docker{},
"file": &inputs.File{},
"kernel": &inputs.Kernel{},
"kubernetes": &inputs.Kubernetes{},
"logparser": &inputs.LogParserPlugin{},
"mem": &inputs.MemStats{},
"net_response": &inputs.NetResponse{},
"net": &inputs.NetIOStats{},
"ngnix": &inputs.Nginx{},
"processes": &inputs.Processes{},
"procstats": &inputs.Procstat{},
"prometheus": &inputs.Prometheus{},
"redis": &inputs.Redis{},
"swap": &inputs.SwapStats{},
"syslog": &inputs.Syslog{},
"system": &inputs.SystemStats{},
"tail": &inputs.Tail{},
var availableInputPlugins = map[string](func() plugins.Config){
"cpu": func() plugins.Config { return &inputs.CPUStats{} },
"disk": func() plugins.Config { return &inputs.DiskStats{} },
"diskio": func() plugins.Config { return &inputs.DiskIO{} },
"docker": func() plugins.Config { return &inputs.Docker{} },
"file": func() plugins.Config { return &inputs.File{} },
"kernel": func() plugins.Config { return &inputs.Kernel{} },
"kubernetes": func() plugins.Config { return &inputs.Kubernetes{} },
"logparser": func() plugins.Config { return &inputs.LogParserPlugin{} },
"mem": func() plugins.Config { return &inputs.MemStats{} },
"net_response": func() plugins.Config { return &inputs.NetResponse{} },
"net": func() plugins.Config { return &inputs.NetIOStats{} },
"ngnix": func() plugins.Config { return &inputs.Nginx{} },
"processes": func() plugins.Config { return &inputs.Processes{} },
"procstats": func() plugins.Config { return &inputs.Procstat{} },
"prometheus": func() plugins.Config { return &inputs.Prometheus{} },
"redis": func() plugins.Config { return &inputs.Redis{} },
"swap": func() plugins.Config { return &inputs.SwapStats{} },
"syslog": func() plugins.Config { return &inputs.Syslog{} },
"system": func() plugins.Config { return &inputs.SystemStats{} },
"tail": func() plugins.Config { return &inputs.Tail{} },
}
var availableOutputPlugins = map[string]TelegrafPluginConfig{
"file": &outputs.File{},
"influxdb_v2": &outputs.InfluxDBV2{},
var availableOutputPlugins = map[string](func() plugins.Config){
"file": func() plugins.Config { return &outputs.File{} },
"influxdb_v2": func() plugins.Config { return &outputs.InfluxDBV2{} },
}

View File

@ -10,3 +10,15 @@ const (
Processor Type = "processor" // Processor is a processor plugin.
Aggregator Type = "aggregator" // Aggregator is an aggregator plugin.
)
// Config interface for all plugins.
type Config interface {
// TOML encodes to toml string
TOML() string
// UnmarshalTOML decodes the parsed data to the object
UnmarshalTOML(data interface{}) error
// Type is the plugin type
Type() Type
// PluginName is the string value of telegraf plugin package name.
PluginName() string
}

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"sort"
"testing"
"github.com/BurntSushi/toml"
@ -12,9 +13,28 @@ import (
"github.com/influxdata/platform/telegraf/plugins/outputs"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/platform/telegraf/plugins/inputs"
)
var telegrafCmpOptions = cmp.Options{
cmpopts.IgnoreUnexported(
inputs.CPUStats{},
inputs.Kubernetes{},
inputs.File{},
outputs.File{},
outputs.InfluxDBV2{},
unsupportedPlugin{},
),
cmp.Transformer("Sort", func(in []*TelegrafConfig) []*TelegrafConfig {
out := append([]*TelegrafConfig(nil), in...)
sort.Slice(out, func(i, j int) bool {
return out[i].ID > out[j].ID
})
return out
}),
}
type unsupportedPluginType struct {
Field string `json:"field"`
}
@ -55,6 +75,79 @@ func (u *unsupportedPlugin) UnmarshalTOML(data interface{}) error {
return nil
}
func TestTelegrafConfigJSONDecodeWithoutID(t *testing.T) {
s := `{
"name":"config 2",
"agent": {
"collectionInterval": 120000
},
"plugins": [
{
"name":"cpu",
"type":"input",
"comment": "cpu collect cpu metrics",
"config":{}
},
{
"name":"kubernetes",
"type":"input",
"config":{
"url":"http://1.1.1.1:12"
}
},
{
"name": "influxdb_v2",
"type": "output",
"comment": "3",
"config": {
"urls": [
"http://127.0.0.1:9999"
],
"token": "token1",
"organization": "org",
"bucket": "bucket"
}
}
]
}`
want := &TelegrafConfig{
Name: "config 2",
Agent: TelegrafAgentConfig{
Interval: 120000,
},
Plugins: []TelegrafPlugin{
{
Comment: "cpu collect cpu metrics",
Config: &inputs.CPUStats{},
},
{
Config: &inputs.Kubernetes{
URL: "http://1.1.1.1:12",
},
},
{
Comment: "3",
Config: &outputs.InfluxDBV2{
URLs: []string{
"http://127.0.0.1:9999",
},
Token: "token1",
Organization: "org",
Bucket: "bucket",
},
},
},
}
got := new(TelegrafConfig)
err := json.Unmarshal([]byte(s), got)
if err != nil {
t.Fatal("json decode error", err.Error())
}
if diff := cmp.Diff(got, want, telegrafCmpOptions...); diff != "" {
t.Errorf("telegraf configs are different -got/+want\ndiff %s", diff)
}
}
func TestTelegrafConfigJSON(t *testing.T) {
id1, _ := IDFromString("020f755c3c082000")
id2, _ := IDFromString("020f755c3c082002")
@ -153,9 +246,8 @@ func TestTelegrafConfigJSON(t *testing.T) {
t.Fatalf("%s decode failed, got err: %v, should be %v", c.name, err, c.err)
}
// use DeepEqual to ignore unexported field panic
if c.err == nil && !reflect.DeepEqual(got, c.cfg) {
t.Fatalf("%s decode failed, want: %v, got: %v", c.name, c.cfg, got)
if diff := cmp.Diff(got, c.cfg, telegrafCmpOptions...); c.err == nil && diff != "" {
t.Errorf("failed %s, telegraf configs are different -got/+want\ndiff %s", c.name, diff)
}
}
}

View File

@ -3,13 +3,13 @@ package testing
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sort"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
"github.com/influxdata/platform/telegraf/plugins/inputs"
@ -24,11 +24,14 @@ type TelegrafConfigFields struct {
}
var telegrafCmpOptions = cmp.Options{
cmp.Comparer(func(a, b *platform.TelegrafConfig) bool {
x, _ := json.Marshal(a)
y, _ := json.Marshal(b)
return bytes.Equal(x, y)
}),
cmpopts.IgnoreUnexported(
inputs.CPUStats{},
inputs.MemStats{},
inputs.Kubernetes{},
inputs.File{},
outputs.File{},
outputs.InfluxDBV2{},
),
cmp.Transformer("Sort", func(in []*platform.TelegrafConfig) []*platform.TelegrafConfig {
out := append([]*platform.TelegrafConfig(nil), in...)
sort.Slice(out, func(i, j int) bool {
@ -1138,6 +1141,102 @@ func UpdateTelegrafConfig(
},
},
},
{
name: "config update",
fields: TelegrafConfigFields{
TelegrafConfigs: []*platform.TelegrafConfig{
{
ID: MustIDBase16(oneID),
Name: "tc1",
LastModBy: MustIDBase16(threeID),
Plugins: []platform.TelegrafPlugin{
{
Config: &inputs.CPUStats{},
},
},
},
{
ID: MustIDBase16(twoID),
Name: "tc2",
LastModBy: MustIDBase16(threeID),
Plugins: []platform.TelegrafPlugin{
{
Comment: "comment1",
Config: &inputs.File{
Files: []string{"f1", "f2"},
},
},
{
Comment: "comment2",
Config: &inputs.Kubernetes{
URL: "http://1.2.3.4",
},
},
{
Config: &inputs.Kubernetes{
URL: "123",
},
},
},
},
},
},
args: args{
userID: MustIDBase16(fourID),
now: now,
id: MustIDBase16(twoID),
telegrafConfig: &platform.TelegrafConfig{
Name: "tc2",
LastModBy: MustIDBase16(threeID),
Plugins: []platform.TelegrafPlugin{
{
Comment: "comment1",
Config: &inputs.File{
Files: []string{"f1", "f2", "f3"},
},
},
{
Comment: "comment2",
Config: &inputs.Kubernetes{
URL: "http://1.2.3.5",
},
},
{
Config: &inputs.Kubernetes{
URL: "1234",
},
},
},
},
},
wants: wants{
telegrafConfig: &platform.TelegrafConfig{
ID: MustIDBase16(twoID),
Name: "tc2",
LastModBy: MustIDBase16(fourID),
LastMod: now,
Plugins: []platform.TelegrafPlugin{
{
Comment: "comment1",
Config: &inputs.File{
Files: []string{"f1", "f2", "f3"},
},
},
{
Comment: "comment2",
Config: &inputs.Kubernetes{
URL: "http://1.2.3.5",
},
},
{
Config: &inputs.Kubernetes{
URL: "1234",
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {