feat(cmd/influx/write): add --errors-file option #18742

pull/18779/head
Pavel Zavora 2020-06-30 08:11:21 +02:00
parent 05c8a00b8d
commit 90a3a7c8d8
2 changed files with 42 additions and 0 deletions

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"encoding/csv"
"fmt"
"io"
"log"
@ -38,6 +39,7 @@ type writeFlagsType struct {
SkipHeader int
IgnoreDataTypeInColumnName bool
Encoding string
ErrorsFile string
}
var writeFlags writeFlagsType
@ -86,6 +88,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd.PersistentFlags().BoolVar(&writeFlags.IgnoreDataTypeInColumnName, "xIgnoreDataTypeInColumnName", false, "Ignores dataType which could be specified after ':' in column name")
cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice
cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows")
cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
@ -204,6 +207,27 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
}
}
// create writer for errors-file, if supplied
var errorsFile *csv.Writer
var rowSkippedListener func(*csv2lp.CsvToLineReader, error, []string)
if writeFlags.ErrorsFile != "" {
writer, err := os.Create(writeFlags.ErrorsFile)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to create %q: %v", writeFlags.ErrorsFile, err)
}
closers = append(closers, writer)
errorsFile = csv.NewWriter(writer)
rowSkippedListener = func(source *csv2lp.CsvToLineReader, lineError error, row []string) {
log.Println(lineError)
errorsFile.Comma = source.Comma()
errorsFile.Write([]string{fmt.Sprintf("# error : %v", lineError)})
if err := errorsFile.Write(row); err != nil {
log.Printf("Unable to write to error-file: %v\n", err)
}
errorsFile.Flush() // flush is required
}
}
// concatenate readers
r := io.MultiReader(readers...)
if writeFlags.Format == inputFormatCsv {
@ -213,6 +237,7 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
csvReader.Table.IgnoreDataTypeInColumnName(writeFlags.IgnoreDataTypeInColumnName)
// change LineNumber to report file/stdin line numbers properly
csvReader.LineNumber = writeFlags.SkipHeader - len(writeFlags.Headers)
csvReader.RowSkipped = rowSkippedListener
r = csvReader
}
return r, csv2lp.MultiCloser(closers...), nil

View File

@ -56,6 +56,7 @@ func readLines(reader io.Reader) []string {
func createTempFile(suffix string, contents []byte) string {
file, err := ioutil.TempFile("", "influx_writeTest*."+suffix)
file.Close() // Close immediatelly, since we need only a file name
if err != nil {
log.Fatal(err)
return "unknown.file"
@ -545,3 +546,19 @@ func Test_fluxWriteF(t *testing.T) {
require.Equal(t, "stdin3 i=stdin1,j=stdin2,k=stdin4", strings.Trim(string(lineData), "\n"))
})
}
// Test_writeFlags_errorsFile tests that rejected rows are written to errors file
func Test_writeFlags_errorsFile(t *testing.T) {
defer removeTempFiles()
errorsFile := createTempFile("errors", []byte{})
stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1"
out := bytes.Buffer{}
command := cmdWrite(&globalFlags{}, genericCLIOpts{in: strings.NewReader(stdInContents), w: bufio.NewWriter(&out)})
command.SetArgs([]string{"dryrun", "--format", "csv", "--errors-file", errorsFile})
err := command.Execute()
require.Nil(t, err)
require.Equal(t, "m a=1i", strings.Trim(out.String(), "\n"))
errorLines, err := ioutil.ReadFile(errorsFile)
require.Nil(t, err)
require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n"))
}