diff --git a/cmd/influx/write.go b/cmd/influx/write.go index 3abbdb0fa9..47b47ccc0e 100644 --- a/cmd/influx/write.go +++ b/cmd/influx/write.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/csv" "fmt" "io" "log" @@ -38,6 +39,7 @@ type writeFlagsType struct { SkipHeader int IgnoreDataTypeInColumnName bool Encoding string + ErrorsFile string } var writeFlags writeFlagsType @@ -86,6 +88,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command { cmd.PersistentFlags().BoolVar(&writeFlags.IgnoreDataTypeInColumnName, "xIgnoreDataTypeInColumnName", false, "Ignores dataType which could be specified after ':' in column name") cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin") + cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows") cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false) cmdDryRun.Args = cobra.MaximumNArgs(1) @@ -204,6 +207,27 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob } } + // create writer for errors-file, if supplied + var errorsFile *csv.Writer + var rowSkippedListener func(*csv2lp.CsvToLineReader, error, []string) + if writeFlags.ErrorsFile != "" { + writer, err := os.Create(writeFlags.ErrorsFile) + if err != nil { + return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to create %q: %v", writeFlags.ErrorsFile, err) + } + closers = append(closers, writer) + errorsFile = csv.NewWriter(writer) + rowSkippedListener = func(source *csv2lp.CsvToLineReader, lineError error, row []string) { + log.Println(lineError) + errorsFile.Comma = source.Comma() + errorsFile.Write([]string{fmt.Sprintf("# error : %v", lineError)}) + if err := errorsFile.Write(row); err != nil { + log.Printf("Unable to write to error-file: %v\n", err) + } + errorsFile.Flush() // flush is required + } + } + // concatenate readers r := io.MultiReader(readers...) if writeFlags.Format == inputFormatCsv { @@ -213,6 +237,7 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob csvReader.Table.IgnoreDataTypeInColumnName(writeFlags.IgnoreDataTypeInColumnName) // change LineNumber to report file/stdin line numbers properly csvReader.LineNumber = writeFlags.SkipHeader - len(writeFlags.Headers) + csvReader.RowSkipped = rowSkippedListener r = csvReader } return r, csv2lp.MultiCloser(closers...), nil diff --git a/cmd/influx/write_test.go b/cmd/influx/write_test.go index e56198f172..36dee249d0 100644 --- a/cmd/influx/write_test.go +++ b/cmd/influx/write_test.go @@ -56,6 +56,7 @@ func readLines(reader io.Reader) []string { func createTempFile(suffix string, contents []byte) string { file, err := ioutil.TempFile("", "influx_writeTest*."+suffix) + file.Close() // Close immediatelly, since we need only a file name if err != nil { log.Fatal(err) return "unknown.file" @@ -545,3 +546,19 @@ func Test_fluxWriteF(t *testing.T) { require.Equal(t, "stdin3 i=stdin1,j=stdin2,k=stdin4", strings.Trim(string(lineData), "\n")) }) } + +// Test_writeFlags_errorsFile tests that rejected rows are written to errors file +func Test_writeFlags_errorsFile(t *testing.T) { + defer removeTempFiles() + errorsFile := createTempFile("errors", []byte{}) + stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1" + out := bytes.Buffer{} + command := cmdWrite(&globalFlags{}, genericCLIOpts{in: strings.NewReader(stdInContents), w: bufio.NewWriter(&out)}) + command.SetArgs([]string{"dryrun", "--format", "csv", "--errors-file", errorsFile}) + err := command.Execute() + require.Nil(t, err) + require.Equal(t, "m a=1i", strings.Trim(out.String(), "\n")) + errorLines, err := ioutil.ReadFile(errorsFile) + require.Nil(t, err) + require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n")) +}