399 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			399 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
package influxdb
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/influxdata/influxdb/telegraf/plugins"
 | 
						|
	"github.com/influxdata/influxdb/telegraf/plugins/inputs"
 | 
						|
	"github.com/influxdata/influxdb/telegraf/plugins/outputs"
 | 
						|
)
 | 
						|
 | 
						|
// ErrTelegrafConfigInvalidOrgID is the error message for a missing or invalid organization ID.
 | 
						|
const ErrTelegrafConfigInvalidOrgID = "invalid org ID"
 | 
						|
 | 
						|
// ErrTelegrafConfigNotFound is the error message for a missing telegraf config.
 | 
						|
const ErrTelegrafConfigNotFound = "telegraf configuration not found"
 | 
						|
 | 
						|
// ops for buckets error and buckets op logs.
 | 
						|
var (
 | 
						|
	OpFindTelegrafConfigByID = "FindTelegrafConfigByID"
 | 
						|
	OpFindTelegrafConfigs    = "FindTelegrafConfigs"
 | 
						|
	OpCreateTelegrafConfig   = "CreateTelegrafConfig"
 | 
						|
	OpUpdateTelegrafConfig   = "UpdateTelegrafConfig"
 | 
						|
	OpDeleteTelegrafConfig   = "DeleteTelegrafConfig"
 | 
						|
)
 | 
						|
 | 
						|
// TelegrafConfigStore represents a service for managing telegraf config data.
 | 
						|
type TelegrafConfigStore interface {
 | 
						|
	// UserResourceMappingService must be part of all TelegrafConfigStore service,
 | 
						|
	// for create, search, delete.
 | 
						|
	UserResourceMappingService
 | 
						|
 | 
						|
	// FindTelegrafConfigByID returns a single telegraf config by ID.
 | 
						|
	FindTelegrafConfigByID(ctx context.Context, id ID) (*TelegrafConfig, error)
 | 
						|
 | 
						|
	// FindTelegrafConfigs returns a list of telegraf configs that match filter and the total count of matching telegraf configs.
 | 
						|
	// Additional options provide pagination & sorting.
 | 
						|
	FindTelegrafConfigs(ctx context.Context, filter TelegrafConfigFilter, opt ...FindOptions) ([]*TelegrafConfig, int, error)
 | 
						|
 | 
						|
	// CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier.
 | 
						|
	CreateTelegrafConfig(ctx context.Context, tc *TelegrafConfig, userID ID) error
 | 
						|
 | 
						|
	// UpdateTelegrafConfig updates a single telegraf config.
 | 
						|
	// Returns the new telegraf config after update.
 | 
						|
	UpdateTelegrafConfig(ctx context.Context, id ID, tc *TelegrafConfig, userID ID) (*TelegrafConfig, error)
 | 
						|
 | 
						|
	// DeleteTelegrafConfig removes a telegraf config by ID.
 | 
						|
	DeleteTelegrafConfig(ctx context.Context, id ID) error
 | 
						|
}
 | 
						|
 | 
						|
// TelegrafConfigFilter represents a set of filter that restrict the returned telegraf configs.
 | 
						|
type TelegrafConfigFilter struct {
 | 
						|
	OrgID        *ID
 | 
						|
	Organization *string
 | 
						|
	UserResourceMappingFilter
 | 
						|
}
 | 
						|
 | 
						|
// TelegrafConfig stores telegraf config for one telegraf instance.
 | 
						|
type TelegrafConfig struct {
 | 
						|
	ID          ID
 | 
						|
	OrgID       ID
 | 
						|
	Name        string
 | 
						|
	Description string
 | 
						|
 | 
						|
	Agent   TelegrafAgentConfig
 | 
						|
	Plugins []TelegrafPlugin
 | 
						|
}
 | 
						|
 | 
						|
// TOML returns the telegraf toml config string.
 | 
						|
func (tc TelegrafConfig) TOML() string {
 | 
						|
	plugins := ""
 | 
						|
	for _, p := range tc.Plugins {
 | 
						|
		plugins += p.Config.TOML()
 | 
						|
	}
 | 
						|
	interval := time.Duration(tc.Agent.Interval * 1000000)
 | 
						|
	return fmt.Sprintf(`# Configuration for telegraf agent
 | 
						|
[agent]
 | 
						|
  ## Default data collection interval for all inputs
 | 
						|
  interval = "%s"
 | 
						|
  ## Rounds collection interval to 'interval'
 | 
						|
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
 | 
						|
  round_interval = true
 | 
						|
 | 
						|
  ## Telegraf will send metrics to outputs in batches of at most
 | 
						|
  ## metric_batch_size metrics.
 | 
						|
  ## This controls the size of writes that Telegraf sends to output plugins.
 | 
						|
  metric_batch_size = 1000
 | 
						|
 | 
						|
  ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
 | 
						|
  ## output, and will flush this buffer on a successful write. Oldest metrics
 | 
						|
  ## are dropped first when this buffer fills.
 | 
						|
  ## This buffer only fills when writes fail to output plugin(s).
 | 
						|
  metric_buffer_limit = 10000
 | 
						|
 | 
						|
  ## Collection jitter is used to jitter the collection by a random amount.
 | 
						|
  ## Each plugin will sleep for a random time within jitter before collecting.
 | 
						|
  ## This can be used to avoid many plugins querying things like sysfs at the
 | 
						|
  ## same time, which can have a measurable effect on the system.
 | 
						|
  collection_jitter = "0s"
 | 
						|
 | 
						|
  ## Default flushing interval for all outputs. Maximum flush_interval will be
 | 
						|
  ## flush_interval + flush_jitter
 | 
						|
  flush_interval = "10s"
 | 
						|
  ## Jitter the flush interval by a random amount. This is primarily to avoid
 | 
						|
  ## large write spikes for users running a large number of telegraf instances.
 | 
						|
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
 | 
						|
  flush_jitter = "0s"
 | 
						|
 | 
						|
  ## By default or when set to "0s", precision will be set to the same
 | 
						|
  ## timestamp order as the collection interval, with the maximum being 1s.
 | 
						|
  ##   ie, when interval = "10s", precision will be "1s"
 | 
						|
  ##       when interval = "250ms", precision will be "1ms"
 | 
						|
  ## Precision will NOT be used for service inputs. It is up to each individual
 | 
						|
  ## service input to set the timestamp at the appropriate precision.
 | 
						|
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
 | 
						|
  precision = ""
 | 
						|
 | 
						|
  ## Logging configuration:
 | 
						|
  ## Run telegraf with debug log messages.
 | 
						|
  debug = false
 | 
						|
  ## Run telegraf in quiet mode (error log messages only).
 | 
						|
  quiet = false
 | 
						|
  ## Specify the log file name. The empty string means to log to stderr.
 | 
						|
  logfile = ""
 | 
						|
 | 
						|
  ## Override default hostname, if empty use os.Hostname()
 | 
						|
  hostname = ""
 | 
						|
  ## If set to true, do no set the "host" tag in the telegraf agent.
 | 
						|
  omit_hostname = false
 | 
						|
%s`, interval.String(), plugins)
 | 
						|
}
 | 
						|
 | 
						|
// telegrafConfigEncode is the helper struct for json encoding.
 | 
						|
type telegrafConfigEncode struct {
 | 
						|
	ID          ID     `json:"id"`
 | 
						|
	OrgID       ID     `json:"orgID,omitempty"`
 | 
						|
	Name        string `json:"name"`
 | 
						|
	Description string `json:"description"`
 | 
						|
 | 
						|
	Agent TelegrafAgentConfig `json:"agent"`
 | 
						|
 | 
						|
	Plugins []telegrafPluginEncode `json:"plugins"`
 | 
						|
}
 | 
						|
 | 
						|
// telegrafPluginEncode is the helper struct for json encoding.
 | 
						|
type telegrafPluginEncode struct {
 | 
						|
	// Name of the telegraf plugin, exp "docker"
 | 
						|
	Name    string         `json:"name"`
 | 
						|
	Type    plugins.Type   `json:"type"`
 | 
						|
	Comment string         `json:"comment,omitempty"`
 | 
						|
	Config  plugins.Config `json:"config,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
// telegrafConfigDecode is the helper struct for json decoding.
 | 
						|
type telegrafConfigDecode struct {
 | 
						|
	ID             ID     `json:"id"`
 | 
						|
	OrganizationID ID     `json:"organizationID,omitempty"`
 | 
						|
	OrgID          ID     `json:"orgID,omitempty"`
 | 
						|
	Name           string `json:"name"`
 | 
						|
	Description    string `json:"description"`
 | 
						|
 | 
						|
	Agent TelegrafAgentConfig `json:"agent"`
 | 
						|
 | 
						|
	Plugins []telegrafPluginDecode `json:"plugins"`
 | 
						|
}
 | 
						|
 | 
						|
// telegrafPluginDecode is the helper struct for json decoding.
 | 
						|
type telegrafPluginDecode struct {
 | 
						|
	// Name of the telegraf plugin, exp "docker"
 | 
						|
	Name    string          `json:"name"`
 | 
						|
	Type    plugins.Type    `json:"type"`
 | 
						|
	Comment string          `json:"comment,omitempty"`
 | 
						|
	Config  json.RawMessage `json:"config,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
// TelegrafPlugin is the general wrapper of the telegraf plugin config
 | 
						|
type TelegrafPlugin struct {
 | 
						|
	Comment string         `json:"comment,omitempty"`
 | 
						|
	Config  plugins.Config `json:"config,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
// TelegrafAgentConfig is based telegraf/internal/config AgentConfig.
 | 
						|
type TelegrafAgentConfig struct {
 | 
						|
	// Interval at which to gather information in miliseconds.
 | 
						|
	Interval int64 `json:"collectionInterval"`
 | 
						|
}
 | 
						|
 | 
						|
// errors
 | 
						|
const (
 | 
						|
	ErrTelegrafPluginNameUnmatch   = "the telegraf plugin is name %s doesn't match the config %s"
 | 
						|
	ErrNoTelegrafPlugins           = "there is no telegraf plugin in the config"
 | 
						|
	ErrUnsupportTelegrafPluginType = "unsupported telegraf plugin type %s"
 | 
						|
	ErrUnsupportTelegrafPluginName = "unsupported telegraf plugin %s, type %s"
 | 
						|
)
 | 
						|
 | 
						|
// MarshalJSON implement the json.Marshaler interface.
 | 
						|
func (tc *TelegrafConfig) MarshalJSON() ([]byte, error) {
 | 
						|
	tce := new(telegrafConfigEncode)
 | 
						|
	*tce = telegrafConfigEncode{
 | 
						|
		ID:          tc.ID,
 | 
						|
		OrgID:       tc.OrgID,
 | 
						|
		Name:        tc.Name,
 | 
						|
		Description: tc.Description,
 | 
						|
		Agent:       tc.Agent,
 | 
						|
		Plugins:     make([]telegrafPluginEncode, len(tc.Plugins)),
 | 
						|
	}
 | 
						|
	for k, p := range tc.Plugins {
 | 
						|
		tce.Plugins[k] = telegrafPluginEncode{
 | 
						|
			Name:    p.Config.PluginName(),
 | 
						|
			Type:    p.Config.Type(),
 | 
						|
			Comment: p.Comment,
 | 
						|
			Config:  p.Config,
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return json.Marshal(tce)
 | 
						|
}
 | 
						|
 | 
						|
// UnmarshalTOML implements toml.Unmarshaler interface.
 | 
						|
func (tc *TelegrafConfig) UnmarshalTOML(data interface{}) error {
 | 
						|
	dataOk, ok := data.(map[string]interface{})
 | 
						|
	if !ok {
 | 
						|
		return errors.New("blank string")
 | 
						|
	}
 | 
						|
	agent, ok := dataOk["agent"].(map[string]interface{})
 | 
						|
	if !ok {
 | 
						|
		return errors.New("agent is missing")
 | 
						|
	}
 | 
						|
 | 
						|
	intervalStr, ok := agent["interval"].(string)
 | 
						|
	if !ok {
 | 
						|
		return errors.New("agent interval is not string")
 | 
						|
	}
 | 
						|
 | 
						|
	interval, err := time.ParseDuration(intervalStr)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	tc.Agent = TelegrafAgentConfig{
 | 
						|
		Interval: interval.Nanoseconds() / 1000000,
 | 
						|
	}
 | 
						|
 | 
						|
	for tp, ps := range dataOk {
 | 
						|
		if tp == "agent" {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		plugins, ok := ps.(map[string]interface{})
 | 
						|
		if !ok {
 | 
						|
			return &Error{
 | 
						|
				Msg: "bad plugin type",
 | 
						|
			}
 | 
						|
		}
 | 
						|
		for name, configDataArray := range plugins {
 | 
						|
			if configDataArray == nil {
 | 
						|
				if err := tc.parseTOMLPluginConfig(tp, name, configDataArray); err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			for _, configData := range configDataArray.([]map[string]interface{}) {
 | 
						|
				if err := tc.parseTOMLPluginConfig(tp, name, configData); err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (tc *TelegrafConfig) parseTOMLPluginConfig(typ, name string, configData interface{}) error {
 | 
						|
	var ok bool
 | 
						|
	var tpFn func() plugins.Config
 | 
						|
	switch typ {
 | 
						|
	case "inputs":
 | 
						|
		tpFn, ok = availableInputPlugins[name]
 | 
						|
	case "outputs":
 | 
						|
		tpFn, ok = availableOutputPlugins[name]
 | 
						|
	default:
 | 
						|
		return &Error{
 | 
						|
			Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginType, typ),
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if !ok {
 | 
						|
		return &Error{
 | 
						|
			Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginName, name, typ),
 | 
						|
		}
 | 
						|
	}
 | 
						|
	p := tpFn()
 | 
						|
 | 
						|
	if err := p.UnmarshalTOML(configData); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	tc.Plugins = append(tc.Plugins, TelegrafPlugin{
 | 
						|
		Config: p,
 | 
						|
	})
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// UnmarshalJSON implement the json.Unmarshaler interface.
 | 
						|
func (tc *TelegrafConfig) UnmarshalJSON(b []byte) error {
 | 
						|
	tcd := new(telegrafConfigDecode)
 | 
						|
	if err := json.Unmarshal(b, tcd); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	orgID := tcd.OrgID
 | 
						|
	if !orgID.Valid() {
 | 
						|
		orgID = tcd.OrganizationID
 | 
						|
	}
 | 
						|
	*tc = TelegrafConfig{
 | 
						|
		ID:          tcd.ID,
 | 
						|
		OrgID:       orgID,
 | 
						|
		Name:        tcd.Name,
 | 
						|
		Description: tcd.Description,
 | 
						|
		Agent:       tcd.Agent,
 | 
						|
		Plugins:     make([]TelegrafPlugin, len(tcd.Plugins)),
 | 
						|
	}
 | 
						|
	return decodePluginRaw(tcd, tc)
 | 
						|
}
 | 
						|
 | 
						|
func decodePluginRaw(tcd *telegrafConfigDecode, tc *TelegrafConfig) (err error) {
 | 
						|
	op := "unmarshal telegraf config raw plugin"
 | 
						|
	for k, pr := range tcd.Plugins {
 | 
						|
		var tpFn func() plugins.Config
 | 
						|
		var config plugins.Config
 | 
						|
		var ok bool
 | 
						|
		switch pr.Type {
 | 
						|
		case plugins.Input:
 | 
						|
			tpFn, ok = availableInputPlugins[pr.Name]
 | 
						|
		case plugins.Output:
 | 
						|
			tpFn, ok = availableOutputPlugins[pr.Name]
 | 
						|
		default:
 | 
						|
			return &Error{
 | 
						|
				Code: EInvalid,
 | 
						|
				Msg:  fmt.Sprintf(ErrUnsupportTelegrafPluginType, pr.Type),
 | 
						|
				Op:   op,
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if ok {
 | 
						|
			config = tpFn()
 | 
						|
			// if pr.Config if empty, make it a blank obj,
 | 
						|
			// so it will still go to the unmarshalling process to validate.
 | 
						|
			if len(string(pr.Config)) == 0 {
 | 
						|
				pr.Config = []byte("{}")
 | 
						|
			}
 | 
						|
			if err = json.Unmarshal(pr.Config, config); err != nil {
 | 
						|
				return &Error{
 | 
						|
					Code: EInvalid,
 | 
						|
					Err:  err,
 | 
						|
					Op:   op,
 | 
						|
				}
 | 
						|
			}
 | 
						|
			tc.Plugins[k] = TelegrafPlugin{
 | 
						|
				Comment: pr.Comment,
 | 
						|
				Config:  config,
 | 
						|
			}
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		return &Error{
 | 
						|
			Code: EInvalid,
 | 
						|
			Op:   op,
 | 
						|
			Msg:  fmt.Sprintf(ErrUnsupportTelegrafPluginName, pr.Name, pr.Type),
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
var availableInputPlugins = map[string](func() plugins.Config){
 | 
						|
	"cpu":          func() plugins.Config { return &inputs.CPUStats{} },
 | 
						|
	"disk":         func() plugins.Config { return &inputs.DiskStats{} },
 | 
						|
	"diskio":       func() plugins.Config { return &inputs.DiskIO{} },
 | 
						|
	"docker":       func() plugins.Config { return &inputs.Docker{} },
 | 
						|
	"file":         func() plugins.Config { return &inputs.File{} },
 | 
						|
	"kernel":       func() plugins.Config { return &inputs.Kernel{} },
 | 
						|
	"kubernetes":   func() plugins.Config { return &inputs.Kubernetes{} },
 | 
						|
	"logparser":    func() plugins.Config { return &inputs.LogParserPlugin{} },
 | 
						|
	"mem":          func() plugins.Config { return &inputs.MemStats{} },
 | 
						|
	"net_response": func() plugins.Config { return &inputs.NetResponse{} },
 | 
						|
	"net":          func() plugins.Config { return &inputs.NetIOStats{} },
 | 
						|
	"nginx":        func() plugins.Config { return &inputs.Nginx{} },
 | 
						|
	"processes":    func() plugins.Config { return &inputs.Processes{} },
 | 
						|
	"procstat":     func() plugins.Config { return &inputs.Procstat{} },
 | 
						|
	"prometheus":   func() plugins.Config { return &inputs.Prometheus{} },
 | 
						|
	"redis":        func() plugins.Config { return &inputs.Redis{} },
 | 
						|
	"swap":         func() plugins.Config { return &inputs.SwapStats{} },
 | 
						|
	"syslog":       func() plugins.Config { return &inputs.Syslog{} },
 | 
						|
	"system":       func() plugins.Config { return &inputs.SystemStats{} },
 | 
						|
	"tail":         func() plugins.Config { return &inputs.Tail{} },
 | 
						|
}
 | 
						|
 | 
						|
var availableOutputPlugins = map[string](func() plugins.Config){
 | 
						|
	"file":        func() plugins.Config { return &outputs.File{} },
 | 
						|
	"influxdb_v2": func() plugins.Config { return &outputs.InfluxDBV2{} },
 | 
						|
}
 |