From 234eac85eb8914041eec15898b86dfbffaf94621 Mon Sep 17 00:00:00 2001 From: Pavel Zavora Date: Thu, 16 Apr 2020 13:33:55 +0200 Subject: [PATCH] feat(pkg/csv2lp): add csv to line protocol conversion library --- go.mod | 1 + pkg/csv2lp/csvAnnotations.go | 198 +++++++++++ pkg/csv2lp/csvAnnotations_test.go | 215 ++++++++++++ pkg/csv2lp/csvTable.go | 464 +++++++++++++++++++++++++ pkg/csv2lp/csvTable_test.go | 477 ++++++++++++++++++++++++++ pkg/csv2lp/csvToProtocolLines.go | 132 +++++++ pkg/csv2lp/csvToProtocolLines_test.go | 225 ++++++++++++ pkg/csv2lp/dataConversion.go | 300 ++++++++++++++++ pkg/csv2lp/dataConversion_test.go | 325 ++++++++++++++++++ pkg/csv2lp/examples_test.go | 191 +++++++++++ 10 files changed, 2528 insertions(+) create mode 100644 pkg/csv2lp/csvAnnotations.go create mode 100644 pkg/csv2lp/csvAnnotations_test.go create mode 100644 pkg/csv2lp/csvTable.go create mode 100644 pkg/csv2lp/csvTable_test.go create mode 100644 pkg/csv2lp/csvToProtocolLines.go create mode 100644 pkg/csv2lp/csvToProtocolLines_test.go create mode 100644 pkg/csv2lp/dataConversion.go create mode 100644 pkg/csv2lp/dataConversion_test.go create mode 100644 pkg/csv2lp/examples_test.go diff --git a/go.mod b/go.mod index 1e8d0cb298..c4a231e169 100644 --- a/go.mod +++ b/go.mod @@ -98,6 +98,7 @@ require ( golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 + golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0 google.golang.org/api v0.7.0 diff --git a/pkg/csv2lp/csvAnnotations.go b/pkg/csv2lp/csvAnnotations.go new file mode 100644 index 0000000000..d4c9788ab5 --- /dev/null +++ b/pkg/csv2lp/csvAnnotations.go @@ -0,0 +1,198 @@ +package csv2lp + +import ( + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +// annotationComment represents parsed CSV annotation +type annotationComment struct { + // prefix in a CSV row that recognizes this annotation + prefix string + // flag is 0 to represent an annotation that is used for all data rows + // or a unique bit (>0) that that is unique to the annotation + flag uint8 + // setupColumn setups metadata that drives the way of how column data + // are parsed, mandatory when flag > 0 + setupColumn func(column *CsvTableColumn, columnValue string) + // setupColumn setups metadata that drives the way of how the table data + // are parsed, mandatory when flag == 0 + setupTable func(table *CsvTable, row []string) error +} + +func (a *annotationComment) isTableAnnotation() bool { + return a.setupTable != nil +} + +func (a *annotationComment) matches(comment string) bool { + return strings.HasPrefix(strings.ToLower(comment), a.prefix) +} + +var supportedAnnotations = []annotationComment{ + {"#group", 1, func(column *CsvTableColumn, value string) { + // standard flux query result annotation + if strings.HasSuffix(value, "true") { + column.LinePart = linePartTag + } + }, nil}, + {"#datatype", 2, func(column *CsvTableColumn, value string) { + // standard flux query result annotation + setupDataType(column, value) + }, nil}, + {"#default", 4, func(column *CsvTableColumn, value string) { + // standard flux query result annotation + column.DefaultValue = ignoreLeadingComment(value) + }, nil}, + {"#constant", 0, nil, func(table *CsvTable, row []string) error { + // adds a virtual column with contsant value to all data rows + // supported types of constant annotation rows are: + // 1. "#constant,datatype,label,defaultValue" + // 2. "#constant,measurement,value" + // 3. "#constant,dateTime,value" + // 4. "#constant datatype,label,defaultValue" + // 5. "#constant measurement,value" + // 6. "#constant dateTime,value" + // defaultValue is optional, additional columns are ignored + col := CsvTableColumn{} + col.Index = -1 // this is a virtual column that never extracts data from data rows + // setup column data type + setupDataType(&col, row[0]) + var dataTypeIndex int + if len(col.DataType) == 0 && col.LinePart == 0 { + // type 1,2,3 + dataTypeIndex = 1 + if len(row) > 1 { + setupDataType(&col, row[1]) + } + } else { + // type 4,5,6 + dataTypeIndex = 0 + } + // setup label if available + if len(row) > dataTypeIndex+1 { + col.Label = row[dataTypeIndex+1] + } + // setup defaultValue if available + if len(row) > dataTypeIndex+2 { + col.DefaultValue = row[dataTypeIndex+2] + } + // support type 2,3,5,6 syntax for measurement and timestamp + if col.LinePart == linePartMeasurement || col.LinePart == linePartTime { + if col.DefaultValue == "" && col.Label != "" { + // type 2,3,5,6 + col.DefaultValue = col.Label + col.Label = "#constant " + col.DataType + } else if col.Label == "" { + // setup a label if no label is supplied fo focused error messages + col.Label = "#constant " + col.DataType + } + } + // add a virtual column to the table + table.extraColumns = append(table.extraColumns, col) + return nil + }}, + {"#timezone", 0, nil, func(table *CsvTable, row []string) error { + // setup timezone for parsing timestamps, UTC by default + val := ignoreLeadingComment(row[0]) + if val == "" && len(row) > 1 { + val = row[1] // #timezone,Local + } + tz, err := parseTimeZone(val) + if err != nil { + return fmt.Errorf("#timezone annotation: %v", err) + } + table.timeZone = tz + return nil + }}, +} + +// setupDataType setups data type from a column value +func setupDataType(column *CsvTableColumn, columnValue string) { + // columnValue contains typeName and possibly additional column metadata, + // it can be + // 1. typeName + // 2. typeName:format + // 3. typeName|defaultValue + // 4. typeName:format|defaultValue + // 5. #anycomment (all options above) + + // ignoreLeadingComment is also required to specify datatype together with CSV annotation + // in #constant annotation + columnValue = ignoreLeadingComment(columnValue) + + // | adds a default value to column + pipeIndex := strings.Index(columnValue, "|") + if pipeIndex > 1 { + if column.DefaultValue == "" { + column.DefaultValue = columnValue[pipeIndex+1:] + columnValue = columnValue[:pipeIndex] + } + } + // setup column format + colonIndex := strings.Index(columnValue, ":") + if colonIndex > 1 { + column.DataFormat = columnValue[colonIndex+1:] + columnValue = columnValue[:colonIndex] + } + + // setup column linePart depending dataType + switch { + case columnValue == "tag": + column.LinePart = linePartTag + case strings.HasPrefix(columnValue, "ignore"): + // ignore or ignored + column.LinePart = linePartIgnored + case columnValue == "dateTime": + // dateTime field is used at most once in a protocol line + column.LinePart = linePartTime + case columnValue == "measurement": + column.LinePart = linePartMeasurement + case columnValue == "field": + column.LinePart = linePartField + columnValue = "" // this a generic field without a data type specified + case columnValue == "time": // time is an alias for dateTime + column.LinePart = linePartTime + columnValue = dateTimeDatatype + } + // setup column data type + column.DataType = columnValue + + // setup custom parsing of bool data type + if column.DataType == boolDatatype && column.DataFormat != "" { + column.ParseF = createBoolParseFn(column.DataFormat) + } +} + +// ignoreLeadingComment returns a value without '#anyComment ' prefix +func ignoreLeadingComment(value string) string { + if len(value) > 0 && value[0] == '#' { + pos := strings.Index(value, " ") + if pos > 0 { + return strings.TrimLeft(value[pos+1:], " ") + } + return "" + } + return value +} + +// parseTimeZone tries to parse the supplied timezone indicator as a Location or returns an error +func parseTimeZone(val string) (*time.Location, error) { + switch { + case val == "": + return time.UTC, nil + case strings.ToLower(val) == "local": + return time.Local, nil + case val[0] == '-' || val[0] == '+': + if matched, _ := regexp.MatchString("[+-][0-9][0-9][0-9][0-9]", val); !matched { + return nil, fmt.Errorf("timezone '%s' is not +hhmm or -hhmm", val) + } + intVal, _ := strconv.Atoi(val) + offset := (intVal/100)*3600 + (intVal%100)*60 + return time.FixedZone(val, offset), nil + default: + return time.LoadLocation(val) + } +} diff --git a/pkg/csv2lp/csvAnnotations_test.go b/pkg/csv2lp/csvAnnotations_test.go new file mode 100644 index 0000000000..913b953a3a --- /dev/null +++ b/pkg/csv2lp/csvAnnotations_test.go @@ -0,0 +1,215 @@ +package csv2lp + +import ( + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func annotation(name string) annotationComment { + for _, a := range supportedAnnotations { + if a.prefix == name { + return a + } + } + panic("no annotation named " + name + " found!") +} + +// TestGroupAnnotation tests #group annotation +func TestGroupAnnotation(t *testing.T) { + subject := annotation("#group") + require.True(t, subject.matches("#Group")) + require.False(t, subject.isTableAnnotation()) + var tests = []struct { + value string + expect int + }{ + {"#group true", linePartTag}, + {"#group false", 0}, + {"false", 0}, + {"unknown", 0}, + } + for _, test := range tests { + t.Run(test.value, func(t *testing.T) { + col := &CsvTableColumn{} + subject.setupColumn(col, test.value) + require.Equal(t, test.expect, col.LinePart) + }) + } +} + +// TestDefaultAnnotation tests #default annotation +func TestDefaultAnnotation(t *testing.T) { + subject := annotation("#default") + require.True(t, subject.matches("#Default")) + require.False(t, subject.isTableAnnotation()) + var tests = []struct { + value string + expect string + }{ + {"#default 1", "1"}, + {"#default ", ""}, + {"whatever", "whatever"}, + } + for _, test := range tests { + t.Run(test.value, func(t *testing.T) { + col := &CsvTableColumn{} + subject.setupColumn(col, test.value) + require.Equal(t, test.expect, col.DefaultValue) + }) + } +} + +// TestDatatypeAnnotation tests #datatype annotation +func TestDatatypeAnnotation(t *testing.T) { + subject := annotation("#datatype") + require.True(t, subject.matches("#dataType")) + require.False(t, subject.isTableAnnotation()) + var tests = []struct { + value string + expectType string + expectFormat string + expectLinePart int + }{ + {"#datatype long", "long", "", 0}, + {"#datatype ", "", "", 0}, + {"#datatype measurement", "_", "", linePartMeasurement}, + {"#datatype tag", "_", "", linePartTag}, + {"#datatype field", "", "", linePartField}, + {"dateTime", "dateTime", "", linePartTime}, + {"dateTime:RFC3339", "dateTime", "RFC3339", linePartTime}, + {"#datatype dateTime:RFC3339", "dateTime", "RFC3339", linePartTime}, + {"whatever:format", "whatever", "format", 0}, + } + for _, test := range tests { + t.Run(test.value, func(t *testing.T) { + col := &CsvTableColumn{} + subject.setupColumn(col, test.value) + if test.expectType != "_" { + require.Equal(t, test.expectType, col.DataType) + } + require.Equal(t, test.expectFormat, col.DataFormat) + }) + } +} + +// TestConstantAnnotation tests #constant annotation +func TestConstantAnnotation(t *testing.T) { + subject := annotation("#constant") + require.True(t, subject.matches("#Constant")) + require.True(t, subject.isTableAnnotation()) + var tests = []struct { + value []string + expectLabel string + expectDefault string + expectLinePart int + }{ + {[]string{"#constant "}, "", "", 0}, // means literally nothing + {[]string{"#constant measurement", "a"}, "_", "a", linePartMeasurement}, + {[]string{"#constant measurement", "a", "b"}, "_", "b", linePartMeasurement}, + {[]string{"#constant measurement", "a", ""}, "_", "a", linePartMeasurement}, + {[]string{"#constant tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag}, + {[]string{"#constant", "tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag}, + {[]string{"#constant field", "fName", "fVal"}, "fName", "fVal", linePartField}, + {[]string{"#constant", "field", "fName", "fVal"}, "fName", "fVal", linePartField}, + {[]string{"dateTime", "1"}, "_", "1", linePartTime}, + {[]string{"dateTime", "1", "2"}, "_", "2", linePartTime}, + {[]string{"dateTime", "", "2"}, "_", "2", linePartTime}, + {[]string{"dateTime", "3", ""}, "_", "3", linePartTime}, + {[]string{"long", "fN", "fV"}, "fN", "fV", 0}, + } + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + table := &CsvTable{} + subject.setupTable(table, test.value) + require.Equal(t, 1, len(table.extraColumns)) + col := table.extraColumns[0] + require.Equal(t, test.expectLinePart, col.LinePart) + require.Greater(t, 0, col.Index) + if test.expectLabel != "_" { + require.Equal(t, test.expectLabel, col.Label) + } else { + require.NotEqual(t, "", col.Label) + } + require.Equal(t, test.expectDefault, col.DefaultValue) + }) + } +} + +// TestTimeZoneAnnotation tests #timezone annotation +func TestTimeZoneAnnotation(t *testing.T) { + subject := annotation("#timezone") + require.True(t, subject.matches("#timeZone")) + require.True(t, subject.isTableAnnotation()) + var tests = []struct { + value string + err string + }{ + {"#timezone ", ""}, + {"#timezone EST", ""}, + {"#timezone,EST", ""}, + {"#timezone,+0100", ""}, + {"#timezone,whatever", "#timezone annotation"}, + } + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + table := &CsvTable{} + err := subject.setupTable(table, strings.Split(test.value, ",")) + if test.err == "" { + require.Nil(t, err) + require.NotNil(t, table.timeZone != nil) + } else { + require.NotNil(t, err) + require.True(t, strings.Contains(fmt.Sprintf("%v", err), test.err)) + } + }) + } +} + +// TestTimeZoneAnnotation tests parseTimeZone fn +func Test_parseTimeZone(t *testing.T) { + now := time.Now() + _, localOffset := now.Zone() + var tests = []struct { + value string + offset int + }{ + {"local", localOffset}, + {"Local", localOffset}, + {"-0000", 0}, + {"+0000", 0}, + {"-0100", -3600}, + {"+0100", 3600}, + {"+0130", 3600 + 3600/2}, + {"", 0}, + {"-01", -1}, + {"0000", -1}, + {"UTC", 0}, + {"EST", -5 * 3600}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + tz, err := parseTimeZone(test.value) + require.NotEqual(t, tz, err) // both cannot be nil + if err != nil { + require.Nil(t, tz) + // fmt.Println(err) + if test.offset >= 0 { + require.Fail(t, "offset expected") + } + return + } + require.NotNil(t, tz) + testDate := fmt.Sprintf("%d-%02d-%02d", now.Year(), now.Month(), now.Day()) + result, err := time.ParseInLocation("2006-01-02", testDate, tz) + require.Nil(t, err) + _, offset := result.Zone() + require.Equal(t, test.offset, offset) + }) + } +} diff --git a/pkg/csv2lp/csvTable.go b/pkg/csv2lp/csvTable.go new file mode 100644 index 0000000000..8167deb8dd --- /dev/null +++ b/pkg/csv2lp/csvTable.go @@ -0,0 +1,464 @@ +package csv2lp + +import ( + "errors" + "fmt" + "log" + "sort" + "strings" + "time" + "unsafe" +) + +// column labels using in flux CSV result +const ( + labelFieldName = "_field" + labelFieldValue = "_value" + labelTime = "_time" + labelStart = "_start" + labelStop = "_stop" + labelMeasurement = "_measurement" +) + +// types of column with respect to line protocol +const ( + linePartIgnored = iota + 1 // ignored in line protocol + linePartMeasurement + linePartTag + linePartField + linePartTime +) + +// CsvTableColumn represents processing metadata about a csv column +type CsvTableColumn struct { + // label such as "_start", "_stop", "_time" + Label string + // "string", "long", "dateTime" ... + DataType string + // "RFC3339", "2006-01-02" + DataFormat string + // column's line part (0 means not determined yet), see linePart constants + LinePart int + // default value to be used for rows where value is an empty string. + DefaultValue string + // index of this column in the table row, -1 indicates a virtual column + Index int + // TimeZone of dateTime column, applied when parsing dateTime without timeZone in the format + TimeZone *time.Location + // parse function, when set, is used to convert column's string value to interface{} + ParseF func(string) (interface{}, error) + + // escaped label for line protocol + escapedLabel string +} + +// LineLabel returns escaped name of the column so it can be then used as a tag name or field name in line protocol +func (c *CsvTableColumn) LineLabel() string { + if len(c.escapedLabel) > 0 { + return c.escapedLabel + } + return c.Label +} + +// Value returns the value of the column for the supplied row supplied +func (c *CsvTableColumn) Value(row []string) string { + if c.Index < 0 || c.Index >= len(row) { + return c.DefaultValue + } + val := row[c.Index] + if len(val) > 0 { + return val + } + return c.DefaultValue +} + +// CsvColumnError indicates conversion error in a specific column +type CsvColumnError struct { + Column string + Err error +} + +func (e CsvColumnError) Error() string { + return fmt.Sprintf("column '%s': %v", e.Column, e.Err) +} + +// CsvTable contains metadata about columns and a state of the CSV processing +type CsvTable struct { + // table columns that extract value from data row + columns []CsvTableColumn + // partBits is a bitmap that is used to remember that a particular column annotation + // (#group, #datatype and #default) was already processed for the table; + // it is used to detect start of a new table in CSV flux results, a repeated annotation + // is detected and a new CsvTable can be then created + partBits uint8 + // indicates that the table is ready to read table data, which + // is after reading annotation and header rows + readTableData bool + // indicates whether line protocol columns must be re-computed + lpColumnsCached bool + // extra columns are added by table-wide annotations, such as #constant + extraColumns []CsvTableColumn + // true to skip parsing of data type in column name + ignoreDataTypeInColumnName bool + // timeZone of dateTime column(s), applied when parsing dateTime value without a time zone specified + timeZone *time.Location + + /* cached columns are initialized before reading the data rows */ + cachedMeasurement *CsvTableColumn + cachedTime *CsvTableColumn + cachedFieldName *CsvTableColumn + cachedFieldValue *CsvTableColumn + cachedFields []CsvTableColumn + cachedTags []CsvTableColumn +} + +// IgnoreDataTypeInColumnName sets a flag that that can ignores dataType parsing column names. +// When true, column names can then contain '|'. By default, column name can also contain datatype +// and default value when named `name|datatype` or `name|datatype|default`, +// for example `ready|boolean|true` +func (t *CsvTable) IgnoreDataTypeInColumnName(val bool) { + t.ignoreDataTypeInColumnName = val +} + +// DataColumnsInfo returns a string representation of columns that are used to process CSV data +func (t *CsvTable) DataColumnsInfo() string { + if t == nil { + return "" + } + var builder = strings.Builder{} + builder.WriteString(fmt.Sprintf("CsvTable{ dataColumns: %d constantColumns: %d\n", len(t.columns), len(t.extraColumns))) + builder.WriteString(fmt.Sprintf(" measurement: %+v\n", t.cachedMeasurement)) + for _, col := range t.cachedTags { + builder.WriteString(fmt.Sprintf(" tag: %+v\n", col)) + } + for _, col := range t.cachedFields { + builder.WriteString(fmt.Sprintf(" field: %+v\n", col)) + } + builder.WriteString(fmt.Sprintf(" time: %+v\n", t.cachedTime)) + builder.WriteString("}") + + return builder.String() +} + +// NextTable resets the table to a state in which it expects annotations and header rows +func (t *CsvTable) NextTable() { + t.partBits = 0 // no column annotations parsed yet + t.readTableData = false + t.columns = []CsvTableColumn{} + t.extraColumns = []CsvTableColumn{} +} + +// AddRow updates the state of the state of table with a new header, annotation or data row. +// Returns true if the row is a data row. +func (t *CsvTable) AddRow(row []string) bool { + // detect data row or table header row + if len(row[0]) == 0 || row[0][0] != '#' { + if !t.readTableData { + // row must a header row now + t.lpColumnsCached = false // line protocol columns change + if t.partBits == 0 { + // create columns since no column anotations were processed + t.columns = make([]CsvTableColumn, len(row)) + for i := 0; i < len(row); i++ { + t.columns[i].Index = i + } + } + // assign column labels for the header row + for i := 0; i < len(t.columns); i++ { + col := &t.columns[i] + if len(col.Label) == 0 && col.Index < len(row) { + col.Label = row[col.Index] + // assign column data type if possible + if len(col.DataType) == 0 && !t.ignoreDataTypeInColumnName { + if idx := strings.IndexByte(col.Label, '|'); idx != -1 { + setupDataType(col, col.Label[idx+1:]) + col.Label = col.Label[:idx] + } + } + } + } + // header row is read, now expect data rows + t.readTableData = true + return false + } + return true + } + + // process all supported annotations + for i := 0; i < len(supportedAnnotations); i++ { + supportedAnnotation := &supportedAnnotations[i] + if supportedAnnotation.matches(row[0]) { + if len(row[0]) > len(supportedAnnotation.prefix) && row[0][len(supportedAnnotation.prefix)] != ' ' { + continue // ignoring, not a supported annotation + } + t.lpColumnsCached = false // line protocol columns change + if supportedAnnotation.isTableAnnotation() { + // process table-level annotation + if err := supportedAnnotation.setupTable(t, row); err != nil { + log.Println("WARNING: ", err) + } + return false + } + // invariant: !supportedAnnotation.isTableAnnotation() + if t.readTableData { + // any column annotation stops reading of data rows + t.NextTable() + } + // create new columns upon new or repeated column annotation + if t.partBits == 0 || t.partBits&supportedAnnotation.flag == 1 { + t.partBits = supportedAnnotation.flag + t.columns = make([]CsvTableColumn, len(row)) + for i := 0; i < len(row); i++ { + t.columns[i].Index = i + } + } else { + t.partBits = t.partBits | supportedAnnotation.flag + } + // setup columns according to column annotation + for j := 0; j < len(t.columns); j++ { + col := &t.columns[j] + if col.Index >= len(row) { + continue // missing value + } else { + supportedAnnotation.setupColumn(col, row[col.Index]) + } + } + return false + } + } + // warn about unsupported annotation unless a comment row + if !strings.HasPrefix(row[0], "# ") { + log.Println("WARNING: unsupported annotation: ", row[0]) + } + return false +} + +func (t *CsvTable) computeLineProtocolColumns() bool { + if !t.lpColumnsCached { + t.recomputeLineProtocolColumns() + return true + } + return false +} + +func (t *CsvTable) recomputeLineProtocolColumns() { + // reset results + t.cachedMeasurement = nil + t.cachedTime = nil + t.cachedFieldName = nil + t.cachedFieldValue = nil + t.cachedTags = nil + t.cachedFields = nil + + // having a _field column indicates fields without a line type are ignored + defaultIsField := t.Column(labelFieldName) == nil + + columns := append(append([]CsvTableColumn{}, t.columns...), t.extraColumns...) + for i := 0; i < len(columns); i++ { + col := columns[i] + switch { + case col.Label == labelMeasurement || col.LinePart == linePartMeasurement: + t.cachedMeasurement = &col + case col.Label == labelTime || col.LinePart == linePartTime: + if t.cachedTime != nil && t.cachedTime.Label != labelStart && t.cachedTime.Label != labelStop { + log.Printf("WARNING: at most one dateTime column is expected, '%s' column is ignored\n", t.cachedTime.Label) + } + t.cachedTime = &col + case len(strings.TrimSpace(col.Label)) == 0 || col.LinePart == linePartIgnored: + // ignored columns that are marked to be ignored or without a label + case col.Label == labelFieldName: + t.cachedFieldName = &col + case col.Label == labelFieldValue: + t.cachedFieldValue = &col + case col.LinePart == linePartTag: + col.escapedLabel = escapeTag(col.Label) + t.cachedTags = append(t.cachedTags, col) + case col.LinePart == linePartField: + col.escapedLabel = escapeTag(col.Label) + t.cachedFields = append(t.cachedFields, col) + default: + if defaultIsField { + col.escapedLabel = escapeTag(col.Label) + t.cachedFields = append(t.cachedFields, col) + } + } + } + // line protocol requires sorted tags + if t.cachedTags != nil && len(t.cachedTags) > 0 { + sort.Slice(t.cachedTags, func(i, j int) bool { + return t.cachedTags[i].Label < t.cachedTags[j].Label + }) + } + // setup timezone for timestamp column + if t.cachedTime != nil && t.cachedTime.TimeZone == nil { + t.cachedTime.TimeZone = t.timeZone + } + + t.lpColumnsCached = true // line protocol columns are now fresh +} + +// CreateLine produces a protocol line out of the supplied row or returns error +func (t *CsvTable) CreateLine(row []string) (line string, err error) { + buffer := make([]byte, 100)[:0] + buffer, err = t.AppendLine(buffer, row) + if err != nil { + return "", err + } + return *(*string)(unsafe.Pointer(&buffer)), nil +} + +// AppendLine appends a protocol line to the supplied buffer and returns appended buffer or an error if any +func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) { + if t.computeLineProtocolColumns() { + // validate column data types + if t.cachedFieldValue != nil && !IsTypeSupported(t.cachedFieldValue.DataType) { + return buffer, CsvColumnError{ + t.cachedFieldValue.Label, + fmt.Errorf("data type '%s' is not supported", t.cachedFieldValue.DataType), + } + } + for _, c := range t.cachedFields { + if !IsTypeSupported(c.DataType) { + return buffer, CsvColumnError{ + c.Label, + fmt.Errorf("data type '%s' is not supported", c.DataType), + } + } + } + } + + if t.cachedMeasurement == nil { + return buffer, errors.New("no measurement column found") + } + measurement := t.cachedMeasurement.Value(row) + if measurement == "" { + return buffer, CsvColumnError{ + t.cachedMeasurement.Label, + errors.New("no measurement supplied"), + } + } + buffer = append(buffer, escapeMeasurement(measurement)...) + for _, tag := range t.cachedTags { + value := tag.Value(row) + if tag.Index < len(row) && len(value) > 0 { + buffer = append(buffer, ',') + buffer = append(buffer, tag.LineLabel()...) + buffer = append(buffer, '=') + buffer = append(buffer, escapeTag(value)...) + } + } + buffer = append(buffer, ' ') + fieldAdded := false + if t.cachedFieldName != nil && t.cachedFieldValue != nil { + field := t.cachedFieldName.Value(row) + value := t.cachedFieldValue.Value(row) + if len(value) > 0 && len(field) > 0 { + buffer = append(buffer, escapeTag(field)...) + buffer = append(buffer, '=') + var err error + buffer, err = appendConverted(buffer, value, t.cachedFieldValue) + if err != nil { + return buffer, CsvColumnError{ + t.cachedFieldName.Label, + err, + } + } + fieldAdded = true + } + } + for _, field := range t.cachedFields { + value := field.Value(row) + if len(value) > 0 { + if !fieldAdded { + fieldAdded = true + } else { + buffer = append(buffer, ',') + } + buffer = append(buffer, field.LineLabel()...) + buffer = append(buffer, '=') + var err error + buffer, err = appendConverted(buffer, value, &field) + if err != nil { + return buffer, CsvColumnError{ + field.Label, + err, + } + } + } + } + if !fieldAdded { + return buffer, errors.New("no field data found") + } + + if t.cachedTime != nil && t.cachedTime.Index < len(row) { + timeVal := t.cachedTime.Value(row) + if len(timeVal) > 0 { + if len(t.cachedTime.DataType) == 0 { + // assume dateTime data type (number or RFC3339) + t.cachedTime.DataType = dateTimeDatatype + t.cachedTime.DataFormat = "" + } + buffer = append(buffer, ' ') + var err error + buffer, err = appendConverted(buffer, timeVal, t.cachedTime) + if err != nil { + return buffer, CsvColumnError{ + t.cachedTime.Label, + err, + } + } + } + } + return buffer, nil +} + +// Column returns the first column of the supplied label or nil +func (t *CsvTable) Column(label string) *CsvTableColumn { + for i := 0; i < len(t.columns); i++ { + if t.columns[i].Label == label { + return &t.columns[i] + } + } + return nil +} + +// Columns returns available columns +func (t *CsvTable) Columns() []CsvTableColumn { + return t.columns +} + +// Measurement returns measurement column or nil +func (t *CsvTable) Measurement() *CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedMeasurement +} + +// Time returns time column or nil +func (t *CsvTable) Time() *CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedTime +} + +// FieldName returns field name column or nil +func (t *CsvTable) FieldName() *CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedFieldName +} + +// FieldValue returns field value column or nil +func (t *CsvTable) FieldValue() *CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedFieldValue +} + +// Tags returns tags +func (t *CsvTable) Tags() []CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedTags +} + +// Fields returns fields +func (t *CsvTable) Fields() []CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedFields +} diff --git a/pkg/csv2lp/csvTable_test.go b/pkg/csv2lp/csvTable_test.go new file mode 100644 index 0000000000..5fe6ed04fc --- /dev/null +++ b/pkg/csv2lp/csvTable_test.go @@ -0,0 +1,477 @@ +package csv2lp + +import ( + "encoding/csv" + "io" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func readCsv(t *testing.T, data string) [][]string { + reader := csv.NewReader(strings.NewReader(data)) + var rows [][]string + for { + row, err := reader.Read() + reader.FieldsPerRecord = 0 // every row can have different number of fields + if err == io.EOF { + break + } + if err != nil { + t.Log("row: ", row) + t.Log(err) + t.Fail() + } + rows = append(rows, row) + } + return rows +} + +// TestQueryResult validates construction of table columns from Query CSV result +func TestQueryResult(t *testing.T) { + const csvQueryResult = ` +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod + +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod +#unre` + + var lineProtocolQueryResult = []string{ + "cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000", + "cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000", + "cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000", + "cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000", + } + + table := CsvTable{} + rows := readCsv(t, csvQueryResult) + lineProtocolIndex := 0 + for i, row := range rows { + rowProcessed := table.AddRow(row) + if i%6 < 4 { + require.Equal(t, rowProcessed, false, "row %d", i) + } else { + require.Equal(t, rowProcessed, true, "row %d", i) + line, _ := table.CreateLine(row) + require.Equal(t, lineProtocolQueryResult[lineProtocolIndex], line) + lineProtocolIndex++ + if i%6 == 4 { + // verify table + require.GreaterOrEqual(t, len(table.columns), 10) + require.Equal(t, table.columns, table.Columns()) + for j, col := range table.columns { + if j > 0 { + require.Equal(t, col.Index, j) + require.Equal(t, col.Label, rows[i-1][j]) + if len(rows[i-2]) > j { + require.Equal(t, col.DefaultValue, rows[i-2][j]) + } else { + // some traling data are missing + require.Equal(t, col.DefaultValue, "") + } + types := strings.Split(rows[i-3][j], ":") + require.Equal(t, types[0], col.DataType, "row %d, col %d", i-3, j) + if len(types) > 1 { + require.Equal(t, types[1], col.DataFormat, "row %d, col %d", i-3, j) + } + } + } + // verify cached values + table.computeLineProtocolColumns() + require.Equal(t, table.Column("_measurement"), table.cachedMeasurement) + require.Nil(t, table.Column("_no")) + require.NotNil(t, table.cachedMeasurement) + require.NotNil(t, table.cachedFieldName) + require.NotNil(t, table.cachedFieldValue) + require.NotNil(t, table.cachedTime) + require.NotNil(t, table.cachedTags) + require.Equal(t, table.Measurement().Label, "_measurement") + require.Equal(t, table.FieldName().Label, "_field") + require.Equal(t, table.FieldValue().Label, "_value") + require.Equal(t, table.Time().Label, "_time") + require.Equal(t, len(table.Tags()), 2) + require.Equal(t, table.Tags()[0].Label, "cpu") + require.Equal(t, table.Tags()[1].Label, "host") + require.Equal(t, len(table.Fields()), 0) + } + } + } +} + +//Test_ignoreLeadingComment +func Test_ignoreLeadingComment(t *testing.T) { + var tests = []struct { + value string + expect string + }{ + {"", ""}, + {"a", "a"}, + {" #whatever", " #whatever"}, + {"#whatever", ""}, + {"#whatever ", ""}, + {"#whatever a b ", "a b "}, + {"#whatever a b ", "a b "}, + } + for _, test := range tests { + t.Run(test.value, func(t *testing.T) { + require.Equal(t, test.expect, ignoreLeadingComment(test.value)) + }) + } + +} + +// TestCsvData checks data that are writen in an annotated CSV file +func TestCsvData(t *testing.T) { + var tests = []struct { + name string + csv string + line string + }{ + { + "simple1", + "_measurement,a,b\ncpu,1,1", + "cpu a=1,b=1", + }, + { + "simple1b", + "_measurement,,a,b\ncpu,whatever,1,1", + "cpu a=1,b=1", + }, + { + "simple2", + "_measurement\ncpu,1,1", + "", // no fields present + }, + { + "simple3", + "_time\n1,1", + "", // no measurement present + }, + { + "annotated1", + "#datatype measurement,,\nmeasurement,a,b\ncpu,1,2", + "cpu a=1,b=2", + }, + { + "annotated2", + "#datatype measurement,tag,field\nmeasurement,a,b\ncpu,1,2", + "cpu,a=1 b=2", + }, + { + "annotated3", + "#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2,3", + "cpu,a=1 time=3 2", + }, + { + "annotated3_detectedTime1", + "#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2020-01-10T10:10:10Z,3", + "cpu,a=1 time=3 1578651010000000000", + }, + { + "annotated3_detectedTime2", + "#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2020-01-10T10:10:10.0Z,3", + "cpu,a=1 time=3 1578651010000000000", + }, + { + "annotated4", + "#datatype measurement,tag,ignore,field\nmeasurement,a,b,time\ncpu,1,2,3", + "cpu,a=1 time=3", + }, + { + "annotated5", + "#datatype measurement,tag,ignore,field\nmeasurement,a,b,time\ncpu,1,2,3", + "cpu,a=1 time=3", + }, + { + "annotated6", + "#datatype measurement,tag,ignore,field\n" + + "#datatypea tag,tag,\n" + // this must be ignored since it not a supported annotation + "measurement,a,b,time\ncpu,1,2,3", + "cpu,a=1 time=3", + }, + { + "annotated7", + "#datatype measurement,dateTime,\nmeasurement,a,b\ncpu,2020-01-10T10:10:10.0Z,2", + "cpu b=2 1578651010000000000", + }, + { + "annotated8", + "#datatype measurement,,,field\nmeasurement,_field,_value,other\ncpu,a,1,2", + "cpu a=1,other=2", + }, + { + "annotated9_sortedTags", + "#datatype measurement,tag,tag,time,field\nmeasurement,b,a,c,time\ncpu,1,2,3,4", + "cpu,a=2,b=1 time=4 3", + }, + { + "allFieldTypes", + "#datatype measurement,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime\n" + + "m,s,d,b,l,ul,dur,by,d1,d2,time\n" + + `cpu,"str",1.0,true,1,1,1ms,YWFh,1`, + "cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1", + }, + { + "allFieldTypes", + "#datatype measurement,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime\n" + + "m,s,d,b,l,ul,dur,by,d1,d2,time\n" + + `cpu,"str",1.0,true,1,1,1ms,YWFh,1`, + "cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1", + }, + { + "allFieldTypes_ignoreAdditionalDateTimes", + "#datatype ,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime:RFC3339,dateTime:RFC3339Nano,\n" + + "_measurement,s,d,b,l,ul,dur,by,d1,d2,_time\n" + + `cpu,"str",1.0,true,1,1,1ms,YWFh,2020-01-10T10:10:10Z,2020-01-10T10:10:10Z,1`, + "cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1", + }, + { + "allExtraDataTypes", + "#datatype measurement,tag,field,ignored,dateTime\n" + + "m,t,f,i,dt\n" + + `cpu,myTag,0,myIgnored,1`, + "cpu,t=myTag f=0 1", + }, + { + "allTypes_escaped", + "#datatype ,string,string,,,,\n" + + `_measurement,s1,s2,"a,","b ",c=` + "\n" + + `"cpu, ","""",\,a,b,c`, + `cpu\,\ s1="\"",s2="\\",a\,=a,b\ =b,c\==c`, + }, + { + "default_values", + "#default cpu,yes,0,1\n#datatype ,tag,,\n_measurement,test,col1,_time\n,,,", + "cpu,test=yes col1=0 1", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + var lines []string + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + line, err := table.CreateLine(row) + if err != nil && test.line != "" { + require.Nil(t, err.Error()) + } + lines = append(lines, line) + } + } + require.Equal(t, []string{test.line}, lines) + }) + } +} + +func TestConstantAnnotations(t *testing.T) { + var tests = []struct { + name string + csv string + line string + }{ + { + "measurement_1", + "#constant measurement,cpu\n" + + "a,b\n" + + "1,1", + "cpu a=1,b=1", + }, + { + "measurement_2", + "#constant,measurement,,cpu\n" + + "#constant,tag,cpu,cpu1\n" + + "#constant,long,of,0\n" + + "#constant,dateTime,,2\n" + + "a,b\n" + + "1,1", + "cpu,cpu=cpu1 a=1,b=1,of=0i 2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + var lines []string + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + line, err := table.CreateLine(row) + if err != nil && test.line != "" { + require.Nil(t, err.Error()) + } + lines = append(lines, line) + } + } + require.Equal(t, []string{test.line}, lines) + }) + } +} + +func TestDataTypeInColumnName(t *testing.T) { + var tests = []struct { + csv string + line string + ignoreDataTypeInColumnName bool + }{ + { + "m|measurement,b|boolean:x:,c|boolean:x:|x\n" + + "cpu,,", + `cpu c=true`, + false, + }, + { + "m|measurement,a|boolean,b|boolean:0:1,c|boolean:x:,d|boolean:x:\n" + + "cpu,1,1,x,y", + `cpu a=true,b=false,c=true,d=false`, + false, + }, + { + "#constant measurement,cpu\n" + + "a|long,b|string\n" + + "1,1", + `cpu a=1i,b="1"`, + false, + }, + { + "#constant measurement,cpu\n" + + "a|long,b|string\n" + + "1,1", + `cpu a|long=1,b|string=1`, + true, + }, + { + "#constant measurement,cpu\n" + + "#datatype long,string\n" + + "a|long,b|string\n" + + "1,1", + `cpu a|long=1i,b|string="1"`, + true, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + table.IgnoreDataTypeInColumnName(test.ignoreDataTypeInColumnName) + var lines []string + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + line, err := table.CreateLine(row) + if err != nil && test.line != "" { + require.Nil(t, err.Error()) + } + lines = append(lines, line) + } + } + require.Equal(t, []string{test.line}, lines) + }) + } +} + +// TestCsvData_dataErrors validates table data errors +func TestCsvData_dataErrors(t *testing.T) { + var tests = []struct { + name string + csv string + }{ + { + "error_1_is_not_dateTime:RFC3339", + "#datatype measurement,,\n#datatype ,dateTime:RFC3339,\nmeasurement,a,b\ncpu,1,2", + }, + { + "error_a_fieldValue_is_not_long", + "#datatype measurement,,\n#datatype ,long,\nmeasurement,_value,_field\ncpu,a,count", + }, + { + "error_a_is_not_long", + "#datatype measurement,,\n#datatype ,long,\nmeasurement,a,b\ncpu,a,2", + }, + { + "error_time_is_not_time", + "#datatype measurement,tag,time,field\nmeasurement,a,b,time\ncpu,1,2020-10,3", + }, + { + "error_no_measurement", + "#datatype ,\ncol1,col2\n1,2", + }, + { + "error_unsupportedFieldDataType", + "#datatype ,whatever\n_measurement,col2\na,2", + }, + { + "error_unsupportedFieldValueDataType", + "#datatype ,,whatever\n_measurement,_field,_value\na,1,2", + }, + { + "error_no_measurement_data", + "_measurement,col1\n,2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + var errors []error + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + _, err := table.CreateLine(row) + if err != nil { + errors = append(errors, err) + } + } + } + require.Equal(t, 1, len(errors)) + // fmt.Println(errors[0]) + require.NotNil(t, errors[0].Error()) + // LineLabel is the same as Label in all test columns + for _, col := range table.Columns() { + require.Equal(t, col.Label, col.LineLabel()) + } + }) + } +} + +func TestCsvTable_ColumnInfo(t *testing.T) { + data := "#constant,measurement,cpu\n" + + "#constant,tag,xpu,xpu1\n" + + "#constant,tag,cpu,cpu1\n" + + "#constant,long,of,100\n" + + "#constant,dateTime,2\n" + + "x,y\n" + table := CsvTable{} + for _, row := range readCsv(t, data) { + require.False(t, table.AddRow(row)) + } + table.computeLineProtocolColumns() + columnInfo := "CsvTable{ dataColumns: 2 constantColumns: 5\n" + + " measurement: &{Label:#constant measurement DataType:measurement DataFormat: LinePart:2 DefaultValue:cpu Index:-1 TimeZone:UTC ParseF: escapedLabel:}\n" + + " tag: {Label:cpu DataType:tag DataFormat: LinePart:3 DefaultValue:cpu1 Index:-1 TimeZone:UTC ParseF: escapedLabel:cpu}\n" + + " tag: {Label:xpu DataType:tag DataFormat: LinePart:3 DefaultValue:xpu1 Index:-1 TimeZone:UTC ParseF: escapedLabel:xpu}\n" + + " field: {Label:x DataType: DataFormat: LinePart:0 DefaultValue: Index:0 TimeZone:UTC ParseF: escapedLabel:x}\n" + + " field: {Label:y DataType: DataFormat: LinePart:0 DefaultValue: Index:1 TimeZone:UTC ParseF: escapedLabel:y}\n" + + " field: {Label:of DataType:long DataFormat: LinePart:0 DefaultValue:100 Index:-1 TimeZone:UTC ParseF: escapedLabel:of}\n" + + " time: &{Label:#constant dateTime DataType:dateTime DataFormat: LinePart:5 DefaultValue:2 Index:-1 TimeZone:UTC ParseF: escapedLabel:}" + + "\n}" + require.Equal(t, columnInfo, table.DataColumnsInfo()) + var table2 *CsvTable + require.Equal(t, "", table2.DataColumnsInfo()) +} diff --git a/pkg/csv2lp/csvToProtocolLines.go b/pkg/csv2lp/csvToProtocolLines.go new file mode 100644 index 0000000000..428c5cd8a9 --- /dev/null +++ b/pkg/csv2lp/csvToProtocolLines.go @@ -0,0 +1,132 @@ +// Package csv2lp transforms CSV data to influxDB line protocol +package csv2lp + +import ( + "encoding/csv" + "fmt" + "io" + "log" + "strings" +) + +// CsvLineError is returned for csv conversion errors +type CsvLineError struct { + // 1 is the first line + Line int + Err error +} + +func (e CsvLineError) Error() string { + return fmt.Sprintf("line %d: %v", e.Line, e.Err) +} + +// CsvToLineReader represents state of transformation from csv data to lien protocol reader +type CsvToLineReader struct { + // csv reading + csv *csv.Reader + // Table collects information about used columns + Table CsvTable + // LineNumber represents line number of csv.Reader, 1 is the first + LineNumber int + // when true, log table data columns before reading data rows + logTableDataColumns bool + // state variable that indicates whether any data row was read + dataRowAdded bool + // log CSV data errors to sterr and continue with CSV processing + skipRowOnError bool + + // reader results + buffer []byte + lineBuffer []byte + index int + finished error +} + +// LogTableColumns turns on/off logging of table data columns before reading data rows +func (state *CsvToLineReader) LogTableColumns(val bool) *CsvToLineReader { + state.logTableDataColumns = val + return state +} + +// SkipRowOnError controls whether to fail on every CSV conversion error (false) or to log the error and continue (true) +func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader { + state.skipRowOnError = val + return state +} + +func (state *CsvToLineReader) Read(p []byte) (n int, err error) { + // state1: finished + if state.finished != nil { + return 0, state.finished + } + // state2: some data are in the buffer to copy + if len(state.buffer) > state.index { + // we have remaining bytes to copy + if len(state.buffer)-state.index > len(p) { + // copy a part of the buffer + copy(p, state.buffer[state.index:state.index+len(p)]) + state.index += len(p) + return len(p), nil + } + // copy the entire buffer + n = len(state.buffer) - state.index + copy(p[:n], state.buffer[state.index:]) + state.buffer = state.buffer[:0] + state.index = 0 + return n, nil + } + // state3: fill buffer with data to read from + for { + // Read each record from csv + state.LineNumber++ + row, err := state.csv.Read() + if parseError, ok := err.(*csv.ParseError); ok && parseError.Err == csv.ErrFieldCount { + // every row can have different number of columns + err = nil + } + + if err != nil { + state.finished = err + return state.Read(p) + } + if state.LineNumber == 1 && len(row) == 1 && strings.HasPrefix(row[0], "sep=") && len(row[0]) > 4 { + // separator specified in the first line + state.csv.Comma = rune(row[0][4]) + continue + } + if state.Table.AddRow(row) { + var err error + state.lineBuffer = state.lineBuffer[:0] // reuse line buffer + state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row) + if !state.dataRowAdded && state.logTableDataColumns { + log.Println(state.Table.DataColumnsInfo()) + } + state.dataRowAdded = true + if err != nil { + lineError := CsvLineError{state.LineNumber, err} + if state.skipRowOnError { + log.Println(lineError) + continue + } + state.finished = lineError + return state.Read(p) + } + + state.buffer = append(state.buffer, state.lineBuffer...) + state.buffer = append(state.buffer, '\n') + break + } else { + state.dataRowAdded = false + } + } + return state.Read(p) +} + +// CsvToProtocolLines transforms csv data into line protocol data +func CsvToProtocolLines(reader io.Reader) *CsvToLineReader { + csv := csv.NewReader(reader) + csv.ReuseRecord = true + return &CsvToLineReader{ + csv: csv, + } +} diff --git a/pkg/csv2lp/csvToProtocolLines_test.go b/pkg/csv2lp/csvToProtocolLines_test.go new file mode 100644 index 0000000000..3e0a2d1054 --- /dev/null +++ b/pkg/csv2lp/csvToProtocolLines_test.go @@ -0,0 +1,225 @@ +package csv2lp + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "log" + "os" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestCsvData tests conversion annotated CSV file to line protocol +func Test_CsvToProtocolLines(t *testing.T) { + var tests = []struct { + name string + csv string + lines string + err string + }{ + { + "simple1", + "_measurement,a,b\ncpu,1,1\ncpu,b2\n", + "cpu a=1,b=1\ncpu a=b2\n", + "", + }, + { + "simple1_withSep", + "sep=;\n_measurement;a;b\ncpu;1;1\ncpu;b2\n", + "cpu a=1,b=1\ncpu a=b2\n", + "", + }, + { + "simple2", + "_measurement,a,b\ncpu,1,1\ncpu,\n", + "", + "no field data", + }, + { + "simple3", + "_measurement,a,_time\ncpu,1,1\ncpu,2,invalidTime\n", + "", + "_time", // error in _time column + }, + { + "constant_annotations", + "#constant,measurement,,cpu\n" + + "#constant,tag,xpu,xpu1\n" + + "#constant,tag,cpu,cpu1\n" + + "#constant,long,of,100\n" + + "#constant,dateTime,,2\n" + + "x,y\n" + + "1,2\n" + + "3,4\n", + "cpu,cpu=cpu1,xpu=xpu1 x=1,y=2,of=100i 2\n" + + "cpu,cpu=cpu1,xpu=xpu1 x=3,y=4,of=100i 2\n", + "", // no error + }, + { + "timezone_annotation-0100", + "#timezone,-0100\n" + + "#constant,measurement,cpu\n" + + "#constant,dateTime:2006-01-02,1970-01-01\n" + + "x,y\n" + + "1,2\n", + "cpu x=1,y=2 3600000000000\n", + "", // no error + }, + { + "timezone_annotation_EST", + "#timezone,EST\n" + + "#constant,measurement,cpu\n" + + "#constant,dateTime:2006-01-02,1970-01-01\n" + + "x,y\n" + + "1,2\n", + "cpu x=1,y=2 18000000000000\n", + "", // no error + }, + } + bufferSizes := []int{40, 7, 3, 1} + + for _, test := range tests { + for _, bufferSize := range bufferSizes { + t.Run(test.name+"_"+strconv.Itoa(bufferSize), func(t *testing.T) { + reader := CsvToProtocolLines(strings.NewReader(test.csv)) + buffer := make([]byte, bufferSize) + lines := make([]byte, 0, 100) + for { + n, err := reader.Read(buffer) + if err != nil { + if err == io.EOF { + break + } + if test.err != "" { + // fmt.Println(err) + if err := err.Error(); !strings.Contains(err, test.err) { + require.Equal(t, err, test.err) + } + return + } + require.Nil(t, err.Error()) + break + } + lines = append(lines, buffer[:n]...) + } + if test.err == "" { + require.Equal(t, test.lines, string(lines)) + } else { + require.Fail(t, "error message with '"+test.err+"' expected") + } + }) + } + } +} + +// TestCsvData_LogTableColumns checks correct logging +func TestCsvData_LogTableColumns(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + csv := "_measurement,a,b\ncpu,1,1\ncpu,b2\n" + + reader := CsvToProtocolLines(strings.NewReader(csv)).LogTableColumns(true) + require.False(t, reader.skipRowOnError) + require.True(t, reader.logTableDataColumns) + // read all the data + ioutil.ReadAll(reader) + + out := buf.String() + // fmt.Println(out) + messages := strings.Count(out, prefix) + require.Equal(t, messages, 1) +} + +// TestCsvData_LogTableColumns checks correct logging +func TestCsvData_LogTimeZoneWarning(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + csv := "#timezone 1\n" + + "#constant,dateTime:2006-01-02,1970-01-01\n" + + "_measurement,a,b\ncpu,1,1" + + reader := CsvToProtocolLines(strings.NewReader(csv)) + bytes, _ := ioutil.ReadAll(reader) + + out := buf.String() + // fmt.Println(out) + messages := strings.Count(out, prefix) + require.Equal(t, messages, 1) + require.Equal(t, string(bytes), "cpu a=1,b=1 0\n") +} + +// TestCsvData_LogCsvErrors checks correct logging +func TestCsvData_SkipRowOnError(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + csv := "_measurement,a,_time\n,1,1\ncpu,2,2\ncpu,3,3a\n" + + reader := CsvToProtocolLines(strings.NewReader(csv)).SkipRowOnError(true) + require.Equal(t, reader.skipRowOnError, true) + require.Equal(t, reader.logTableDataColumns, false) + // read all the data + ioutil.ReadAll(reader) + + out := buf.String() + // fmt.Println(out) + messages := strings.Count(out, prefix) + require.Equal(t, messages, 2) +} + +// Test_CsvLineError checks formating of line errors +func Test_CsvLineError(t *testing.T) { + var tests = []struct { + err CsvLineError + value string + }{ + { + CsvLineError{Line: 1, Err: errors.New("cause")}, + "line 1: cause", + }, + { + CsvLineError{Line: 2, Err: CsvColumnError{"a", errors.New("cause")}}, + "line 2: column 'a': cause", + }, + } + for _, test := range tests { + require.Equal(t, test.value, test.err.Error()) + } +} diff --git a/pkg/csv2lp/dataConversion.go b/pkg/csv2lp/dataConversion.go new file mode 100644 index 0000000000..1abce537ec --- /dev/null +++ b/pkg/csv2lp/dataConversion.go @@ -0,0 +1,300 @@ +package csv2lp + +import ( + "encoding/base64" + "errors" + "fmt" + "io" + "math" + "strconv" + "strings" + "time" + + "golang.org/x/text/encoding/ianaindex" +) + +// see https://v2.docs.influxdata.com/v2.0/reference/syntax/annotated-csv/#valid-data-types +const ( + stringDatatype = "string" + doubleDatatype = "double" + boolDatatype = "boolean" + longDatatype = "long" + uLongDatatype = "unsignedLong" + durationDatatype = "duration" + base64BinaryDataType = "base64Binary" + dateTimeDatatype = "dateTime" + dateTimeDataFormatRFC3339 = "RFC3339" + dateTimeDataFormatRFC3339Nano = "RFC3339Nano" + dateTimeDataFormatNumber = "number" //the same as long, but serialized without i suffix, used for timestamps +) + +var supportedDataTypes map[string]struct{} + +func init() { + supportedDataTypes = make(map[string]struct{}, 9) + supportedDataTypes[stringDatatype] = struct{}{} + supportedDataTypes[doubleDatatype] = struct{}{} + supportedDataTypes[boolDatatype] = struct{}{} + supportedDataTypes[longDatatype] = struct{}{} + supportedDataTypes[uLongDatatype] = struct{}{} + supportedDataTypes[durationDatatype] = struct{}{} + supportedDataTypes[base64BinaryDataType] = struct{}{} + supportedDataTypes[dateTimeDatatype] = struct{}{} + supportedDataTypes[""] = struct{}{} +} + +// IsTypeSupported returns true if the data type is supported +func IsTypeSupported(dataType string) bool { + _, supported := supportedDataTypes[dataType] + return supported +} + +var replaceMeasurement *strings.Replacer = strings.NewReplacer(",", "\\,", " ", "\\ ") +var replaceTag *strings.Replacer = strings.NewReplacer(",", "\\,", " ", "\\ ", "=", "\\=") +var replaceQuoted *strings.Replacer = strings.NewReplacer("\"", "\\\"", "\\", "\\\\") + +func escapeMeasurement(val string) string { + for i := 0; i < len(val); i++ { + if val[i] == ',' || val[i] == ' ' { + return replaceMeasurement.Replace(val) + } + } + return val +} +func escapeTag(val string) string { + for i := 0; i < len(val); i++ { + if val[i] == ',' || val[i] == ' ' || val[i] == '=' { + return replaceTag.Replace(val) + } + } + return val +} +func escapeString(val string) string { + for i := 0; i < len(val); i++ { + if val[i] == '"' || val[i] == '\\' { + return replaceQuoted.Replace(val) + } + } + return val +} + +// normalizeNumberString normalizes the supplied value with the help of the format supplied. +// This normalization is intended to convert number strings of different locales to a strconv-parseable value. +// +// The format's first character is a fraction delimiter character. Next characters in the format +// are simply removed, they are typically used to visually separate groups in large numbers. +// The removeFaction parameter controls whether the returned value can contain also the fraction part. +// +// For example, to get a strconv-parseable float from a Spanish value '3.494.826.157,123', use format ",." . +func normalizeNumberString(value string, format string, removeFraction bool) string { + if len(format) == 0 { + format = ". \n\t\r_" + } + if strings.ContainsAny(value, format) { + formatRunes := []rune(format) + fractionRune := formatRunes[0] + ignored := formatRunes[1:] + retVal := strings.Builder{} + retVal.Grow(len(value)) + ForAllCharacters: + for _, c := range value { + // skip ignored characters + for i := 0; i < len(ignored); i++ { + if c == ignored[i] { + continue ForAllCharacters + } + } + if c == fractionRune { + if removeFraction { + break ForAllCharacters + } + retVal.WriteByte('.') + } else { + retVal.WriteRune(c) + } + } + + return retVal.String() + } + return value +} + +func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) { + dataType := column.DataType + dataFormat := column.DataFormat + if column.ParseF != nil { + return column.ParseF(val) + } + switch dataType { + case stringDatatype: + return val, nil + case dateTimeDatatype: + switch dataFormat { + case "": // number or time.RFC3339 + t, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return time.Parse(time.RFC3339, val) + } + return time.Unix(0, t).UTC(), nil + case dateTimeDataFormatRFC3339: + return time.Parse(time.RFC3339, val) + case dateTimeDataFormatRFC3339Nano: + return time.Parse(time.RFC3339Nano, val) + case dateTimeDataFormatNumber: + t, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, err + } + return time.Unix(0, t).UTC(), nil + default: + if column.TimeZone != nil { + return time.ParseInLocation(dataFormat, val, column.TimeZone) + } + return time.Parse(dataFormat, val) + } + case durationDatatype: + return time.ParseDuration(val) + case doubleDatatype: + return strconv.ParseFloat(normalizeNumberString(val, dataFormat, false), 64) + case boolDatatype: + switch { + case len(val) == 0: + return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'") + case val[0] == 't' || val[0] == 'T' || val[0] == 'y' || val[0] == 'Y' || val[0] == '1': + return true, nil + case val[0] == 'f' || val[0] == 'F' || val[0] == 'n' || val[0] == 'N' || val[0] == '0': + return false, nil + default: + return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'") + } + case longDatatype: + return strconv.ParseInt(normalizeNumberString(val, dataFormat, true), 10, 64) + case uLongDatatype: + return strconv.ParseUint(normalizeNumberString(val, dataFormat, true), 10, 64) + case base64BinaryDataType: + return base64.StdEncoding.DecodeString(val) + default: + return nil, fmt.Errorf("unsupported data type '%s'", dataType) + } +} + +func appendProtocolValue(buffer []byte, value interface{}) ([]byte, error) { + switch v := value.(type) { + case uint64: + return append(strconv.AppendUint(buffer, v, 10), 'u'), nil + case int64: + return append(strconv.AppendInt(buffer, v, 10), 'i'), nil + case int: + return append(strconv.AppendInt(buffer, int64(v), 10), 'i'), nil + case float64: + if math.IsNaN(v) { + return buffer, errors.New("value is NaN") + } + if math.IsInf(v, 0) { + return buffer, errors.New("value is Infinite") + } + return strconv.AppendFloat(buffer, v, 'f', -1, 64), nil + case float32: + v32 := float64(v) + if math.IsNaN(v32) { + return buffer, errors.New("value is NaN") + } + if math.IsInf(v32, 0) { + return buffer, errors.New("value is Infinite") + } + return strconv.AppendFloat(buffer, v32, 'f', -1, 64), nil + case string: + buffer = append(buffer, '"') + buffer = append(buffer, escapeString(v)...) + buffer = append(buffer, '"') + return buffer, nil + case []byte: + buf := make([]byte, base64.StdEncoding.EncodedLen(len(v))) + base64.StdEncoding.Encode(buf, v) + return append(buffer, buf...), nil + case bool: + if v { + return append(buffer, "true"...), nil + } + return append(buffer, "false"...), nil + case time.Time: + return strconv.AppendInt(buffer, v.UnixNano(), 10), nil + case time.Duration: + return append(strconv.AppendInt(buffer, v.Nanoseconds(), 10), 'i'), nil + default: + return buffer, fmt.Errorf("unsupported value type: %T", v) + } +} + +func appendConverted(buffer []byte, val string, column *CsvTableColumn) ([]byte, error) { + if len(column.DataType) == 0 { // keep the value as it is + return append(buffer, val...), nil + } + typedVal, err := toTypedValue(val, column) + if err != nil { + return buffer, err + } + return appendProtocolValue(buffer, typedVal) +} + +func decodeNop(reader io.Reader) io.Reader { + return reader +} + +// CreateDecoder creates a decoding reader from the supplied encoding to UTF-8, or returns an error +func CreateDecoder(encoding string) (func(io.Reader) io.Reader, error) { + if len(encoding) > 0 && encoding != "UTF-8" { + enc, err := ianaindex.IANA.Encoding(encoding) + if err != nil { + return nil, fmt.Errorf("%v, see https://www.iana.org/assignments/character-sets/character-sets.xhtml", err) + } + if enc == nil { + return nil, fmt.Errorf("unsupported encoding: %s", encoding) + } + return enc.NewDecoder().Reader, nil + } + return decodeNop, nil +} + +// createBoolParseFn returns a function that converts a string value to boolean according to format "true,yes,1:false,no,0" +func createBoolParseFn(format string) func(string) (interface{}, error) { + var err error = nil + truthy := []string{} + falsy := []string{} + if !strings.Contains(format, ":") { + err = fmt.Errorf("unsupported boolean format: %s should be in 'true,yes,1:false,no,0' format, but no ':' is present", format) + } else { + colon := strings.Index(format, ":") + t := format[:colon] + f := format[colon+1:] + if t != "" { + truthy = strings.Split(t, ",") + } + if f != "" { + falsy = strings.Split(f, ",") + } + } + return func(val string) (interface{}, error) { + if err != nil { + return nil, err + } + for _, s := range falsy { + if s == val { + return false, nil + } + } + for _, s := range truthy { + if s == val { + return true, nil + } + } + if len(falsy) == 0 { + return false, nil + } + if len(truthy) == 0 { + return true, nil + } + + return nil, fmt.Errorf("unsupported boolean value: %s must one of %v or one of %v", val, truthy, falsy) + } +} diff --git a/pkg/csv2lp/dataConversion_test.go b/pkg/csv2lp/dataConversion_test.go new file mode 100644 index 0000000000..71434985ae --- /dev/null +++ b/pkg/csv2lp/dataConversion_test.go @@ -0,0 +1,325 @@ +package csv2lp + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "math" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_escapeMeasurement(t *testing.T) { + var tests = []struct { + value string + expect string + }{ + {"a", "a"}, {"", ""}, + {"a,", `a\,`}, + {"a ", `a\ `}, + {"a=", `a=`}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, test.expect, escapeMeasurement(test.value)) + }) + } +} + +func Test_escapeTag(t *testing.T) { + var tests = []struct { + value string + expect string + }{ + {"a", "a"}, {"", ""}, + {"a,", `a\,`}, + {"a ", `a\ `}, + {"a=", `a\=`}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, test.expect, escapeTag(test.value)) + }) + } +} + +func Test_escapeString(t *testing.T) { + var tests = []struct { + value string + expect string + }{ + {"a", `a`}, {"", ``}, + {`a"`, `a\"`}, + {`a\`, `a\\`}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, test.expect, escapeString(test.value)) + }) + } +} + +func Test_toTypedValue(t *testing.T) { + epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z") + var tests = []struct { + dataType string + value string + expect interface{} + }{ + {"string", "a", "a"}, + {"double", "1.0", float64(1.0)}, + {"boolean", "true", true}, + {"boolean", "True", true}, + {"boolean", "y", true}, + {"boolean", "Yes", true}, + {"boolean", "1", true}, + {"boolean", "false", false}, + {"boolean", "False", false}, + {"boolean", "n", false}, + {"boolean", "No", false}, + {"boolean", "0", false}, + {"boolean", "", nil}, + {"boolean", "?", nil}, + {"long", "1", int64(1)}, + {"unsignedLong", "1", uint64(1)}, + {"duration", "1ns", time.Duration(1)}, + {"base64Binary", "YWFh", []byte("aaa")}, + {"dateTime:RFC3339", "1970-01-01T00:00:00Z", epochTime}, + {"dateTime:RFC3339Nano", "1970-01-01T00:00:00.0Z", epochTime}, + {"dateTime:RFC3339", "1970-01-01T00:00:00.000000001Z", epochTime.Add(time.Duration(1))}, + {"dateTime:RFC3339Nano", "1970-01-01T00:00:00.000000002Z", epochTime.Add(time.Duration(2))}, + {"dateTime:number", "3", epochTime.Add(time.Duration(3))}, + {"dateTime", "4", epochTime.Add(time.Duration(4))}, + {"dateTime:2006-01-02", "1970-01-01", epochTime}, + {"dateTime", "1970-01-01T00:00:00Z", epochTime}, + {"dateTime", "1970-01-01T00:00:00.000000001Z", epochTime.Add(time.Duration(1))}, + {"double:, .", "200 100.299,0", float64(200100299.0)}, + {"long:, .", "200 100.299,0", int64(200100299)}, + {"unsignedLong:, .", "200 100.299,0", uint64(200100299)}, + {"u.type", "", nil}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) { + column := &CsvTableColumn{} + setupDataType(column, test.dataType) + val, err := toTypedValue(test.value, column) + if err != nil && test.expect != nil { + require.Nil(t, err.Error()) + } + require.Equal(t, test.expect, val) + }) + } +} + +func Test_toTypedValue_dateTimeCustomTimeZone(t *testing.T) { + epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z") + tz, _ := parseTimeZone("-0100") + var tests = []struct { + dataType string + value string + expect interface{} + }{ + {"dateTime:RFC3339", "1970-01-01T00:00:00Z", epochTime}, + {"dateTime:RFC3339Nano", "1970-01-01T00:00:00.0Z", epochTime}, + {"dateTime:number", "3", epochTime.Add(time.Duration(3))}, + {"dateTime:2006-01-02", "1970-01-01", epochTime.Add(time.Hour)}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) { + column := &CsvTableColumn{} + column.TimeZone = tz + setupDataType(column, test.dataType) + val, err := toTypedValue(test.value, column) + if err != nil && test.expect != nil { + require.Nil(t, err.Error()) + } + if test.expect == nil { + require.Equal(t, test.expect, val) + } else { + expectTime := test.expect.(time.Time) + time := val.(time.Time) + require.True(t, expectTime.Equal(time)) + } + }) + } +} + +func Test_toLineProtocolValue(t *testing.T) { + epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z") + var tests = []struct { + value interface{} + expect string + }{ + {uint64(1), "1u"}, + {int64(1), "1i"}, + {int(1), "1i"}, + {float64(1.1), "1.1"}, + {math.NaN(), ""}, + {math.Inf(1), ""}, + {float32(1), "1"}, + {float32(math.NaN()), ""}, + {float32(math.Inf(1)), ""}, + {"a", `"a"`}, + {[]byte("aaa"), "YWFh"}, + {true, "true"}, + {false, "false"}, + {epochTime, "0"}, + {time.Duration(100), "100i"}, + {struct{}{}, ""}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + val, err := appendProtocolValue(nil, test.value) + if err != nil && test.expect != "" { + require.Nil(t, err.Error()) + } + require.Equal(t, test.expect, string(val)) + }) + } +} + +func Test_appendConverted(t *testing.T) { + var tests = []struct { + dataType string + value string + expect string + }{ + {"", "1", "1"}, + {"long", "a", ""}, + {"dateTime", "a", ""}, + {"dateTime:number", "a", ""}, + {"string", "a", `"a"`}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + column := &CsvTableColumn{} + setupDataType(column, test.dataType) + val, err := appendConverted(nil, test.value, column) + if err != nil && test.expect != "" { + require.Nil(t, err.Error()) + } + require.Equal(t, test.expect, string(val)) + }) + } +} + +// Test_IsTypeSupported +func Test_IsTypeSupported(t *testing.T) { + require.True(t, IsTypeSupported(stringDatatype), true) + require.True(t, IsTypeSupported(doubleDatatype), true) + require.True(t, IsTypeSupported(boolDatatype), true) + require.True(t, IsTypeSupported(longDatatype), true) + require.True(t, IsTypeSupported(uLongDatatype), true) + require.True(t, IsTypeSupported(durationDatatype), true) + require.True(t, IsTypeSupported(base64BinaryDataType), true) + require.True(t, IsTypeSupported(dateTimeDatatype), true) + require.True(t, IsTypeSupported(""), true) + require.False(t, IsTypeSupported(" "), false) + // time format is not part of data type + require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatRFC3339)) + require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatRFC3339Nano)) + require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatNumber)) +} + +// Test_normalizeNumberString +func Test_normalizeNumberString(t *testing.T) { + var tests = []struct { + value string + format string + removeFraction bool + expect string + }{ + {"123", "", true, "123"}, + {"123", ".", true, "123"}, + {"123.456", ".", true, "123"}, + {"123.456", ".", false, "123.456"}, + {"1 2.3,456", ",. ", false, "123.456"}, + {" 1 2\t3.456 \r\n", "", false, "123.456"}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, test.expect, normalizeNumberString(test.value, test.format, test.removeFraction)) + }) + } +} + +// TestCreateDecoder tests the decoding reader factory +func TestCreateDecoder(t *testing.T) { + decoder, err := CreateDecoder("UTF-8") + toUtf8 := func(in []byte) string { + s, _ := ioutil.ReadAll(decoder(bytes.NewReader(in))) + return string(s) + } + require.NotNil(t, decoder) + require.Nil(t, err) + require.Equal(t, "\u2318", toUtf8([]byte{226, 140, 152})) + decoder, err = CreateDecoder("windows-1250") + require.NotNil(t, decoder) + require.Nil(t, err) + require.Equal(t, "\u0160", toUtf8([]byte{0x8A})) + decoder, err = CreateDecoder("whateveritis") + require.NotNil(t, err) + require.Nil(t, decoder) + // we can have valid IANA names that are not supported by golang/x/text + decoder, err = CreateDecoder("US-ASCII") + log.Printf("US-ASCII encoding support: %v,%v", decoder != nil, err) +} + +func Test_createBoolParseFn(t *testing.T) { + type pairT struct { + value string + expect string + } + var tests = []struct { + format string + pair []pairT + }{ + {"t,y,1:f,n,0", []pairT{ + {"y", "true"}, + {"0", "false"}, + {"T", "unsupported"}, + }}, + {"true", []pairT{ + {"true", "unsupported"}, + {"false", "unsupported"}, + }}, + {"true:", []pairT{ + {"true", "true"}, + {"other", "false"}, + }}, + {":false", []pairT{ + {"false", "false"}, + {"other", "true"}, + }}, + } + + for i, test := range tests { + fn := createBoolParseFn(test.format) + for j, pair := range test.pair { + t.Run(fmt.Sprint(i)+"_"+fmt.Sprint(j), func(t *testing.T) { + result, err := fn(pair.value) + switch pair.expect { + case "true": + require.Equal(t, true, result) + case "false": + require.Equal(t, false, result) + default: + require.NotNil(t, err) + require.True(t, strings.Contains(fmt.Sprintf("%v", err), pair.expect)) + } + }) + } + } +} diff --git a/pkg/csv2lp/examples_test.go b/pkg/csv2lp/examples_test.go new file mode 100644 index 0000000000..97084e3dbf --- /dev/null +++ b/pkg/csv2lp/examples_test.go @@ -0,0 +1,191 @@ +package csv2lp + +import ( + "fmt" + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +type csvExample struct { + name string + csv string + lp string +} + +var examples []csvExample = []csvExample{ + { + "fluxQueryResult", + ` +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod + +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod +`, + ` +cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000 +cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000 +cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000 +cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000 +`, + }, + { + "annotatedSimple", + ` +#datatype measurement,tag,tag,double,double,ignored,dateTime:number +m,cpu,host,time_steal,usage_user,nothing,time +cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000 +cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000 +`, + ` +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000 +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000 +`, + }, + { + "annotatedSimple_labels", + ` +m|measurement,cpu|tag,host|tag,time_steal|double,usage_user|double,nothing|ignored,time|dateTime:number +cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000 +cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000 +`, + ` +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000 +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000 +`, + }, + { + "annotatedDatatype", + ` +#datatype measurement,tag,string,double,boolean,long,unsignedLong,duration,dateTime +#default test,annotatedDatatypes,,,,,, +m,name,s,d,b,l,ul,dur,time +,,str1,1.0,true,1,1,1ms,1 +,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z +`, + ` +test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1 +test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000 +`, + }, + { + "annotatedDatatype_labels", + ` +m|measurement|test,name|tag|annotatedDatatypes,s|string,d|double,b|boolean,l|long,ul|unsignedLong,dur|duration,time|dateTime +,,str1,1.0,true,1,1,1ms,1 +,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z`, + ` +test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1 +test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000 +`, + }, + { + "datetypeFormats", + ` +#constant measurement,test +#constant tag,name,datetypeFormats +#timezone -0500 +#datatype dateTime:2006-01-02|1970-01-02,"double:,. ","boolean:y,Y:n,N|y" +t,d,b +1970-01-01,"123.456,78", +,"123 456,78",Y +`, + ` +test,name=datetypeFormats d=123456.78,b=true 18000000000000 +test,name=datetypeFormats d=123456.78,b=true 104400000000000 +`, + }, + { + "datetypeFormats_labels", + ` +#constant measurement,test +#constant tag,name,datetypeFormats +#timezone -0500 +t|dateTime:2006-01-02|1970-01-02,"d|double:,. ","b|boolean:y,Y:n,N|y" +1970-01-01,"123.456,78", +,"123 456,78",Y +`, + ` +test,name=datetypeFormats d=123456.78,b=true 18000000000000 +test,name=datetypeFormats d=123456.78,b=true 104400000000000 +`, + }, + { + "datetypeFormats_labels_override", + ` +#constant measurement,test2 +t|dateTime:2006-01-02,_|ignored,s|string|unknown +1970-01-01,"123.456,78", +,"123 456,78",Y +`, + ` +test2 s="unknown" 0 +test2 s="Y" +`, + }, + { + "datetypeFormats_labels_override", + ` +m|measurement,usage_user|double +cpu,2.7 +cpu,nil +cpu, +,2.9 +`, + ` +cpu usage_user=2.7 +`, + }, + { + "columnSeparator", + ` +sep=; +m|measurement;available|boolean:y,Y:|n;dt|dateTime:number +test;nil;1 +test;N;2 +test;";";3 +test;;4 +test;Y;5 +`, + ` +test available=false 1 +test available=false 2 +test available=false 3 +test available=false 4 +test available=true 5 +`, + }, +} + +func (example *csvExample) normalize() { + for len(example.lp) > 0 && example.lp[0] == '\n' { + example.lp = example.lp[1:] + } +} + +// TestExamples validates documentation examples +func TestExamples(t *testing.T) { + for _, example := range examples { + example.normalize() + t.Run(example.name, func(t *testing.T) { + transformer := CsvToProtocolLines(strings.NewReader(example.csv)) + transformer.SkipRowOnError(true) + result, err := ioutil.ReadAll(transformer) + if err != nil { + require.Nil(t, fmt.Sprintf("%s", err)) + } + require.Equal(t, example.lp, string(result)) + }) + } +}