diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ae7c66e08..c75d36ffca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ need to update any InfluxDB CLI config profiles with the new port number. ### Features +1. [18779](https://github.com/influxdata/influxdb/pull/18779): Add new processing options and enhancements to influx write. 1. [19246](https://github.com/influxdata/influxdb/pull/19246): Redesign load data page to increase discovery and ease of use 1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command 1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset. diff --git a/cmd/influx/write.go b/cmd/influx/write.go index 3abbdb0fa9..888bd7a2e8 100644 --- a/cmd/influx/write.go +++ b/cmd/influx/write.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/csv" "fmt" "io" "log" @@ -38,6 +39,7 @@ type writeFlagsType struct { SkipHeader int IgnoreDataTypeInColumnName bool Encoding string + ErrorsFile string } var writeFlags writeFlagsType @@ -86,6 +88,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command { cmd.PersistentFlags().BoolVar(&writeFlags.IgnoreDataTypeInColumnName, "xIgnoreDataTypeInColumnName", false, "Ignores dataType which could be specified after ':' in column name") cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin") + cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to") cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false) cmdDryRun.Args = cobra.MaximumNArgs(1) @@ -204,6 +207,27 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob } } + // create writer for errors-file, if supplied + var errorsFile *csv.Writer + var rowSkippedListener func(*csv2lp.CsvToLineReader, error, []string) + if writeFlags.ErrorsFile != "" { + writer, err := os.Create(writeFlags.ErrorsFile) + if err != nil { + return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to create %q: %v", writeFlags.ErrorsFile, err) + } + closers = append(closers, writer) + errorsFile = csv.NewWriter(writer) + rowSkippedListener = func(source *csv2lp.CsvToLineReader, lineError error, row []string) { + log.Println(lineError) + errorsFile.Comma = source.Comma() + errorsFile.Write([]string{fmt.Sprintf("# error : %v", lineError)}) + if err := errorsFile.Write(row); err != nil { + log.Printf("Unable to write to error-file: %v\n", err) + } + errorsFile.Flush() // flush is required + } + } + // concatenate readers r := io.MultiReader(readers...) if writeFlags.Format == inputFormatCsv { @@ -213,6 +237,7 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob csvReader.Table.IgnoreDataTypeInColumnName(writeFlags.IgnoreDataTypeInColumnName) // change LineNumber to report file/stdin line numbers properly csvReader.LineNumber = writeFlags.SkipHeader - len(writeFlags.Headers) + csvReader.RowSkipped = rowSkippedListener r = csvReader } return r, csv2lp.MultiCloser(closers...), nil diff --git a/cmd/influx/write_test.go b/cmd/influx/write_test.go index e56198f172..36dee249d0 100644 --- a/cmd/influx/write_test.go +++ b/cmd/influx/write_test.go @@ -56,6 +56,7 @@ func readLines(reader io.Reader) []string { func createTempFile(suffix string, contents []byte) string { file, err := ioutil.TempFile("", "influx_writeTest*."+suffix) + file.Close() // Close immediatelly, since we need only a file name if err != nil { log.Fatal(err) return "unknown.file" @@ -545,3 +546,19 @@ func Test_fluxWriteF(t *testing.T) { require.Equal(t, "stdin3 i=stdin1,j=stdin2,k=stdin4", strings.Trim(string(lineData), "\n")) }) } + +// Test_writeFlags_errorsFile tests that rejected rows are written to errors file +func Test_writeFlags_errorsFile(t *testing.T) { + defer removeTempFiles() + errorsFile := createTempFile("errors", []byte{}) + stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1" + out := bytes.Buffer{} + command := cmdWrite(&globalFlags{}, genericCLIOpts{in: strings.NewReader(stdInContents), w: bufio.NewWriter(&out)}) + command.SetArgs([]string{"dryrun", "--format", "csv", "--errors-file", errorsFile}) + err := command.Execute() + require.Nil(t, err) + require.Equal(t, "m a=1i", strings.Trim(out.String(), "\n")) + errorLines, err := ioutil.ReadFile(errorsFile) + require.Nil(t, err) + require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n")) +} diff --git a/pkg/csv2lp/README.md b/pkg/csv2lp/README.md index fdee92859e..42a07940a5 100644 --- a/pkg/csv2lp/README.md +++ b/pkg/csv2lp/README.md @@ -139,6 +139,11 @@ Existing [data types](https://v2.docs.influxdata.com/v2.0/reference/syntax/annot - `#constant` annotation adds a constant column to the data, so you can set measurement, time, field or tag of every row you import - the format of a constant annotation row is `#constant,datatype,name,value`', it contains supported datatype, a column name, and a constant value - _column name_ can be omitted for _dateTime_ or _measurement_ columns, so the annotation can be simply `#constant,measurement,cpu` +- `#concat` annotation adds a new column that is concatenated from existing columns according to a template + - the format of a concat annotation row is `#concat,datatype,name,template`', it contains supported datatype, a column name, and a template value + - the `template` is a string with `${columnName}` placeholders, in which the placeholders are replaced by values of existing columns + - for example: `#concat,string,fullName,${firstName} ${lastName}` + - _column name_ can be omitted for _dateTime_ or _measurement_ columns - `#timezone` annotation specifies the time zone of the data using an offset, which is either `+hhmm` or `-hhmm` or `Local` to use the local/computer time zone. Examples: _#timezone,+0100_ _#timezone -0500_ _#timezone Local_ #### Data type with data format @@ -158,6 +163,9 @@ All data types can include the format that is used to parse column data. It is t - note that you have to quote column delimiters whenever they appear in a CSV column value, for example: - `#constant,"double:,.",myColumn,"1.234,011"` - `long:format` and `unsignedLong:format` support the same format as `double`, but everything after and including a fraction character is ignored + - the format can be prepended with `strict` to fail when a fraction digit is present, for example: + - `1000.000` is `1000` when parsed as `long`, but fails when parsed as `long:strict` + - `1_000,000` is `1000` when parsed as `long:,_`, but fails when parsed as `long:strict,_` - `boolean:truthy:falsy` - `truthy` and `falsy` are comma-separated lists of values, they can be empty to assume all values as truthy/falsy; for example `boolean:sí,yes,ja,oui,ano,да:no,nein,non,ne,нет` - a `boolean` data type (without the format) parses column values that start with any of _tTyY1_ as `true` values, _fFnN0_ as `false` values and fails on other values diff --git a/pkg/csv2lp/csv2lp.go b/pkg/csv2lp/csv2lp.go index 25bf3f7c9a..4299b577ec 100644 --- a/pkg/csv2lp/csv2lp.go +++ b/pkg/csv2lp/csv2lp.go @@ -17,7 +17,21 @@ type CsvLineError struct { } func (e CsvLineError) Error() string { - return fmt.Sprintf("line %d: %v", e.Line, e.Err) + if e.Line > 0 { + return fmt.Sprintf("line %d: %v", e.Line, e.Err) + } + return fmt.Sprintf("%v", e.Err) +} + +// CreateRowColumnError wraps an existing error to add line and column coordinates +func CreateRowColumnError(line int, columnLabel string, err error) CsvLineError { + return CsvLineError{ + Line: line, + Err: CsvColumnError{ + Column: columnLabel, + Err: err, + }, + } } // CsvToLineReader represents state of transformation from csv data to lien protocol reader @@ -34,6 +48,8 @@ type CsvToLineReader struct { dataRowAdded bool // log CSV data errors to sterr and continue with CSV processing skipRowOnError bool + // RowSkipped is called when a row is skipped because of data parsing error + RowSkipped func(source *CsvToLineReader, lineError error, row []string) // reader results buffer []byte @@ -54,6 +70,11 @@ func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader { return state } +// Comma returns a field delimiter used in an input CSV file +func (state *CsvToLineReader) Comma() rune { + return state.csv.Comma +} + // Read implements io.Reader that returns protocol lines func (state *CsvToLineReader) Read(p []byte) (n int, err error) { // state1: finished @@ -98,13 +119,17 @@ func (state *CsvToLineReader) Read(p []byte) (n int, err error) { if state.Table.AddRow(row) { var err error state.lineBuffer = state.lineBuffer[:0] // reuse line buffer - state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row) + state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row, state.LineNumber) if !state.dataRowAdded && state.logTableDataColumns { log.Println(state.Table.DataColumnsInfo()) } state.dataRowAdded = true if err != nil { lineError := CsvLineError{state.LineNumber, err} + if state.RowSkipped != nil { + state.RowSkipped(state, lineError, row) + continue + } if state.skipRowOnError { log.Println(lineError) continue diff --git a/pkg/csv2lp/csv2lp_test.go b/pkg/csv2lp/csv2lp_test.go index 01d5885367..4f091efb51 100644 --- a/pkg/csv2lp/csv2lp_test.go +++ b/pkg/csv2lp/csv2lp_test.go @@ -204,6 +204,68 @@ func Test_CsvToLineProtocol_SkipRowOnError(t *testing.T) { require.Equal(t, messages, 2) } +// Test_CsvToLineProtocol_RowSkipped tests that error rows are reported to configured RowSkipped listener +func Test_CsvToLineProtocol_RowSkipped(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + }() + + type ActualArguments = struct { + src *CsvToLineReader + err error + row []string + } + type ExpectedArguments = struct { + errorString string + row []string + } + + csv := "sep=;\n_measurement;a|long:strict\n;1\ncpu;2.1\ncpu;3a\n" + calledArgs := []ActualArguments{} + expectedArgs := []ExpectedArguments{ + { + "line 3: column '_measurement': no measurement supplied", + []string{"", "1"}, + }, + { + "line 4: column 'a': '2.1' cannot fit into long data type", + []string{"cpu", "2.1"}, + }, + { + "line 5: column 'a': strconv.ParseInt:", + []string{"cpu", "3a"}, + }, + } + + reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true) + reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) { + // make a copy of _row + row := make([]string, len(_row)) + copy(row, _row) + // remember for comparison + calledArgs = append(calledArgs, ActualArguments{ + src, err, row, + }) + } + // read all the data + ioutil.ReadAll(reader) + + out := buf.String() + require.Empty(t, out, "No log messages expected because RowSkipped handler is set") + + require.Len(t, calledArgs, 3) + for i, expected := range expectedArgs { + require.Equal(t, reader, calledArgs[i].src) + require.Contains(t, calledArgs[i].err.Error(), expected.errorString) + require.Equal(t, expected.row, calledArgs[i].row) + } +} + // Test_CsvLineError tests CsvLineError error format func Test_CsvLineError(t *testing.T) { var tests = []struct { @@ -218,6 +280,10 @@ func Test_CsvLineError(t *testing.T) { CsvLineError{Line: 2, Err: CsvColumnError{"a", errors.New("cause")}}, "line 2: column 'a': cause", }, + { + CsvLineError{Line: -1, Err: CsvColumnError{"a", errors.New("cause")}}, + "column 'a': cause", + }, } for _, test := range tests { require.Equal(t, test.value, test.err.Error()) diff --git a/pkg/csv2lp/csv_annotations.go b/pkg/csv2lp/csv_annotations.go index 8b6fa12f8d..43ae923f6d 100644 --- a/pkg/csv2lp/csv_annotations.go +++ b/pkg/csv2lp/csv_annotations.go @@ -2,6 +2,7 @@ package csv2lp import ( "fmt" + "log" "regexp" "strconv" "strings" @@ -33,9 +34,8 @@ func (a annotationComment) matches(comment string) bool { return strings.HasPrefix(strings.ToLower(comment), a.prefix) } -// constantSetupTable setups the supplied CSV table from #constant annotation -func constantSetupTable(table *CsvTable, row []string) error { - // adds a virtual column with contsant value to all data rows +func createConstantOrConcatColumn(table *CsvTable, row []string, annotationName string) CsvTableColumn { + // adds a virtual column with constant value to all data rows // supported types of constant annotation rows are: // 1. "#constant,datatype,label,defaultValue" // 2. "#constant,measurement,value" @@ -72,17 +72,61 @@ func constantSetupTable(table *CsvTable, row []string) error { if col.DefaultValue == "" && col.Label != "" { // type 2,3,5,6 col.DefaultValue = col.Label - col.Label = "#constant " + col.DataType + col.Label = annotationName + " " + col.DataType } else if col.Label == "" { - // setup a label if no label is supplied fo focused error messages - col.Label = "#constant " + col.DataType + // setup a label if no label is supplied for focused error messages + col.Label = annotationName + " " + col.DataType } } // add a virtual column to the table + return col +} + +// constantSetupTable setups the supplied CSV table from #constant annotation +func constantSetupTable(table *CsvTable, row []string) error { + col := createConstantOrConcatColumn(table, row, "#constant") + // add a virtual column to the table table.extraColumns = append(table.extraColumns, &col) return nil } +// computedReplacer is used to replace value in computed columns +var computedReplacer *regexp.Regexp = regexp.MustCompile(`\$\{[^}]+\}`) + +// concatSetupTable setups the supplied CSV table from #concat annotation +func concatSetupTable(table *CsvTable, row []string) error { + col := createConstantOrConcatColumn(table, row, "#concat") + template := col.DefaultValue + col.ComputeValue = func(row []string) string { + return computedReplacer.ReplaceAllStringFunc(template, func(text string) string { + columnLabel := text[2 : len(text)-1] // ${columnLabel} + if placeholderColumn := table.Column(columnLabel); placeholderColumn != nil { + return placeholderColumn.Value(row) + } + log.Printf("WARNING: column %s: column '%s' cannot be replaced, no such column available", col.Label, columnLabel) + return "" + }) + } + // add a virtual column to the table + table.extraColumns = append(table.extraColumns, &col) + // add validator to report error when no placeholder column is available + table.validators = append(table.validators, func(table *CsvTable) error { + placeholders := computedReplacer.FindAllString(template, len(template)) + for _, placeholder := range placeholders { + columnLabel := placeholder[2 : len(placeholder)-1] // ${columnLabel} + if placeholderColumn := table.Column(columnLabel); placeholderColumn == nil { + return CsvColumnError{ + Column: col.Label, + Err: fmt.Errorf("'%s' references an uknown column '%s', available columns are: %v", + template, columnLabel, strings.Join(table.ColumnLabels(), ",")), + } + } + } + return nil + }) + return nil +} + // supportedAnnotations contains all supported CSV annotations comments var supportedAnnotations = []annotationComment{ { @@ -131,6 +175,10 @@ var supportedAnnotations = []annotationComment{ return nil }, }, + { + prefix: "#concat", + setupTable: concatSetupTable, + }, } // ignoreLeadingComment returns a value without '#anyComment ' prefix diff --git a/pkg/csv2lp/csv_annotations_test.go b/pkg/csv2lp/csv_annotations_test.go index 0b4e9e1a29..2313f856ba 100644 --- a/pkg/csv2lp/csv_annotations_test.go +++ b/pkg/csv2lp/csv_annotations_test.go @@ -1,7 +1,10 @@ package csv2lp import ( + "bytes" "fmt" + "log" + "os" "strconv" "strings" "testing" @@ -140,6 +143,93 @@ func Test_ConstantAnnotation(t *testing.T) { } } +// Test_ConcatAnnotation tests #concat annotation +func Test_ConcatAnnotation(t *testing.T) { + subject := annotation("#concat") + require.True(t, subject.matches("#Concat")) + require.True(t, subject.isTableAnnotation()) + var tests = []struct { + value []string + expectLabel string + expectValue string + expectLinePart int + }{ + // all possible specifications + {[]string{"#concat "}, "", "", 0}, // means literally nothing + {[]string{"#concat measurement", "a"}, "_", "a", linePartMeasurement}, + {[]string{"#concat measurement", "a", "b"}, "_", "b", linePartMeasurement}, + {[]string{"#concat measurement", "a", ""}, "_", "a", linePartMeasurement}, + {[]string{"#concat tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag}, + {[]string{"#concat", "tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag}, + {[]string{"#concat field", "fName", "fVal"}, "fName", "fVal", linePartField}, + {[]string{"#concat", "field", "fName", "fVal"}, "fName", "fVal", linePartField}, + {[]string{"dateTime", "1"}, "_", "1", linePartTime}, + {[]string{"dateTime", "1", "2"}, "_", "2", linePartTime}, + {[]string{"dateTime", "", "2"}, "_", "2", linePartTime}, + {[]string{"dateTime", "3", ""}, "_", "3", linePartTime}, + {[]string{"long", "fN", "fV"}, "fN", "fV", 0}, + // concat values + {[]string{"string", "fN", "$-${b}-${a}"}, "fN", "$-2-1", 0}, + } + exampleRow := []string{"1", "2"} + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + table := &CsvTable{columns: []*CsvTableColumn{ + {Label: "a", Index: 0}, + {Label: "b", Index: 1}, + }} + subject.setupTable(table, test.value) + // validator + require.Equal(t, 1, len(table.validators)) + require.Equal(t, table.validators[0](table), nil) + // columns + require.Equal(t, 1, len(table.extraColumns)) + col := table.extraColumns[0] + require.Equal(t, test.expectLinePart, col.LinePart) + require.Greater(t, 0, col.Index) + if test.expectLabel != "_" { + require.Equal(t, test.expectLabel, col.Label) + } else { + require.NotEqual(t, "", col.Label) + } + require.Equal(t, test.expectValue, col.Value(exampleRow)) + }) + } + t.Run("concat template references unknown column", func(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + table := &CsvTable{columns: []*CsvTableColumn{ + {Label: "x", Index: 0}, + }} + subject.setupTable(table, []string{"string", "fN", "a${y}-${x}z"}) + require.Equal(t, 1, len(table.validators)) + require.NotNil(t, table.validators[0](table)) + require.Equal(t, + "column 'fN': 'a${y}-${x}z' references an uknown column 'y', available columns are: x", + table.validators[0](table).Error()) + // columns + require.Equal(t, 1, len(table.extraColumns)) + col := table.extraColumns[0] + require.Greater(t, 0, col.Index) + require.Equal(t, "a-1z", col.Value(exampleRow)) + // a warning is printed to console + require.Equal(t, + "::PREFIX::WARNING: column fN: column 'y' cannot be replaced, no such column available", + strings.TrimSpace(buf.String())) + }) +} + // Test_TimeZoneAnnotation tests #timezone annotation func Test_TimeZoneAnnotation(t *testing.T) { subject := annotation("#timezone") diff --git a/pkg/csv2lp/csv_table.go b/pkg/csv2lp/csv_table.go index 9c98fd2574..af720ed7cd 100644 --- a/pkg/csv2lp/csv_table.go +++ b/pkg/csv2lp/csv_table.go @@ -46,7 +46,9 @@ type CsvTableColumn struct { // TimeZone of dateTime column, applied when parsing dateTime DataType TimeZone *time.Location // ParseF is an optional function used to convert column's string value to interface{} - ParseF func(string) (interface{}, error) + ParseF func(value string) (interface{}, error) + // ComputeValue is an optional function used to compute column value out of row data + ComputeValue func(row []string) string // escapedLabel contains escaped label that can be directly used in line protocol escapedLabel string @@ -63,6 +65,9 @@ func (c *CsvTableColumn) LineLabel() string { // Value returns the value of the column for the supplied row func (c *CsvTableColumn) Value(row []string) string { if c.Index < 0 || c.Index >= len(row) { + if c.ComputeValue != nil { + return c.ComputeValue(row) + } return c.DefaultValue } val := row[c.Index] @@ -126,9 +131,18 @@ func (c *CsvTableColumn) setupDataType(columnValue string) { // setup column data type c.DataType = columnValue - // setup custom parsing of bool data type + // setup custom parsing if c.DataType == boolDatatype && c.DataFormat != "" { c.ParseF = createBoolParseFn(c.DataFormat) + return + } + if c.DataType == longDatatype && strings.HasPrefix(c.DataFormat, "strict") { + c.ParseF = createStrictLongParseFn(c.DataFormat[6:]) + return + } + if c.DataType == uLongDatatype && strings.HasPrefix(c.DataFormat, "strict") { + c.ParseF = createStrictUnsignedLongParseFn(c.DataFormat[6:]) + return } } @@ -163,6 +177,8 @@ type CsvTable struct { ignoreDataTypeInColumnName bool // timeZone of dateTime column(s), applied when parsing dateTime value without a time zone specified timeZone *time.Location + // validators validate table structure right before processing data rows + validators []func(*CsvTable) error /* cached columns are initialized before reading the data rows using the computeLineProtocolColumns fn */ // cachedMeasurement is a required column that read (line protocol) measurement @@ -193,6 +209,7 @@ func (t *CsvTable) DataColumnsInfo() string { return "" } var builder = strings.Builder{} + t.computeLineProtocolColumns() // censure that ached columns are initialized builder.WriteString(fmt.Sprintf("CsvTable{ dataColumns: %d constantColumns: %d\n", len(t.columns), len(t.extraColumns))) builder.WriteString(fmt.Sprintf(" measurement: %+v\n", t.cachedMeasurement)) for _, col := range t.cachedTags { @@ -391,7 +408,7 @@ func (t *CsvTable) recomputeLineProtocolColumns() { // CreateLine produces a protocol line out of the supplied row or returns error func (t *CsvTable) CreateLine(row []string) (line string, err error) { buffer := make([]byte, 100)[:0] - buffer, err = t.AppendLine(buffer, row) + buffer, err = t.AppendLine(buffer, row, -1) if err != nil { return "", err } @@ -399,7 +416,7 @@ func (t *CsvTable) CreateLine(row []string) (line string, err error) { } // AppendLine appends a protocol line to the supplied buffer using a CSV row and returns appended buffer or an error if any -func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) { +func (t *CsvTable) AppendLine(buffer []byte, row []string, lineNumber int) ([]byte, error) { if t.computeLineProtocolColumns() { // validate column data types if t.cachedFieldValue != nil && !IsTypeSupported(t.cachedFieldValue.DataType) { @@ -416,6 +433,11 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) { } } } + for _, v := range t.validators { + if err := v(t); err != nil { + return buffer, err + } + } } if t.cachedMeasurement == nil { @@ -447,7 +469,7 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) { buffer = append(buffer, escapeTag(field)...) buffer = append(buffer, '=') var err error - buffer, err = appendConverted(buffer, value, t.cachedFieldValue) + buffer, err = appendConverted(buffer, value, t.cachedFieldValue, lineNumber) if err != nil { return buffer, CsvColumnError{ t.cachedFieldName.Label, @@ -468,7 +490,7 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) { buffer = append(buffer, field.LineLabel()...) buffer = append(buffer, '=') var err error - buffer, err = appendConverted(buffer, value, field) + buffer, err = appendConverted(buffer, value, field, lineNumber) if err != nil { return buffer, CsvColumnError{ field.Label, @@ -491,7 +513,7 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) { } buffer = append(buffer, ' ') var err error - buffer, err = appendConverted(buffer, timeVal, t.cachedTime) + buffer, err = appendConverted(buffer, timeVal, t.cachedTime, lineNumber) if err != nil { return buffer, CsvColumnError{ t.cachedTime.Label, @@ -518,6 +540,15 @@ func (t *CsvTable) Columns() []*CsvTableColumn { return t.columns } +// ColumnLabels returns available columns labels +func (t *CsvTable) ColumnLabels() []string { + labels := make([]string, len(t.columns)) + for i, col := range t.columns { + labels[i] = col.Label + } + return labels +} + // Measurement returns measurement column or nil func (t *CsvTable) Measurement() *CsvTableColumn { t.computeLineProtocolColumns() diff --git a/pkg/csv2lp/csv_table_test.go b/pkg/csv2lp/csv_table_test.go index 70d93f47e1..45585fecf2 100644 --- a/pkg/csv2lp/csv_table_test.go +++ b/pkg/csv2lp/csv_table_test.go @@ -104,6 +104,7 @@ func Test_CsvTable_FluxQueryResult(t *testing.T) { require.Equal(t, table.Tags()[0].Label, "cpu") require.Equal(t, table.Tags()[1].Label, "host") require.Equal(t, len(table.Fields()), 0) + require.Contains(t, table.ColumnLabels(), "_measurement") } } } @@ -332,46 +333,108 @@ func Test_ConstantAnnotations(t *testing.T) { } } +// Test_ConcatAnnotations tests processing of concat annotations +func Test_ConcatAnnotations(t *testing.T) { + var tests = []struct { + name string + csv string + line string + }{ + { + "measurement_1", + "#concat measurement,cpu\n" + + "a,b\n" + + "1,1", + "cpu a=1,b=1", + }, + { + "measurement_2", + "#concat,measurement,${a}${b}\n" + + "#constant,tag,cpu,cpu1\n" + + "#constant,long,of,0\n" + + "#constant,dateTime,,2\n" + + "a,b\n" + + "1,1", + "11,cpu=cpu1 a=1,b=1,of=0i 2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + var lines []string + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + line, err := table.CreateLine(row) + if err != nil && test.line != "" { + require.Nil(t, err.Error()) + } + lines = append(lines, line) + } + } + require.Equal(t, []string{test.line}, lines) + }) + } +} + // Test_DataTypeInColumnName tests specification of column data type in the header row func Test_DataTypeInColumnName(t *testing.T) { var tests = []struct { csv string line string ignoreDataTypeInColumnName bool + error string }{ { - "m|measurement,b|boolean:x:,c|boolean:x:|x\n" + + csv: "m|measurement,b|boolean:x:,c|boolean:x:|x\n" + "cpu,,", - `cpu c=true`, - false, + line: `cpu c=true`, }, { - "m|measurement,a|boolean,b|boolean:0:1,c|boolean:x:,d|boolean:x:\n" + + csv: "m|measurement,a|boolean,b|boolean:0:1,c|boolean:x:,d|boolean:x:\n" + "cpu,1,1,x,y", - `cpu a=true,b=false,c=true,d=false`, - false, + line: `cpu a=true,b=false,c=true,d=false`, }, { - "#constant measurement,cpu\n" + + csv: "#constant measurement,cpu\n" + "a|long,b|string\n" + "1,1", - `cpu a=1i,b="1"`, - false, + line: `cpu a=1i,b="1"`, }, { - "#constant measurement,cpu\n" + + csv: "#constant measurement,cpu\n" + "a|long,b|string\n" + "1,1", - `cpu a|long=1,b|string=1`, - true, + line: `cpu a|long=1,b|string=1`, + ignoreDataTypeInColumnName: true, }, { - "#constant measurement,cpu\n" + + csv: "#constant measurement,cpu\n" + "#datatype long,string\n" + "a|long,b|string\n" + "1,1", - `cpu a|long=1i,b|string="1"`, - true, + line: `cpu a|long=1i,b|string="1"`, + ignoreDataTypeInColumnName: true, + }, + { + csv: "#constant measurement,cpu\n" + + "a|long:strict: ,b|unsignedLong:strict: \n" + + "1 2,1 2", + line: `cpu a=12i,b=12u`, + }, + { + csv: "#constant measurement,cpu\n" + + "a|long:strict\n" + + "1.1,1", + error: "column 'a': '1.1' cannot fit into long data type", + }, + { + csv: "#constant measurement,cpu\n" + + "a|unsignedLong:strict\n" + + "1.1,1", + error: "column 'a': '1.1' cannot fit into unsignedLong data type", }, } @@ -385,8 +448,12 @@ func Test_DataTypeInColumnName(t *testing.T) { rowProcessed := table.AddRow(row) if rowProcessed { line, err := table.CreateLine(row) - if err != nil && test.line != "" { - require.Nil(t, err.Error()) + if err != nil { + if test.error == "" { + require.Nil(t, err.Error()) + } else { + require.Equal(t, test.error, err.Error()) + } } lines = append(lines, line) } @@ -434,6 +501,10 @@ func Test_CsvTable_dataErrors(t *testing.T) { "error_no_measurement_data", "_measurement,col1\n,2", }, + { + "error_derived_column_missing reference", + "#concat string,d,${col1}${col2}\n_measurement,col1\nm,2", + }, } for _, test := range tests { diff --git a/pkg/csv2lp/data_conversion.go b/pkg/csv2lp/data_conversion.go index d11f8e4b56..12a036afb3 100644 --- a/pkg/csv2lp/data_conversion.go +++ b/pkg/csv2lp/data_conversion.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "log" "math" "strconv" "strings" @@ -82,15 +83,16 @@ func escapeString(val string) string { return val } -// normalizeNumberString normalizes the supplied value with the help of the format supplied. +// normalizeNumberString normalizes the supplied value according to the supplied format. // This normalization is intended to convert number strings of different locales to a strconv-parseable value. // // The format's first character is a fraction delimiter character. Next characters in the format // are simply removed, they are typically used to visually separate groups in large numbers. -// The removeFaction parameter controls whether the returned value can contain also the fraction part. +// The removeFraction parameter controls whether the returned value can contain also the fraction part. +// An empty format means ". \n\t\r_" // // For example, to get a strconv-parseable float from a Spanish value '3.494.826.157,123', use format ",." . -func normalizeNumberString(value string, format string, removeFraction bool) string { +func normalizeNumberString(value string, format string, removeFraction bool) (normalized string, truncated bool) { if len(format) == 0 { format = ". \n\t\r_" } @@ -110,20 +112,20 @@ func normalizeNumberString(value string, format string, removeFraction bool) str } if c == fractionRune { if removeFraction { - break ForAllCharacters + return retVal.String(), true } retVal.WriteByte('.') - } else { - retVal.WriteRune(c) + continue } + retVal.WriteRune(c) } - return retVal.String() + return retVal.String(), false } - return value + return value, false } -func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) { +func toTypedValue(val string, column *CsvTableColumn, lineNumber int) (interface{}, error) { dataType := column.DataType dataFormat := column.DataFormat if column.ParseF != nil { @@ -159,7 +161,8 @@ func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) { case durationDatatype: return time.ParseDuration(val) case doubleDatatype: - return strconv.ParseFloat(normalizeNumberString(val, dataFormat, false), 64) + normalized, _ := normalizeNumberString(val, dataFormat, false) + return strconv.ParseFloat(normalized, 64) case boolDatatype: switch { case len(val) == 0: @@ -172,9 +175,21 @@ func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) { return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'") } case longDatatype: - return strconv.ParseInt(normalizeNumberString(val, dataFormat, true), 10, 64) + normalized, truncated := normalizeNumberString(val, dataFormat, true) + if truncated { + error := CreateRowColumnError(lineNumber, column.Label, + fmt.Errorf("'%s' truncated to '%s' to fit into long data type", val, normalized)) + log.Printf("WARNING: %v\n", error) + } + return strconv.ParseInt(normalized, 10, 64) case uLongDatatype: - return strconv.ParseUint(normalizeNumberString(val, dataFormat, true), 10, 64) + normalized, truncated := normalizeNumberString(val, dataFormat, true) + if truncated { + error := CreateRowColumnError(lineNumber, column.Label, + fmt.Errorf("'%s' truncated to '%s' to fit into unsignedLong data type", val, normalized)) + log.Printf("WARNING: %v\n", error) + } + return strconv.ParseUint(normalized, 10, 64) case base64BinaryDataType: return base64.StdEncoding.DecodeString(val) default: @@ -230,11 +245,11 @@ func appendProtocolValue(buffer []byte, value interface{}) ([]byte, error) { } } -func appendConverted(buffer []byte, val string, column *CsvTableColumn) ([]byte, error) { +func appendConverted(buffer []byte, val string, column *CsvTableColumn, lineNumber int) ([]byte, error) { if len(column.DataType) == 0 { // keep the value as it is return append(buffer, val...), nil } - typedVal, err := toTypedValue(val, column) + typedVal, err := toTypedValue(val, column, lineNumber) if err != nil { return buffer, err } @@ -302,3 +317,25 @@ func createBoolParseFn(format string) func(string) (interface{}, error) { return nil, fmt.Errorf("unsupported boolean value: %s must one of %v or one of %v", val, truthy, falsy) } } + +// createStrictLongParseFn returns a function that converts a string value to long and fails also when a fraction digit is detected +func createStrictLongParseFn(dataFormat string) func(string) (interface{}, error) { + return func(val string) (interface{}, error) { + normalized, truncated := normalizeNumberString(val, dataFormat, true) + if truncated { + return 0, fmt.Errorf("'%s' cannot fit into long data type", val) + } + return strconv.ParseInt(normalized, 10, 64) + } +} + +// createStrictUnsignedLongParseFn returns a function that converts a string value to unsigned long and fails when a fraction digit is detected +func createStrictUnsignedLongParseFn(dataFormat string) func(string) (interface{}, error) { + return func(val string) (interface{}, error) { + normalized, truncated := normalizeNumberString(val, dataFormat, true) + if truncated { + return 0, fmt.Errorf("'%s' cannot fit into unsignedLong data type", val) + } + return strconv.ParseUint(normalized, 10, 64) + } +} diff --git a/pkg/csv2lp/data_conversion_test.go b/pkg/csv2lp/data_conversion_test.go index ec1d42d9b8..6bd0aa3d85 100644 --- a/pkg/csv2lp/data_conversion_test.go +++ b/pkg/csv2lp/data_conversion_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "math" + "os" "strings" "testing" "time" @@ -112,9 +113,9 @@ func Test_ToTypedValue(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) { - column := &CsvTableColumn{} + column := &CsvTableColumn{Label: "test"} column.setupDataType(test.dataType) - val, err := toTypedValue(test.value, column) + val, err := toTypedValue(test.value, column, 1) if err != nil && test.expect != nil { require.Nil(t, err.Error()) } @@ -143,7 +144,7 @@ func Test_ToTypedValue_dateTimeCustomTimeZone(t *testing.T) { column := &CsvTableColumn{} column.TimeZone = tz column.setupDataType(test.dataType) - val, err := toTypedValue(test.value, column) + val, err := toTypedValue(test.value, column, 1) if err != nil && test.expect != nil { require.Nil(t, err.Error()) } @@ -210,9 +211,9 @@ func Test_AppendConverted(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { - column := &CsvTableColumn{} + column := &CsvTableColumn{Label: "test"} column.setupDataType(test.dataType) - val, err := appendConverted(nil, test.value, column) + val, err := appendConverted(nil, test.value, column, 1) if err != nil && test.expect != "" { require.Nil(t, err.Error()) } @@ -246,18 +247,34 @@ func Test_NormalizeNumberString(t *testing.T) { format string removeFraction bool expect string + truncated bool }{ - {"123", "", true, "123"}, - {"123", ".", true, "123"}, - {"123.456", ".", true, "123"}, - {"123.456", ".", false, "123.456"}, - {"1 2.3,456", ",. ", false, "123.456"}, - {" 1 2\t3.456 \r\n", "", false, "123.456"}, + {"123", "", true, "123", false}, + {"123", ".", true, "123", false}, + {"123.456", ".", true, "123", true}, + {"123.456", ".", false, "123.456", false}, + {"1 2.3,456", ",. ", false, "123.456", false}, + {" 1 2\t3.456 \r\n", "", false, "123.456", false}, } for i, test := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { - require.Equal(t, test.expect, normalizeNumberString(test.value, test.format, test.removeFraction)) + // customize logging to check warnings + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + normalized, truncated := normalizeNumberString(test.value, test.format, test.removeFraction) + require.Equal(t, test.expect, normalized) + require.Equal(t, test.truncated, truncated) }) } }