diff --git a/http/query_service.go b/http/query_service.go index 8c24112abb..c76c3fe486 100644 --- a/http/query_service.go +++ b/http/query_service.go @@ -32,11 +32,8 @@ type QueryHandler struct { // NewQueryHandler returns a new instance of QueryHandler. func NewQueryHandler() *QueryHandler { h := &QueryHandler{ - Router: httprouter.New(), - csvEncoder: &query.DelimitedMultiResultEncoder{ - Delimiter: []byte{'\n'}, - Encoder: csv.NewResultEncoder(csv.DefaultEncoderConfig()), - }, + Router: httprouter.New(), + csvEncoder: csv.NewMultiResultEncoder(csv.DefaultEncoderConfig()), } h.HandlerFunc("POST", queryPath, h.handlePostQuery) diff --git a/query/csv/result.go b/query/csv/result.go index 18cc3b5752..ad391f42b7 100644 --- a/query/csv/result.go +++ b/query/csv/result.go @@ -68,7 +68,7 @@ type ResultDecoderConfig struct { } func (d *ResultDecoder) Decode(r io.Reader) (query.Result, error) { - return newResultDecoder(r, d.c, nil), nil + return newResultDecoder(r, d.c, nil) } // MultiResultDecoder reads multiple results from a single csv file. @@ -109,14 +109,14 @@ func (r *resultIterator) More() bool { if r.next != nil { extraMeta = r.next.extraMeta } - r.next = newResultDecoder(r.r, r.c, extraMeta) + r.next, r.err = newResultDecoder(r.r, r.c, extraMeta) return true } return false } -func (r *resultIterator) Next() (string, query.Result) { - return r.next.id, r.next +func (r *resultIterator) Next() query.Result { + return r.next } func (r *resultIterator) Cancel() { @@ -131,17 +131,33 @@ type resultDecoder struct { r io.Reader c ResultDecoderConfig + cr *csv.Reader + extraMeta *tableMetadata eof bool } -func newResultDecoder(r io.Reader, c ResultDecoderConfig, extraMeta *tableMetadata) *resultDecoder { - return &resultDecoder{ +func newResultDecoder(r io.Reader, c ResultDecoderConfig, extraMeta *tableMetadata) (*resultDecoder, error) { + d := &resultDecoder{ r: r, c: c, + cr: newCSVReader(r), extraMeta: extraMeta, } + // We need to know the result ID before we return + if extraMeta == nil { + tm, err := readMetadata(d.cr, c, nil) + if err != nil { + if err == io.EOF { + d.eof = true + } + return nil, err + } + d.extraMeta = &tm + } + d.id = d.extraMeta.ResultID + return d, nil } func newCSVReader(r io.Reader) *csv.Reader { @@ -152,6 +168,10 @@ func newCSVReader(r io.Reader) *csv.Reader { return csvr } +func (r *resultDecoder) Name() string { + return r.id +} + func (r *resultDecoder) Blocks() query.BlockIterator { return r } @@ -161,11 +181,8 @@ func (r *resultDecoder) Abort(error) { } func (r *resultDecoder) Do(f func(query.Block) error) error { - cr := newCSVReader(r.r) - var extraLine []string var meta tableMetadata - newMeta := true for !r.eof { if newMeta { @@ -173,7 +190,7 @@ func (r *resultDecoder) Do(f func(query.Block) error) error { meta = *r.extraMeta r.extraMeta = nil } else { - tm, err := readMetadata(cr, r.c, extraLine) + tm, err := readMetadata(r.cr, r.c, extraLine) if err != nil { if err == io.EOF { r.eof = true @@ -184,9 +201,6 @@ func (r *resultDecoder) Do(f func(query.Block) error) error { meta = tm extraLine = nil } - if r.id == "" { - r.id = meta.ResultID - } if meta.ResultID != r.id { r.extraMeta = &meta @@ -195,7 +209,7 @@ func (r *resultDecoder) Do(f func(query.Block) error) error { } // create new block - b, err := newBlock(cr, r.c, meta, extraLine) + b, err := newBlock(r.cr, r.c, meta, extraLine) if err != nil { return err } @@ -576,7 +590,8 @@ func newUnlimitedAllocator() *execute.Allocator { } type ResultEncoder struct { - c ResultEncoderConfig + c ResultEncoderConfig + written bool } // ResultEncoderConfig are options that can be specified on the ResultEncoder. @@ -605,6 +620,15 @@ func NewResultEncoder(c ResultEncoderConfig) *ResultEncoder { } } +func (e *ResultEncoder) csvWriter(w io.Writer) *csv.Writer { + writer := csv.NewWriter(w) + if e.c.Delimiter != 0 { + writer.Comma = e.c.Delimiter + } + writer.UseCRLF = true + return writer +} + func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error { tableID := 0 tableIDStr := "0" @@ -613,16 +637,13 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error { {ColMeta: query.ColMeta{Label: resultLabel, Type: query.TString}}, {ColMeta: query.ColMeta{Label: tableLabel, Type: query.TInt}}, } - writer := csv.NewWriter(w) - if e.c.Delimiter != 0 { - writer.Comma = e.c.Delimiter - } - writer.UseCRLF = true + writer := e.csvWriter(w) var lastCols []colMeta var lastEmpty bool return result.Blocks().Do(func(b query.Block) error { + e.written = true // Update cols with block cols cols := metaCols for _, c := range b.Cols() { @@ -643,7 +664,7 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error { writer.Write(nil) } - if err := writeSchema(writer, &e.c, row, cols, b.Empty(), b.Key(), tableIDStr); err != nil { + if err := writeSchema(writer, &e.c, row, cols, b.Empty(), b.Key(), result.Name(), tableIDStr); err != nil { return err } } @@ -692,14 +713,27 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error { }) } -func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols []colMeta, useKeyDefaults bool, key query.PartitionKey, tableID string) error { +func (e *ResultEncoder) EncodeError(w io.Writer, err error) error { + writer := e.csvWriter(w) + if e.written { + // Write out empty line + writer.Write(nil) + } + + writer.Write([]string{"error", "reference"}) + // TODO: Add referenced code + writer.Write([]string{err.Error(), ""}) + writer.Flush() + return writer.Error() +} + +func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols []colMeta, useKeyDefaults bool, key query.PartitionKey, resultName, tableID string) error { defaults := make([]string, len(row)) for j, c := range cols { switch j { case annotationIdx: case resultIdx: - // TODO use real result name - defaults[j] = "_result" + defaults[j] = resultName case tableIdx: if useKeyDefaults { defaults[j] = tableID @@ -962,3 +996,10 @@ func equalCols(a, b []colMeta) bool { } return true } + +func NewMultiResultEncoder(c ResultEncoderConfig) query.MultiResultEncoder { + return &query.DelimitedMultiResultEncoder{ + Delimiter: []byte("\r\n"), + Encoder: NewResultEncoder(c), + } +} diff --git a/query/csv/result_test.go b/query/csv/result_test.go index f18a5b33dd..dd95b1ed7a 100644 --- a/query/csv/result_test.go +++ b/query/csv/result_test.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/platform/query/csv" "github.com/influxdata/platform/query/execute/executetest" "github.com/influxdata/platform/query/values" + "github.com/pkg/errors" ) type TestCase struct { @@ -34,76 +35,9 @@ var symetricalTestCases = []TestCase{ ,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42 ,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43 `), - result: &executetest.Result{Blks: []*executetest.Block{{ - KeyCols: []string{"_start", "_stop", "_measurement", "host"}, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "_value", Type: query.TFloat}, - }, - Data: [][]interface{}{ - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - "cpu", - "A", - 42.0, - }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), - "cpu", - "A", - 43.0, - }, - }, - }}}, - }, - { - name: "single empty table", - encoderConfig: csv.DefaultEncoderConfig(), - encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double -#partition,false,false,true,true,false,true,true,false -#default,_result,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A, -,result,table,_start,_stop,_time,_measurement,host,_value -`), - result: &executetest.Result{Blks: []*executetest.Block{{ - KeyCols: []string{"_start", "_stop", "_measurement", "host"}, - KeyValues: []interface{}{ - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - "cpu", - "A", - }, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "_value", Type: query.TFloat}, - }, - }}}, - }, - { - name: "multiple tables", - encoderConfig: csv.DefaultEncoderConfig(), - encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double -#partition,false,false,true,true,false,true,true,false -#default,_result,,,,,,, -,result,table,_start,_stop,_time,_measurement,host,_value -,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42 -,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43 -,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:06:00Z,mem,A,52 -,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:07:01Z,mem,A,53 -`), - result: &executetest.Result{Blks: []*executetest.Block{ - { + result: &executetest.Result{ + Nm: "_result", + Blks: []*executetest.Block{{ KeyCols: []string{"_start", "_stop", "_measurement", "host"}, ColMeta: []query.ColMeta{ {Label: "_start", Type: query.TTime}, @@ -131,9 +65,27 @@ var symetricalTestCases = []TestCase{ 43.0, }, }, - }, - { + }}, + }, + }, + { + name: "single empty table", + encoderConfig: csv.DefaultEncoderConfig(), + encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double +#partition,false,false,true,true,false,true,true,false +#default,_result,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A, +,result,table,_start,_stop,_time,_measurement,host,_value +`), + result: &executetest.Result{ + Nm: "_result", + Blks: []*executetest.Block{{ KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + KeyValues: []interface{}{ + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + "cpu", + "A", + }, ColMeta: []query.ColMeta{ {Label: "_start", Type: query.TTime}, {Label: "_stop", Type: query.TTime}, @@ -142,26 +94,84 @@ var symetricalTestCases = []TestCase{ {Label: "host", Type: query.TString}, {Label: "_value", Type: query.TFloat}, }, - Data: [][]interface{}{ - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)), - "mem", - "A", - 52.0, + }}, + }, + }, + { + name: "multiple tables", + encoderConfig: csv.DefaultEncoderConfig(), + encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double +#partition,false,false,true,true,false,true,true,false +#default,_result,,,,,,, +,result,table,_start,_stop,_time,_measurement,host,_value +,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42 +,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43 +,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:06:00Z,mem,A,52 +,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:07:01Z,mem,A,53 +`), + result: &executetest.Result{ + Nm: "_result", + Blks: []*executetest.Block{ + { + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)), - "mem", - "A", - 53.0, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + "cpu", + "A", + 42.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), + "cpu", + "A", + 43.0, + }, + }, + }, + { + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)), + "mem", + "A", + 52.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)), + "mem", + "A", + 53.0, + }, }, }, }, - }}, + }, }, { name: "multiple tables with differing schemas", @@ -184,130 +194,133 @@ var symetricalTestCases = []TestCase{ ,,3,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:06:00Z,Europe,4623,52,89.3 ,,3,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:07:01Z,Europe,3163,53,55.6 `), - result: &executetest.Result{Blks: []*executetest.Block{ - { - KeyCols: []string{"_start", "_stop", "_measurement", "host"}, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "_value", Type: query.TFloat}, - }, - Data: [][]interface{}{ - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - "cpu", - "A", - 42.0, + result: &executetest.Result{ + Nm: "_result", + Blks: []*executetest.Block{ + { + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), - "cpu", - "A", - 43.0, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + "cpu", + "A", + 42.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), + "cpu", + "A", + 43.0, + }, + }, + }, + { + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)), + "mem", + "A", + 52.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)), + "mem", + "A", + 53.0, + }, + }, + }, + { + KeyCols: []string{"_start", "_stop", "location"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "location", Type: query.TString}, + {Label: "device", Type: query.TString}, + {Label: "min", Type: query.TFloat}, + {Label: "max", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + "USA", + "1563", + 42.0, + 67.9, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), + "USA", + "1414", + 43.0, + 44.7, + }, + }, + }, + { + KeyCols: []string{"_start", "_stop", "location"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "location", Type: query.TString}, + {Label: "device", Type: query.TString}, + {Label: "min", Type: query.TFloat}, + {Label: "max", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)), + "Europe", + "4623", + 52.0, + 89.3, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)), + "Europe", + "3163", + 53.0, + 55.6, + }, }, }, }, - { - KeyCols: []string{"_start", "_stop", "_measurement", "host"}, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "_value", Type: query.TFloat}, - }, - Data: [][]interface{}{ - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)), - "mem", - "A", - 52.0, - }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)), - "mem", - "A", - 53.0, - }, - }, - }, - { - KeyCols: []string{"_start", "_stop", "location"}, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "location", Type: query.TString}, - {Label: "device", Type: query.TString}, - {Label: "min", Type: query.TFloat}, - {Label: "max", Type: query.TFloat}, - }, - Data: [][]interface{}{ - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - "USA", - "1563", - 42.0, - 67.9, - }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), - "USA", - "1414", - 43.0, - 44.7, - }, - }, - }, - { - KeyCols: []string{"_start", "_stop", "location"}, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "location", Type: query.TString}, - {Label: "device", Type: query.TString}, - {Label: "min", Type: query.TFloat}, - {Label: "max", Type: query.TFloat}, - }, - Data: [][]interface{}{ - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)), - "Europe", - "4623", - 52.0, - 89.3, - }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)), - "Europe", - "3163", - 53.0, - 55.6, - }, - }, - }, - }}, + }, }, { name: "multiple tables with one empty", @@ -326,83 +339,86 @@ var symetricalTestCases = []TestCase{ #default,_result,2,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A, ,result,table,_start,_stop,_time,_measurement,host,_value `), - result: &executetest.Result{Blks: []*executetest.Block{ - { - KeyCols: []string{"_start", "_stop", "_measurement", "host"}, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "_value", Type: query.TFloat}, + result: &executetest.Result{ + Nm: "_result", + Blks: []*executetest.Block{ + { + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + "cpu", + "A", + 42.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), + "cpu", + "A", + 43.0, + }, + }, }, - Data: [][]interface{}{ - { + { + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)), + "mem", + "A", + 52.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)), + "mem", + "A", + 53.0, + }, + }, + }, + { + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + KeyValues: []interface{}{ values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), "cpu", "A", - 42.0, }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), - "cpu", - "A", - 43.0, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, }, }, }, - { - KeyCols: []string{"_start", "_stop", "_measurement", "host"}, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "_value", Type: query.TFloat}, - }, - Data: [][]interface{}{ - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)), - "mem", - "A", - 52.0, - }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)), - "mem", - "A", - 53.0, - }, - }, - }, - { - KeyCols: []string{"_start", "_stop", "_measurement", "host"}, - KeyValues: []interface{}{ - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - "cpu", - "A", - }, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "_value", Type: query.TFloat}, - }, - }, - }}, + }, }, } @@ -418,35 +434,38 @@ func TestResultDecoder(t *testing.T) { ,,,,,2018-04-17T00:00:00Z,cpu,A,42.0 ,,,,,2018-04-17T00:00:01Z,cpu,A,43.0 `), - result: &executetest.Result{Blks: []*executetest.Block{{ - KeyCols: []string{"_start", "_stop", "_measurement", "host"}, - ColMeta: []query.ColMeta{ - {Label: "_start", Type: query.TTime}, - {Label: "_stop", Type: query.TTime}, - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "_value", Type: query.TFloat}, - }, - Data: [][]interface{}{ - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - "cpu", - "A", - 42.0, + result: &executetest.Result{ + Nm: "_result", + Blks: []*executetest.Block{{ + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, }, - { - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), - values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), - "cpu", - "A", - 43.0, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + "cpu", + "A", + 42.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), + "cpu", + "A", + 43.0, + }, }, - }, - }}}, + }}, + }, }, } testCases = append(testCases, symetricalTestCases...) @@ -461,7 +480,9 @@ func TestResultDecoder(t *testing.T) { if err != nil { t.Fatal(err) } - got := new(executetest.Result) + got := &executetest.Result{ + Nm: result.Name(), + } if err := result.Blocks().Do(func(b query.Block) error { cb, err := executetest.ConvertBlock(b) if err != nil { @@ -506,7 +527,169 @@ func TestResultEncoder(t *testing.T) { } }) } +} +func TestMutliResultEncoder(t *testing.T) { + testCases := []struct { + name string + results query.ResultIterator + encoded []byte + config csv.ResultEncoderConfig + }{ + { + name: "single result", + config: csv.DefaultEncoderConfig(), + results: query.NewSliceResultIterator([]query.Result{&executetest.Result{ + Nm: "_result", + Blks: []*executetest.Block{{ + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + "cpu", + "A", + 42.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), + "cpu", + "A", + 43.0, + }, + }, + }}, + }}), + encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double +#partition,false,false,true,true,false,true,true,false +#default,_result,,,,,,, +,result,table,_start,_stop,_time,_measurement,host,_value +,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42 +,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43 +`), + }, + { + name: "two results", + config: csv.DefaultEncoderConfig(), + results: query.NewSliceResultIterator([]query.Result{ + &executetest.Result{ + Nm: "_result", + Blks: []*executetest.Block{{ + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + "cpu", + "A", + 42.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), + "cpu", + "A", + 43.0, + }, + }, + }}, + }, + &executetest.Result{ + Nm: "mean", + Blks: []*executetest.Block{{ + KeyCols: []string{"_start", "_stop", "_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "_start", Type: query.TTime}, + {Label: "_stop", Type: query.TTime}, + {Label: "_time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "_value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + "cpu", + "A", + 40.0, + }, + { + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)), + values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)), + "cpu", + "A", + 40.1, + }, + }, + }}, + }, + }), + encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double +#partition,false,false,true,true,false,true,true,false +#default,_result,,,,,,, +,result,table,_start,_stop,_time,_measurement,host,_value +,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42 +,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43 + +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double +#partition,false,false,true,true,false,true,true,false +#default,mean,,,,,,, +,result,table,_start,_stop,_time,_measurement,host,_value +,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,40 +,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,40.1 + +`), + }, + { + name: "error results", + config: csv.DefaultEncoderConfig(), + results: errorResultIterator{ + Error: errors.New("test error"), + }, + encoded: toCRLF(`error,reference +test error, +`), + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + encoder := csv.NewMultiResultEncoder(tc.config) + var got bytes.Buffer + err := encoder.Encode(&got, tc.results) + if err != nil { + t.Fatal(err) + } + + if g, w := got.String(), string(tc.encoded); g != w { + t.Errorf("unexpected encoding -want/+got:\n%s", diff.LineDiff(w, g)) + } + }) + } } var crlfPattern = regexp.MustCompile(`\r?\n`) @@ -514,3 +697,22 @@ var crlfPattern = regexp.MustCompile(`\r?\n`) func toCRLF(data string) []byte { return []byte(crlfPattern.ReplaceAllString(data, "\r\n")) } + +type errorResultIterator struct { + Error error +} + +func (r errorResultIterator) More() bool { + return false +} + +func (r errorResultIterator) Next() query.Result { + panic("no results") +} + +func (r errorResultIterator) Cancel() { +} + +func (r errorResultIterator) Err() error { + return r.Error +} diff --git a/query/execute/executetest/result.go b/query/execute/executetest/result.go index d8fc536fd6..1a35161c86 100644 --- a/query/execute/executetest/result.go +++ b/query/execute/executetest/result.go @@ -5,6 +5,7 @@ import ( ) type Result struct { + Nm string Blks []*Block } @@ -12,6 +13,10 @@ func NewResult(blocks []*Block) *Result { return &Result{Blks: blocks} } +func (r *Result) Name() string { + return r.Nm +} + func (r *Result) Blocks() query.BlockIterator { return &BlockIterator{ r.Blks, diff --git a/query/execute/executor.go b/query/execute/executor.go index aaa1c6fb79..815571a5d4 100644 --- a/query/execute/executor.go +++ b/query/execute/executor.go @@ -88,7 +88,7 @@ func (e *executor) createExecutionState(ctx context.Context, orgID id.ID, p *pla if err != nil { return nil, err } - r := newResult(yield) + r := newResult(name, yield) ds.AddTransformation(r) es.results[name] = r } diff --git a/query/execute/result.go b/query/execute/result.go index 0fc7ad0757..899d7fe322 100644 --- a/query/execute/result.go +++ b/query/execute/result.go @@ -10,6 +10,8 @@ import ( // result implements both the Transformation and Result interfaces, // mapping the pushed based Transformation API to the pull based Result interface. type result struct { + name string + mu sync.Mutex blocks chan resultMessage @@ -22,8 +24,9 @@ type resultMessage struct { err error } -func newResult(plan.YieldSpec) *result { +func newResult(name string, spec plan.YieldSpec) *result { return &result{ + name: name, // TODO(nathanielc): Currently this buffer needs to be big enough hold all result blocks :( blocks: make(chan resultMessage, 1000), abortErr: make(chan error, 1), @@ -31,6 +34,9 @@ func newResult(plan.YieldSpec) *result { } } +func (s *result) Name() string { + return s.name +} func (s *result) RetractBlock(DatasetID, query.PartitionKey) error { //TODO implement return nil diff --git a/query/influxql/result.go b/query/influxql/result.go index 9910907dc3..8e23a2c3bc 100644 --- a/query/influxql/result.go +++ b/query/influxql/result.go @@ -30,7 +30,8 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e resp := Response{} for results.More() { - name, r := results.Next() + r := results.Next() + name := r.Name() id, err := strconv.Atoi(name) if err != nil { resp.error(fmt.Errorf("unable to parse statement id from result name: %s", err)) diff --git a/query/influxql/result_test.go b/query/influxql/result_test.go index 569a54a7c5..00e9fe48d4 100644 --- a/query/influxql/result_test.go +++ b/query/influxql/result_test.go @@ -22,23 +22,22 @@ func TestMultiResultEncoder_Encode(t *testing.T) { }{ { name: "Default", - in: query.NewMapResultIterator( - map[string]query.Result{ - "0": &executetest.Result{ - Blks: []*executetest.Block{{ - KeyCols: []string{"_measurement", "host"}, - ColMeta: []query.ColMeta{ - {Label: "time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: "value", Type: query.TFloat}, - }, - Data: [][]interface{}{ - {mustParseTime("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)}, - }, - }}, - }, - }, + in: query.NewSliceResultIterator( + []query.Result{&executetest.Result{ + Nm: "0", + Blks: []*executetest.Block{{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {mustParseTime("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)}, + }, + }}, + }}, ), out: `{"results":[{"statement_id":0,"series":[{"name":"m0","tags":{"host":"server01"},"columns":["time","value"],"values":[["2018-05-24T09:00:00Z",2]]}]}]}`, }, @@ -66,9 +65,9 @@ type resultErrorIterator struct { Error string } -func (*resultErrorIterator) Cancel() {} -func (*resultErrorIterator) More() bool { return false } -func (*resultErrorIterator) Next() (string, query.Result) { return "", nil } +func (*resultErrorIterator) Cancel() {} +func (*resultErrorIterator) More() bool { return false } +func (*resultErrorIterator) Next() query.Result { panic("no results") } func (ri *resultErrorIterator) Err() error { return errors.New(ri.Error) diff --git a/query/query.go b/query/query.go index 59d508c52c..05336e464b 100644 --- a/query/query.go +++ b/query/query.go @@ -21,9 +21,9 @@ type ResultIterator interface { // More must be called until it returns false in order to free all resources. More() bool - // Next returns the next name and results. + // Next returns the next result. // If More is false, Next panics. - Next() (string, Result) + Next() Result // Cancel discards the remaining results. // If not all results are going to be read, Cancel must be called to free resources. @@ -125,7 +125,7 @@ DONE: return false } -func (r *resultIterator) Next() (string, Result) { +func (r *resultIterator) Next() Result { return r.results.Next() } @@ -163,10 +163,10 @@ func (r *MapResultIterator) More() bool { return len(r.order) > 0 } -func (r *MapResultIterator) Next() (string, Result) { +func (r *MapResultIterator) Next() Result { next := r.order[0] r.order = r.order[1:] - return next, r.results[next] + return r.results[next] } func (r *MapResultIterator) Cancel() { @@ -176,3 +176,30 @@ func (r *MapResultIterator) Cancel() { func (r *MapResultIterator) Err() error { return nil } + +type SliceResultIterator struct { + results []Result +} + +func NewSliceResultIterator(results []Result) *SliceResultIterator { + return &SliceResultIterator{ + results: results, + } +} + +func (r *SliceResultIterator) More() bool { + return len(r.results) > 0 +} + +func (r *SliceResultIterator) Next() Result { + next := r.results[0] + r.results = r.results[1:] + return next +} + +func (r *SliceResultIterator) Cancel() { +} + +func (r *SliceResultIterator) Err() error { + return nil +} diff --git a/query/query_test/query_test.go b/query/query_test/query_test.go index 2a9b9e8f25..bcac3cb5b5 100644 --- a/query/query_test/query_test.go +++ b/query/query_test/query_test.go @@ -124,7 +124,6 @@ func Test_QueryEndToEnd(t *testing.T) { t.Errorf("failed to run influxql query spec for test case %s. error=%s", prefix, err) } } - } } @@ -163,7 +162,7 @@ func QueryTestCheckSpec(t *testing.T, qs query.QueryServiceBridge, spec *query.S buf := new(bytes.Buffer) // we are only expecting one result, for now for results.More() { - _, res := results.Next() + res := results.Next() err := enc.Encode(buf, res) if err != nil { diff --git a/query/query_test/test_cases/simple_max.ifql b/query/query_test/test_cases/simple_max.ifql index 1f50eb7a78..c518cf4b1c 100644 --- a/query/query_test/test_cases/simple_max.ifql +++ b/query/query_test/test_cases/simple_max.ifql @@ -3,3 +3,4 @@ from(db:"test") |> group(by:["_measurement"]) |> max() |> map(fn: (r) => {_time:r._time, _measurement:r._measurement, _field: r._field, max:r._value}) + |> yield(name:"0") diff --git a/query/query_test/test_cases/simple_max.out.csv b/query/query_test/test_cases/simple_max.out.csv index d8262a74f7..2324ee63f8 100644 --- a/query/query_test/test_cases/simple_max.out.csv +++ b/query/query_test/test_cases/simple_max.out.csv @@ -1,5 +1,5 @@ #datatype,string,long,string,string,dateTime:RFC3339,double #partition,false,false,false,true,false,false -#default,_result,,,,, +#default,0,,,,, ,result,table,_field,_measurement,_time,max ,,0,f1,m1,2018-04-17T00:00:01Z,43 diff --git a/query/result.go b/query/result.go index 1ab23b0144..9ccdd0422f 100644 --- a/query/result.go +++ b/query/result.go @@ -7,6 +7,7 @@ import ( ) type Result interface { + Name() string // Blocks returns a BlockIterator for iterating through results Blocks() BlockIterator } @@ -138,10 +139,17 @@ type MultiResultEncoder interface { // DelimitedMultiResultEncoder encodes multiple results using a trailing delimiter. // The delimiter is written after every result. +// +// If an error is encountered when iterating EncodeError will be called on the Encoder. +// // If the io.Writer implements flusher, it will be flushed after each delimiter. type DelimitedMultiResultEncoder struct { Delimiter []byte - Encoder ResultEncoder + Encoder interface { + ResultEncoder + // EncodeError encodes an error on the writer. + EncodeError(w io.Writer, err error) error + } } type flusher interface { @@ -150,18 +158,21 @@ type flusher interface { func (e *DelimitedMultiResultEncoder) Encode(w io.Writer, results ResultIterator) error { for results.More() { - //TODO(nathanielc): Make the result name a property of a result. - _, result := results.Next() + result := results.Next() if err := e.Encoder.Encode(w, result); err != nil { return err } if _, err := w.Write(e.Delimiter); err != nil { - return nil + return err } // Flush the writer after each result if f, ok := w.(flusher); ok { f.Flush() } } - return results.Err() + err := results.Err() + if err != nil { + return e.Encoder.EncodeError(w, err) + } + return nil }