chore(pkger): refactor parser telegraf config state out into stateful type

references: #17434
pull/17770/head
Johnny Steenbergen 2020-04-16 11:27:30 -07:00 committed by Johnny Steenbergen
parent c02e83038f
commit f084a93fa2
11 changed files with 215 additions and 137 deletions

View File

@ -7815,8 +7815,8 @@ components:
items:
type: object
properties:
remove:
type: boolean
stateStatus:
type: string
id:
type: string
pkgName:

View File

@ -513,20 +513,8 @@ type (
type DiffTelegraf struct {
DiffIdentifier
New influxdb.TelegrafConfig
Old *influxdb.TelegrafConfig
}
func newDiffTelegraf(t *telegraf) DiffTelegraf {
return DiffTelegraf{
DiffIdentifier: DiffIdentifier{
ID: SafeID(t.ID()),
Remove: t.shouldRemove,
PkgName: t.PkgName(),
},
New: t.config,
Old: t.existing,
}
New influxdb.TelegrafConfig `json:"new"`
Old *influxdb.TelegrafConfig `json:"old"`
}
type (
@ -1082,77 +1070,6 @@ func (r mapperNotificationRules) Len() int {
return len(r)
}
const (
fieldTelegrafConfig = "config"
)
type telegraf struct {
identity
config influxdb.TelegrafConfig
labels sortedLabels
existing *influxdb.TelegrafConfig
}
func (t *telegraf) ID() influxdb.ID {
if t.existing != nil {
return t.existing.ID
}
return t.config.ID
}
func (t *telegraf) Labels() []*label {
return t.labels
}
func (t *telegraf) ResourceType() influxdb.ResourceType {
return KindTelegraf.ResourceType()
}
func (t *telegraf) Exists() bool {
return t.existing != nil
}
func (t *telegraf) summarize() SummaryTelegraf {
cfg := t.config
cfg.Name = t.Name()
return SummaryTelegraf{
PkgName: t.PkgName(),
TelegrafConfig: cfg,
LabelAssociations: toSummaryLabels(t.labels...),
}
}
func (t *telegraf) valid() []validationErr {
var vErrs []validationErr
if t.config.Config == "" {
vErrs = append(vErrs, validationErr{
Field: fieldTelegrafConfig,
Msg: "no config provided",
})
}
if len(vErrs) > 0 {
return []validationErr{
objectValidationErr(fieldSpec, vErrs...),
}
}
return nil
}
type mapperTelegrafs []*telegraf
func (m mapperTelegrafs) Association(i int) labelAssociater {
return m[i]
}
func (m mapperTelegrafs) Len() int {
return len(m)
}
const (
fieldDashCharts = "charts"
)

View File

@ -491,11 +491,6 @@ func TestPkg(t *testing.T) {
kind: KindNotificationRule,
validName: "rule_UUID",
},
{
pkgFile: "testdata/telegraf.yml",
kind: KindTelegraf,
validName: "first_tele_config",
},
}
for _, tt := range tests {

View File

@ -424,11 +424,6 @@ func (p *Pkg) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) {
identity: newIdentity,
id: id,
}
case KindTelegraf:
p.mTelegrafs[pkgName] = &telegraf{
identity: newIdentity,
config: influxdb.TelegrafConfig{ID: id},
}
}
}
@ -449,11 +444,6 @@ func (p *Pkg) getObjectIDSetter(k Kind, pkgName string) (func(influxdb.ID), bool
return func(id influxdb.ID) {
r.id = id
}, ok
case KindTelegraf:
t, ok := p.mTelegrafs[pkgName]
return func(id influxdb.ID) {
t.config.ID = id
}, ok
default:
return nil, false
}

View File

@ -896,6 +896,54 @@ func (tft taskFluxTranslation) generateTaskOption() string {
return fmt.Sprintf("option task = { %s }", strings.Join(taskOpts, ", "))
}
const (
fieldTelegrafConfig = "config"
)
type telegraf struct {
identity
config influxdb.TelegrafConfig
labels sortedLabels
}
func (t *telegraf) Labels() []*label {
return t.labels
}
func (t *telegraf) ResourceType() influxdb.ResourceType {
return KindTelegraf.ResourceType()
}
func (t *telegraf) summarize() SummaryTelegraf {
cfg := t.config
cfg.Name = t.Name()
return SummaryTelegraf{
PkgName: t.PkgName(),
TelegrafConfig: cfg,
LabelAssociations: toSummaryLabels(t.labels...),
}
}
func (t *telegraf) valid() []validationErr {
var vErrs []validationErr
if t.config.Config == "" {
vErrs = append(vErrs, validationErr{
Field: fieldTelegrafConfig,
Msg: "no config provided",
})
}
if len(vErrs) > 0 {
return []validationErr{
objectValidationErr(fieldSpec, vErrs...),
}
}
return nil
}
const (
fieldArgTypeConstant = "constant"
fieldArgTypeMap = "map"

View File

@ -3543,10 +3543,11 @@ spec:
assert.Equal(t, "display name", actual.TelegrafConfig.Name)
assert.Equal(t, "desc", actual.TelegrafConfig.Description)
require.Len(t, actual.LabelAssociations, 1)
require.Len(t, actual.LabelAssociations, 2)
assert.Equal(t, "label_1", actual.LabelAssociations[0].Name)
assert.Equal(t, "label_2", actual.LabelAssociations[1].Name)
require.Len(t, sum.LabelMappings, 1)
require.Len(t, sum.LabelMappings, 2)
expectedMapping := SummaryLabelMapping{
Status: StateStatusNew,
ResourcePkgName: "first_tele_config",
@ -3556,6 +3557,9 @@ spec:
ResourceType: influxdb.TelegrafsResourceType,
}
assert.Equal(t, expectedMapping, sum.LabelMappings[0])
expectedMapping.LabelPkgName = "label_2"
expectedMapping.LabelName = "label_2"
assert.Equal(t, expectedMapping, sum.LabelMappings[1])
})
})

View File

@ -687,7 +687,6 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts
var diff Diff
diff.Dashboards = s.dryRunDashboards(pkg)
diff.Telegrafs = s.dryRunTelegraf(pkg)
diffRules, err := s.dryRunNotificationRules(ctx, orgID, pkg)
if err != nil {
@ -714,6 +713,7 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts
diff.NotificationEndpoints = stateDiff.NotificationEndpoints
diff.Labels = stateDiff.Labels
diff.Tasks = stateDiff.Tasks
diff.Telegrafs = stateDiff.Telegrafs
diff.Variables = stateDiff.Variables
diff.LabelMappings = append(stateDiff.LabelMappings, diff.LabelMappings...)
@ -897,15 +897,6 @@ func (s *Service) dryRunSecrets(ctx context.Context, orgID influxdb.ID, pkg *Pkg
return nil
}
func (s *Service) dryRunTelegraf(pkg *Pkg) []DiffTelegraf {
telegrafs := pkg.telegrafs()
diffs := make([]DiffTelegraf, 0, len(telegrafs))
for _, t := range telegrafs {
diffs = append(diffs, newDiffTelegraf(t))
}
return diffs
}
func (s *Service) dryRunVariables(ctx context.Context, orgID influxdb.ID, vars map[string]*stateVariable) {
existingVars, _ := s.getAllPlatformVariables(ctx, orgID)
@ -952,7 +943,6 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg, state *stat
mappers := []labelMappers{
mapperDashboards(pkg.dashboards()),
mapperNotificationRules(pkg.notificationRules()),
mapperTelegrafs(pkg.telegrafs()),
}
diffs := make([]DiffLabelMapping, 0)
@ -1100,6 +1090,17 @@ func (s *Service) dryRunLabelMappingsV2(ctx context.Context, state *stateCoordin
mappings = append(mappings, mm...)
}
for _, t := range state.mTelegrafs {
if IsRemoval(t.stateStatus) {
continue
}
mm, err := s.dryRunResourceLabelMappingV2(ctx, state, stateLabelsByResName, t)
if err != nil {
return nil, err
}
mappings = append(mappings, mm...)
}
for _, v := range state.mVariables {
if IsRemoval(v.stateStatus) {
continue
@ -1325,7 +1326,7 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o
s.applyDashboards(pkg.dashboards()),
s.applyNotificationEndpoints(ctx, userID, state.endpoints()),
s.applyTasks(state.tasks()),
s.applyTelegrafs(pkg.telegrafs()),
s.applyTelegrafs(state.telegrafConfigs()),
},
}
@ -2112,16 +2113,16 @@ func (s *Service) applyTasks(tasks []*stateTask) applier {
}
}
func (s *Service) applyTelegrafs(teles []*telegraf) applier {
func (s *Service) applyTelegrafs(teles []*stateTelegraf) applier {
const resource = "telegrafs"
mutex := new(doMutex)
rollbackTelegrafs := make([]*telegraf, 0, len(teles))
rollbackTelegrafs := make([]*stateTelegraf, 0, len(teles))
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var cfg influxdb.TelegrafConfig
mutex.Do(func() {
teles[i].config.OrgID = orgID
teles[i].orgID = orgID
cfg = teles[i].summarize().TelegrafConfig
})
@ -2134,7 +2135,7 @@ func (s *Service) applyTelegrafs(teles []*telegraf) applier {
}
mutex.Do(func() {
teles[i].config = cfg
teles[i].id = cfg.ID
rollbackTelegrafs = append(rollbackTelegrafs, teles[i])
})
@ -2617,6 +2618,7 @@ func newSummaryFromStatePkg(pkg *Pkg, state *stateCoordinator) Summary {
pkgSum.NotificationEndpoints = stateSum.NotificationEndpoints
pkgSum.Labels = stateSum.Labels
pkgSum.Tasks = stateSum.Tasks
pkgSum.TelegrafConfigs = stateSum.TelegrafConfigs
pkgSum.Variables = stateSum.Variables
// filter out label mappings that are from pgk and replace with those
@ -2626,6 +2628,7 @@ func newSummaryFromStatePkg(pkg *Pkg, state *stateCoordinator) Summary {
influxdb.ChecksResourceType: true,
influxdb.NotificationEndpointResourceType: true,
influxdb.TasksResourceType: true,
influxdb.TelegrafsResourceType: true,
influxdb.VariablesResourceType: true,
}
for _, lm := range pkgSum.LabelMappings {

View File

@ -13,6 +13,7 @@ type stateCoordinator struct {
mEndpoints map[string]*stateEndpoint
mLabels map[string]*stateLabel
mTasks map[string]*stateTask
mTelegrafs map[string]*stateTelegraf
mVariables map[string]*stateVariable
labelMappings []stateLabelMapping
@ -25,6 +26,7 @@ func newStateCoordinator(pkg *Pkg) *stateCoordinator {
mEndpoints: make(map[string]*stateEndpoint),
mLabels: make(map[string]*stateLabel),
mTasks: make(map[string]*stateTask),
mTelegrafs: make(map[string]*stateTelegraf),
mVariables: make(map[string]*stateVariable),
}
@ -58,6 +60,12 @@ func newStateCoordinator(pkg *Pkg) *stateCoordinator {
stateStatus: StateStatusNew,
}
}
for _, pkgTele := range pkg.telegrafs() {
state.mTelegrafs[pkgTele.PkgName()] = &stateTelegraf{
parserTelegraf: pkgTele,
stateStatus: StateStatusNew,
}
}
for _, pkgVar := range pkg.variables() {
state.mVariables[pkgVar.PkgName()] = &stateVariable{
parserVar: pkgVar,
@ -108,6 +116,14 @@ func (s *stateCoordinator) tasks() []*stateTask {
return out
}
func (s *stateCoordinator) telegrafConfigs() []*stateTelegraf {
out := make([]*stateTelegraf, 0, len(s.mTelegrafs))
for _, t := range s.mTelegrafs {
out = append(out, t)
}
return out
}
func (s *stateCoordinator) variables() []*stateVariable {
out := make([]*stateVariable, 0, len(s.mVariables))
for _, v := range s.mVariables {
@ -153,6 +169,13 @@ func (s *stateCoordinator) diff() Diff {
return diff.Tasks[i].PkgName < diff.Tasks[j].PkgName
})
for _, t := range s.mTelegrafs {
diff.Telegrafs = append(diff.Telegrafs, t.diffTelegraf())
}
sort.Slice(diff.Telegrafs, func(i, j int) bool {
return diff.Telegrafs[i].PkgName < diff.Telegrafs[j].PkgName
})
for _, v := range s.mVariables {
diff.Variables = append(diff.Variables, v.diffVariable())
}
@ -232,6 +255,16 @@ func (s *stateCoordinator) summary() Summary {
return sum.Tasks[i].PkgName < sum.Tasks[j].PkgName
})
for _, t := range s.mTelegrafs {
if IsRemoval(t.stateStatus) {
continue
}
sum.TelegrafConfigs = append(sum.TelegrafConfigs, t.summarize())
}
sort.Slice(sum.TelegrafConfigs, func(i, j int) bool {
return sum.TelegrafConfigs[i].PkgName < sum.TelegrafConfigs[j].PkgName
})
for _, v := range s.mVariables {
if IsRemoval(v.stateStatus) {
continue
@ -320,6 +353,12 @@ func (s *stateCoordinator) addObjectForRemoval(k Kind, pkgName string, id influx
parserTask: &task{identity: newIdentity},
stateStatus: StateStatusRemove,
}
case KindTelegraf:
s.mTelegrafs[pkgName] = &stateTelegraf{
id: id,
parserTelegraf: &telegraf{identity: newIdentity},
stateStatus: StateStatusRemove,
}
case KindVariable:
s.mVariables[pkgName] = &stateVariable{
id: id,
@ -364,6 +403,12 @@ func (s *stateCoordinator) getObjectIDSetter(k Kind, pkgName string) (func(influ
r.id = id
r.stateStatus = StateStatusExists
}, ok
case KindTelegraf:
r, ok := s.mTelegrafs[pkgName]
return func(id influxdb.ID) {
r.id = id
r.stateStatus = StateStatusExists
}, ok
case KindVariable:
r, ok := s.mVariables[pkgName]
return func(id influxdb.ID) {
@ -783,6 +828,58 @@ func (t *stateTask) summarize() SummaryTask {
return sum
}
type stateTelegraf struct {
id, orgID influxdb.ID
stateStatus StateStatus
parserTelegraf *telegraf
existing *influxdb.TelegrafConfig
}
func (t *stateTelegraf) ID() influxdb.ID {
if !IsNew(t.stateStatus) && t.existing != nil {
return t.existing.ID
}
return t.id
}
func (t *stateTelegraf) diffTelegraf() DiffTelegraf {
return DiffTelegraf{
DiffIdentifier: DiffIdentifier{
ID: SafeID(t.ID()),
Remove: IsRemoval(t.stateStatus),
PkgName: t.parserTelegraf.PkgName(),
},
New: t.parserTelegraf.config,
Old: t.existing,
}
}
func (t *stateTelegraf) labels() []*label {
return t.parserTelegraf.labels
}
func (t *stateTelegraf) resourceType() influxdb.ResourceType {
return influxdb.TelegrafsResourceType
}
func (t *stateTelegraf) stateIdentity() stateIdentity {
return stateIdentity{
id: t.ID(),
name: t.parserTelegraf.Name(),
pkgName: t.parserTelegraf.PkgName(),
resourceType: t.resourceType(),
stateStatus: t.stateStatus,
}
}
func (t *stateTelegraf) summarize() SummaryTelegraf {
sum := t.parserTelegraf.summarize()
sum.TelegrafConfig.ID = t.id
sum.TelegrafConfig.OrgID = t.orgID
return sum
}
type stateVariable struct {
id, orgID influxdb.ID
stateStatus StateStatus

View File

@ -913,9 +913,6 @@ func TestService(t *testing.T) {
l.ID = influxdb.ID(fakeLabelSVC.CreateLabelCalls.Count() + 1)
return nil
}
fakeLabelSVC.DeleteLabelMappingFn = func(_ context.Context, m *influxdb.LabelMapping) error {
return nil
}
fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, mapping *influxdb.LabelMapping) error {
if mapping.ResourceID == 0 {
return errors.New("did not get a resource ID")
@ -1173,19 +1170,22 @@ func TestService(t *testing.T) {
})
t.Run("maps telegrafs with labels", func(t *testing.T) {
testLabelMappingFn(
t,
"testdata/telegraf.yml",
1,
func() []ServiceSetterFn {
fakeTeleSVC := mock.NewTelegrafConfigStore()
fakeTeleSVC.CreateTelegrafConfigF = func(_ context.Context, cfg *influxdb.TelegrafConfig, _ influxdb.ID) error {
cfg.ID = influxdb.ID(rand.Int())
return nil
}
return []ServiceSetterFn{WithTelegrafSVC(fakeTeleSVC)}
},
)
opts := func() []ServiceSetterFn {
fakeTeleSVC := mock.NewTelegrafConfigStore()
fakeTeleSVC.CreateTelegrafConfigF = func(_ context.Context, cfg *influxdb.TelegrafConfig, _ influxdb.ID) error {
cfg.ID = influxdb.ID(rand.Int())
return nil
}
return []ServiceSetterFn{WithTelegrafSVC(fakeTeleSVC)}
}
t.Run("applies successfully", func(t *testing.T) {
testLabelMappingV2ApplyFn(t, "testdata/telegraf.yml", 2, opts)
})
t.Run("deletes new label mappings on error", func(t *testing.T) {
testLabelMappingV2RollbackFn(t, "testdata/telegraf.yml", 1, opts)
})
})
t.Run("maps variables with labels", func(t *testing.T) {
@ -1456,6 +1456,7 @@ func TestService(t *testing.T) {
testfileRunner(t, "testdata/telegraf.yml", func(t *testing.T, pkg *Pkg) {
fakeTeleSVC := mock.NewTelegrafConfigStore()
fakeTeleSVC.CreateTelegrafConfigF = func(_ context.Context, tc *influxdb.TelegrafConfig, userID influxdb.ID) error {
t.Log("called")
if fakeTeleSVC.CreateTelegrafConfigCalls.Count() == 1 {
return errors.New("limit hit")
}
@ -1469,7 +1470,12 @@ func TestService(t *testing.T) {
return nil
}
pkg.mTelegrafs["first_tele_config_copy"] = pkg.mTelegrafs["first_tele_config"]
stubTele := &telegraf{
identity: identity{
name: &references{val: "stub"},
},
}
pkg.mTelegrafs[stubTele.PkgName()] = stubTele
svc := newTestService(WithTelegrafSVC(fakeTeleSVC))

View File

@ -6,6 +6,13 @@
"name": "label_1"
}
},
{
"apiVersion": "influxdata.com/v2alpha1",
"kind": "Label",
"metadata": {
"name": "label_2"
}
},
{
"apiVersion": "influxdata.com/v2alpha1",
"kind": "Telegraf",
@ -19,6 +26,10 @@
{
"kind": "Label",
"name": "label_1"
},
{
"kind": "Label",
"name": "label_2"
}
],
"config": "# Configuration for telegraf agent\n [agent]\n ## Default data collection interval for all inputs\n interval = \"10s\"\n ## Rounds collection interval to 'interval'\n ## ie, if interval=\"10s\" then always collect on :00, :10, :20, etc.\n round_interval = true\n\n ## Telegraf will send metrics to outputs in batches of at most\n ## metric_batch_size metrics.\n ## This controls the size of writes that Telegraf sends to output plugins.\n metric_batch_size = 1000\n\n ## For failed writes, telegraf will cache metric_buffer_limit metrics for each\n ## output, and will flush this buffer on a successful write. Oldest metrics\n ## are dropped first when this buffer fills.\n ## This buffer only fills when writes fail to output plugin(s).\n metric_buffer_limit = 10000\n\n ## Collection jitter is used to jitter the collection by a random amount.\n ## Each plugin will sleep for a random time within jitter before collecting.\n ## This can be used to avoid many plugins querying things like sysfs at the\n ## same time, which can have a measurable effect on the system.\n collection_jitter = \"0s\"\n\n ## Default flushing interval for all outputs. Maximum flush_interval will be\n ## flush_interval + flush_jitter\n flush_interval = \"10s\"\n ## Jitter the flush interval by a random amount. This is primarily to avoid\n ## large write spikes for users running a large number of telegraf instances.\n ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s\n flush_jitter = \"0s\"\n\n ## By default or when set to \"0s\", precision will be set to the same\n ## timestamp order as the collection interval, with the maximum being 1s.\n ## ie, when interval = \"10s\", precision will be \"1s\"\n ## when interval = \"250ms\", precision will be \"1ms\"\n ## Precision will NOT be used for service inputs. It is up to each individual\n ## service input to set the timestamp at the appropriate precision.\n ## Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\".\n precision = \"\"\n\n ## Logging configuration:\n ## Run telegraf with debug log messages.\n debug = false\n ## Run telegraf in quiet mode (error log messages only).\n quiet = false\n ## Specify the log file name. The empty string means to log to stderr.\n logfile = \"\"\n\n ## Override default hostname, if empty use os.Hostname()\n hostname = \"\"\n ## If set to true, do no set the \"host\" tag in the telegraf agent.\n omit_hostname = false\n [[outputs.influxdb_v2]]\n ## The URLs of the InfluxDB cluster nodes.\n ##\n ## Multiple URLs can be specified for a single cluster, only ONE of the\n ## urls will be written to each interval.\n ## urls exp: http://127.0.0.1:9999\n urls = [\"http://localhost:9999\"]\n\n ## Token for authentication.\n token = \"$INFLUX_TOKEN\"\n\n ## Organization is the name of the organization you wish to write to; must exist.\n organization = \"rg\"\n\n ## Destination bucket to write into.\n bucket = \"rucket_3\"\n [[inputs.cpu]]\n ## Whether to report per-cpu stats or not\n percpu = true\n ## Whether to report total system cpu stats or not\n totalcpu = true\n ## If true, collect raw CPU time metrics.\n collect_cpu_time = false\n ## If true, compute and report the sum of all non-idle CPU states.\n report_active = false\n [[inputs.disk]]\n ## By default stats will be gathered for all mount points.\n ## Set mount_points will restrict the stats to only the specified mount points.\n # mount_points = [\"/\"]\n ## Ignore mount points by filesystem type.\n ignore_fs = [\"tmpfs\", \"devtmpfs\", \"devfs\", \"overlay\", \"aufs\", \"squashfs\"]\n [[inputs.diskio]]\n [[inputs.mem]]\n [[inputs.net]]\n [[inputs.processes]]\n [[inputs.swap]]\n [[inputs.system]]"

View File

@ -4,6 +4,11 @@ metadata:
name: label_1
---
apiVersion: influxdata.com/v2alpha1
kind: Label
metadata:
name: label_2
---
apiVersion: influxdata.com/v2alpha1
kind: Telegraf
metadata:
name: first_tele_config
@ -13,6 +18,8 @@ spec:
associations:
- kind: Label
name: label_1
- kind: Label
name: label_2
config: |
# Configuration for telegraf agent
[agent]