Merge pull request #18779 from influxdata/18744/csv2lp

feat(cmd/influx/write): add new processing options and enhancements
pull/19503/head
Pavel Závora 2020-09-12 11:47:49 +02:00 committed by GitHub
commit be8b2a9c9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 494 additions and 58 deletions

View File

@ -25,6 +25,7 @@ need to update any InfluxDB CLI config profiles with the new port number.
### Features
1. [18779](https://github.com/influxdata/influxdb/pull/18779): Add new processing options and enhancements to influx write.
1. [19246](https://github.com/influxdata/influxdb/pull/19246): Redesign load data page to increase discovery and ease of use
1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command
1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset.

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"encoding/csv"
"fmt"
"io"
"log"
@ -38,6 +39,7 @@ type writeFlagsType struct {
SkipHeader int
IgnoreDataTypeInColumnName bool
Encoding string
ErrorsFile string
}
var writeFlags writeFlagsType
@ -86,6 +88,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd.PersistentFlags().BoolVar(&writeFlags.IgnoreDataTypeInColumnName, "xIgnoreDataTypeInColumnName", false, "Ignores dataType which could be specified after ':' in column name")
cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice
cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to")
cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
@ -204,6 +207,27 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
}
}
// create writer for errors-file, if supplied
var errorsFile *csv.Writer
var rowSkippedListener func(*csv2lp.CsvToLineReader, error, []string)
if writeFlags.ErrorsFile != "" {
writer, err := os.Create(writeFlags.ErrorsFile)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to create %q: %v", writeFlags.ErrorsFile, err)
}
closers = append(closers, writer)
errorsFile = csv.NewWriter(writer)
rowSkippedListener = func(source *csv2lp.CsvToLineReader, lineError error, row []string) {
log.Println(lineError)
errorsFile.Comma = source.Comma()
errorsFile.Write([]string{fmt.Sprintf("# error : %v", lineError)})
if err := errorsFile.Write(row); err != nil {
log.Printf("Unable to write to error-file: %v\n", err)
}
errorsFile.Flush() // flush is required
}
}
// concatenate readers
r := io.MultiReader(readers...)
if writeFlags.Format == inputFormatCsv {
@ -213,6 +237,7 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
csvReader.Table.IgnoreDataTypeInColumnName(writeFlags.IgnoreDataTypeInColumnName)
// change LineNumber to report file/stdin line numbers properly
csvReader.LineNumber = writeFlags.SkipHeader - len(writeFlags.Headers)
csvReader.RowSkipped = rowSkippedListener
r = csvReader
}
return r, csv2lp.MultiCloser(closers...), nil

View File

@ -56,6 +56,7 @@ func readLines(reader io.Reader) []string {
func createTempFile(suffix string, contents []byte) string {
file, err := ioutil.TempFile("", "influx_writeTest*."+suffix)
file.Close() // Close immediatelly, since we need only a file name
if err != nil {
log.Fatal(err)
return "unknown.file"
@ -545,3 +546,19 @@ func Test_fluxWriteF(t *testing.T) {
require.Equal(t, "stdin3 i=stdin1,j=stdin2,k=stdin4", strings.Trim(string(lineData), "\n"))
})
}
// Test_writeFlags_errorsFile tests that rejected rows are written to errors file
func Test_writeFlags_errorsFile(t *testing.T) {
defer removeTempFiles()
errorsFile := createTempFile("errors", []byte{})
stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1"
out := bytes.Buffer{}
command := cmdWrite(&globalFlags{}, genericCLIOpts{in: strings.NewReader(stdInContents), w: bufio.NewWriter(&out)})
command.SetArgs([]string{"dryrun", "--format", "csv", "--errors-file", errorsFile})
err := command.Execute()
require.Nil(t, err)
require.Equal(t, "m a=1i", strings.Trim(out.String(), "\n"))
errorLines, err := ioutil.ReadFile(errorsFile)
require.Nil(t, err)
require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n"))
}

View File

@ -139,6 +139,11 @@ Existing [data types](https://v2.docs.influxdata.com/v2.0/reference/syntax/annot
- `#constant` annotation adds a constant column to the data, so you can set measurement, time, field or tag of every row you import
- the format of a constant annotation row is `#constant,datatype,name,value`', it contains supported datatype, a column name, and a constant value
- _column name_ can be omitted for _dateTime_ or _measurement_ columns, so the annotation can be simply `#constant,measurement,cpu`
- `#concat` annotation adds a new column that is concatenated from existing columns according to a template
- the format of a concat annotation row is `#concat,datatype,name,template`', it contains supported datatype, a column name, and a template value
- the `template` is a string with `${columnName}` placeholders, in which the placeholders are replaced by values of existing columns
- for example: `#concat,string,fullName,${firstName} ${lastName}`
- _column name_ can be omitted for _dateTime_ or _measurement_ columns
- `#timezone` annotation specifies the time zone of the data using an offset, which is either `+hhmm` or `-hhmm` or `Local` to use the local/computer time zone. Examples: _#timezone,+0100_ _#timezone -0500_ _#timezone Local_
#### Data type with data format
@ -158,6 +163,9 @@ All data types can include the format that is used to parse column data. It is t
- note that you have to quote column delimiters whenever they appear in a CSV column value, for example:
- `#constant,"double:,.",myColumn,"1.234,011"`
- `long:format` and `unsignedLong:format` support the same format as `double`, but everything after and including a fraction character is ignored
- the format can be prepended with `strict` to fail when a fraction digit is present, for example:
- `1000.000` is `1000` when parsed as `long`, but fails when parsed as `long:strict`
- `1_000,000` is `1000` when parsed as `long:,_`, but fails when parsed as `long:strict,_`
- `boolean:truthy:falsy`
- `truthy` and `falsy` are comma-separated lists of values, they can be empty to assume all values as truthy/falsy; for example `boolean:sí,yes,ja,oui,ano,да:no,nein,non,ne,нет`
- a `boolean` data type (without the format) parses column values that start with any of _tTyY1_ as `true` values, _fFnN0_ as `false` values and fails on other values

View File

@ -17,8 +17,22 @@ type CsvLineError struct {
}
func (e CsvLineError) Error() string {
if e.Line > 0 {
return fmt.Sprintf("line %d: %v", e.Line, e.Err)
}
return fmt.Sprintf("%v", e.Err)
}
// CreateRowColumnError wraps an existing error to add line and column coordinates
func CreateRowColumnError(line int, columnLabel string, err error) CsvLineError {
return CsvLineError{
Line: line,
Err: CsvColumnError{
Column: columnLabel,
Err: err,
},
}
}
// CsvToLineReader represents state of transformation from csv data to lien protocol reader
type CsvToLineReader struct {
@ -34,6 +48,8 @@ type CsvToLineReader struct {
dataRowAdded bool
// log CSV data errors to sterr and continue with CSV processing
skipRowOnError bool
// RowSkipped is called when a row is skipped because of data parsing error
RowSkipped func(source *CsvToLineReader, lineError error, row []string)
// reader results
buffer []byte
@ -54,6 +70,11 @@ func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader {
return state
}
// Comma returns a field delimiter used in an input CSV file
func (state *CsvToLineReader) Comma() rune {
return state.csv.Comma
}
// Read implements io.Reader that returns protocol lines
func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
// state1: finished
@ -98,13 +119,17 @@ func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
if state.Table.AddRow(row) {
var err error
state.lineBuffer = state.lineBuffer[:0] // reuse line buffer
state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row)
state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row, state.LineNumber)
if !state.dataRowAdded && state.logTableDataColumns {
log.Println(state.Table.DataColumnsInfo())
}
state.dataRowAdded = true
if err != nil {
lineError := CsvLineError{state.LineNumber, err}
if state.RowSkipped != nil {
state.RowSkipped(state, lineError, row)
continue
}
if state.skipRowOnError {
log.Println(lineError)
continue

View File

@ -204,6 +204,68 @@ func Test_CsvToLineProtocol_SkipRowOnError(t *testing.T) {
require.Equal(t, messages, 2)
}
// Test_CsvToLineProtocol_RowSkipped tests that error rows are reported to configured RowSkipped listener
func Test_CsvToLineProtocol_RowSkipped(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
}()
type ActualArguments = struct {
src *CsvToLineReader
err error
row []string
}
type ExpectedArguments = struct {
errorString string
row []string
}
csv := "sep=;\n_measurement;a|long:strict\n;1\ncpu;2.1\ncpu;3a\n"
calledArgs := []ActualArguments{}
expectedArgs := []ExpectedArguments{
{
"line 3: column '_measurement': no measurement supplied",
[]string{"", "1"},
},
{
"line 4: column 'a': '2.1' cannot fit into long data type",
[]string{"cpu", "2.1"},
},
{
"line 5: column 'a': strconv.ParseInt:",
[]string{"cpu", "3a"},
},
}
reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) {
// make a copy of _row
row := make([]string, len(_row))
copy(row, _row)
// remember for comparison
calledArgs = append(calledArgs, ActualArguments{
src, err, row,
})
}
// read all the data
ioutil.ReadAll(reader)
out := buf.String()
require.Empty(t, out, "No log messages expected because RowSkipped handler is set")
require.Len(t, calledArgs, 3)
for i, expected := range expectedArgs {
require.Equal(t, reader, calledArgs[i].src)
require.Contains(t, calledArgs[i].err.Error(), expected.errorString)
require.Equal(t, expected.row, calledArgs[i].row)
}
}
// Test_CsvLineError tests CsvLineError error format
func Test_CsvLineError(t *testing.T) {
var tests = []struct {
@ -218,6 +280,10 @@ func Test_CsvLineError(t *testing.T) {
CsvLineError{Line: 2, Err: CsvColumnError{"a", errors.New("cause")}},
"line 2: column 'a': cause",
},
{
CsvLineError{Line: -1, Err: CsvColumnError{"a", errors.New("cause")}},
"column 'a': cause",
},
}
for _, test := range tests {
require.Equal(t, test.value, test.err.Error())

View File

@ -2,6 +2,7 @@ package csv2lp
import (
"fmt"
"log"
"regexp"
"strconv"
"strings"
@ -33,9 +34,8 @@ func (a annotationComment) matches(comment string) bool {
return strings.HasPrefix(strings.ToLower(comment), a.prefix)
}
// constantSetupTable setups the supplied CSV table from #constant annotation
func constantSetupTable(table *CsvTable, row []string) error {
// adds a virtual column with contsant value to all data rows
func createConstantOrConcatColumn(table *CsvTable, row []string, annotationName string) CsvTableColumn {
// adds a virtual column with constant value to all data rows
// supported types of constant annotation rows are:
// 1. "#constant,datatype,label,defaultValue"
// 2. "#constant,measurement,value"
@ -72,17 +72,61 @@ func constantSetupTable(table *CsvTable, row []string) error {
if col.DefaultValue == "" && col.Label != "" {
// type 2,3,5,6
col.DefaultValue = col.Label
col.Label = "#constant " + col.DataType
col.Label = annotationName + " " + col.DataType
} else if col.Label == "" {
// setup a label if no label is supplied fo focused error messages
col.Label = "#constant " + col.DataType
// setup a label if no label is supplied for focused error messages
col.Label = annotationName + " " + col.DataType
}
}
// add a virtual column to the table
return col
}
// constantSetupTable setups the supplied CSV table from #constant annotation
func constantSetupTable(table *CsvTable, row []string) error {
col := createConstantOrConcatColumn(table, row, "#constant")
// add a virtual column to the table
table.extraColumns = append(table.extraColumns, &col)
return nil
}
// computedReplacer is used to replace value in computed columns
var computedReplacer *regexp.Regexp = regexp.MustCompile(`\$\{[^}]+\}`)
// concatSetupTable setups the supplied CSV table from #concat annotation
func concatSetupTable(table *CsvTable, row []string) error {
col := createConstantOrConcatColumn(table, row, "#concat")
template := col.DefaultValue
col.ComputeValue = func(row []string) string {
return computedReplacer.ReplaceAllStringFunc(template, func(text string) string {
columnLabel := text[2 : len(text)-1] // ${columnLabel}
if placeholderColumn := table.Column(columnLabel); placeholderColumn != nil {
return placeholderColumn.Value(row)
}
log.Printf("WARNING: column %s: column '%s' cannot be replaced, no such column available", col.Label, columnLabel)
return ""
})
}
// add a virtual column to the table
table.extraColumns = append(table.extraColumns, &col)
// add validator to report error when no placeholder column is available
table.validators = append(table.validators, func(table *CsvTable) error {
placeholders := computedReplacer.FindAllString(template, len(template))
for _, placeholder := range placeholders {
columnLabel := placeholder[2 : len(placeholder)-1] // ${columnLabel}
if placeholderColumn := table.Column(columnLabel); placeholderColumn == nil {
return CsvColumnError{
Column: col.Label,
Err: fmt.Errorf("'%s' references an uknown column '%s', available columns are: %v",
template, columnLabel, strings.Join(table.ColumnLabels(), ",")),
}
}
}
return nil
})
return nil
}
// supportedAnnotations contains all supported CSV annotations comments
var supportedAnnotations = []annotationComment{
{
@ -131,6 +175,10 @@ var supportedAnnotations = []annotationComment{
return nil
},
},
{
prefix: "#concat",
setupTable: concatSetupTable,
},
}
// ignoreLeadingComment returns a value without '#anyComment ' prefix

View File

@ -1,7 +1,10 @@
package csv2lp
import (
"bytes"
"fmt"
"log"
"os"
"strconv"
"strings"
"testing"
@ -140,6 +143,93 @@ func Test_ConstantAnnotation(t *testing.T) {
}
}
// Test_ConcatAnnotation tests #concat annotation
func Test_ConcatAnnotation(t *testing.T) {
subject := annotation("#concat")
require.True(t, subject.matches("#Concat"))
require.True(t, subject.isTableAnnotation())
var tests = []struct {
value []string
expectLabel string
expectValue string
expectLinePart int
}{
// all possible specifications
{[]string{"#concat "}, "", "", 0}, // means literally nothing
{[]string{"#concat measurement", "a"}, "_", "a", linePartMeasurement},
{[]string{"#concat measurement", "a", "b"}, "_", "b", linePartMeasurement},
{[]string{"#concat measurement", "a", ""}, "_", "a", linePartMeasurement},
{[]string{"#concat tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag},
{[]string{"#concat", "tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag},
{[]string{"#concat field", "fName", "fVal"}, "fName", "fVal", linePartField},
{[]string{"#concat", "field", "fName", "fVal"}, "fName", "fVal", linePartField},
{[]string{"dateTime", "1"}, "_", "1", linePartTime},
{[]string{"dateTime", "1", "2"}, "_", "2", linePartTime},
{[]string{"dateTime", "", "2"}, "_", "2", linePartTime},
{[]string{"dateTime", "3", ""}, "_", "3", linePartTime},
{[]string{"long", "fN", "fV"}, "fN", "fV", 0},
// concat values
{[]string{"string", "fN", "$-${b}-${a}"}, "fN", "$-2-1", 0},
}
exampleRow := []string{"1", "2"}
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
table := &CsvTable{columns: []*CsvTableColumn{
{Label: "a", Index: 0},
{Label: "b", Index: 1},
}}
subject.setupTable(table, test.value)
// validator
require.Equal(t, 1, len(table.validators))
require.Equal(t, table.validators[0](table), nil)
// columns
require.Equal(t, 1, len(table.extraColumns))
col := table.extraColumns[0]
require.Equal(t, test.expectLinePart, col.LinePart)
require.Greater(t, 0, col.Index)
if test.expectLabel != "_" {
require.Equal(t, test.expectLabel, col.Label)
} else {
require.NotEqual(t, "", col.Label)
}
require.Equal(t, test.expectValue, col.Value(exampleRow))
})
}
t.Run("concat template references unknown column", func(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()
table := &CsvTable{columns: []*CsvTableColumn{
{Label: "x", Index: 0},
}}
subject.setupTable(table, []string{"string", "fN", "a${y}-${x}z"})
require.Equal(t, 1, len(table.validators))
require.NotNil(t, table.validators[0](table))
require.Equal(t,
"column 'fN': 'a${y}-${x}z' references an uknown column 'y', available columns are: x",
table.validators[0](table).Error())
// columns
require.Equal(t, 1, len(table.extraColumns))
col := table.extraColumns[0]
require.Greater(t, 0, col.Index)
require.Equal(t, "a-1z", col.Value(exampleRow))
// a warning is printed to console
require.Equal(t,
"::PREFIX::WARNING: column fN: column 'y' cannot be replaced, no such column available",
strings.TrimSpace(buf.String()))
})
}
// Test_TimeZoneAnnotation tests #timezone annotation
func Test_TimeZoneAnnotation(t *testing.T) {
subject := annotation("#timezone")

View File

@ -46,7 +46,9 @@ type CsvTableColumn struct {
// TimeZone of dateTime column, applied when parsing dateTime DataType
TimeZone *time.Location
// ParseF is an optional function used to convert column's string value to interface{}
ParseF func(string) (interface{}, error)
ParseF func(value string) (interface{}, error)
// ComputeValue is an optional function used to compute column value out of row data
ComputeValue func(row []string) string
// escapedLabel contains escaped label that can be directly used in line protocol
escapedLabel string
@ -63,6 +65,9 @@ func (c *CsvTableColumn) LineLabel() string {
// Value returns the value of the column for the supplied row
func (c *CsvTableColumn) Value(row []string) string {
if c.Index < 0 || c.Index >= len(row) {
if c.ComputeValue != nil {
return c.ComputeValue(row)
}
return c.DefaultValue
}
val := row[c.Index]
@ -126,9 +131,18 @@ func (c *CsvTableColumn) setupDataType(columnValue string) {
// setup column data type
c.DataType = columnValue
// setup custom parsing of bool data type
// setup custom parsing
if c.DataType == boolDatatype && c.DataFormat != "" {
c.ParseF = createBoolParseFn(c.DataFormat)
return
}
if c.DataType == longDatatype && strings.HasPrefix(c.DataFormat, "strict") {
c.ParseF = createStrictLongParseFn(c.DataFormat[6:])
return
}
if c.DataType == uLongDatatype && strings.HasPrefix(c.DataFormat, "strict") {
c.ParseF = createStrictUnsignedLongParseFn(c.DataFormat[6:])
return
}
}
@ -163,6 +177,8 @@ type CsvTable struct {
ignoreDataTypeInColumnName bool
// timeZone of dateTime column(s), applied when parsing dateTime value without a time zone specified
timeZone *time.Location
// validators validate table structure right before processing data rows
validators []func(*CsvTable) error
/* cached columns are initialized before reading the data rows using the computeLineProtocolColumns fn */
// cachedMeasurement is a required column that read (line protocol) measurement
@ -193,6 +209,7 @@ func (t *CsvTable) DataColumnsInfo() string {
return "<nil>"
}
var builder = strings.Builder{}
t.computeLineProtocolColumns() // censure that ached columns are initialized
builder.WriteString(fmt.Sprintf("CsvTable{ dataColumns: %d constantColumns: %d\n", len(t.columns), len(t.extraColumns)))
builder.WriteString(fmt.Sprintf(" measurement: %+v\n", t.cachedMeasurement))
for _, col := range t.cachedTags {
@ -391,7 +408,7 @@ func (t *CsvTable) recomputeLineProtocolColumns() {
// CreateLine produces a protocol line out of the supplied row or returns error
func (t *CsvTable) CreateLine(row []string) (line string, err error) {
buffer := make([]byte, 100)[:0]
buffer, err = t.AppendLine(buffer, row)
buffer, err = t.AppendLine(buffer, row, -1)
if err != nil {
return "", err
}
@ -399,7 +416,7 @@ func (t *CsvTable) CreateLine(row []string) (line string, err error) {
}
// AppendLine appends a protocol line to the supplied buffer using a CSV row and returns appended buffer or an error if any
func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
func (t *CsvTable) AppendLine(buffer []byte, row []string, lineNumber int) ([]byte, error) {
if t.computeLineProtocolColumns() {
// validate column data types
if t.cachedFieldValue != nil && !IsTypeSupported(t.cachedFieldValue.DataType) {
@ -416,6 +433,11 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
}
}
}
for _, v := range t.validators {
if err := v(t); err != nil {
return buffer, err
}
}
}
if t.cachedMeasurement == nil {
@ -447,7 +469,7 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
buffer = append(buffer, escapeTag(field)...)
buffer = append(buffer, '=')
var err error
buffer, err = appendConverted(buffer, value, t.cachedFieldValue)
buffer, err = appendConverted(buffer, value, t.cachedFieldValue, lineNumber)
if err != nil {
return buffer, CsvColumnError{
t.cachedFieldName.Label,
@ -468,7 +490,7 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
buffer = append(buffer, field.LineLabel()...)
buffer = append(buffer, '=')
var err error
buffer, err = appendConverted(buffer, value, field)
buffer, err = appendConverted(buffer, value, field, lineNumber)
if err != nil {
return buffer, CsvColumnError{
field.Label,
@ -491,7 +513,7 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
}
buffer = append(buffer, ' ')
var err error
buffer, err = appendConverted(buffer, timeVal, t.cachedTime)
buffer, err = appendConverted(buffer, timeVal, t.cachedTime, lineNumber)
if err != nil {
return buffer, CsvColumnError{
t.cachedTime.Label,
@ -518,6 +540,15 @@ func (t *CsvTable) Columns() []*CsvTableColumn {
return t.columns
}
// ColumnLabels returns available columns labels
func (t *CsvTable) ColumnLabels() []string {
labels := make([]string, len(t.columns))
for i, col := range t.columns {
labels[i] = col.Label
}
return labels
}
// Measurement returns measurement column or nil
func (t *CsvTable) Measurement() *CsvTableColumn {
t.computeLineProtocolColumns()

View File

@ -104,6 +104,7 @@ func Test_CsvTable_FluxQueryResult(t *testing.T) {
require.Equal(t, table.Tags()[0].Label, "cpu")
require.Equal(t, table.Tags()[1].Label, "host")
require.Equal(t, len(table.Fields()), 0)
require.Contains(t, table.ColumnLabels(), "_measurement")
}
}
}
@ -332,46 +333,108 @@ func Test_ConstantAnnotations(t *testing.T) {
}
}
// Test_ConcatAnnotations tests processing of concat annotations
func Test_ConcatAnnotations(t *testing.T) {
var tests = []struct {
name string
csv string
line string
}{
{
"measurement_1",
"#concat measurement,cpu\n" +
"a,b\n" +
"1,1",
"cpu a=1,b=1",
},
{
"measurement_2",
"#concat,measurement,${a}${b}\n" +
"#constant,tag,cpu,cpu1\n" +
"#constant,long,of,0\n" +
"#constant,dateTime,,2\n" +
"a,b\n" +
"1,1",
"11,cpu=cpu1 a=1,b=1,of=0i 2",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var lines []string
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil && test.line != "" {
require.Nil(t, err.Error())
}
lines = append(lines, line)
}
}
require.Equal(t, []string{test.line}, lines)
})
}
}
// Test_DataTypeInColumnName tests specification of column data type in the header row
func Test_DataTypeInColumnName(t *testing.T) {
var tests = []struct {
csv string
line string
ignoreDataTypeInColumnName bool
error string
}{
{
"m|measurement,b|boolean:x:,c|boolean:x:|x\n" +
csv: "m|measurement,b|boolean:x:,c|boolean:x:|x\n" +
"cpu,,",
`cpu c=true`,
false,
line: `cpu c=true`,
},
{
"m|measurement,a|boolean,b|boolean:0:1,c|boolean:x:,d|boolean:x:\n" +
csv: "m|measurement,a|boolean,b|boolean:0:1,c|boolean:x:,d|boolean:x:\n" +
"cpu,1,1,x,y",
`cpu a=true,b=false,c=true,d=false`,
false,
line: `cpu a=true,b=false,c=true,d=false`,
},
{
"#constant measurement,cpu\n" +
csv: "#constant measurement,cpu\n" +
"a|long,b|string\n" +
"1,1",
`cpu a=1i,b="1"`,
false,
line: `cpu a=1i,b="1"`,
},
{
"#constant measurement,cpu\n" +
csv: "#constant measurement,cpu\n" +
"a|long,b|string\n" +
"1,1",
`cpu a|long=1,b|string=1`,
true,
line: `cpu a|long=1,b|string=1`,
ignoreDataTypeInColumnName: true,
},
{
"#constant measurement,cpu\n" +
csv: "#constant measurement,cpu\n" +
"#datatype long,string\n" +
"a|long,b|string\n" +
"1,1",
`cpu a|long=1i,b|string="1"`,
true,
line: `cpu a|long=1i,b|string="1"`,
ignoreDataTypeInColumnName: true,
},
{
csv: "#constant measurement,cpu\n" +
"a|long:strict: ,b|unsignedLong:strict: \n" +
"1 2,1 2",
line: `cpu a=12i,b=12u`,
},
{
csv: "#constant measurement,cpu\n" +
"a|long:strict\n" +
"1.1,1",
error: "column 'a': '1.1' cannot fit into long data type",
},
{
csv: "#constant measurement,cpu\n" +
"a|unsignedLong:strict\n" +
"1.1,1",
error: "column 'a': '1.1' cannot fit into unsignedLong data type",
},
}
@ -385,8 +448,12 @@ func Test_DataTypeInColumnName(t *testing.T) {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil && test.line != "" {
if err != nil {
if test.error == "" {
require.Nil(t, err.Error())
} else {
require.Equal(t, test.error, err.Error())
}
}
lines = append(lines, line)
}
@ -434,6 +501,10 @@ func Test_CsvTable_dataErrors(t *testing.T) {
"error_no_measurement_data",
"_measurement,col1\n,2",
},
{
"error_derived_column_missing reference",
"#concat string,d,${col1}${col2}\n_measurement,col1\nm,2",
},
}
for _, test := range tests {

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"log"
"math"
"strconv"
"strings"
@ -82,15 +83,16 @@ func escapeString(val string) string {
return val
}
// normalizeNumberString normalizes the supplied value with the help of the format supplied.
// normalizeNumberString normalizes the supplied value according to the supplied format.
// This normalization is intended to convert number strings of different locales to a strconv-parseable value.
//
// The format's first character is a fraction delimiter character. Next characters in the format
// are simply removed, they are typically used to visually separate groups in large numbers.
// The removeFaction parameter controls whether the returned value can contain also the fraction part.
// The removeFraction parameter controls whether the returned value can contain also the fraction part.
// An empty format means ". \n\t\r_"
//
// For example, to get a strconv-parseable float from a Spanish value '3.494.826.157,123', use format ",." .
func normalizeNumberString(value string, format string, removeFraction bool) string {
func normalizeNumberString(value string, format string, removeFraction bool) (normalized string, truncated bool) {
if len(format) == 0 {
format = ". \n\t\r_"
}
@ -110,20 +112,20 @@ func normalizeNumberString(value string, format string, removeFraction bool) str
}
if c == fractionRune {
if removeFraction {
break ForAllCharacters
return retVal.String(), true
}
retVal.WriteByte('.')
} else {
continue
}
retVal.WriteRune(c)
}
return retVal.String(), false
}
return value, false
}
return retVal.String()
}
return value
}
func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) {
func toTypedValue(val string, column *CsvTableColumn, lineNumber int) (interface{}, error) {
dataType := column.DataType
dataFormat := column.DataFormat
if column.ParseF != nil {
@ -159,7 +161,8 @@ func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) {
case durationDatatype:
return time.ParseDuration(val)
case doubleDatatype:
return strconv.ParseFloat(normalizeNumberString(val, dataFormat, false), 64)
normalized, _ := normalizeNumberString(val, dataFormat, false)
return strconv.ParseFloat(normalized, 64)
case boolDatatype:
switch {
case len(val) == 0:
@ -172,9 +175,21 @@ func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) {
return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'")
}
case longDatatype:
return strconv.ParseInt(normalizeNumberString(val, dataFormat, true), 10, 64)
normalized, truncated := normalizeNumberString(val, dataFormat, true)
if truncated {
error := CreateRowColumnError(lineNumber, column.Label,
fmt.Errorf("'%s' truncated to '%s' to fit into long data type", val, normalized))
log.Printf("WARNING: %v\n", error)
}
return strconv.ParseInt(normalized, 10, 64)
case uLongDatatype:
return strconv.ParseUint(normalizeNumberString(val, dataFormat, true), 10, 64)
normalized, truncated := normalizeNumberString(val, dataFormat, true)
if truncated {
error := CreateRowColumnError(lineNumber, column.Label,
fmt.Errorf("'%s' truncated to '%s' to fit into unsignedLong data type", val, normalized))
log.Printf("WARNING: %v\n", error)
}
return strconv.ParseUint(normalized, 10, 64)
case base64BinaryDataType:
return base64.StdEncoding.DecodeString(val)
default:
@ -230,11 +245,11 @@ func appendProtocolValue(buffer []byte, value interface{}) ([]byte, error) {
}
}
func appendConverted(buffer []byte, val string, column *CsvTableColumn) ([]byte, error) {
func appendConverted(buffer []byte, val string, column *CsvTableColumn, lineNumber int) ([]byte, error) {
if len(column.DataType) == 0 { // keep the value as it is
return append(buffer, val...), nil
}
typedVal, err := toTypedValue(val, column)
typedVal, err := toTypedValue(val, column, lineNumber)
if err != nil {
return buffer, err
}
@ -302,3 +317,25 @@ func createBoolParseFn(format string) func(string) (interface{}, error) {
return nil, fmt.Errorf("unsupported boolean value: %s must one of %v or one of %v", val, truthy, falsy)
}
}
// createStrictLongParseFn returns a function that converts a string value to long and fails also when a fraction digit is detected
func createStrictLongParseFn(dataFormat string) func(string) (interface{}, error) {
return func(val string) (interface{}, error) {
normalized, truncated := normalizeNumberString(val, dataFormat, true)
if truncated {
return 0, fmt.Errorf("'%s' cannot fit into long data type", val)
}
return strconv.ParseInt(normalized, 10, 64)
}
}
// createStrictUnsignedLongParseFn returns a function that converts a string value to unsigned long and fails when a fraction digit is detected
func createStrictUnsignedLongParseFn(dataFormat string) func(string) (interface{}, error) {
return func(val string) (interface{}, error) {
normalized, truncated := normalizeNumberString(val, dataFormat, true)
if truncated {
return 0, fmt.Errorf("'%s' cannot fit into unsignedLong data type", val)
}
return strconv.ParseUint(normalized, 10, 64)
}
}

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"math"
"os"
"strings"
"testing"
"time"
@ -112,9 +113,9 @@ func Test_ToTypedValue(t *testing.T) {
for i, test := range tests {
t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) {
column := &CsvTableColumn{}
column := &CsvTableColumn{Label: "test"}
column.setupDataType(test.dataType)
val, err := toTypedValue(test.value, column)
val, err := toTypedValue(test.value, column, 1)
if err != nil && test.expect != nil {
require.Nil(t, err.Error())
}
@ -143,7 +144,7 @@ func Test_ToTypedValue_dateTimeCustomTimeZone(t *testing.T) {
column := &CsvTableColumn{}
column.TimeZone = tz
column.setupDataType(test.dataType)
val, err := toTypedValue(test.value, column)
val, err := toTypedValue(test.value, column, 1)
if err != nil && test.expect != nil {
require.Nil(t, err.Error())
}
@ -210,9 +211,9 @@ func Test_AppendConverted(t *testing.T) {
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
column := &CsvTableColumn{}
column := &CsvTableColumn{Label: "test"}
column.setupDataType(test.dataType)
val, err := appendConverted(nil, test.value, column)
val, err := appendConverted(nil, test.value, column, 1)
if err != nil && test.expect != "" {
require.Nil(t, err.Error())
}
@ -246,18 +247,34 @@ func Test_NormalizeNumberString(t *testing.T) {
format string
removeFraction bool
expect string
truncated bool
}{
{"123", "", true, "123"},
{"123", ".", true, "123"},
{"123.456", ".", true, "123"},
{"123.456", ".", false, "123.456"},
{"1 2.3,456", ",. ", false, "123.456"},
{" 1 2\t3.456 \r\n", "", false, "123.456"},
{"123", "", true, "123", false},
{"123", ".", true, "123", false},
{"123.456", ".", true, "123", true},
{"123.456", ".", false, "123.456", false},
{"1 2.3,456", ",. ", false, "123.456", false},
{" 1 2\t3.456 \r\n", "", false, "123.456", false},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.expect, normalizeNumberString(test.value, test.format, test.removeFraction))
// customize logging to check warnings
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()
normalized, truncated := normalizeNumberString(test.value, test.format, test.removeFraction)
require.Equal(t, test.expect, normalized)
require.Equal(t, test.truncated, truncated)
})
}
}