chore(pkg/csv2lp): apply code-review comments
parent
6e5aca1d8c
commit
8cdde7ac17
|
@ -54,6 +54,7 @@ func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader {
|
|||
return state
|
||||
}
|
||||
|
||||
// Read implements io.Reader that returns protocol lines
|
||||
func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
|
||||
// state1: finished
|
||||
if state.finished != nil {
|
||||
|
|
|
@ -8,12 +8,12 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// annotationComment represents parsed CSV annotation
|
||||
// annotationComment describes CSV annotation
|
||||
type annotationComment struct {
|
||||
// prefix in a CSV row that recognizes this annotation
|
||||
prefix string
|
||||
// flag is 0 to represent an annotation that is used for all data rows
|
||||
// or a unique bit (>0) that that is unique to the annotation
|
||||
// or a unique bit (>0) between supported annotation prefixes
|
||||
flag uint8
|
||||
// setupColumn setups metadata that drives the way of how column data
|
||||
// are parsed, mandatory when flag > 0
|
||||
|
@ -23,147 +23,114 @@ type annotationComment struct {
|
|||
setupTable func(table *CsvTable, row []string) error
|
||||
}
|
||||
|
||||
func (a *annotationComment) isTableAnnotation() bool {
|
||||
// isTableAnnotation returns true for a table-wide annotation, false for column-based annotations
|
||||
func (a annotationComment) isTableAnnotation() bool {
|
||||
return a.setupTable != nil
|
||||
}
|
||||
|
||||
func (a *annotationComment) matches(comment string) bool {
|
||||
// matches tests whether an annotationComment can process the CSV comment row
|
||||
func (a annotationComment) matches(comment string) bool {
|
||||
return strings.HasPrefix(strings.ToLower(comment), a.prefix)
|
||||
}
|
||||
|
||||
var supportedAnnotations = []annotationComment{
|
||||
{"#group", 1, func(column *CsvTableColumn, value string) {
|
||||
// standard flux query result annotation
|
||||
if strings.HasSuffix(value, "true") {
|
||||
column.LinePart = linePartTag
|
||||
// constantSetupTable setups the supplied CSV table from #constant annotation
|
||||
func constantSetupTable(table *CsvTable, row []string) error {
|
||||
// adds a virtual column with contsant value to all data rows
|
||||
// supported types of constant annotation rows are:
|
||||
// 1. "#constant,datatype,label,defaultValue"
|
||||
// 2. "#constant,measurement,value"
|
||||
// 3. "#constant,dateTime,value"
|
||||
// 4. "#constant datatype,label,defaultValue"
|
||||
// 5. "#constant measurement,value"
|
||||
// 6. "#constant dateTime,value"
|
||||
// defaultValue is optional, additional columns are ignored
|
||||
col := CsvTableColumn{}
|
||||
col.Index = -1 // this is a virtual column that never extracts data from data rows
|
||||
// setup column data type
|
||||
col.setupDataType(row[0])
|
||||
var dataTypeIndex int
|
||||
if len(col.DataType) == 0 && col.LinePart == 0 {
|
||||
// type 1,2,3
|
||||
dataTypeIndex = 1
|
||||
if len(row) > 1 {
|
||||
col.setupDataType(row[1])
|
||||
}
|
||||
}, nil},
|
||||
{"#datatype", 2, func(column *CsvTableColumn, value string) {
|
||||
// standard flux query result annotation
|
||||
setupDataType(column, value)
|
||||
}, nil},
|
||||
{"#default", 4, func(column *CsvTableColumn, value string) {
|
||||
// standard flux query result annotation
|
||||
column.DefaultValue = ignoreLeadingComment(value)
|
||||
}, nil},
|
||||
{"#constant", 0, nil, func(table *CsvTable, row []string) error {
|
||||
// adds a virtual column with contsant value to all data rows
|
||||
// supported types of constant annotation rows are:
|
||||
// 1. "#constant,datatype,label,defaultValue"
|
||||
// 2. "#constant,measurement,value"
|
||||
// 3. "#constant,dateTime,value"
|
||||
// 4. "#constant datatype,label,defaultValue"
|
||||
// 5. "#constant measurement,value"
|
||||
// 6. "#constant dateTime,value"
|
||||
// defaultValue is optional, additional columns are ignored
|
||||
col := CsvTableColumn{}
|
||||
col.Index = -1 // this is a virtual column that never extracts data from data rows
|
||||
// setup column data type
|
||||
setupDataType(&col, row[0])
|
||||
var dataTypeIndex int
|
||||
if len(col.DataType) == 0 && col.LinePart == 0 {
|
||||
// type 1,2,3
|
||||
dataTypeIndex = 1
|
||||
if len(row) > 1 {
|
||||
setupDataType(&col, row[1])
|
||||
}
|
||||
} else {
|
||||
// type 4,5,6
|
||||
dataTypeIndex = 0
|
||||
} else {
|
||||
// type 4,5,6
|
||||
dataTypeIndex = 0
|
||||
}
|
||||
// setup label if available
|
||||
if len(row) > dataTypeIndex+1 {
|
||||
col.Label = row[dataTypeIndex+1]
|
||||
}
|
||||
// setup defaultValue if available
|
||||
if len(row) > dataTypeIndex+2 {
|
||||
col.DefaultValue = row[dataTypeIndex+2]
|
||||
}
|
||||
// support type 2,3,5,6 syntax for measurement and timestamp
|
||||
if col.LinePart == linePartMeasurement || col.LinePart == linePartTime {
|
||||
if col.DefaultValue == "" && col.Label != "" {
|
||||
// type 2,3,5,6
|
||||
col.DefaultValue = col.Label
|
||||
col.Label = "#constant " + col.DataType
|
||||
} else if col.Label == "" {
|
||||
// setup a label if no label is supplied fo focused error messages
|
||||
col.Label = "#constant " + col.DataType
|
||||
}
|
||||
// setup label if available
|
||||
if len(row) > dataTypeIndex+1 {
|
||||
col.Label = row[dataTypeIndex+1]
|
||||
}
|
||||
// setup defaultValue if available
|
||||
if len(row) > dataTypeIndex+2 {
|
||||
col.DefaultValue = row[dataTypeIndex+2]
|
||||
}
|
||||
// support type 2,3,5,6 syntax for measurement and timestamp
|
||||
if col.LinePart == linePartMeasurement || col.LinePart == linePartTime {
|
||||
if col.DefaultValue == "" && col.Label != "" {
|
||||
// type 2,3,5,6
|
||||
col.DefaultValue = col.Label
|
||||
col.Label = "#constant " + col.DataType
|
||||
} else if col.Label == "" {
|
||||
// setup a label if no label is supplied fo focused error messages
|
||||
col.Label = "#constant " + col.DataType
|
||||
}
|
||||
}
|
||||
// add a virtual column to the table
|
||||
table.extraColumns = append(table.extraColumns, col)
|
||||
return nil
|
||||
}},
|
||||
{"#timezone", 0, nil, func(table *CsvTable, row []string) error {
|
||||
// setup timezone for parsing timestamps, UTC by default
|
||||
val := ignoreLeadingComment(row[0])
|
||||
if val == "" && len(row) > 1 {
|
||||
val = row[1] // #timezone,Local
|
||||
}
|
||||
tz, err := parseTimeZone(val)
|
||||
if err != nil {
|
||||
return fmt.Errorf("#timezone annotation: %v", err)
|
||||
}
|
||||
table.timeZone = tz
|
||||
return nil
|
||||
}},
|
||||
}
|
||||
// add a virtual column to the table
|
||||
table.extraColumns = append(table.extraColumns, &col)
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupDataType setups data type from a column value
|
||||
func setupDataType(column *CsvTableColumn, columnValue string) {
|
||||
// columnValue contains typeName and possibly additional column metadata,
|
||||
// it can be
|
||||
// 1. typeName
|
||||
// 2. typeName:format
|
||||
// 3. typeName|defaultValue
|
||||
// 4. typeName:format|defaultValue
|
||||
// 5. #anycomment (all options above)
|
||||
|
||||
// ignoreLeadingComment is also required to specify datatype together with CSV annotation
|
||||
// in #constant annotation
|
||||
columnValue = ignoreLeadingComment(columnValue)
|
||||
|
||||
// | adds a default value to column
|
||||
pipeIndex := strings.Index(columnValue, "|")
|
||||
if pipeIndex > 1 {
|
||||
if column.DefaultValue == "" {
|
||||
column.DefaultValue = columnValue[pipeIndex+1:]
|
||||
columnValue = columnValue[:pipeIndex]
|
||||
}
|
||||
}
|
||||
// setup column format
|
||||
colonIndex := strings.Index(columnValue, ":")
|
||||
if colonIndex > 1 {
|
||||
column.DataFormat = columnValue[colonIndex+1:]
|
||||
columnValue = columnValue[:colonIndex]
|
||||
}
|
||||
|
||||
// setup column linePart depending dataType
|
||||
switch {
|
||||
case columnValue == "tag":
|
||||
column.LinePart = linePartTag
|
||||
case strings.HasPrefix(columnValue, "ignore"):
|
||||
// ignore or ignored
|
||||
column.LinePart = linePartIgnored
|
||||
case columnValue == "dateTime":
|
||||
// dateTime field is used at most once in a protocol line
|
||||
column.LinePart = linePartTime
|
||||
case columnValue == "measurement":
|
||||
column.LinePart = linePartMeasurement
|
||||
case columnValue == "field":
|
||||
column.LinePart = linePartField
|
||||
columnValue = "" // this a generic field without a data type specified
|
||||
case columnValue == "time": // time is an alias for dateTime
|
||||
column.LinePart = linePartTime
|
||||
columnValue = dateTimeDatatype
|
||||
}
|
||||
// setup column data type
|
||||
column.DataType = columnValue
|
||||
|
||||
// setup custom parsing of bool data type
|
||||
if column.DataType == boolDatatype && column.DataFormat != "" {
|
||||
column.ParseF = createBoolParseFn(column.DataFormat)
|
||||
}
|
||||
// supportedAnnotations contains all supported CSV annotations comments
|
||||
var supportedAnnotations = []annotationComment{
|
||||
{
|
||||
prefix: "#group",
|
||||
flag: 1,
|
||||
setupColumn: func(column *CsvTableColumn, value string) {
|
||||
// standard flux query result annotation
|
||||
if strings.HasSuffix(value, "true") {
|
||||
column.LinePart = linePartTag
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
prefix: "#datatype",
|
||||
flag: 2,
|
||||
setupColumn: func(column *CsvTableColumn, value string) {
|
||||
// standard flux query result annotation
|
||||
column.setupDataType(value)
|
||||
},
|
||||
},
|
||||
{
|
||||
prefix: "#default",
|
||||
flag: 4,
|
||||
setupColumn: func(column *CsvTableColumn, value string) {
|
||||
// standard flux query result annotation
|
||||
column.DefaultValue = ignoreLeadingComment(value)
|
||||
},
|
||||
},
|
||||
{
|
||||
prefix: "#constant",
|
||||
setupTable: constantSetupTable,
|
||||
},
|
||||
{
|
||||
prefix: "#timezone",
|
||||
setupTable: func(table *CsvTable, row []string) error {
|
||||
// setup timezone for parsing timestamps, UTC by default
|
||||
val := ignoreLeadingComment(row[0])
|
||||
if val == "" && len(row) > 1 {
|
||||
val = row[1] // #timezone,Local
|
||||
}
|
||||
tz, err := parseTimeZone(val)
|
||||
if err != nil {
|
||||
return fmt.Errorf("#timezone annotation: %v", err)
|
||||
}
|
||||
table.timeZone = tz
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// ignoreLeadingComment returns a value without '#anyComment ' prefix
|
||||
|
@ -178,7 +145,13 @@ func ignoreLeadingComment(value string) string {
|
|||
return value
|
||||
}
|
||||
|
||||
// parseTimeZone tries to parse the supplied timezone indicator as a Location or returns an error
|
||||
// parseTimeZone parses the supplied timezone from a string into a time.Location
|
||||
//
|
||||
// parseTimeZone("") // time.UTC
|
||||
// parseTimeZone("local") // time.Local
|
||||
// parseTimeZone("-0500") // time.FixedZone(-5*3600 + 0*60)
|
||||
// parseTimeZone("+0200") // time.FixedZone(2*3600 + 0*60)
|
||||
// parseTimeZone("EST") // time.LoadLocation("EST")
|
||||
func parseTimeZone(val string) (*time.Location, error) {
|
||||
switch {
|
||||
case val == "":
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
"unsafe"
|
||||
)
|
||||
|
||||
// column labels using in flux CSV result
|
||||
// column labels used in flux CSV result
|
||||
const (
|
||||
labelFieldName = "_field"
|
||||
labelFieldValue = "_value"
|
||||
|
@ -20,7 +20,7 @@ const (
|
|||
labelMeasurement = "_measurement"
|
||||
)
|
||||
|
||||
// types of column with respect to line protocol
|
||||
// types of columns with respect to line protocol
|
||||
const (
|
||||
linePartIgnored = iota + 1 // ignored in line protocol
|
||||
linePartMeasurement
|
||||
|
@ -31,24 +31,24 @@ const (
|
|||
|
||||
// CsvTableColumn represents processing metadata about a csv column
|
||||
type CsvTableColumn struct {
|
||||
// label such as "_start", "_stop", "_time"
|
||||
// Label is a column label from the header row, such as "_start", "_stop", "_time"
|
||||
Label string
|
||||
// "string", "long", "dateTime" ...
|
||||
// DataType such as "string", "long", "dateTime" ...
|
||||
DataType string
|
||||
// "RFC3339", "2006-01-02"
|
||||
// DataFormat is a format of DataType, such as "RFC3339", "2006-01-02"
|
||||
DataFormat string
|
||||
// column's line part (0 means not determined yet), see linePart constants
|
||||
// LinePart is a line part of the column (0 means not determined yet), see linePart constants
|
||||
LinePart int
|
||||
// default value to be used for rows where value is an empty string.
|
||||
// DefaultValue is used when column's value is an empty string.
|
||||
DefaultValue string
|
||||
// index of this column in the table row, -1 indicates a virtual column
|
||||
// Index of this column when reading rows, -1 indicates a virtual column with DefaultValue data
|
||||
Index int
|
||||
// TimeZone of dateTime column, applied when parsing dateTime without timeZone in the format
|
||||
// TimeZone of dateTime column, applied when parsing dateTime DataType
|
||||
TimeZone *time.Location
|
||||
// parse function, when set, is used to convert column's string value to interface{}
|
||||
// ParseF is an optional function used to convert column's string value to interface{}
|
||||
ParseF func(string) (interface{}, error)
|
||||
|
||||
// escaped label for line protocol
|
||||
// escapedLabel contains escaped label that can be directly used in line protocol
|
||||
escapedLabel string
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func (c *CsvTableColumn) LineLabel() string {
|
|||
return c.Label
|
||||
}
|
||||
|
||||
// Value returns the value of the column for the supplied row supplied
|
||||
// Value returns the value of the column for the supplied row
|
||||
func (c *CsvTableColumn) Value(row []string) string {
|
||||
if c.Index < 0 || c.Index >= len(row) {
|
||||
return c.DefaultValue
|
||||
|
@ -72,49 +72,116 @@ func (c *CsvTableColumn) Value(row []string) string {
|
|||
return c.DefaultValue
|
||||
}
|
||||
|
||||
// setupDataType setups data type from the value supplied
|
||||
//
|
||||
// columnValue contains typeName and possibly additional column metadata,
|
||||
// it can be
|
||||
// 1. typeName
|
||||
// 2. typeName:format
|
||||
// 3. typeName|defaultValue
|
||||
// 4. typeName:format|defaultValue
|
||||
// 5. #anycomment (all options above)
|
||||
func (c *CsvTableColumn) setupDataType(columnValue string) {
|
||||
// ignoreLeadingComment is required to specify datatype together with CSV annotation
|
||||
// in annotations (such as #constant)
|
||||
columnValue = ignoreLeadingComment(columnValue)
|
||||
|
||||
// | adds a default value to column
|
||||
pipeIndex := strings.Index(columnValue, "|")
|
||||
if pipeIndex > 1 {
|
||||
if c.DefaultValue == "" {
|
||||
c.DefaultValue = columnValue[pipeIndex+1:]
|
||||
columnValue = columnValue[:pipeIndex]
|
||||
}
|
||||
}
|
||||
// setup column format
|
||||
colonIndex := strings.Index(columnValue, ":")
|
||||
if colonIndex > 1 {
|
||||
c.DataFormat = columnValue[colonIndex+1:]
|
||||
columnValue = columnValue[:colonIndex]
|
||||
}
|
||||
|
||||
// setup column linePart depending dataType
|
||||
switch {
|
||||
case columnValue == "tag":
|
||||
c.LinePart = linePartTag
|
||||
case strings.HasPrefix(columnValue, "ignore"):
|
||||
// ignore or ignored
|
||||
c.LinePart = linePartIgnored
|
||||
case columnValue == "dateTime":
|
||||
// dateTime field is used at most once in a protocol line
|
||||
c.LinePart = linePartTime
|
||||
case columnValue == "measurement":
|
||||
c.LinePart = linePartMeasurement
|
||||
case columnValue == "field":
|
||||
c.LinePart = linePartField
|
||||
columnValue = "" // this a generic field without a data type specified
|
||||
case columnValue == "time": // time is an alias for dateTime
|
||||
c.LinePart = linePartTime
|
||||
columnValue = dateTimeDatatype
|
||||
default:
|
||||
// nothing to do since we don't know the linePart yet
|
||||
// the line part is decided in recomputeLineProtocolColumns
|
||||
}
|
||||
// setup column data type
|
||||
c.DataType = columnValue
|
||||
|
||||
// setup custom parsing of bool data type
|
||||
if c.DataType == boolDatatype && c.DataFormat != "" {
|
||||
c.ParseF = createBoolParseFn(c.DataFormat)
|
||||
}
|
||||
}
|
||||
|
||||
// CsvColumnError indicates conversion error in a specific column
|
||||
type CsvColumnError struct {
|
||||
Column string
|
||||
Err error
|
||||
}
|
||||
|
||||
// Error interface implementation
|
||||
func (e CsvColumnError) Error() string {
|
||||
return fmt.Sprintf("column '%s': %v", e.Column, e.Err)
|
||||
}
|
||||
|
||||
// CsvTable contains metadata about columns and a state of the CSV processing
|
||||
type CsvTable struct {
|
||||
// table columns that extract value from data row
|
||||
columns []CsvTableColumn
|
||||
// columns contains columns that extract values from data rows
|
||||
columns []*CsvTableColumn
|
||||
// partBits is a bitmap that is used to remember that a particular column annotation
|
||||
// (#group, #datatype and #default) was already processed for the table;
|
||||
// it is used to detect start of a new table in CSV flux results, a repeated annotation
|
||||
// is detected and a new CsvTable can be then created
|
||||
partBits uint8
|
||||
// indicates that the table is ready to read table data, which
|
||||
// readTableData indicates that the table is ready to read table data, which
|
||||
// is after reading annotation and header rows
|
||||
readTableData bool
|
||||
// indicates whether line protocol columns must be re-computed
|
||||
lpColumnsCached bool
|
||||
// extra columns are added by table-wide annotations, such as #constant
|
||||
extraColumns []CsvTableColumn
|
||||
// true to skip parsing of data type in column name
|
||||
// lpColumnsValid indicates whether line protocol columns are valid or must be re-calculated from columns
|
||||
lpColumnsValid bool
|
||||
// extraColumns are added by table-wide annotations, such as #constant
|
||||
extraColumns []*CsvTableColumn
|
||||
// ignoreDataTypeInColumnName is true to skip parsing of data type as a part a column name
|
||||
ignoreDataTypeInColumnName bool
|
||||
// timeZone of dateTime column(s), applied when parsing dateTime value without a time zone specified
|
||||
timeZone *time.Location
|
||||
|
||||
/* cached columns are initialized before reading the data rows */
|
||||
/* cached columns are initialized before reading the data rows using the computeLineProtocolColumns fn */
|
||||
// cachedMeasurement is a required column that read (line protocol) measurement
|
||||
cachedMeasurement *CsvTableColumn
|
||||
cachedTime *CsvTableColumn
|
||||
cachedFieldName *CsvTableColumn
|
||||
cachedFieldValue *CsvTableColumn
|
||||
cachedFields []CsvTableColumn
|
||||
cachedTags []CsvTableColumn
|
||||
// cachedTime is an optional column that reads timestamp of lp row
|
||||
cachedTime *CsvTableColumn
|
||||
// cachedFieldName is an optional column that reads a field name to add to the protocol line
|
||||
cachedFieldName *CsvTableColumn
|
||||
// cachedFieldValue is an optional column that reads a field value to add to the protocol line
|
||||
cachedFieldValue *CsvTableColumn
|
||||
// cachedFields are columns that read field values, a field name is taken from a column label
|
||||
cachedFields []*CsvTableColumn
|
||||
// cachedTags are columns that read tag values, a tag name is taken from a column label
|
||||
cachedTags []*CsvTableColumn
|
||||
}
|
||||
|
||||
// IgnoreDataTypeInColumnName sets a flag that that can ignores dataType parsing column names.
|
||||
// IgnoreDataTypeInColumnName sets a flag that can ignore dataType parsing in column names.
|
||||
// When true, column names can then contain '|'. By default, column name can also contain datatype
|
||||
// and default value when named `name|datatype` or `name|datatype|default`,
|
||||
// and a default value when named `name|datatype` or `name|datatype|default`,
|
||||
// for example `ready|boolean|true`
|
||||
func (t *CsvTable) IgnoreDataTypeInColumnName(val bool) {
|
||||
t.ignoreDataTypeInColumnName = val
|
||||
|
@ -144,34 +211,42 @@ func (t *CsvTable) DataColumnsInfo() string {
|
|||
func (t *CsvTable) NextTable() {
|
||||
t.partBits = 0 // no column annotations parsed yet
|
||||
t.readTableData = false
|
||||
t.columns = []CsvTableColumn{}
|
||||
t.extraColumns = []CsvTableColumn{}
|
||||
t.columns = []*CsvTableColumn{}
|
||||
t.extraColumns = []*CsvTableColumn{}
|
||||
}
|
||||
|
||||
// AddRow updates the state of the state of table with a new header, annotation or data row.
|
||||
// createColumns create a slice of CsvTableColumn for the supplied rowSize
|
||||
func createColumns(rowSize int) []*CsvTableColumn {
|
||||
retVal := make([]*CsvTableColumn, rowSize)
|
||||
for i := 0; i < rowSize; i++ {
|
||||
retVal[i] = &CsvTableColumn{
|
||||
Index: i,
|
||||
}
|
||||
}
|
||||
return retVal
|
||||
}
|
||||
|
||||
// AddRow updates the state of the CSV table with a new header, annotation or data row.
|
||||
// Returns true if the row is a data row.
|
||||
func (t *CsvTable) AddRow(row []string) bool {
|
||||
// detect data row or table header row
|
||||
if len(row[0]) == 0 || row[0][0] != '#' {
|
||||
if !t.readTableData {
|
||||
// row must a header row now
|
||||
t.lpColumnsCached = false // line protocol columns change
|
||||
t.lpColumnsValid = false // line protocol columns change
|
||||
if t.partBits == 0 {
|
||||
// create columns since no column anotations were processed
|
||||
t.columns = make([]CsvTableColumn, len(row))
|
||||
for i := 0; i < len(row); i++ {
|
||||
t.columns[i].Index = i
|
||||
}
|
||||
t.columns = createColumns(len(row))
|
||||
}
|
||||
// assign column labels for the header row
|
||||
for i := 0; i < len(t.columns); i++ {
|
||||
col := &t.columns[i]
|
||||
col := t.columns[i]
|
||||
if len(col.Label) == 0 && col.Index < len(row) {
|
||||
col.Label = row[col.Index]
|
||||
// assign column data type if possible
|
||||
if len(col.DataType) == 0 && !t.ignoreDataTypeInColumnName {
|
||||
if idx := strings.IndexByte(col.Label, '|'); idx != -1 {
|
||||
setupDataType(col, col.Label[idx+1:])
|
||||
col.setupDataType(col.Label[idx+1:])
|
||||
col.Label = col.Label[:idx]
|
||||
}
|
||||
}
|
||||
|
@ -186,12 +261,12 @@ func (t *CsvTable) AddRow(row []string) bool {
|
|||
|
||||
// process all supported annotations
|
||||
for i := 0; i < len(supportedAnnotations); i++ {
|
||||
supportedAnnotation := &supportedAnnotations[i]
|
||||
supportedAnnotation := supportedAnnotations[i]
|
||||
if supportedAnnotation.matches(row[0]) {
|
||||
if len(row[0]) > len(supportedAnnotation.prefix) && row[0][len(supportedAnnotation.prefix)] != ' ' {
|
||||
continue // ignoring, not a supported annotation
|
||||
}
|
||||
t.lpColumnsCached = false // line protocol columns change
|
||||
t.lpColumnsValid = false // line protocol columns change
|
||||
if supportedAnnotation.isTableAnnotation() {
|
||||
// process table-level annotation
|
||||
if err := supportedAnnotation.setupTable(t, row); err != nil {
|
||||
|
@ -207,16 +282,13 @@ func (t *CsvTable) AddRow(row []string) bool {
|
|||
// create new columns upon new or repeated column annotation
|
||||
if t.partBits == 0 || t.partBits&supportedAnnotation.flag == 1 {
|
||||
t.partBits = supportedAnnotation.flag
|
||||
t.columns = make([]CsvTableColumn, len(row))
|
||||
for i := 0; i < len(row); i++ {
|
||||
t.columns[i].Index = i
|
||||
}
|
||||
t.columns = createColumns(len(row))
|
||||
} else {
|
||||
t.partBits = t.partBits | supportedAnnotation.flag
|
||||
}
|
||||
// setup columns according to column annotation
|
||||
for j := 0; j < len(t.columns); j++ {
|
||||
col := &t.columns[j]
|
||||
col := t.columns[j]
|
||||
if col.Index >= len(row) {
|
||||
continue // missing value
|
||||
} else {
|
||||
|
@ -233,14 +305,21 @@ func (t *CsvTable) AddRow(row []string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// computeLineProtocolColumns computes columns that are
|
||||
// used to create line protocol rows when required to do so
|
||||
//
|
||||
// returns true if new columns were initialized or false if there
|
||||
// was no change in line protocol columns
|
||||
func (t *CsvTable) computeLineProtocolColumns() bool {
|
||||
if !t.lpColumnsCached {
|
||||
if !t.lpColumnsValid {
|
||||
t.recomputeLineProtocolColumns()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// recomputeLineProtocolColumns always computes the columns that are
|
||||
// used to create line protocol rows
|
||||
func (t *CsvTable) recomputeLineProtocolColumns() {
|
||||
// reset results
|
||||
t.cachedMeasurement = nil
|
||||
|
@ -253,23 +332,26 @@ func (t *CsvTable) recomputeLineProtocolColumns() {
|
|||
// having a _field column indicates fields without a line type are ignored
|
||||
defaultIsField := t.Column(labelFieldName) == nil
|
||||
|
||||
columns := append(append([]CsvTableColumn{}, t.columns...), t.extraColumns...)
|
||||
// go over columns + extra columns
|
||||
columns := make([]*CsvTableColumn, len(t.columns)+len(t.extraColumns))
|
||||
copy(columns, t.columns)
|
||||
copy(columns[len(t.columns):], t.extraColumns)
|
||||
for i := 0; i < len(columns); i++ {
|
||||
col := columns[i]
|
||||
switch {
|
||||
case col.Label == labelMeasurement || col.LinePart == linePartMeasurement:
|
||||
t.cachedMeasurement = &col
|
||||
t.cachedMeasurement = col
|
||||
case col.Label == labelTime || col.LinePart == linePartTime:
|
||||
if t.cachedTime != nil && t.cachedTime.Label != labelStart && t.cachedTime.Label != labelStop {
|
||||
log.Printf("WARNING: at most one dateTime column is expected, '%s' column is ignored\n", t.cachedTime.Label)
|
||||
}
|
||||
t.cachedTime = &col
|
||||
t.cachedTime = col
|
||||
case len(strings.TrimSpace(col.Label)) == 0 || col.LinePart == linePartIgnored:
|
||||
// ignored columns that are marked to be ignored or without a label
|
||||
case col.Label == labelFieldName:
|
||||
t.cachedFieldName = &col
|
||||
t.cachedFieldName = col
|
||||
case col.Label == labelFieldValue:
|
||||
t.cachedFieldValue = &col
|
||||
t.cachedFieldValue = col
|
||||
case col.LinePart == linePartTag:
|
||||
col.escapedLabel = escapeTag(col.Label)
|
||||
t.cachedTags = append(t.cachedTags, col)
|
||||
|
@ -294,7 +376,7 @@ func (t *CsvTable) recomputeLineProtocolColumns() {
|
|||
t.cachedTime.TimeZone = t.timeZone
|
||||
}
|
||||
|
||||
t.lpColumnsCached = true // line protocol columns are now fresh
|
||||
t.lpColumnsValid = true // line protocol columns are now fresh
|
||||
}
|
||||
|
||||
// CreateLine produces a protocol line out of the supplied row or returns error
|
||||
|
@ -307,7 +389,7 @@ func (t *CsvTable) CreateLine(row []string) (line string, err error) {
|
|||
return *(*string)(unsafe.Pointer(&buffer)), nil
|
||||
}
|
||||
|
||||
// AppendLine appends a protocol line to the supplied buffer and returns appended buffer or an error if any
|
||||
// AppendLine appends a protocol line to the supplied buffer using a CSV row and returns appended buffer or an error if any
|
||||
func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
|
||||
if t.computeLineProtocolColumns() {
|
||||
// validate column data types
|
||||
|
@ -377,7 +459,7 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
|
|||
buffer = append(buffer, field.LineLabel()...)
|
||||
buffer = append(buffer, '=')
|
||||
var err error
|
||||
buffer, err = appendConverted(buffer, value, &field)
|
||||
buffer, err = appendConverted(buffer, value, field)
|
||||
if err != nil {
|
||||
return buffer, CsvColumnError{
|
||||
field.Label,
|
||||
|
@ -416,14 +498,14 @@ func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
|
|||
func (t *CsvTable) Column(label string) *CsvTableColumn {
|
||||
for i := 0; i < len(t.columns); i++ {
|
||||
if t.columns[i].Label == label {
|
||||
return &t.columns[i]
|
||||
return t.columns[i]
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Columns returns available columns
|
||||
func (t *CsvTable) Columns() []CsvTableColumn {
|
||||
func (t *CsvTable) Columns() []*CsvTableColumn {
|
||||
return t.columns
|
||||
}
|
||||
|
||||
|
@ -452,13 +534,13 @@ func (t *CsvTable) FieldValue() *CsvTableColumn {
|
|||
}
|
||||
|
||||
// Tags returns tags
|
||||
func (t *CsvTable) Tags() []CsvTableColumn {
|
||||
func (t *CsvTable) Tags() []*CsvTableColumn {
|
||||
t.computeLineProtocolColumns()
|
||||
return t.cachedTags
|
||||
}
|
||||
|
||||
// Fields returns fields
|
||||
func (t *CsvTable) Fields() []CsvTableColumn {
|
||||
func (t *CsvTable) Fields() []*CsvTableColumn {
|
||||
t.computeLineProtocolColumns()
|
||||
return t.cachedFields
|
||||
}
|
||||
|
|
|
@ -465,16 +465,22 @@ func Test_CsvTable_DataColumnsInfo(t *testing.T) {
|
|||
require.False(t, table.AddRow(row))
|
||||
}
|
||||
table.computeLineProtocolColumns()
|
||||
columnInfo := "CsvTable{ dataColumns: 2 constantColumns: 5\n" +
|
||||
" measurement: &{Label:#constant measurement DataType:measurement DataFormat: LinePart:2 DefaultValue:cpu Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:}\n" +
|
||||
" tag: {Label:cpu DataType:tag DataFormat: LinePart:3 DefaultValue:cpu1 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:cpu}\n" +
|
||||
" tag: {Label:xpu DataType:tag DataFormat: LinePart:3 DefaultValue:xpu1 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:xpu}\n" +
|
||||
" field: {Label:x DataType: DataFormat: LinePart:0 DefaultValue: Index:0 TimeZone:UTC ParseF:<nil> escapedLabel:x}\n" +
|
||||
" field: {Label:y DataType: DataFormat: LinePart:0 DefaultValue: Index:1 TimeZone:UTC ParseF:<nil> escapedLabel:y}\n" +
|
||||
" field: {Label:of DataType:long DataFormat: LinePart:0 DefaultValue:100 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:of}\n" +
|
||||
" time: &{Label:#constant dateTime DataType:dateTime DataFormat: LinePart:5 DefaultValue:2 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:}" +
|
||||
"\n}"
|
||||
require.Equal(t, columnInfo, table.DataColumnsInfo())
|
||||
// expected result is something like this:
|
||||
// "CsvTable{ dataColumns: 2 constantColumns: 5\n" +
|
||||
// " measurement: &{Label:#constant measurement DataType:measurement DataFormat: LinePart:2 DefaultValue:cpu Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:}\n" +
|
||||
// " tag: {Label:cpu DataType:tag DataFormat: LinePart:3 DefaultValue:cpu1 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:cpu}\n" +
|
||||
// " tag: {Label:xpu DataType:tag DataFormat: LinePart:3 DefaultValue:xpu1 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:xpu}\n" +
|
||||
// " field: {Label:x DataType: DataFormat: LinePart:0 DefaultValue: Index:0 TimeZone:UTC ParseF:<nil> escapedLabel:x}\n" +
|
||||
// " field: {Label:y DataType: DataFormat: LinePart:0 DefaultValue: Index:1 TimeZone:UTC ParseF:<nil> escapedLabel:y}\n" +
|
||||
// " field: {Label:of DataType:long DataFormat: LinePart:0 DefaultValue:100 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:of}\n" +
|
||||
// " time: &{Label:#constant dateTime DataType:dateTime DataFormat: LinePart:5 DefaultValue:2 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:}" +
|
||||
// "\n}"
|
||||
result := table.DataColumnsInfo()
|
||||
require.Equal(t, 1, strings.Count(result, "measurement:"))
|
||||
require.Equal(t, 2, strings.Count(result, "tag:"))
|
||||
require.Equal(t, 3, strings.Count(result, "field:"))
|
||||
require.Equal(t, 1, strings.Count(result, "time:"))
|
||||
|
||||
var table2 *CsvTable
|
||||
require.Equal(t, "<nil>", table2.DataColumnsInfo())
|
||||
}
|
||||
|
|
|
@ -15,17 +15,21 @@ import (
|
|||
|
||||
// see https://v2.docs.influxdata.com/v2.0/reference/syntax/annotated-csv/#valid-data-types
|
||||
const (
|
||||
stringDatatype = "string"
|
||||
doubleDatatype = "double"
|
||||
boolDatatype = "boolean"
|
||||
longDatatype = "long"
|
||||
uLongDatatype = "unsignedLong"
|
||||
durationDatatype = "duration"
|
||||
base64BinaryDataType = "base64Binary"
|
||||
dateTimeDatatype = "dateTime"
|
||||
dateTimeDataFormatRFC3339 = "RFC3339"
|
||||
dateTimeDataFormatRFC3339Nano = "RFC3339Nano"
|
||||
dateTimeDataFormatNumber = "number" //the same as long, but serialized without i suffix, used for timestamps
|
||||
stringDatatype = "string"
|
||||
doubleDatatype = "double"
|
||||
boolDatatype = "boolean"
|
||||
longDatatype = "long"
|
||||
uLongDatatype = "unsignedLong"
|
||||
durationDatatype = "duration"
|
||||
base64BinaryDataType = "base64Binary"
|
||||
dateTimeDatatype = "dateTime"
|
||||
)
|
||||
|
||||
// predefined dateTime formats
|
||||
const (
|
||||
RFC3339 = "RFC3339"
|
||||
RFC3339Nano = "RFC3339Nano"
|
||||
dataFormatNumber = "number" //the same as long, but serialized without i suffix, used for timestamps
|
||||
)
|
||||
|
||||
var supportedDataTypes map[string]struct{}
|
||||
|
@ -136,11 +140,11 @@ func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) {
|
|||
return time.Parse(time.RFC3339, val)
|
||||
}
|
||||
return time.Unix(0, t).UTC(), nil
|
||||
case dateTimeDataFormatRFC3339:
|
||||
case RFC3339:
|
||||
return time.Parse(time.RFC3339, val)
|
||||
case dateTimeDataFormatRFC3339Nano:
|
||||
case RFC3339Nano:
|
||||
return time.Parse(time.RFC3339Nano, val)
|
||||
case dateTimeDataFormatNumber:
|
||||
case dataFormatNumber:
|
||||
t, err := strconv.ParseInt(val, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -113,7 +113,7 @@ func Test_ToTypedValue(t *testing.T) {
|
|||
for i, test := range tests {
|
||||
t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) {
|
||||
column := &CsvTableColumn{}
|
||||
setupDataType(column, test.dataType)
|
||||
column.setupDataType(test.dataType)
|
||||
val, err := toTypedValue(test.value, column)
|
||||
if err != nil && test.expect != nil {
|
||||
require.Nil(t, err.Error())
|
||||
|
@ -142,7 +142,7 @@ func Test_ToTypedValue_dateTimeCustomTimeZone(t *testing.T) {
|
|||
t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) {
|
||||
column := &CsvTableColumn{}
|
||||
column.TimeZone = tz
|
||||
setupDataType(column, test.dataType)
|
||||
column.setupDataType(test.dataType)
|
||||
val, err := toTypedValue(test.value, column)
|
||||
if err != nil && test.expect != nil {
|
||||
require.Nil(t, err.Error())
|
||||
|
@ -158,7 +158,7 @@ func Test_ToTypedValue_dateTimeCustomTimeZone(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test_AppendProtocolValue tests appendProtocolValue function
|
||||
// Test_WriteProtocolValue tests writeProtocolValue function
|
||||
func Test_AppendProtocolValue(t *testing.T) {
|
||||
epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z")
|
||||
var tests = []struct {
|
||||
|
@ -211,7 +211,7 @@ func Test_AppendConverted(t *testing.T) {
|
|||
for i, test := range tests {
|
||||
t.Run(fmt.Sprint(i), func(t *testing.T) {
|
||||
column := &CsvTableColumn{}
|
||||
setupDataType(column, test.dataType)
|
||||
column.setupDataType(test.dataType)
|
||||
val, err := appendConverted(nil, test.value, column)
|
||||
if err != nil && test.expect != "" {
|
||||
require.Nil(t, err.Error())
|
||||
|
@ -234,9 +234,9 @@ func Test_IsTypeSupported(t *testing.T) {
|
|||
require.True(t, IsTypeSupported(""), true)
|
||||
require.False(t, IsTypeSupported(" "), false)
|
||||
// time format is not part of data type
|
||||
require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatRFC3339))
|
||||
require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatRFC3339Nano))
|
||||
require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatNumber))
|
||||
require.False(t, IsTypeSupported(dateTimeDatatype+":"+RFC3339))
|
||||
require.False(t, IsTypeSupported(dateTimeDatatype+":"+RFC3339Nano))
|
||||
require.False(t, IsTypeSupported(dateTimeDatatype+":"+dataFormatNumber))
|
||||
}
|
||||
|
||||
// Test_NormalizeNumberString tests normalizeNumberString function
|
||||
|
|
Loading…
Reference in New Issue