feat(pkg/csv2lp): add csv to line protocol conversion library
@ -98,6 +98,7 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0
golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0
google.golang.org/api v0.7.0
google.golang.org/api v0.7.0
@ -0,0 +1,198 @@
package csv2lp
import (
// annotationComment represents parsed CSV annotation
type annotationComment struct {
// prefix in a CSV row that recognizes this annotation
prefix string
// flag is 0 to represent an annotation that is used for all data rows
// or a unique bit (>0) that that is unique to the annotation
flag uint8
// setupColumn setups metadata that drives the way of how column data
// are parsed, mandatory when flag > 0
setupColumn func(column *CsvTableColumn, columnValue string)
// setupColumn setups metadata that drives the way of how the table data
// are parsed, mandatory when flag == 0
setupTable func(table *CsvTable, row []string) error
func (a *annotationComment) isTableAnnotation() bool {
return a.setupTable != nil
func (a *annotationComment) matches(comment string) bool {
return strings.HasPrefix(strings.ToLower(comment), a.prefix)
var supportedAnnotations = []annotationComment{
{"#group", 1, func(column *CsvTableColumn, value string) {
// standard flux query result annotation
if strings.HasSuffix(value, "true") {
column.LinePart = linePartTag
}, nil},
{"#datatype", 2, func(column *CsvTableColumn, value string) {
// standard flux query result annotation
setupDataType(column, value)
}, nil},
{"#default", 4, func(column *CsvTableColumn, value string) {
// standard flux query result annotation
column.DefaultValue = ignoreLeadingComment(value)
}, nil},
{"#constant", 0, nil, func(table *CsvTable, row []string) error {
// adds a virtual column with contsant value to all data rows
// supported types of constant annotation rows are:
// 1. "#constant,datatype,label,defaultValue"
// 2. "#constant,measurement,value"
// 3. "#constant,dateTime,value"
// 4. "#constant datatype,label,defaultValue"
// 5. "#constant measurement,value"
// 6. "#constant dateTime,value"
// defaultValue is optional, additional columns are ignored
col := CsvTableColumn{}
col.Index = -1 // this is a virtual column that never extracts data from data rows
// setup column data type
setupDataType(&col, row[0])
var dataTypeIndex int
if len(col.DataType) == 0 && col.LinePart == 0 {
// type 1,2,3
dataTypeIndex = 1
if len(row) > 1 {
setupDataType(&col, row[1])
} else {
// type 4,5,6
dataTypeIndex = 0
// setup label if available
if len(row) > dataTypeIndex+1 {
col.Label = row[dataTypeIndex+1]
// setup defaultValue if available
if len(row) > dataTypeIndex+2 {
col.DefaultValue = row[dataTypeIndex+2]
// support type 2,3,5,6 syntax for measurement and timestamp
if col.LinePart == linePartMeasurement || col.LinePart == linePartTime {
if col.DefaultValue == "" && col.Label != "" {
// type 2,3,5,6
col.DefaultValue = col.Label
col.Label = "#constant " + col.DataType
} else if col.Label == "" {
// setup a label if no label is supplied fo focused error messages
col.Label = "#constant " + col.DataType
// add a virtual column to the table
table.extraColumns = append(table.extraColumns, col)
return nil
{"#timezone", 0, nil, func(table *CsvTable, row []string) error {
// setup timezone for parsing timestamps, UTC by default
val := ignoreLeadingComment(row[0])
if val == "" && len(row) > 1 {
val = row[1] // #timezone,Local
tz, err := parseTimeZone(val)
if err != nil {
return fmt.Errorf("#timezone annotation: %v", err)
table.timeZone = tz
return nil
// setupDataType setups data type from a column value
func setupDataType(column *CsvTableColumn, columnValue string) {
// columnValue contains typeName and possibly additional column metadata,
// it can be
// 1. typeName
// 2. typeName:format
// 3. typeName|defaultValue
// 4. typeName:format|defaultValue
// 5. #anycomment (all options above)
// ignoreLeadingComment is also required to specify datatype together with CSV annotation
// in #constant annotation
columnValue = ignoreLeadingComment(columnValue)
// | adds a default value to column
pipeIndex := strings.Index(columnValue, "|")
if pipeIndex > 1 {
if column.DefaultValue == "" {
column.DefaultValue = columnValue[pipeIndex+1:]
columnValue = columnValue[:pipeIndex]
// setup column format
colonIndex := strings.Index(columnValue, ":")
if colonIndex > 1 {
column.DataFormat = columnValue[colonIndex+1:]
columnValue = columnValue[:colonIndex]
// setup column linePart depending dataType
switch {
case columnValue == "tag":
column.LinePart = linePartTag
case strings.HasPrefix(columnValue, "ignore"):
// ignore or ignored
column.LinePart = linePartIgnored
case columnValue == "dateTime":
// dateTime field is used at most once in a protocol line
column.LinePart = linePartTime
case columnValue == "measurement":
column.LinePart = linePartMeasurement
case columnValue == "field":
column.LinePart = linePartField
columnValue = "" // this a generic field without a data type specified
case columnValue == "time": // time is an alias for dateTime
column.LinePart = linePartTime
columnValue = dateTimeDatatype
// setup column data type
column.DataType = columnValue
// setup custom parsing of bool data type
if column.DataType == boolDatatype && column.DataFormat != "" {
column.ParseF = createBoolParseFn(column.DataFormat)
// ignoreLeadingComment returns a value without '#anyComment ' prefix
func ignoreLeadingComment(value string) string {
if len(value) > 0 && value[0] == '#' {
pos := strings.Index(value, " ")
if pos > 0 {
return strings.TrimLeft(value[pos+1:], " ")
return ""
return value
// parseTimeZone tries to parse the supplied timezone indicator as a Location or returns an error
func parseTimeZone(val string) (*time.Location, error) {
switch {
case val == "":
return time.UTC, nil
case strings.ToLower(val) == "local":
return time.Local, nil
case val[0] == '-' || val[0] == '+':
if matched, _ := regexp.MatchString("[+-][0-9][0-9][0-9][0-9]", val); !matched {
return nil, fmt.Errorf("timezone '%s' is not +hhmm or -hhmm", val)
intVal, _ := strconv.Atoi(val)
offset := (intVal/100)*3600 + (intVal%100)*60
return time.FixedZone(val, offset), nil
return time.LoadLocation(val)
@ -0,0 +1,215 @@
package csv2lp
import (
func annotation(name string) annotationComment {
for _, a := range supportedAnnotations {
if a.prefix == name {
return a
panic("no annotation named " + name + " found!")
// TestGroupAnnotation tests #group annotation
func TestGroupAnnotation(t *testing.T) {
subject := annotation("#group")
require.True(t, subject.matches("#Group"))
require.False(t, subject.isTableAnnotation())
var tests = []struct {
value string
expect int
{"#group true", linePartTag},
{"#group false", 0},
{"false", 0},
{"unknown", 0},
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
col := &CsvTableColumn{}
subject.setupColumn(col, test.value)
require.Equal(t, test.expect, col.LinePart)
// TestDefaultAnnotation tests #default annotation
func TestDefaultAnnotation(t *testing.T) {
subject := annotation("#default")
require.True(t, subject.matches("#Default"))
require.False(t, subject.isTableAnnotation())
var tests = []struct {
value string
expect string
{"#default 1", "1"},
{"#default ", ""},
{"whatever", "whatever"},
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
col := &CsvTableColumn{}
subject.setupColumn(col, test.value)
require.Equal(t, test.expect, col.DefaultValue)
// TestDatatypeAnnotation tests #datatype annotation
func TestDatatypeAnnotation(t *testing.T) {
subject := annotation("#datatype")
require.True(t, subject.matches("#dataType"))
require.False(t, subject.isTableAnnotation())
var tests = []struct {
value string
expectType string
expectFormat string
expectLinePart int
{"#datatype long", "long", "", 0},
{"#datatype ", "", "", 0},
{"#datatype measurement", "_", "", linePartMeasurement},
{"#datatype tag", "_", "", linePartTag},
{"#datatype field", "", "", linePartField},
{"dateTime", "dateTime", "", linePartTime},
{"dateTime:RFC3339", "dateTime", "RFC3339", linePartTime},
{"#datatype dateTime:RFC3339", "dateTime", "RFC3339", linePartTime},
{"whatever:format", "whatever", "format", 0},
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
col := &CsvTableColumn{}
subject.setupColumn(col, test.value)
if test.expectType != "_" {
require.Equal(t, test.expectType, col.DataType)
require.Equal(t, test.expectFormat, col.DataFormat)
// TestConstantAnnotation tests #constant annotation
func TestConstantAnnotation(t *testing.T) {
subject := annotation("#constant")
require.True(t, subject.matches("#Constant"))
require.True(t, subject.isTableAnnotation())
var tests = []struct {
value []string
expectLabel string
expectDefault string
expectLinePart int
{[]string{"#constant "}, "", "", 0}, // means literally nothing
{[]string{"#constant measurement", "a"}, "_", "a", linePartMeasurement},
{[]string{"#constant measurement", "a", "b"}, "_", "b", linePartMeasurement},
{[]string{"#constant measurement", "a", ""}, "_", "a", linePartMeasurement},
{[]string{"#constant tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag},
{[]string{"#constant", "tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag},
{[]string{"#constant field", "fName", "fVal"}, "fName", "fVal", linePartField},
{[]string{"#constant", "field", "fName", "fVal"}, "fName", "fVal", linePartField},
{[]string{"dateTime", "1"}, "_", "1", linePartTime},
{[]string{"dateTime", "1", "2"}, "_", "2", linePartTime},
{[]string{"dateTime", "", "2"}, "_", "2", linePartTime},
{[]string{"dateTime", "3", ""}, "_", "3", linePartTime},
{[]string{"long", "fN", "fV"}, "fN", "fV", 0},
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
table := &CsvTable{}
subject.setupTable(table, test.value)
require.Equal(t, 1, len(table.extraColumns))
col := table.extraColumns[0]
require.Equal(t, test.expectLinePart, col.LinePart)
require.Greater(t, 0, col.Index)
if test.expectLabel != "_" {
require.Equal(t, test.expectLabel, col.Label)
} else {
require.NotEqual(t, "", col.Label)
require.Equal(t, test.expectDefault, col.DefaultValue)
// TestTimeZoneAnnotation tests #timezone annotation
func TestTimeZoneAnnotation(t *testing.T) {
subject := annotation("#timezone")
require.True(t, subject.matches("#timeZone"))
require.True(t, subject.isTableAnnotation())
var tests = []struct {
value string
err string
{"#timezone ", ""},
{"#timezone EST", ""},
{"#timezone,EST", ""},
{"#timezone,+0100", ""},
{"#timezone,whatever", "#timezone annotation"},
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
table := &CsvTable{}
err := subject.setupTable(table, strings.Split(test.value, ","))
if test.err == "" {
require.Nil(t, err)
require.NotNil(t, table.timeZone != nil)
} else {
require.NotNil(t, err)
require.True(t, strings.Contains(fmt.Sprintf("%v", err), test.err))
// TestTimeZoneAnnotation tests parseTimeZone fn
func Test_parseTimeZone(t *testing.T) {
now := time.Now()
_, localOffset := now.Zone()
var tests = []struct {
value string
offset int
{"local", localOffset},
{"Local", localOffset},
{"-0000", 0},
{"+0000", 0},
{"-0100", -3600},
{"+0100", 3600},
{"+0130", 3600 + 3600/2},
{"", 0},
{"-01", -1},
{"0000", -1},
{"UTC", 0},
{"EST", -5 * 3600},
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
tz, err := parseTimeZone(test.value)
require.NotEqual(t, tz, err) // both cannot be nil
if err != nil {
require.Nil(t, tz)
// fmt.Println(err)
if test.offset >= 0 {
require.Fail(t, "offset expected")
require.NotNil(t, tz)
testDate := fmt.Sprintf("%d-%02d-%02d", now.Year(), now.Month(), now.Day())
result, err := time.ParseInLocation("2006-01-02", testDate, tz)
require.Nil(t, err)
_, offset := result.Zone()
require.Equal(t, test.offset, offset)
@ -0,0 +1,464 @@
package csv2lp
import (
// column labels using in flux CSV result
const (
labelFieldName = "_field"
labelFieldValue = "_value"
labelTime = "_time"
labelStart = "_start"
labelStop = "_stop"
labelMeasurement = "_measurement"
// types of column with respect to line protocol
const (
linePartIgnored = iota + 1 // ignored in line protocol
// CsvTableColumn represents processing metadata about a csv column
type CsvTableColumn struct {
// label such as "_start", "_stop", "_time"
Label string
// "string", "long", "dateTime" ...
DataType string
// "RFC3339", "2006-01-02"
DataFormat string
// column's line part (0 means not determined yet), see linePart constants
LinePart int
// default value to be used for rows where value is an empty string.
DefaultValue string
// index of this column in the table row, -1 indicates a virtual column
Index int
// TimeZone of dateTime column, applied when parsing dateTime without timeZone in the format
TimeZone *time.Location
// parse function, when set, is used to convert column's string value to interface{}
ParseF func(string) (interface{}, error)
// escaped label for line protocol
escapedLabel string
// LineLabel returns escaped name of the column so it can be then used as a tag name or field name in line protocol
func (c *CsvTableColumn) LineLabel() string {
if len(c.escapedLabel) > 0 {
return c.escapedLabel
return c.Label
// Value returns the value of the column for the supplied row supplied
func (c *CsvTableColumn) Value(row []string) string {
if c.Index < 0 || c.Index >= len(row) {
return c.DefaultValue
val := row[c.Index]
if len(val) > 0 {
return val
return c.DefaultValue
// CsvColumnError indicates conversion error in a specific column
type CsvColumnError struct {
Column string
Err error
func (e CsvColumnError) Error() string {
return fmt.Sprintf("column '%s': %v", e.Column, e.Err)
// CsvTable contains metadata about columns and a state of the CSV processing
type CsvTable struct {
// table columns that extract value from data row
columns []CsvTableColumn
// partBits is a bitmap that is used to remember that a particular column annotation
// (#group, #datatype and #default) was already processed for the table;
// it is used to detect start of a new table in CSV flux results, a repeated annotation
// is detected and a new CsvTable can be then created
partBits uint8
// indicates that the table is ready to read table data, which
// is after reading annotation and header rows
readTableData bool
// indicates whether line protocol columns must be re-computed
lpColumnsCached bool
// extra columns are added by table-wide annotations, such as #constant
extraColumns []CsvTableColumn
// true to skip parsing of data type in column name
ignoreDataTypeInColumnName bool
// timeZone of dateTime column(s), applied when parsing dateTime value without a time zone specified
timeZone *time.Location
/* cached columns are initialized before reading the data rows */
cachedMeasurement *CsvTableColumn
cachedTime *CsvTableColumn
cachedFieldName *CsvTableColumn
cachedFieldValue *CsvTableColumn
cachedFields []CsvTableColumn
cachedTags []CsvTableColumn
// IgnoreDataTypeInColumnName sets a flag that that can ignores dataType parsing column names.
// When true, column names can then contain '|'. By default, column name can also contain datatype
// and default value when named `name|datatype` or `name|datatype|default`,
// for example `ready|boolean|true`
func (t *CsvTable) IgnoreDataTypeInColumnName(val bool) {
t.ignoreDataTypeInColumnName = val
// DataColumnsInfo returns a string representation of columns that are used to process CSV data
func (t *CsvTable) DataColumnsInfo() string {
if t == nil {
return "<nil>"
var builder = strings.Builder{}
builder.WriteString(fmt.Sprintf("CsvTable{ dataColumns: %d constantColumns: %d\n", len(t.columns), len(t.extraColumns)))
builder.WriteString(fmt.Sprintf(" measurement: %+v\n", t.cachedMeasurement))
for _, col := range t.cachedTags {
builder.WriteString(fmt.Sprintf(" tag: %+v\n", col))
for _, col := range t.cachedFields {
builder.WriteString(fmt.Sprintf(" field: %+v\n", col))
builder.WriteString(fmt.Sprintf(" time: %+v\n", t.cachedTime))
return builder.String()
// NextTable resets the table to a state in which it expects annotations and header rows
func (t *CsvTable) NextTable() {
t.partBits = 0 // no column annotations parsed yet
t.readTableData = false
t.columns = []CsvTableColumn{}
t.extraColumns = []CsvTableColumn{}
// AddRow updates the state of the state of table with a new header, annotation or data row.
// Returns true if the row is a data row.
func (t *CsvTable) AddRow(row []string) bool {
// detect data row or table header row
if len(row[0]) == 0 || row[0][0] != '#' {
if !t.readTableData {
// row must a header row now
t.lpColumnsCached = false // line protocol columns change
if t.partBits == 0 {
// create columns since no column anotations were processed
t.columns = make([]CsvTableColumn, len(row))
for i := 0; i < len(row); i++ {
t.columns[i].Index = i
// assign column labels for the header row
for i := 0; i < len(t.columns); i++ {
col := &t.columns[i]
if len(col.Label) == 0 && col.Index < len(row) {
col.Label = row[col.Index]
// assign column data type if possible
if len(col.DataType) == 0 && !t.ignoreDataTypeInColumnName {
if idx := strings.IndexByte(col.Label, '|'); idx != -1 {
setupDataType(col, col.Label[idx+1:])
col.Label = col.Label[:idx]
// header row is read, now expect data rows
t.readTableData = true
return false
return true
// process all supported annotations
for i := 0; i < len(supportedAnnotations); i++ {
supportedAnnotation := &supportedAnnotations[i]
if supportedAnnotation.matches(row[0]) {
if len(row[0]) > len(supportedAnnotation.prefix) && row[0][len(supportedAnnotation.prefix)] != ' ' {
continue // ignoring, not a supported annotation
t.lpColumnsCached = false // line protocol columns change
if supportedAnnotation.isTableAnnotation() {
// process table-level annotation
if err := supportedAnnotation.setupTable(t, row); err != nil {
log.Println("WARNING: ", err)
return false
// invariant: !supportedAnnotation.isTableAnnotation()
if t.readTableData {
// any column annotation stops reading of data rows
// create new columns upon new or repeated column annotation
if t.partBits == 0 || t.partBits&supportedAnnotation.flag == 1 {
t.partBits = supportedAnnotation.flag
t.columns = make([]CsvTableColumn, len(row))
for i := 0; i < len(row); i++ {
t.columns[i].Index = i
} else {
t.partBits = t.partBits | supportedAnnotation.flag
// setup columns according to column annotation
for j := 0; j < len(t.columns); j++ {
col := &t.columns[j]
if col.Index >= len(row) {
continue // missing value
} else {
supportedAnnotation.setupColumn(col, row[col.Index])
return false
// warn about unsupported annotation unless a comment row
if !strings.HasPrefix(row[0], "# ") {
log.Println("WARNING: unsupported annotation: ", row[0])
return false
func (t *CsvTable) computeLineProtocolColumns() bool {
if !t.lpColumnsCached {
return true
return false
func (t *CsvTable) recomputeLineProtocolColumns() {
// reset results
t.cachedMeasurement = nil
t.cachedTime = nil
t.cachedFieldName = nil
t.cachedFieldValue = nil
t.cachedTags = nil
t.cachedFields = nil
// having a _field column indicates fields without a line type are ignored
defaultIsField := t.Column(labelFieldName) == nil
columns := append(append([]CsvTableColumn{}, t.columns...), t.extraColumns...)
for i := 0; i < len(columns); i++ {
col := columns[i]
switch {
case col.Label == labelMeasurement || col.LinePart == linePartMeasurement:
t.cachedMeasurement = &col
case col.Label == labelTime || col.LinePart == linePartTime:
if t.cachedTime != nil && t.cachedTime.Label != labelStart && t.cachedTime.Label != labelStop {
log.Printf("WARNING: at most one dateTime column is expected, '%s' column is ignored\n", t.cachedTime.Label)
t.cachedTime = &col
case len(strings.TrimSpace(col.Label)) == 0 || col.LinePart == linePartIgnored:
// ignored columns that are marked to be ignored or without a label
case col.Label == labelFieldName:
t.cachedFieldName = &col
case col.Label == labelFieldValue:
t.cachedFieldValue = &col
case col.LinePart == linePartTag:
col.escapedLabel = escapeTag(col.Label)
t.cachedTags = append(t.cachedTags, col)
case col.LinePart == linePartField:
col.escapedLabel = escapeTag(col.Label)
t.cachedFields = append(t.cachedFields, col)
if defaultIsField {
col.escapedLabel = escapeTag(col.Label)
t.cachedFields = append(t.cachedFields, col)
// line protocol requires sorted tags
if t.cachedTags != nil && len(t.cachedTags) > 0 {
sort.Slice(t.cachedTags, func(i, j int) bool {
return t.cachedTags[i].Label < t.cachedTags[j].Label
// setup timezone for timestamp column
if t.cachedTime != nil && t.cachedTime.TimeZone == nil {
t.cachedTime.TimeZone = t.timeZone
t.lpColumnsCached = true // line protocol columns are now fresh
// CreateLine produces a protocol line out of the supplied row or returns error
func (t *CsvTable) CreateLine(row []string) (line string, err error) {
buffer := make([]byte, 100)[:0]
buffer, err = t.AppendLine(buffer, row)
if err != nil {
return "", err
return *(*string)(unsafe.Pointer(&buffer)), nil
// AppendLine appends a protocol line to the supplied buffer and returns appended buffer or an error if any
func (t *CsvTable) AppendLine(buffer []byte, row []string) ([]byte, error) {
if t.computeLineProtocolColumns() {
// validate column data types
if t.cachedFieldValue != nil && !IsTypeSupported(t.cachedFieldValue.DataType) {
return buffer, CsvColumnError{
fmt.Errorf("data type '%s' is not supported", t.cachedFieldValue.DataType),
for _, c := range t.cachedFields {
if !IsTypeSupported(c.DataType) {
return buffer, CsvColumnError{
fmt.Errorf("data type '%s' is not supported", c.DataType),
if t.cachedMeasurement == nil {
return buffer, errors.New("no measurement column found")
measurement := t.cachedMeasurement.Value(row)
if measurement == "" {
return buffer, CsvColumnError{
errors.New("no measurement supplied"),
buffer = append(buffer, escapeMeasurement(measurement)...)
for _, tag := range t.cachedTags {
value := tag.Value(row)
if tag.Index < len(row) && len(value) > 0 {
buffer = append(buffer, ',')
buffer = append(buffer, tag.LineLabel()...)
buffer = append(buffer, '=')
buffer = append(buffer, escapeTag(value)...)
buffer = append(buffer, ' ')
fieldAdded := false
if t.cachedFieldName != nil && t.cachedFieldValue != nil {
field := t.cachedFieldName.Value(row)
value := t.cachedFieldValue.Value(row)
if len(value) > 0 && len(field) > 0 {
buffer = append(buffer, escapeTag(field)...)
buffer = append(buffer, '=')
var err error
buffer, err = appendConverted(buffer, value, t.cachedFieldValue)
if err != nil {
return buffer, CsvColumnError{
fieldAdded = true
for _, field := range t.cachedFields {
value := field.Value(row)
if len(value) > 0 {
if !fieldAdded {
fieldAdded = true
} else {
buffer = append(buffer, ',')
buffer = append(buffer, field.LineLabel()...)
buffer = append(buffer, '=')
var err error
buffer, err = appendConverted(buffer, value, &field)
if err != nil {
return buffer, CsvColumnError{
if !fieldAdded {
return buffer, errors.New("no field data found")
if t.cachedTime != nil && t.cachedTime.Index < len(row) {
timeVal := t.cachedTime.Value(row)
if len(timeVal) > 0 {
if len(t.cachedTime.DataType) == 0 {
// assume dateTime data type (number or RFC3339)
t.cachedTime.DataType = dateTimeDatatype
t.cachedTime.DataFormat = ""
buffer = append(buffer, ' ')
var err error
buffer, err = appendConverted(buffer, timeVal, t.cachedTime)
if err != nil {
return buffer, CsvColumnError{
return buffer, nil
// Column returns the first column of the supplied label or nil
func (t *CsvTable) Column(label string) *CsvTableColumn {
for i := 0; i < len(t.columns); i++ {
if t.columns[i].Label == label {
return &t.columns[i]
return nil
// Columns returns available columns
func (t *CsvTable) Columns() []CsvTableColumn {
return t.columns
// Measurement returns measurement column or nil
func (t *CsvTable) Measurement() *CsvTableColumn {
return t.cachedMeasurement
// Time returns time column or nil
func (t *CsvTable) Time() *CsvTableColumn {
return t.cachedTime
// FieldName returns field name column or nil
func (t *CsvTable) FieldName() *CsvTableColumn {
return t.cachedFieldName
// FieldValue returns field value column or nil
func (t *CsvTable) FieldValue() *CsvTableColumn {
return t.cachedFieldValue
// Tags returns tags
func (t *CsvTable) Tags() []CsvTableColumn {
return t.cachedTags
// Fields returns fields
func (t *CsvTable) Fields() []CsvTableColumn {
return t.cachedFields
@ -0,0 +1,477 @@
package csv2lp
import (
func readCsv(t *testing.T, data string) [][]string {
reader := csv.NewReader(strings.NewReader(data))
var rows [][]string
for {
row, err := reader.Read()
reader.FieldsPerRecord = 0 // every row can have different number of fields
if err == io.EOF {
if err != nil {
t.Log("row: ", row)
rows = append(rows, row)
return rows
// TestQueryResult validates construction of table columns from Query CSV result
func TestQueryResult(t *testing.T) {
const csvQueryResult = `
var lineProtocolQueryResult = []string{
"cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000",
"cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000",
"cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000",
"cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000",
table := CsvTable{}
rows := readCsv(t, csvQueryResult)
lineProtocolIndex := 0
for i, row := range rows {
rowProcessed := table.AddRow(row)
if i%6 < 4 {
require.Equal(t, rowProcessed, false, "row %d", i)
} else {
require.Equal(t, rowProcessed, true, "row %d", i)
line, _ := table.CreateLine(row)
require.Equal(t, lineProtocolQueryResult[lineProtocolIndex], line)
if i%6 == 4 {
// verify table
require.GreaterOrEqual(t, len(table.columns), 10)
require.Equal(t, table.columns, table.Columns())
for j, col := range table.columns {
if j > 0 {
require.Equal(t, col.Index, j)
require.Equal(t, col.Label, rows[i-1][j])
if len(rows[i-2]) > j {
require.Equal(t, col.DefaultValue, rows[i-2][j])
} else {
// some traling data are missing
require.Equal(t, col.DefaultValue, "")
types := strings.Split(rows[i-3][j], ":")
require.Equal(t, types[0], col.DataType, "row %d, col %d", i-3, j)
if len(types) > 1 {
require.Equal(t, types[1], col.DataFormat, "row %d, col %d", i-3, j)
// verify cached values
require.Equal(t, table.Column("_measurement"), table.cachedMeasurement)
require.Nil(t, table.Column("_no"))
require.NotNil(t, table.cachedMeasurement)
require.NotNil(t, table.cachedFieldName)
require.NotNil(t, table.cachedFieldValue)
require.NotNil(t, table.cachedTime)
require.NotNil(t, table.cachedTags)
require.Equal(t, table.Measurement().Label, "_measurement")
require.Equal(t, table.FieldName().Label, "_field")
require.Equal(t, table.FieldValue().Label, "_value")
require.Equal(t, table.Time().Label, "_time")
require.Equal(t, len(table.Tags()), 2)
require.Equal(t, table.Tags()[0].Label, "cpu")
require.Equal(t, table.Tags()[1].Label, "host")
require.Equal(t, len(table.Fields()), 0)
func Test_ignoreLeadingComment(t *testing.T) {
var tests = []struct {
value string
expect string
{"", ""},
{"a", "a"},
{" #whatever", " #whatever"},
{"#whatever", ""},
{"#whatever ", ""},
{"#whatever a b ", "a b "},
{"#whatever a b ", "a b "},
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
require.Equal(t, test.expect, ignoreLeadingComment(test.value))
// TestCsvData checks data that are writen in an annotated CSV file
func TestCsvData(t *testing.T) {
var tests = []struct {
name string
csv string
line string
"cpu a=1,b=1",
"cpu a=1,b=1",
"", // no fields present
"", // no measurement present
"#datatype measurement,,\nmeasurement,a,b\ncpu,1,2",
"cpu a=1,b=2",
"#datatype measurement,tag,field\nmeasurement,a,b\ncpu,1,2",
"cpu,a=1 b=2",
"#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2,3",
"cpu,a=1 time=3 2",
"#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2020-01-10T10:10:10Z,3",
"cpu,a=1 time=3 1578651010000000000",
"#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2020-01-10T10:10:10.0Z,3",
"cpu,a=1 time=3 1578651010000000000",
"#datatype measurement,tag,ignore,field\nmeasurement,a,b,time\ncpu,1,2,3",
"cpu,a=1 time=3",
"#datatype measurement,tag,ignore,field\nmeasurement,a,b,time\ncpu,1,2,3",
"cpu,a=1 time=3",
"#datatype measurement,tag,ignore,field\n" +
"#datatypea tag,tag,\n" + // this must be ignored since it not a supported annotation
"cpu,a=1 time=3",
"#datatype measurement,dateTime,\nmeasurement,a,b\ncpu,2020-01-10T10:10:10.0Z,2",
"cpu b=2 1578651010000000000",
"#datatype measurement,,,field\nmeasurement,_field,_value,other\ncpu,a,1,2",
"cpu a=1,other=2",
"#datatype measurement,tag,tag,time,field\nmeasurement,b,a,c,time\ncpu,1,2,3,4",
"cpu,a=2,b=1 time=4 3",
"#datatype measurement,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime\n" +
"m,s,d,b,l,ul,dur,by,d1,d2,time\n" +
"cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1",
"#datatype measurement,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime\n" +
"m,s,d,b,l,ul,dur,by,d1,d2,time\n" +
"cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1",
"#datatype ,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime:RFC3339,dateTime:RFC3339Nano,\n" +
"_measurement,s,d,b,l,ul,dur,by,d1,d2,_time\n" +
"cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1",
"#datatype measurement,tag,field,ignored,dateTime\n" +
"m,t,f,i,dt\n" +
"cpu,t=myTag f=0 1",
"#datatype ,string,string,,,,\n" +
`_measurement,s1,s2,"a,","b ",c=` + "\n" +
`"cpu, ","""",\,a,b,c`,
`cpu\,\ s1="\"",s2="\\",a\,=a,b\ =b,c\==c`,
"#default cpu,yes,0,1\n#datatype ,tag,,\n_measurement,test,col1,_time\n,,,",
"cpu,test=yes col1=0 1",
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var lines []string
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil && test.line != "" {
require.Nil(t, err.Error())
lines = append(lines, line)
require.Equal(t, []string{test.line}, lines)
func TestConstantAnnotations(t *testing.T) {
var tests = []struct {
name string
csv string
line string
"#constant measurement,cpu\n" +
"a,b\n" +
"cpu a=1,b=1",
"#constant,measurement,,cpu\n" +
"#constant,tag,cpu,cpu1\n" +
"#constant,long,of,0\n" +
"#constant,dateTime,,2\n" +
"a,b\n" +
"cpu,cpu=cpu1 a=1,b=1,of=0i 2",
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var lines []string
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil && test.line != "" {
require.Nil(t, err.Error())
lines = append(lines, line)
require.Equal(t, []string{test.line}, lines)
func TestDataTypeInColumnName(t *testing.T) {
var tests = []struct {
csv string
line string
ignoreDataTypeInColumnName bool
"m|measurement,b|boolean:x:,c|boolean:x:|x\n" +
`cpu c=true`,
"m|measurement,a|boolean,b|boolean:0:1,c|boolean:x:,d|boolean:x:\n" +
`cpu a=true,b=false,c=true,d=false`,
"#constant measurement,cpu\n" +
"a|long,b|string\n" +
`cpu a=1i,b="1"`,
"#constant measurement,cpu\n" +
"a|long,b|string\n" +
`cpu a|long=1,b|string=1`,
"#constant measurement,cpu\n" +
"#datatype long,string\n" +
"a|long,b|string\n" +
`cpu a|long=1i,b|string="1"`,
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var lines []string
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil && test.line != "" {
require.Nil(t, err.Error())
lines = append(lines, line)
require.Equal(t, []string{test.line}, lines)
// TestCsvData_dataErrors validates table data errors
func TestCsvData_dataErrors(t *testing.T) {
var tests = []struct {
name string
csv string
"#datatype measurement,,\n#datatype ,dateTime:RFC3339,\nmeasurement,a,b\ncpu,1,2",
"#datatype measurement,,\n#datatype ,long,\nmeasurement,_value,_field\ncpu,a,count",
"#datatype measurement,,\n#datatype ,long,\nmeasurement,a,b\ncpu,a,2",
"#datatype measurement,tag,time,field\nmeasurement,a,b,time\ncpu,1,2020-10,3",
"#datatype ,\ncol1,col2\n1,2",
"#datatype ,whatever\n_measurement,col2\na,2",
"#datatype ,,whatever\n_measurement,_field,_value\na,1,2",
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var errors []error
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
_, err := table.CreateLine(row)
if err != nil {
errors = append(errors, err)
require.Equal(t, 1, len(errors))
// fmt.Println(errors[0])
require.NotNil(t, errors[0].Error())
// LineLabel is the same as Label in all test columns
for _, col := range table.Columns() {
require.Equal(t, col.Label, col.LineLabel())
func TestCsvTable_ColumnInfo(t *testing.T) {
data := "#constant,measurement,cpu\n" +
"#constant,tag,xpu,xpu1\n" +
"#constant,tag,cpu,cpu1\n" +
"#constant,long,of,100\n" +
"#constant,dateTime,2\n" +
table := CsvTable{}
for _, row := range readCsv(t, data) {
require.False(t, table.AddRow(row))
columnInfo := "CsvTable{ dataColumns: 2 constantColumns: 5\n" +
" measurement: &{Label:#constant measurement DataType:measurement DataFormat: LinePart:2 DefaultValue:cpu Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:}\n" +
" tag: {Label:cpu DataType:tag DataFormat: LinePart:3 DefaultValue:cpu1 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:cpu}\n" +
" tag: {Label:xpu DataType:tag DataFormat: LinePart:3 DefaultValue:xpu1 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:xpu}\n" +
" field: {Label:x DataType: DataFormat: LinePart:0 DefaultValue: Index:0 TimeZone:UTC ParseF:<nil> escapedLabel:x}\n" +
" field: {Label:y DataType: DataFormat: LinePart:0 DefaultValue: Index:1 TimeZone:UTC ParseF:<nil> escapedLabel:y}\n" +
" field: {Label:of DataType:long DataFormat: LinePart:0 DefaultValue:100 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:of}\n" +
" time: &{Label:#constant dateTime DataType:dateTime DataFormat: LinePart:5 DefaultValue:2 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:}" +
require.Equal(t, columnInfo, table.DataColumnsInfo())
var table2 *CsvTable
require.Equal(t, "<nil>", table2.DataColumnsInfo())
@ -0,0 +1,132 @@
// Package csv2lp transforms CSV data to influxDB line protocol
package csv2lp
import (
// CsvLineError is returned for csv conversion errors
type CsvLineError struct {
// 1 is the first line
Line int
Err error
func (e CsvLineError) Error() string {
return fmt.Sprintf("line %d: %v", e.Line, e.Err)
// CsvToLineReader represents state of transformation from csv data to lien protocol reader
type CsvToLineReader struct {
// csv reading
csv *csv.Reader
// Table collects information about used columns
Table CsvTable
// LineNumber represents line number of csv.Reader, 1 is the first
LineNumber int
// when true, log table data columns before reading data rows
logTableDataColumns bool
// state variable that indicates whether any data row was read
dataRowAdded bool
// log CSV data errors to sterr and continue with CSV processing
skipRowOnError bool
// reader results
buffer []byte
lineBuffer []byte
index int
finished error
// LogTableColumns turns on/off logging of table data columns before reading data rows
func (state *CsvToLineReader) LogTableColumns(val bool) *CsvToLineReader {
state.logTableDataColumns = val
return state
// SkipRowOnError controls whether to fail on every CSV conversion error (false) or to log the error and continue (true)
func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader {
state.skipRowOnError = val
return state
func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
// state1: finished
if state.finished != nil {
return 0, state.finished
// state2: some data are in the buffer to copy
if len(state.buffer) > state.index {
// we have remaining bytes to copy
if len(state.buffer)-state.index > len(p) {
// copy a part of the buffer
copy(p, state.buffer[state.index:state.index+len(p)])
state.index += len(p)
return len(p), nil
// copy the entire buffer
n = len(state.buffer) - state.index
copy(p[:n], state.buffer[state.index:])
state.buffer = state.buffer[:0]
state.index = 0
return n, nil
// state3: fill buffer with data to read from
for {
// Read each record from csv
row, err := state.csv.Read()
if parseError, ok := err.(*csv.ParseError); ok && parseError.Err == csv.ErrFieldCount {
// every row can have different number of columns
err = nil
if err != nil {
state.finished = err
return state.Read(p)
if state.LineNumber == 1 && len(row) == 1 && strings.HasPrefix(row[0], "sep=") && len(row[0]) > 4 {
// separator specified in the first line
state.csv.Comma = rune(row[0][4])
if state.Table.AddRow(row) {
var err error
state.lineBuffer = state.lineBuffer[:0] // reuse line buffer
state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row)
if !state.dataRowAdded && state.logTableDataColumns {
state.dataRowAdded = true
if err != nil {
lineError := CsvLineError{state.LineNumber, err}
if state.skipRowOnError {
state.finished = lineError
return state.Read(p)
state.buffer = append(state.buffer, state.lineBuffer...)
state.buffer = append(state.buffer, '\n')
} else {
state.dataRowAdded = false
return state.Read(p)
// CsvToProtocolLines transforms csv data into line protocol data
func CsvToProtocolLines(reader io.Reader) *CsvToLineReader {
csv := csv.NewReader(reader)
csv.ReuseRecord = true
return &CsvToLineReader{
csv: csv,
@ -0,0 +1,225 @@
package csv2lp
import (
// TestCsvData tests conversion annotated CSV file to line protocol
func Test_CsvToProtocolLines(t *testing.T) {
var tests = []struct {
name string
csv string
lines string
err string
"cpu a=1,b=1\ncpu a=b2\n",
"cpu a=1,b=1\ncpu a=b2\n",
"no field data",
"_time", // error in _time column
"#constant,measurement,,cpu\n" +
"#constant,tag,xpu,xpu1\n" +
"#constant,tag,cpu,cpu1\n" +
"#constant,long,of,100\n" +
"#constant,dateTime,,2\n" +
"x,y\n" +
"1,2\n" +
"cpu,cpu=cpu1,xpu=xpu1 x=1,y=2,of=100i 2\n" +
"cpu,cpu=cpu1,xpu=xpu1 x=3,y=4,of=100i 2\n",
"", // no error
"#timezone,-0100\n" +
"#constant,measurement,cpu\n" +
"#constant,dateTime:2006-01-02,1970-01-01\n" +
"x,y\n" +
"cpu x=1,y=2 3600000000000\n",
"", // no error
"#timezone,EST\n" +
"#constant,measurement,cpu\n" +
"#constant,dateTime:2006-01-02,1970-01-01\n" +
"x,y\n" +
"cpu x=1,y=2 18000000000000\n",
"", // no error
bufferSizes := []int{40, 7, 3, 1}
for _, test := range tests {
for _, bufferSize := range bufferSizes {
t.Run(test.name+"_"+strconv.Itoa(bufferSize), func(t *testing.T) {
reader := CsvToProtocolLines(strings.NewReader(test.csv))
buffer := make([]byte, bufferSize)
lines := make([]byte, 0, 100)
for {
n, err := reader.Read(buffer)
if err != nil {
if err == io.EOF {
if test.err != "" {
// fmt.Println(err)
if err := err.Error(); !strings.Contains(err, test.err) {
require.Equal(t, err, test.err)
require.Nil(t, err.Error())
lines = append(lines, buffer[:n]...)
if test.err == "" {
require.Equal(t, test.lines, string(lines))
} else {
require.Fail(t, "error message with '"+test.err+"' expected")
// TestCsvData_LogTableColumns checks correct logging
func TestCsvData_LogTableColumns(t *testing.T) {
var buf bytes.Buffer
oldFlags := log.Flags()
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
defer func() {
csv := "_measurement,a,b\ncpu,1,1\ncpu,b2\n"
reader := CsvToProtocolLines(strings.NewReader(csv)).LogTableColumns(true)
require.False(t, reader.skipRowOnError)
require.True(t, reader.logTableDataColumns)
// read all the data
out := buf.String()
// fmt.Println(out)
messages := strings.Count(out, prefix)
require.Equal(t, messages, 1)
// TestCsvData_LogTableColumns checks correct logging
func TestCsvData_LogTimeZoneWarning(t *testing.T) {
var buf bytes.Buffer
oldFlags := log.Flags()
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
defer func() {
csv := "#timezone 1\n" +
"#constant,dateTime:2006-01-02,1970-01-01\n" +
reader := CsvToProtocolLines(strings.NewReader(csv))
bytes, _ := ioutil.ReadAll(reader)
out := buf.String()
// fmt.Println(out)
messages := strings.Count(out, prefix)
require.Equal(t, messages, 1)
require.Equal(t, string(bytes), "cpu a=1,b=1 0\n")
// TestCsvData_LogCsvErrors checks correct logging
func TestCsvData_SkipRowOnError(t *testing.T) {
var buf bytes.Buffer
oldFlags := log.Flags()
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
defer func() {
csv := "_measurement,a,_time\n,1,1\ncpu,2,2\ncpu,3,3a\n"
reader := CsvToProtocolLines(strings.NewReader(csv)).SkipRowOnError(true)
require.Equal(t, reader.skipRowOnError, true)
require.Equal(t, reader.logTableDataColumns, false)
// read all the data
out := buf.String()
// fmt.Println(out)
messages := strings.Count(out, prefix)
require.Equal(t, messages, 2)
// Test_CsvLineError checks formating of line errors
func Test_CsvLineError(t *testing.T) {
var tests = []struct {
err CsvLineError
value string
CsvLineError{Line: 1, Err: errors.New("cause")},
"line 1: cause",
CsvLineError{Line: 2, Err: CsvColumnError{"a", errors.New("cause")}},
"line 2: column 'a': cause",
for _, test := range tests {
require.Equal(t, test.value, test.err.Error())
@ -0,0 +1,300 @@
package csv2lp
import (
// see https://v2.docs.influxdata.com/v2.0/reference/syntax/annotated-csv/#valid-data-types
const (
stringDatatype = "string"
doubleDatatype = "double"
boolDatatype = "boolean"
longDatatype = "long"
uLongDatatype = "unsignedLong"
durationDatatype = "duration"
base64BinaryDataType = "base64Binary"
dateTimeDatatype = "dateTime"
dateTimeDataFormatRFC3339 = "RFC3339"
dateTimeDataFormatRFC3339Nano = "RFC3339Nano"
dateTimeDataFormatNumber = "number" //the same as long, but serialized without i suffix, used for timestamps
var supportedDataTypes map[string]struct{}
func init() {
supportedDataTypes = make(map[string]struct{}, 9)
supportedDataTypes[stringDatatype] = struct{}{}
supportedDataTypes[doubleDatatype] = struct{}{}
supportedDataTypes[boolDatatype] = struct{}{}
supportedDataTypes[longDatatype] = struct{}{}
supportedDataTypes[uLongDatatype] = struct{}{}
supportedDataTypes[durationDatatype] = struct{}{}
supportedDataTypes[base64BinaryDataType] = struct{}{}
supportedDataTypes[dateTimeDatatype] = struct{}{}
supportedDataTypes[""] = struct{}{}
// IsTypeSupported returns true if the data type is supported
func IsTypeSupported(dataType string) bool {
_, supported := supportedDataTypes[dataType]
return supported
var replaceMeasurement *strings.Replacer = strings.NewReplacer(",", "\\,", " ", "\\ ")
var replaceTag *strings.Replacer = strings.NewReplacer(",", "\\,", " ", "\\ ", "=", "\\=")
var replaceQuoted *strings.Replacer = strings.NewReplacer("\"", "\\\"", "\\", "\\\\")
func escapeMeasurement(val string) string {
for i := 0; i < len(val); i++ {
if val[i] == ',' || val[i] == ' ' {
return replaceMeasurement.Replace(val)
return val
func escapeTag(val string) string {
for i := 0; i < len(val); i++ {
if val[i] == ',' || val[i] == ' ' || val[i] == '=' {
return replaceTag.Replace(val)
return val
func escapeString(val string) string {
for i := 0; i < len(val); i++ {
if val[i] == '"' || val[i] == '\\' {
return replaceQuoted.Replace(val)
return val
// normalizeNumberString normalizes the supplied value with the help of the format supplied.
// This normalization is intended to convert number strings of different locales to a strconv-parseable value.
// The format's first character is a fraction delimiter character. Next characters in the format
// are simply removed, they are typically used to visually separate groups in large numbers.
// The removeFaction parameter controls whether the returned value can contain also the fraction part.
// For example, to get a strconv-parseable float from a Spanish value '3.494.826.157,123', use format ",." .
func normalizeNumberString(value string, format string, removeFraction bool) string {
if len(format) == 0 {
format = ". \n\t\r_"
if strings.ContainsAny(value, format) {
formatRunes := []rune(format)
fractionRune := formatRunes[0]
ignored := formatRunes[1:]
retVal := strings.Builder{}
for _, c := range value {
// skip ignored characters
for i := 0; i < len(ignored); i++ {
if c == ignored[i] {
continue ForAllCharacters
if c == fractionRune {
if removeFraction {
break ForAllCharacters
} else {
return retVal.String()
return value
func toTypedValue(val string, column *CsvTableColumn) (interface{}, error) {
dataType := column.DataType
dataFormat := column.DataFormat
if column.ParseF != nil {
return column.ParseF(val)
switch dataType {
case stringDatatype:
return val, nil
case dateTimeDatatype:
switch dataFormat {
case "": // number or time.RFC3339
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return time.Parse(time.RFC3339, val)
return time.Unix(0, t).UTC(), nil
case dateTimeDataFormatRFC3339:
return time.Parse(time.RFC3339, val)
case dateTimeDataFormatRFC3339Nano:
return time.Parse(time.RFC3339Nano, val)
case dateTimeDataFormatNumber:
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, err
return time.Unix(0, t).UTC(), nil
if column.TimeZone != nil {
return time.ParseInLocation(dataFormat, val, column.TimeZone)
return time.Parse(dataFormat, val)
case durationDatatype:
return time.ParseDuration(val)
case doubleDatatype:
return strconv.ParseFloat(normalizeNumberString(val, dataFormat, false), 64)
case boolDatatype:
switch {
case len(val) == 0:
return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'")
case val[0] == 't' || val[0] == 'T' || val[0] == 'y' || val[0] == 'Y' || val[0] == '1':
return true, nil
case val[0] == 'f' || val[0] == 'F' || val[0] == 'n' || val[0] == 'N' || val[0] == '0':
return false, nil
return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'")
case longDatatype:
return strconv.ParseInt(normalizeNumberString(val, dataFormat, true), 10, 64)
case uLongDatatype:
return strconv.ParseUint(normalizeNumberString(val, dataFormat, true), 10, 64)
case base64BinaryDataType:
return base64.StdEncoding.DecodeString(val)
return nil, fmt.Errorf("unsupported data type '%s'", dataType)
func appendProtocolValue(buffer []byte, value interface{}) ([]byte, error) {
switch v := value.(type) {
case uint64:
return append(strconv.AppendUint(buffer, v, 10), 'u'), nil
case int64:
return append(strconv.AppendInt(buffer, v, 10), 'i'), nil
case int:
return append(strconv.AppendInt(buffer, int64(v), 10), 'i'), nil
case float64:
if math.IsNaN(v) {
return buffer, errors.New("value is NaN")
if math.IsInf(v, 0) {
return buffer, errors.New("value is Infinite")
return strconv.AppendFloat(buffer, v, 'f', -1, 64), nil
case float32:
v32 := float64(v)
if math.IsNaN(v32) {
return buffer, errors.New("value is NaN")
if math.IsInf(v32, 0) {
return buffer, errors.New("value is Infinite")
return strconv.AppendFloat(buffer, v32, 'f', -1, 64), nil
case string:
buffer = append(buffer, '"')
buffer = append(buffer, escapeString(v)...)
buffer = append(buffer, '"')
return buffer, nil
case []byte:
buf := make([]byte, base64.StdEncoding.EncodedLen(len(v)))
base64.StdEncoding.Encode(buf, v)
return append(buffer, buf...), nil
case bool:
if v {
return append(buffer, "true"...), nil
return append(buffer, "false"...), nil
case time.Time:
return strconv.AppendInt(buffer, v.UnixNano(), 10), nil
case time.Duration:
return append(strconv.AppendInt(buffer, v.Nanoseconds(), 10), 'i'), nil
return buffer, fmt.Errorf("unsupported value type: %T", v)
func appendConverted(buffer []byte, val string, column *CsvTableColumn) ([]byte, error) {
if len(column.DataType) == 0 { // keep the value as it is
return append(buffer, val...), nil
typedVal, err := toTypedValue(val, column)
if err != nil {
return buffer, err
return appendProtocolValue(buffer, typedVal)
func decodeNop(reader io.Reader) io.Reader {
return reader
// CreateDecoder creates a decoding reader from the supplied encoding to UTF-8, or returns an error
func CreateDecoder(encoding string) (func(io.Reader) io.Reader, error) {
if len(encoding) > 0 && encoding != "UTF-8" {
enc, err := ianaindex.IANA.Encoding(encoding)
if err != nil {
return nil, fmt.Errorf("%v, see https://www.iana.org/assignments/character-sets/character-sets.xhtml", err)
if enc == nil {
return nil, fmt.Errorf("unsupported encoding: %s", encoding)
return enc.NewDecoder().Reader, nil
return decodeNop, nil
// createBoolParseFn returns a function that converts a string value to boolean according to format "true,yes,1:false,no,0"
func createBoolParseFn(format string) func(string) (interface{}, error) {
var err error = nil
truthy := []string{}
falsy := []string{}
if !strings.Contains(format, ":") {
err = fmt.Errorf("unsupported boolean format: %s should be in 'true,yes,1:false,no,0' format, but no ':' is present", format)
} else {
colon := strings.Index(format, ":")
t := format[:colon]
f := format[colon+1:]
if t != "" {
truthy = strings.Split(t, ",")
if f != "" {
falsy = strings.Split(f, ",")
return func(val string) (interface{}, error) {
if err != nil {
return nil, err
for _, s := range falsy {
if s == val {
return false, nil
for _, s := range truthy {
if s == val {
return true, nil
if len(falsy) == 0 {
return false, nil
if len(truthy) == 0 {
return true, nil
return nil, fmt.Errorf("unsupported boolean value: %s must one of %v or one of %v", val, truthy, falsy)
@ -0,0 +1,325 @@
package csv2lp
import (
func Test_escapeMeasurement(t *testing.T) {
var tests = []struct {
value string
expect string
{"a", "a"}, {"", ""},
{"a,", `a\,`},
{"a ", `a\ `},
{"a=", `a=`},
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.expect, escapeMeasurement(test.value))
func Test_escapeTag(t *testing.T) {
var tests = []struct {
value string
expect string
{"a", "a"}, {"", ""},
{"a,", `a\,`},
{"a ", `a\ `},
{"a=", `a\=`},
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.expect, escapeTag(test.value))
func Test_escapeString(t *testing.T) {
var tests = []struct {
value string
expect string
{"a", `a`}, {"", ``},
{`a"`, `a\"`},
{`a\`, `a\\`},
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.expect, escapeString(test.value))
func Test_toTypedValue(t *testing.T) {
epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z")
var tests = []struct {
dataType string
value string
expect interface{}
{"string", "a", "a"},
{"double", "1.0", float64(1.0)},
{"boolean", "true", true},
{"boolean", "True", true},
{"boolean", "y", true},
{"boolean", "Yes", true},
{"boolean", "1", true},
{"boolean", "false", false},
{"boolean", "False", false},
{"boolean", "n", false},
{"boolean", "No", false},
{"boolean", "0", false},
{"boolean", "", nil},
{"boolean", "?", nil},
{"long", "1", int64(1)},
{"unsignedLong", "1", uint64(1)},
{"duration", "1ns", time.Duration(1)},
{"base64Binary", "YWFh", []byte("aaa")},
{"dateTime:RFC3339", "1970-01-01T00:00:00Z", epochTime},
{"dateTime:RFC3339Nano", "1970-01-01T00:00:00.0Z", epochTime},
{"dateTime:RFC3339", "1970-01-01T00:00:00.000000001Z", epochTime.Add(time.Duration(1))},
{"dateTime:RFC3339Nano", "1970-01-01T00:00:00.000000002Z", epochTime.Add(time.Duration(2))},
{"dateTime:number", "3", epochTime.Add(time.Duration(3))},
{"dateTime", "4", epochTime.Add(time.Duration(4))},
{"dateTime:2006-01-02", "1970-01-01", epochTime},
{"dateTime", "1970-01-01T00:00:00Z", epochTime},
{"dateTime", "1970-01-01T00:00:00.000000001Z", epochTime.Add(time.Duration(1))},
{"double:, .", "200 100.299,0", float64(200100299.0)},
{"long:, .", "200 100.299,0", int64(200100299)},
{"unsignedLong:, .", "200 100.299,0", uint64(200100299)},
{"u.type", "", nil},
for i, test := range tests {
t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) {
column := &CsvTableColumn{}
setupDataType(column, test.dataType)
val, err := toTypedValue(test.value, column)
if err != nil && test.expect != nil {
require.Nil(t, err.Error())
require.Equal(t, test.expect, val)
func Test_toTypedValue_dateTimeCustomTimeZone(t *testing.T) {
epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z")
tz, _ := parseTimeZone("-0100")
var tests = []struct {
dataType string
value string
expect interface{}
{"dateTime:RFC3339", "1970-01-01T00:00:00Z", epochTime},
{"dateTime:RFC3339Nano", "1970-01-01T00:00:00.0Z", epochTime},
{"dateTime:number", "3", epochTime.Add(time.Duration(3))},
{"dateTime:2006-01-02", "1970-01-01", epochTime.Add(time.Hour)},
for i, test := range tests {
t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) {
column := &CsvTableColumn{}
column.TimeZone = tz
setupDataType(column, test.dataType)
val, err := toTypedValue(test.value, column)
if err != nil && test.expect != nil {
require.Nil(t, err.Error())
if test.expect == nil {
require.Equal(t, test.expect, val)
} else {
expectTime := test.expect.(time.Time)
time := val.(time.Time)
require.True(t, expectTime.Equal(time))
func Test_toLineProtocolValue(t *testing.T) {
epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z")
var tests = []struct {
value interface{}
expect string
{uint64(1), "1u"},
{int64(1), "1i"},
{int(1), "1i"},
{float64(1.1), "1.1"},
{math.NaN(), ""},
{math.Inf(1), ""},
{float32(1), "1"},
{float32(math.NaN()), ""},
{float32(math.Inf(1)), ""},
{"a", `"a"`},
{[]byte("aaa"), "YWFh"},
{true, "true"},
{false, "false"},
{epochTime, "0"},
{time.Duration(100), "100i"},
{struct{}{}, ""},
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
val, err := appendProtocolValue(nil, test.value)
if err != nil && test.expect != "" {
require.Nil(t, err.Error())
require.Equal(t, test.expect, string(val))
func Test_appendConverted(t *testing.T) {
var tests = []struct {
dataType string
value string
expect string
{"", "1", "1"},
{"long", "a", ""},
{"dateTime", "a", ""},
{"dateTime:number", "a", ""},
{"string", "a", `"a"`},
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
column := &CsvTableColumn{}
setupDataType(column, test.dataType)
val, err := appendConverted(nil, test.value, column)
if err != nil && test.expect != "" {
require.Nil(t, err.Error())
require.Equal(t, test.expect, string(val))
// Test_IsTypeSupported
func Test_IsTypeSupported(t *testing.T) {
require.True(t, IsTypeSupported(stringDatatype), true)
require.True(t, IsTypeSupported(doubleDatatype), true)
require.True(t, IsTypeSupported(boolDatatype), true)
require.True(t, IsTypeSupported(longDatatype), true)
require.True(t, IsTypeSupported(uLongDatatype), true)
require.True(t, IsTypeSupported(durationDatatype), true)
require.True(t, IsTypeSupported(base64BinaryDataType), true)
require.True(t, IsTypeSupported(dateTimeDatatype), true)
require.True(t, IsTypeSupported(""), true)
require.False(t, IsTypeSupported(" "), false)
// time format is not part of data type
require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatRFC3339))
require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatRFC3339Nano))
require.False(t, IsTypeSupported(dateTimeDatatype+":"+dateTimeDataFormatNumber))
// Test_normalizeNumberString
func Test_normalizeNumberString(t *testing.T) {
var tests = []struct {
value string
format string
removeFraction bool
expect string
{"123", "", true, "123"},
{"123", ".", true, "123"},
{"123.456", ".", true, "123"},
{"123.456", ".", false, "123.456"},
{"1 2.3,456", ",. ", false, "123.456"},
{" 1 2\t3.456 \r\n", "", false, "123.456"},
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.expect, normalizeNumberString(test.value, test.format, test.removeFraction))
// TestCreateDecoder tests the decoding reader factory
func TestCreateDecoder(t *testing.T) {
decoder, err := CreateDecoder("UTF-8")
toUtf8 := func(in []byte) string {
s, _ := ioutil.ReadAll(decoder(bytes.NewReader(in)))
return string(s)
require.NotNil(t, decoder)
require.Nil(t, err)
require.Equal(t, "\u2318", toUtf8([]byte{226, 140, 152}))
decoder, err = CreateDecoder("windows-1250")
require.NotNil(t, decoder)
require.Nil(t, err)
require.Equal(t, "\u0160", toUtf8([]byte{0x8A}))
decoder, err = CreateDecoder("whateveritis")
require.NotNil(t, err)
require.Nil(t, decoder)
// we can have valid IANA names that are not supported by golang/x/text
decoder, err = CreateDecoder("US-ASCII")
log.Printf("US-ASCII encoding support: %v,%v", decoder != nil, err)
func Test_createBoolParseFn(t *testing.T) {
type pairT struct {
value string
expect string
var tests = []struct {
format string
pair []pairT
{"t,y,1:f,n,0", []pairT{
{"y", "true"},
{"0", "false"},
{"T", "unsupported"},
{"true", []pairT{
{"true", "unsupported"},
{"false", "unsupported"},
{"true:", []pairT{
{"true", "true"},
{"other", "false"},
{":false", []pairT{
{"false", "false"},
{"other", "true"},
for i, test := range tests {
fn := createBoolParseFn(test.format)
for j, pair := range test.pair {
t.Run(fmt.Sprint(i)+"_"+fmt.Sprint(j), func(t *testing.T) {
result, err := fn(pair.value)
switch pair.expect {
case "true":
require.Equal(t, true, result)
case "false":
require.Equal(t, false, result)
require.NotNil(t, err)
require.True(t, strings.Contains(fmt.Sprintf("%v", err), pair.expect))
@ -0,0 +1,191 @@
package csv2lp
import (
type csvExample struct {
name string
csv string
lp string
var examples []csvExample = []csvExample{
cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000
cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000
cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000
#datatype measurement,tag,tag,double,double,ignored,dateTime:number
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000
#datatype measurement,tag,string,double,boolean,long,unsignedLong,duration,dateTime
#default test,annotatedDatatypes,,,,,,
test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1
test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000
test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1
test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000
#constant measurement,test
#constant tag,name,datetypeFormats
#timezone -0500
#datatype dateTime:2006-01-02|1970-01-02,"double:,. ","boolean:y,Y:n,N|y"
,"123 456,78",Y
test,name=datetypeFormats d=123456.78,b=true 18000000000000
test,name=datetypeFormats d=123456.78,b=true 104400000000000
#constant measurement,test
#constant tag,name,datetypeFormats
#timezone -0500
t|dateTime:2006-01-02|1970-01-02,"d|double:,. ","b|boolean:y,Y:n,N|y"
,"123 456,78",Y
test,name=datetypeFormats d=123456.78,b=true 18000000000000
test,name=datetypeFormats d=123456.78,b=true 104400000000000
#constant measurement,test2
,"123 456,78",Y
test2 s="unknown" 0
test2 s="Y"
cpu usage_user=2.7
test available=false 1
test available=false 2
test available=false 3
test available=false 4
test available=true 5
func (example *csvExample) normalize() {
for len(example.lp) > 0 && example.lp[0] == '\n' {
example.lp = example.lp[1:]
// TestExamples validates documentation examples
func TestExamples(t *testing.T) {
for _, example := range examples {
t.Run(example.name, func(t *testing.T) {
transformer := CsvToProtocolLines(strings.NewReader(example.csv))
result, err := ioutil.ReadAll(transformer)
if err != nil {
require.Nil(t, fmt.Sprintf("%s", err))
require.Equal(t, example.lp, string(result))
Reference in New Issue