diff --git a/cmd/influx/write.go b/cmd/influx/write.go index cf28586993..3798e559ef 100644 --- a/cmd/influx/write.go +++ b/cmd/influx/write.go @@ -9,6 +9,8 @@ import ( "net/http" "net/url" "os" + "regexp" + "strconv" "strings" "github.com/fujiwara/shapeio" @@ -42,7 +44,7 @@ type writeFlagsType struct { IgnoreDataTypeInColumnName bool Encoding string ErrorsFile string - RateLimit float64 + RateLimit string } var writeFlags writeFlagsType @@ -93,7 +95,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command { cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin") cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to") - cmd.PersistentFlags().Float64Var(&writeFlags.RateLimit, "rate-limit", 0.0, "How many megabytes per minute the write will allow. Defaults to zero, which disables throttling.") + cmd.PersistentFlags().StringVar(&writeFlags.RateLimit, "rate-limit", "", "Throttles write, examples: \"5 MB / 5 min\" , \"17kBs\". \"\" (default) disables throttling.") cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false) cmdDryRun.Args = cobra.MaximumNArgs(1) @@ -246,12 +248,16 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob r = csvReader } // throttle reader if requested - if writeFlags.RateLimit > 0.0 { + rateLimit, err := ToBytesPerSecond(writeFlags.RateLimit) + if err != nil { + return nil, csv2lp.MultiCloser(closers...), err + } + if rateLimit > 0.0 { // LineReader ensures that original reader is consumed in the smallest possible // units (at most one protocol line) to avoid bigger pauses in throttling r = csv2lp.NewLineReader(r) throttledReader := shapeio.NewReaderWithContext(r, ctx) - throttledReader.SetRateLimit(writeFlags.RateLimit * 1024 * 1024 / 60) // convert from MB/minute to bytes/sec + throttledReader.SetRateLimit(rateLimit) r = throttledReader } @@ -374,3 +380,43 @@ func isCharacterDevice(reader io.Reader) bool { } return (info.Mode() & os.ModeCharDevice) == os.ModeCharDevice } + +var rateLimitRegexp = regexp.MustCompile(`^(\d*\.?\d*)(B|kB|MB)/?(\d*)?(s|sec|m|min)$`) +var bytesUnitMultiplier = map[string]float64{"B": 1, "kB": 1024, "MB": 1_048_576} +var timeUnitMultiplier = map[string]float64{"s": 1, "sec": 1, "m": 60, "min": 60} + +// ToBytesPerSecond converts rate from string to number. The supplied string +// value format must be COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional. +// All spaces are ignored, they can help with formatting. Examples: "5 MB / 5 min", 17kbs. 5.1MB5m. +func ToBytesPerSecond(rateLimit string) (float64, error) { + // ignore all spaces + strVal := strings.ReplaceAll(rateLimit, " ", "") + if len(strVal) == 0 { + return 0, nil + } + + matches := rateLimitRegexp.FindStringSubmatch(strVal) + if matches == nil { + return 0, fmt.Errorf("invalid rate limit %q: it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional, rexpexp: %v", strVal, rateLimitRegexp) + } + bytes, err := strconv.ParseFloat(matches[1], 64) + if err != nil { + return 0, fmt.Errorf("invalid rate limit %q: '%v' is not count of bytes: %v", strVal, matches[1], err) + } + bytes = bytes * bytesUnitMultiplier[matches[2]] + var time float64 + if len(matches[3]) == 0 { + time = 1 // number is not specified, for example 5kbs or 1Mb/s + } else { + int64Val, err := strconv.ParseUint(matches[3], 10, 32) + if err != nil { + return 0, fmt.Errorf("invalid rate limit %q: time is out of range: %v", strVal, err) + } + if int64Val <= 0 { + return 0, fmt.Errorf("invalid rate limit %q: possitive time expected but %v supplied", strVal, matches[3]) + } + time = float64(int64Val) + } + time = time * timeUnitMultiplier[matches[4]] + return bytes / time, nil +} diff --git a/cmd/influx/write_test.go b/cmd/influx/write_test.go index 598020daf1..5d687ee385 100644 --- a/cmd/influx/write_test.go +++ b/cmd/influx/write_test.go @@ -238,7 +238,7 @@ func Test_writeFlags_createLineReader(t *testing.T) { name: "read data from CSV file + transform to line protocol + throttle read to 1MB/min", flags: writeFlagsType{ Files: []string{csvFile1}, - RateLimit: 1.0, + RateLimit: "1MBs", }, lines: []string{ "f1 b=f2,c=f3,d=f4", @@ -254,7 +254,7 @@ func Test_writeFlags_createLineReader(t *testing.T) { defer closer.Close() require.Nil(t, err) require.NotNil(t, reader) - if !test.lpData && test.flags.RateLimit == 0.0 { + if !test.lpData && len(test.flags.RateLimit) == 0 { csvToLineReader, ok := reader.(*csv2lp.CsvToLineReader) require.True(t, ok) require.Equal(t, csvToLineReader.LineNumber, test.firstLineCorrection) @@ -572,3 +572,61 @@ func Test_writeFlags_errorsFile(t *testing.T) { require.Nil(t, err) require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n")) } + +func Test_ToBytesPerSecond(t *testing.T) { + var tests = []struct { + in string + out float64 + error string + }{ + { + in: "5 MB / 5 min", + out: float64(5*1024*1024) / float64(5*60), + }, + { + in: "17kBs", + out: float64(17 * 1024), + }, + { + in: "1B/m", + out: float64(1) / float64(60), + }, + { + in: "1B/2sec", + out: float64(1) / float64(2), + }, + { + in: "", + out: 0, + }, + { + in: "1B/munite", + error: `invalid rate limit "1B/munite": it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional`, + }, + { + in: ".B/s", + error: `invalid rate limit ".B/s": '.' is not count of bytes:`, + }, + { + in: "1B0s", + error: `invalid rate limit "1B0s": possitive time expected but 0 supplied`, + }, + { + in: "1MB/42949672950s", + error: `invalid rate limit "1MB/42949672950s": time is out of range`, + }, + } + for _, test := range tests { + t.Run(test.in, func(t *testing.T) { + bytesPerSec, err := ToBytesPerSecond(test.in) + if len(test.error) == 0 { + require.Equal(t, test.out, bytesPerSec) + require.Nil(t, err) + } else { + require.NotNil(t, err) + // contains is used, since the error messages contains root cause that may evolve with go versions + require.Contains(t, fmt.Sprintf("%s", err), test.error) + } + }) + } +}