feat(cmd/influx/write): specify --rate-limit as COUNT(B|kB|MB)/TIME(s|sec|m|min)

pull/19660/head
Pavel Zavora 2020-10-02 05:07:37 +02:00
parent b2a51593b3
commit f8ffcb8a65
2 changed files with 110 additions and 6 deletions

View File

@ -9,6 +9,8 @@ import (
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"github.com/fujiwara/shapeio"
@ -42,7 +44,7 @@ type writeFlagsType struct {
IgnoreDataTypeInColumnName bool
Encoding string
ErrorsFile string
RateLimit float64
RateLimit string
}
var writeFlags writeFlagsType
@ -93,7 +95,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice
cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to")
cmd.PersistentFlags().Float64Var(&writeFlags.RateLimit, "rate-limit", 0.0, "How many megabytes per minute the write will allow. Defaults to zero, which disables throttling.")
cmd.PersistentFlags().StringVar(&writeFlags.RateLimit, "rate-limit", "", "Throttles write, examples: \"5 MB / 5 min\" , \"17kBs\". \"\" (default) disables throttling.")
cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
@ -246,12 +248,16 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
r = csvReader
}
// throttle reader if requested
if writeFlags.RateLimit > 0.0 {
rateLimit, err := ToBytesPerSecond(writeFlags.RateLimit)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
if rateLimit > 0.0 {
// LineReader ensures that original reader is consumed in the smallest possible
// units (at most one protocol line) to avoid bigger pauses in throttling
r = csv2lp.NewLineReader(r)
throttledReader := shapeio.NewReaderWithContext(r, ctx)
throttledReader.SetRateLimit(writeFlags.RateLimit * 1024 * 1024 / 60) // convert from MB/minute to bytes/sec
throttledReader.SetRateLimit(rateLimit)
r = throttledReader
}
@ -374,3 +380,43 @@ func isCharacterDevice(reader io.Reader) bool {
}
return (info.Mode() & os.ModeCharDevice) == os.ModeCharDevice
}
var rateLimitRegexp = regexp.MustCompile(`^(\d*\.?\d*)(B|kB|MB)/?(\d*)?(s|sec|m|min)$`)
var bytesUnitMultiplier = map[string]float64{"B": 1, "kB": 1024, "MB": 1_048_576}
var timeUnitMultiplier = map[string]float64{"s": 1, "sec": 1, "m": 60, "min": 60}
// ToBytesPerSecond converts rate from string to number. The supplied string
// value format must be COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional.
// All spaces are ignored, they can help with formatting. Examples: "5 MB / 5 min", 17kbs. 5.1MB5m.
func ToBytesPerSecond(rateLimit string) (float64, error) {
// ignore all spaces
strVal := strings.ReplaceAll(rateLimit, " ", "")
if len(strVal) == 0 {
return 0, nil
}
matches := rateLimitRegexp.FindStringSubmatch(strVal)
if matches == nil {
return 0, fmt.Errorf("invalid rate limit %q: it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional, rexpexp: %v", strVal, rateLimitRegexp)
}
bytes, err := strconv.ParseFloat(matches[1], 64)
if err != nil {
return 0, fmt.Errorf("invalid rate limit %q: '%v' is not count of bytes: %v", strVal, matches[1], err)
}
bytes = bytes * bytesUnitMultiplier[matches[2]]
var time float64
if len(matches[3]) == 0 {
time = 1 // number is not specified, for example 5kbs or 1Mb/s
} else {
int64Val, err := strconv.ParseUint(matches[3], 10, 32)
if err != nil {
return 0, fmt.Errorf("invalid rate limit %q: time is out of range: %v", strVal, err)
}
if int64Val <= 0 {
return 0, fmt.Errorf("invalid rate limit %q: possitive time expected but %v supplied", strVal, matches[3])
}
time = float64(int64Val)
}
time = time * timeUnitMultiplier[matches[4]]
return bytes / time, nil
}

View File

@ -238,7 +238,7 @@ func Test_writeFlags_createLineReader(t *testing.T) {
name: "read data from CSV file + transform to line protocol + throttle read to 1MB/min",
flags: writeFlagsType{
Files: []string{csvFile1},
RateLimit: 1.0,
RateLimit: "1MBs",
},
lines: []string{
"f1 b=f2,c=f3,d=f4",
@ -254,7 +254,7 @@ func Test_writeFlags_createLineReader(t *testing.T) {
defer closer.Close()
require.Nil(t, err)
require.NotNil(t, reader)
if !test.lpData && test.flags.RateLimit == 0.0 {
if !test.lpData && len(test.flags.RateLimit) == 0 {
csvToLineReader, ok := reader.(*csv2lp.CsvToLineReader)
require.True(t, ok)
require.Equal(t, csvToLineReader.LineNumber, test.firstLineCorrection)
@ -572,3 +572,61 @@ func Test_writeFlags_errorsFile(t *testing.T) {
require.Nil(t, err)
require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n"))
}
func Test_ToBytesPerSecond(t *testing.T) {
var tests = []struct {
in string
out float64
error string
}{
{
in: "5 MB / 5 min",
out: float64(5*1024*1024) / float64(5*60),
},
{
in: "17kBs",
out: float64(17 * 1024),
},
{
in: "1B/m",
out: float64(1) / float64(60),
},
{
in: "1B/2sec",
out: float64(1) / float64(2),
},
{
in: "",
out: 0,
},
{
in: "1B/munite",
error: `invalid rate limit "1B/munite": it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional`,
},
{
in: ".B/s",
error: `invalid rate limit ".B/s": '.' is not count of bytes:`,
},
{
in: "1B0s",
error: `invalid rate limit "1B0s": possitive time expected but 0 supplied`,
},
{
in: "1MB/42949672950s",
error: `invalid rate limit "1MB/42949672950s": time is out of range`,
},
}
for _, test := range tests {
t.Run(test.in, func(t *testing.T) {
bytesPerSec, err := ToBytesPerSecond(test.in)
if len(test.error) == 0 {
require.Equal(t, test.out, bytesPerSec)
require.Nil(t, err)
} else {
require.NotNil(t, err)
// contains is used, since the error messages contains root cause that may evolve with go versions
require.Contains(t, fmt.Sprintf("%s", err), test.error)
}
})
}
}