8426: updated code based on pull-request feedback
parent
e4b4cfb7a1
commit
b12ee4e6ee
|
|
@ -4,11 +4,11 @@
|
|||
|
||||
#### `[collectd]` Section
|
||||
|
||||
* `parse-multivalue-plugin` was added with a default of `split`. When set to `split`, multivalue plugin data will be split into separate measurements. When set to `join`, multivalue plugin will be stored as a single multi-value measurement.
|
||||
* `parse-multivalue-plugin` was added with a default of `split`. When set to `split`, multivalue plugin data (e.g. df free:5000,used:1000) will be split into separate measurements (e.g., (df_free, value=5000) (df_used, value=1000)). When set to `join`, multivalue plugin will be stored as a single multi-value measurement (e.g., (df, free=5000,used=1000)).
|
||||
|
||||
### Features
|
||||
|
||||
- [#8426] (https://github.com/influxdata/influxdb/issues/8426): Add `parse-multivalue-plugin` to allow users to choose how multivalue plugins should be handled by the collectd service.
|
||||
- [#8426](https://github.com/influxdata/influxdb/issues/8426): Add `parse-multivalue-plugin` to allow users to choose how multivalue plugins should be handled by the collectd service.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
@ -57,10 +57,6 @@ The following new configuration options are available.
|
|||
|
||||
* `query-stats-enabled` was added with a default of `false`. When set to `true`, continuous query execution statistics are written to the default monitor store.
|
||||
|
||||
#### `[collectd]` Section
|
||||
|
||||
* `parse-multivalue-plugin` was added with a default of `split`. When set to `split`, multivalue plugin data will be split into separate measurements. When set to `join`, multivalue plugin will be stored as a single multi-value measurement.
|
||||
|
||||
### Features
|
||||
|
||||
- [#8143](https://github.com/influxdata/influxdb/pull/8143): Add WAL sync delay
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ Each collectd input allows the binding address, target database, and target rete
|
|||
|
||||
Each collectd input also performs internal batching of the points it receives, as batched writes to the database are more efficient. The default batch size is 1000, pending batch factor is 5, with a batch timeout of 1 second. This means the input will write batches of maximum size 1000, but if a batch has not reached 1000 points within 1 second of the first point being added to a batch, it will emit that batch regardless of size. The pending batch factor controls how many batches can be in memory at once, allowing the input to transmit a batch, while still building other batches.
|
||||
|
||||
Multi-value plugins can be handled two ways. "split" will parse and store the multi-value plugin data into separate measurements, while "join" will parse and store the multi-value plugin as a single multi-value measurement. "split" is the default behavior for backward compatability with previous versions of influxdb.
|
||||
Multi-value plugins can be handled two ways. Setting parse-multivalue-plugin to "split" will parse and store the multi-value plugin data (e.g., df free:5000,used:1000) into separate measurements (e.g., (df_free, value=5000) (df_used, value=1000)), while "join" will parse and store the multi-value plugin as a single multi-value measurement (e.g., (df, free=5000,used=1000)). "split" is the default behavior for backward compatability with previous versions of influxdb.
|
||||
|
||||
The path to the collectd types database file may also be set.
|
||||
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ const (
|
|||
// DefaultAuthFile is the default location of the user/password file.
|
||||
DefaultAuthFile = "/etc/collectd/auth_file"
|
||||
|
||||
// DefaultParseMultiValuePlugin is false, defaulting to version <1.2 where plugin values were split into separate rows
|
||||
// DefaultParseMultiValuePlugin is "split", defaulting to version <1.2 where plugin values were split into separate rows
|
||||
DefaultParseMultiValuePlugin = "split"
|
||||
)
|
||||
|
||||
|
|
@ -138,7 +138,7 @@ func (c *Config) Validate() error {
|
|||
switch c.ParseMultiValuePlugin {
|
||||
case "split", "join":
|
||||
default:
|
||||
return errors.New("Invalid value for parse-multivalue-plugin. Valid options are \"split\" and \"join\"")
|
||||
return errors.New(`Invalid value for parse-multivalue-plugin. Valid options are "split" and "join"`)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -390,17 +390,17 @@ func (s *Service) writePoints() {
|
|||
}
|
||||
}
|
||||
|
||||
// UnmarshalValueListPacked is an alternative to the original UnmarshalValueList
|
||||
// UnmarshalValueListPacked is an alternative to the original UnmarshalValueList.
|
||||
// The difference is that the original provided measurements like (PLUGIN_DSNAME, ["value",xxx])
|
||||
// while this one will provide measurements like (PLUGIN, {["DSNAME",xxx]})
|
||||
// this effectively joins collectd data that should go together, such as:
|
||||
// (df, {["used",1000],["free",2500]})
|
||||
// while this one will provide measurements like (PLUGIN, {["DSNAME",xxx]}).
|
||||
// This effectively joins collectd data that should go together, such as:
|
||||
// (df, {["used",1000],["free",2500]}).
|
||||
func (s *Service) UnmarshalValueListPacked(vl *api.ValueList) []models.Point {
|
||||
timestamp := vl.Time.UTC()
|
||||
|
||||
var name = vl.Identifier.Plugin
|
||||
tags := make(map[string]string)
|
||||
fields := make(map[string]interface{})
|
||||
tags := make(map[string]string, 4)
|
||||
fields := make(map[string]interface{}, len(vl.Values))
|
||||
|
||||
if vl.Identifier.Host != "" {
|
||||
tags["host"] = vl.Identifier.Host
|
||||
|
|
@ -415,9 +415,9 @@ func (s *Service) UnmarshalValueListPacked(vl *api.ValueList) []models.Point {
|
|||
tags["type_instance"] = vl.Identifier.TypeInstance
|
||||
}
|
||||
|
||||
for i := range vl.Values {
|
||||
for i, v := range vl.Values {
|
||||
fieldName := vl.DSName(i)
|
||||
switch value := vl.Values[i].(type) {
|
||||
switch value := v.(type) {
|
||||
case api.Gauge:
|
||||
fields[fieldName] = float64(value)
|
||||
case api.Derive:
|
||||
|
|
@ -431,9 +431,9 @@ func (s *Service) UnmarshalValueListPacked(vl *api.ValueList) []models.Point {
|
|||
if err != nil {
|
||||
s.Logger.Info(fmt.Sprintf("Dropping point %v: %v", name, err))
|
||||
atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// return as an array for compatability with rest of API
|
||||
return []models.Point{p}
|
||||
}
|
||||
|
||||
|
|
@ -445,8 +445,8 @@ func (s *Service) UnmarshalValueList(vl *api.ValueList) []models.Point {
|
|||
for i := range vl.Values {
|
||||
var name string
|
||||
name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
|
||||
tags := make(map[string]string)
|
||||
fields := make(map[string]interface{})
|
||||
tags := make(map[string]string, 4)
|
||||
fields := make(map[string]interface{}, 1)
|
||||
|
||||
// Convert interface back to actual type, then to float64
|
||||
switch value := vl.Values[i].(type) {
|
||||
|
|
|
|||
|
|
@ -231,16 +231,18 @@ func TestService_BatchSize(t *testing.T) {
|
|||
t.Fatalf("only sent %d of %d bytes", n, len(testData))
|
||||
}
|
||||
|
||||
points := []models.Point{}
|
||||
var points []models.Point
|
||||
timer := time.NewTimer(time.Second)
|
||||
Loop:
|
||||
for {
|
||||
timer.Reset(time.Second)
|
||||
select {
|
||||
case p := <-pointCh:
|
||||
points = append(points, p)
|
||||
if len(points) == totalPoints {
|
||||
break Loop
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
case <-timer.C:
|
||||
t.Logf("exp %d points, got %d", totalPoints, len(points))
|
||||
t.Fatal("timed out waiting for points from collectd service")
|
||||
}
|
||||
|
|
@ -260,7 +262,8 @@ func TestService_BatchSize(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test that the collectd service correctly batches points by BatchSize.
|
||||
// Test that the parse-multi-value-plugin config works properly.
|
||||
// The other tests already verify the 'split' config, so this only runs the 'join' test.
|
||||
func TestService_ParseMultiValuePlugin(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
@ -295,17 +298,19 @@ func TestService_ParseMultiValuePlugin(t *testing.T) {
|
|||
t.Fatalf("only sent %d of %d bytes", n, len(testData))
|
||||
}
|
||||
|
||||
points := []models.Point{}
|
||||
var points []models.Point
|
||||
|
||||
timer := time.NewTimer(time.Second)
|
||||
Loop:
|
||||
for {
|
||||
timer.Reset(time.Second)
|
||||
select {
|
||||
case p := <-pointCh:
|
||||
points = append(points, p)
|
||||
if len(points) == totalPoints {
|
||||
break Loop
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
case <-timer.C:
|
||||
t.Logf("exp %d points, got %d", totalPoints, len(points))
|
||||
t.Fatal("timed out waiting for points from collectd service")
|
||||
}
|
||||
|
|
@ -355,16 +360,18 @@ func TestService_BatchDuration(t *testing.T) {
|
|||
t.Fatalf("only sent %d of %d bytes", n, len(testData))
|
||||
}
|
||||
|
||||
points := []models.Point{}
|
||||
var points []models.Point
|
||||
timer := time.NewTimer(time.Second)
|
||||
Loop:
|
||||
for {
|
||||
timer.Reset(time.Second)
|
||||
select {
|
||||
case p := <-pointCh:
|
||||
points = append(points, p)
|
||||
if len(points) == totalPoints {
|
||||
break Loop
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
case <-timer.C:
|
||||
t.Logf("exp %d points, got %d", totalPoints, len(points))
|
||||
t.Fatal("timed out waiting for points from collectd service")
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue