Merge pull request #19466 from influxdata/19452/csv2lp
fix(pkg/csv2lp): do not override existing line part in group annotationpull/19503/head
commit
2c32938a78
|
@ -40,6 +40,7 @@ need to update any InfluxDB CLI config profiles with the new port number.
|
||||||
1. [19331](https://github.com/influxdata/influxdb/pull/19331): Add description to auth influx command outputs.
|
1. [19331](https://github.com/influxdata/influxdb/pull/19331): Add description to auth influx command outputs.
|
||||||
1. [19392](https://github.com/influxdata/influxdb/pull/19392): Include the edge of the boundary we are observing.
|
1. [19392](https://github.com/influxdata/influxdb/pull/19392): Include the edge of the boundary we are observing.
|
||||||
1. [19453](https://github.com/influxdata/influxdb/pull/19453): Warn about duplicate tag names during influx write csv.
|
1. [19453](https://github.com/influxdata/influxdb/pull/19453): Warn about duplicate tag names during influx write csv.
|
||||||
|
1. [19466](https://github.com/influxdata/influxdb/pull/19466): Do not override existing line part in group annotation.
|
||||||
|
|
||||||
## v2.0.0-beta.16 [2020-08-07]
|
## v2.0.0-beta.16 [2020-08-07]
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,8 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test_CsvToLineProtocol tests conversion of annotated CSV data to line protocol data
|
// Test_CsvToLineProtocol_variousBufferSize tests conversion of annotated CSV data to line protocol data on various buffer sizes
|
||||||
func Test_CsvToLineProtocol(t *testing.T) {
|
func Test_CsvToLineProtocol_variousBufferSize(t *testing.T) {
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
name string
|
name string
|
||||||
csv string
|
csv string
|
||||||
|
@ -117,6 +117,68 @@ func Test_CsvToLineProtocol(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test_CsvToLineProtocol_samples tests conversion of annotated CSV data to line protocol data
|
||||||
|
func Test_CsvToLineProtocol_samples(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
csv string
|
||||||
|
lines string
|
||||||
|
err string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"queryResult_19452", // https://github.com/influxdata/influxdb/issues/19452
|
||||||
|
"#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n" +
|
||||||
|
"#group,false,false,true,true,false,false,true,true,true\n" +
|
||||||
|
"#default,_result,,,,,,,,\n" +
|
||||||
|
",result,table,_start,_stop,_time,_value,_field,_measurement,host\n" +
|
||||||
|
",,0,2020-08-26T22:59:23.598653Z,2020-08-26T23:00:23.598653Z,2020-08-26T22:59:30Z,15075651584,active,mem,ip-192-168-86-25.ec2.internal\n",
|
||||||
|
"mem,host=ip-192-168-86-25.ec2.internal active=15075651584i 1598482770000000000\n",
|
||||||
|
"", // no error
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"queryResult_19452_group_first", // issue 19452, but with group annotation first
|
||||||
|
"#group,false,false,true,true,false,false,true,true,true\n" +
|
||||||
|
"#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n" +
|
||||||
|
"#default,_result,,,,,,,,\n" +
|
||||||
|
",result,table,_start,_stop,_time,_value,_field,_measurement,host\n" +
|
||||||
|
",,0,2020-08-26T22:59:23.598653Z,2020-08-26T23:00:23.598653Z,2020-08-26T22:59:30Z,15075651584,active,mem,ip-192-168-86-25.ec2.internal\n",
|
||||||
|
"mem,host=ip-192-168-86-25.ec2.internal active=15075651584i 1598482770000000000\n",
|
||||||
|
"", // no error
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
reader := CsvToLineProtocol(strings.NewReader(test.csv))
|
||||||
|
buffer := make([]byte, 100)
|
||||||
|
lines := make([]byte, 0, 100)
|
||||||
|
for {
|
||||||
|
n, err := reader.Read(buffer)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if test.err != "" {
|
||||||
|
// fmt.Println(err)
|
||||||
|
if err := err.Error(); !strings.Contains(err, test.err) {
|
||||||
|
require.Equal(t, err, test.err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.Nil(t, err.Error())
|
||||||
|
break
|
||||||
|
}
|
||||||
|
lines = append(lines, buffer[:n]...)
|
||||||
|
}
|
||||||
|
if test.err == "" {
|
||||||
|
require.Equal(t, test.lines, string(lines))
|
||||||
|
} else {
|
||||||
|
require.Fail(t, "error message with '"+test.err+"' expected")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Test_CsvToLineProtocol_LogTableColumns checks correct logging of table columns
|
// Test_CsvToLineProtocol_LogTableColumns checks correct logging of table columns
|
||||||
func Test_CsvToLineProtocol_LogTableColumns(t *testing.T) {
|
func Test_CsvToLineProtocol_LogTableColumns(t *testing.T) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
|
|
|
@ -135,8 +135,11 @@ var supportedAnnotations = []annotationComment{
|
||||||
setupColumn: func(column *CsvTableColumn, value string) {
|
setupColumn: func(column *CsvTableColumn, value string) {
|
||||||
// standard flux query result annotation
|
// standard flux query result annotation
|
||||||
if strings.HasSuffix(value, "true") {
|
if strings.HasSuffix(value, "true") {
|
||||||
|
// setup column's line part unless it is already set (#19452)
|
||||||
|
if column.LinePart == 0 {
|
||||||
column.LinePart = linePartTag
|
column.LinePart = linePartTag
|
||||||
}
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -249,7 +249,7 @@ func (t *CsvTable) AddRow(row []string) bool {
|
||||||
// detect data row or table header row
|
// detect data row or table header row
|
||||||
if len(row[0]) == 0 || row[0][0] != '#' {
|
if len(row[0]) == 0 || row[0][0] != '#' {
|
||||||
if !t.readTableData {
|
if !t.readTableData {
|
||||||
// row must a header row now
|
// expect a header row
|
||||||
t.lpColumnsValid = false // line protocol columns change
|
t.lpColumnsValid = false // line protocol columns change
|
||||||
if t.partBits == 0 {
|
if t.partBits == 0 {
|
||||||
// create columns since no column anotations were processed
|
// create columns since no column anotations were processed
|
||||||
|
|
Loading…
Reference in New Issue