fix(query/csv): Encode result iterator errors

This change also adds Name() to the Result interface to allow for
correct MultiResultEncoding.
pull/10616/head
Nathaniel Cook 2018-05-24 11:29:36 -06:00
parent ea6e96bd09
commit 99f4e4a8b4
13 changed files with 658 additions and 369 deletions

View File

@ -32,11 +32,8 @@ type QueryHandler struct {
// NewQueryHandler returns a new instance of QueryHandler.
func NewQueryHandler() *QueryHandler {
h := &QueryHandler{
Router: httprouter.New(),
csvEncoder: &query.DelimitedMultiResultEncoder{
Delimiter: []byte{'\n'},
Encoder: csv.NewResultEncoder(csv.DefaultEncoderConfig()),
},
Router: httprouter.New(),
csvEncoder: csv.NewMultiResultEncoder(csv.DefaultEncoderConfig()),
}
h.HandlerFunc("POST", queryPath, h.handlePostQuery)

View File

@ -68,7 +68,7 @@ type ResultDecoderConfig struct {
}
func (d *ResultDecoder) Decode(r io.Reader) (query.Result, error) {
return newResultDecoder(r, d.c, nil), nil
return newResultDecoder(r, d.c, nil)
}
// MultiResultDecoder reads multiple results from a single csv file.
@ -109,14 +109,14 @@ func (r *resultIterator) More() bool {
if r.next != nil {
extraMeta = r.next.extraMeta
}
r.next = newResultDecoder(r.r, r.c, extraMeta)
r.next, r.err = newResultDecoder(r.r, r.c, extraMeta)
return true
}
return false
}
func (r *resultIterator) Next() (string, query.Result) {
return r.next.id, r.next
func (r *resultIterator) Next() query.Result {
return r.next
}
func (r *resultIterator) Cancel() {
@ -131,17 +131,33 @@ type resultDecoder struct {
r io.Reader
c ResultDecoderConfig
cr *csv.Reader
extraMeta *tableMetadata
eof bool
}
func newResultDecoder(r io.Reader, c ResultDecoderConfig, extraMeta *tableMetadata) *resultDecoder {
return &resultDecoder{
func newResultDecoder(r io.Reader, c ResultDecoderConfig, extraMeta *tableMetadata) (*resultDecoder, error) {
d := &resultDecoder{
r: r,
c: c,
cr: newCSVReader(r),
extraMeta: extraMeta,
}
// We need to know the result ID before we return
if extraMeta == nil {
tm, err := readMetadata(d.cr, c, nil)
if err != nil {
if err == io.EOF {
d.eof = true
}
return nil, err
}
d.extraMeta = &tm
}
d.id = d.extraMeta.ResultID
return d, nil
}
func newCSVReader(r io.Reader) *csv.Reader {
@ -152,6 +168,10 @@ func newCSVReader(r io.Reader) *csv.Reader {
return csvr
}
func (r *resultDecoder) Name() string {
return r.id
}
func (r *resultDecoder) Blocks() query.BlockIterator {
return r
}
@ -161,11 +181,8 @@ func (r *resultDecoder) Abort(error) {
}
func (r *resultDecoder) Do(f func(query.Block) error) error {
cr := newCSVReader(r.r)
var extraLine []string
var meta tableMetadata
newMeta := true
for !r.eof {
if newMeta {
@ -173,7 +190,7 @@ func (r *resultDecoder) Do(f func(query.Block) error) error {
meta = *r.extraMeta
r.extraMeta = nil
} else {
tm, err := readMetadata(cr, r.c, extraLine)
tm, err := readMetadata(r.cr, r.c, extraLine)
if err != nil {
if err == io.EOF {
r.eof = true
@ -184,9 +201,6 @@ func (r *resultDecoder) Do(f func(query.Block) error) error {
meta = tm
extraLine = nil
}
if r.id == "" {
r.id = meta.ResultID
}
if meta.ResultID != r.id {
r.extraMeta = &meta
@ -195,7 +209,7 @@ func (r *resultDecoder) Do(f func(query.Block) error) error {
}
// create new block
b, err := newBlock(cr, r.c, meta, extraLine)
b, err := newBlock(r.cr, r.c, meta, extraLine)
if err != nil {
return err
}
@ -576,7 +590,8 @@ func newUnlimitedAllocator() *execute.Allocator {
}
type ResultEncoder struct {
c ResultEncoderConfig
c ResultEncoderConfig
written bool
}
// ResultEncoderConfig are options that can be specified on the ResultEncoder.
@ -605,6 +620,15 @@ func NewResultEncoder(c ResultEncoderConfig) *ResultEncoder {
}
}
func (e *ResultEncoder) csvWriter(w io.Writer) *csv.Writer {
writer := csv.NewWriter(w)
if e.c.Delimiter != 0 {
writer.Comma = e.c.Delimiter
}
writer.UseCRLF = true
return writer
}
func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error {
tableID := 0
tableIDStr := "0"
@ -613,16 +637,13 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error {
{ColMeta: query.ColMeta{Label: resultLabel, Type: query.TString}},
{ColMeta: query.ColMeta{Label: tableLabel, Type: query.TInt}},
}
writer := csv.NewWriter(w)
if e.c.Delimiter != 0 {
writer.Comma = e.c.Delimiter
}
writer.UseCRLF = true
writer := e.csvWriter(w)
var lastCols []colMeta
var lastEmpty bool
return result.Blocks().Do(func(b query.Block) error {
e.written = true
// Update cols with block cols
cols := metaCols
for _, c := range b.Cols() {
@ -643,7 +664,7 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error {
writer.Write(nil)
}
if err := writeSchema(writer, &e.c, row, cols, b.Empty(), b.Key(), tableIDStr); err != nil {
if err := writeSchema(writer, &e.c, row, cols, b.Empty(), b.Key(), result.Name(), tableIDStr); err != nil {
return err
}
}
@ -692,14 +713,27 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error {
})
}
func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols []colMeta, useKeyDefaults bool, key query.PartitionKey, tableID string) error {
func (e *ResultEncoder) EncodeError(w io.Writer, err error) error {
writer := e.csvWriter(w)
if e.written {
// Write out empty line
writer.Write(nil)
}
writer.Write([]string{"error", "reference"})
// TODO: Add referenced code
writer.Write([]string{err.Error(), ""})
writer.Flush()
return writer.Error()
}
func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols []colMeta, useKeyDefaults bool, key query.PartitionKey, resultName, tableID string) error {
defaults := make([]string, len(row))
for j, c := range cols {
switch j {
case annotationIdx:
case resultIdx:
// TODO use real result name
defaults[j] = "_result"
defaults[j] = resultName
case tableIdx:
if useKeyDefaults {
defaults[j] = tableID
@ -962,3 +996,10 @@ func equalCols(a, b []colMeta) bool {
}
return true
}
func NewMultiResultEncoder(c ResultEncoderConfig) query.MultiResultEncoder {
return &query.DelimitedMultiResultEncoder{
Delimiter: []byte("\r\n"),
Encoder: NewResultEncoder(c),
}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/platform/query/csv"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/values"
"github.com/pkg/errors"
)
type TestCase struct {
@ -34,76 +35,9 @@ var symetricalTestCases = []TestCase{
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43
`),
result: &executetest.Result{Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
},
},
}}},
},
{
name: "single empty table",
encoderConfig: csv.DefaultEncoderConfig(),
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
#partition,false,false,true,true,false,true,true,false
#default,_result,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A,
,result,table,_start,_stop,_time,_measurement,host,_value
`),
result: &executetest.Result{Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
KeyValues: []interface{}{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
"cpu",
"A",
},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
}}},
},
{
name: "multiple tables",
encoderConfig: csv.DefaultEncoderConfig(),
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
#partition,false,false,true,true,false,true,true,false
#default,_result,,,,,,,
,result,table,_start,_stop,_time,_measurement,host,_value
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43
,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:06:00Z,mem,A,52
,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:07:01Z,mem,A,53
`),
result: &executetest.Result{Blks: []*executetest.Block{
{
result: &executetest.Result{
Nm: "_result",
Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
@ -131,9 +65,27 @@ var symetricalTestCases = []TestCase{
43.0,
},
},
},
{
}},
},
},
{
name: "single empty table",
encoderConfig: csv.DefaultEncoderConfig(),
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
#partition,false,false,true,true,false,true,true,false
#default,_result,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A,
,result,table,_start,_stop,_time,_measurement,host,_value
`),
result: &executetest.Result{
Nm: "_result",
Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
KeyValues: []interface{}{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
"cpu",
"A",
},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
@ -142,26 +94,84 @@ var symetricalTestCases = []TestCase{
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)),
"mem",
"A",
52.0,
}},
},
},
{
name: "multiple tables",
encoderConfig: csv.DefaultEncoderConfig(),
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
#partition,false,false,true,true,false,true,true,false
#default,_result,,,,,,,
,result,table,_start,_stop,_time,_measurement,host,_value
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43
,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:06:00Z,mem,A,52
,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:07:01Z,mem,A,53
`),
result: &executetest.Result{
Nm: "_result",
Blks: []*executetest.Block{
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)),
"mem",
"A",
53.0,
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
},
},
},
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)),
"mem",
"A",
52.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)),
"mem",
"A",
53.0,
},
},
},
},
}},
},
},
{
name: "multiple tables with differing schemas",
@ -184,130 +194,133 @@ var symetricalTestCases = []TestCase{
,,3,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:06:00Z,Europe,4623,52,89.3
,,3,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:07:01Z,Europe,3163,53,55.6
`),
result: &executetest.Result{Blks: []*executetest.Block{
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
result: &executetest.Result{
Nm: "_result",
Blks: []*executetest.Block{
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
},
},
},
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)),
"mem",
"A",
52.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)),
"mem",
"A",
53.0,
},
},
},
{
KeyCols: []string{"_start", "_stop", "location"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "location", Type: query.TString},
{Label: "device", Type: query.TString},
{Label: "min", Type: query.TFloat},
{Label: "max", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"USA",
"1563",
42.0,
67.9,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"USA",
"1414",
43.0,
44.7,
},
},
},
{
KeyCols: []string{"_start", "_stop", "location"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "location", Type: query.TString},
{Label: "device", Type: query.TString},
{Label: "min", Type: query.TFloat},
{Label: "max", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)),
"Europe",
"4623",
52.0,
89.3,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)),
"Europe",
"3163",
53.0,
55.6,
},
},
},
},
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)),
"mem",
"A",
52.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)),
"mem",
"A",
53.0,
},
},
},
{
KeyCols: []string{"_start", "_stop", "location"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "location", Type: query.TString},
{Label: "device", Type: query.TString},
{Label: "min", Type: query.TFloat},
{Label: "max", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"USA",
"1563",
42.0,
67.9,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"USA",
"1414",
43.0,
44.7,
},
},
},
{
KeyCols: []string{"_start", "_stop", "location"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "location", Type: query.TString},
{Label: "device", Type: query.TString},
{Label: "min", Type: query.TFloat},
{Label: "max", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)),
"Europe",
"4623",
52.0,
89.3,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)),
"Europe",
"3163",
53.0,
55.6,
},
},
},
}},
},
},
{
name: "multiple tables with one empty",
@ -326,83 +339,86 @@ var symetricalTestCases = []TestCase{
#default,_result,2,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A,
,result,table,_start,_stop,_time,_measurement,host,_value
`),
result: &executetest.Result{Blks: []*executetest.Block{
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
result: &executetest.Result{
Nm: "_result",
Blks: []*executetest.Block{
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
},
},
},
Data: [][]interface{}{
{
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)),
"mem",
"A",
52.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)),
"mem",
"A",
53.0,
},
},
},
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
KeyValues: []interface{}{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
},
},
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 6, 0, 0, time.UTC)),
"mem",
"A",
52.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 10, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 7, 1, 0, time.UTC)),
"mem",
"A",
53.0,
},
},
},
{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
KeyValues: []interface{}{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
"cpu",
"A",
},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
},
}},
},
},
}
@ -418,35 +434,38 @@ func TestResultDecoder(t *testing.T) {
,,,,,2018-04-17T00:00:00Z,cpu,A,42.0
,,,,,2018-04-17T00:00:01Z,cpu,A,43.0
`),
result: &executetest.Result{Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
result: &executetest.Result{
Nm: "_result",
Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
},
},
},
}}},
}},
},
},
}
testCases = append(testCases, symetricalTestCases...)
@ -461,7 +480,9 @@ func TestResultDecoder(t *testing.T) {
if err != nil {
t.Fatal(err)
}
got := new(executetest.Result)
got := &executetest.Result{
Nm: result.Name(),
}
if err := result.Blocks().Do(func(b query.Block) error {
cb, err := executetest.ConvertBlock(b)
if err != nil {
@ -506,7 +527,169 @@ func TestResultEncoder(t *testing.T) {
}
})
}
}
func TestMutliResultEncoder(t *testing.T) {
testCases := []struct {
name string
results query.ResultIterator
encoded []byte
config csv.ResultEncoderConfig
}{
{
name: "single result",
config: csv.DefaultEncoderConfig(),
results: query.NewSliceResultIterator([]query.Result{&executetest.Result{
Nm: "_result",
Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
},
},
}},
}}),
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
#partition,false,false,true,true,false,true,true,false
#default,_result,,,,,,,
,result,table,_start,_stop,_time,_measurement,host,_value
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43
`),
},
{
name: "two results",
config: csv.DefaultEncoderConfig(),
results: query.NewSliceResultIterator([]query.Result{
&executetest.Result{
Nm: "_result",
Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
42.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
43.0,
},
},
}},
},
&executetest.Result{
Nm: "mean",
Blks: []*executetest.Block{{
KeyCols: []string{"_start", "_stop", "_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "_start", Type: query.TTime},
{Label: "_stop", Type: query.TTime},
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
"cpu",
"A",
40.0,
},
{
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 5, 0, 0, time.UTC)),
values.ConvertTime(time.Date(2018, 4, 17, 0, 0, 1, 0, time.UTC)),
"cpu",
"A",
40.1,
},
},
}},
},
}),
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
#partition,false,false,true,true,false,true,true,false
#default,_result,,,,,,,
,result,table,_start,_stop,_time,_measurement,host,_value
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
#partition,false,false,true,true,false,true,true,false
#default,mean,,,,,,,
,result,table,_start,_stop,_time,_measurement,host,_value
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,40
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,40.1
`),
},
{
name: "error results",
config: csv.DefaultEncoderConfig(),
results: errorResultIterator{
Error: errors.New("test error"),
},
encoded: toCRLF(`error,reference
test error,
`),
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
encoder := csv.NewMultiResultEncoder(tc.config)
var got bytes.Buffer
err := encoder.Encode(&got, tc.results)
if err != nil {
t.Fatal(err)
}
if g, w := got.String(), string(tc.encoded); g != w {
t.Errorf("unexpected encoding -want/+got:\n%s", diff.LineDiff(w, g))
}
})
}
}
var crlfPattern = regexp.MustCompile(`\r?\n`)
@ -514,3 +697,22 @@ var crlfPattern = regexp.MustCompile(`\r?\n`)
func toCRLF(data string) []byte {
return []byte(crlfPattern.ReplaceAllString(data, "\r\n"))
}
type errorResultIterator struct {
Error error
}
func (r errorResultIterator) More() bool {
return false
}
func (r errorResultIterator) Next() query.Result {
panic("no results")
}
func (r errorResultIterator) Cancel() {
}
func (r errorResultIterator) Err() error {
return r.Error
}

View File

@ -5,6 +5,7 @@ import (
)
type Result struct {
Nm string
Blks []*Block
}
@ -12,6 +13,10 @@ func NewResult(blocks []*Block) *Result {
return &Result{Blks: blocks}
}
func (r *Result) Name() string {
return r.Nm
}
func (r *Result) Blocks() query.BlockIterator {
return &BlockIterator{
r.Blks,

View File

@ -88,7 +88,7 @@ func (e *executor) createExecutionState(ctx context.Context, orgID id.ID, p *pla
if err != nil {
return nil, err
}
r := newResult(yield)
r := newResult(name, yield)
ds.AddTransformation(r)
es.results[name] = r
}

View File

@ -10,6 +10,8 @@ import (
// result implements both the Transformation and Result interfaces,
// mapping the pushed based Transformation API to the pull based Result interface.
type result struct {
name string
mu sync.Mutex
blocks chan resultMessage
@ -22,8 +24,9 @@ type resultMessage struct {
err error
}
func newResult(plan.YieldSpec) *result {
func newResult(name string, spec plan.YieldSpec) *result {
return &result{
name: name,
// TODO(nathanielc): Currently this buffer needs to be big enough hold all result blocks :(
blocks: make(chan resultMessage, 1000),
abortErr: make(chan error, 1),
@ -31,6 +34,9 @@ func newResult(plan.YieldSpec) *result {
}
}
func (s *result) Name() string {
return s.name
}
func (s *result) RetractBlock(DatasetID, query.PartitionKey) error {
//TODO implement
return nil

View File

@ -30,7 +30,8 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e
resp := Response{}
for results.More() {
name, r := results.Next()
r := results.Next()
name := r.Name()
id, err := strconv.Atoi(name)
if err != nil {
resp.error(fmt.Errorf("unable to parse statement id from result name: %s", err))

View File

@ -22,23 +22,22 @@ func TestMultiResultEncoder_Encode(t *testing.T) {
}{
{
name: "Default",
in: query.NewMapResultIterator(
map[string]query.Result{
"0": &executetest.Result{
Blks: []*executetest.Block{{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "value", Type: query.TFloat},
},
Data: [][]interface{}{
{mustParseTime("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)},
},
}},
},
},
in: query.NewSliceResultIterator(
[]query.Result{&executetest.Result{
Nm: "0",
Blks: []*executetest.Block{{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "value", Type: query.TFloat},
},
Data: [][]interface{}{
{mustParseTime("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)},
},
}},
}},
),
out: `{"results":[{"statement_id":0,"series":[{"name":"m0","tags":{"host":"server01"},"columns":["time","value"],"values":[["2018-05-24T09:00:00Z",2]]}]}]}`,
},
@ -66,9 +65,9 @@ type resultErrorIterator struct {
Error string
}
func (*resultErrorIterator) Cancel() {}
func (*resultErrorIterator) More() bool { return false }
func (*resultErrorIterator) Next() (string, query.Result) { return "", nil }
func (*resultErrorIterator) Cancel() {}
func (*resultErrorIterator) More() bool { return false }
func (*resultErrorIterator) Next() query.Result { panic("no results") }
func (ri *resultErrorIterator) Err() error {
return errors.New(ri.Error)

View File

@ -21,9 +21,9 @@ type ResultIterator interface {
// More must be called until it returns false in order to free all resources.
More() bool
// Next returns the next name and results.
// Next returns the next result.
// If More is false, Next panics.
Next() (string, Result)
Next() Result
// Cancel discards the remaining results.
// If not all results are going to be read, Cancel must be called to free resources.
@ -125,7 +125,7 @@ DONE:
return false
}
func (r *resultIterator) Next() (string, Result) {
func (r *resultIterator) Next() Result {
return r.results.Next()
}
@ -163,10 +163,10 @@ func (r *MapResultIterator) More() bool {
return len(r.order) > 0
}
func (r *MapResultIterator) Next() (string, Result) {
func (r *MapResultIterator) Next() Result {
next := r.order[0]
r.order = r.order[1:]
return next, r.results[next]
return r.results[next]
}
func (r *MapResultIterator) Cancel() {
@ -176,3 +176,30 @@ func (r *MapResultIterator) Cancel() {
func (r *MapResultIterator) Err() error {
return nil
}
type SliceResultIterator struct {
results []Result
}
func NewSliceResultIterator(results []Result) *SliceResultIterator {
return &SliceResultIterator{
results: results,
}
}
func (r *SliceResultIterator) More() bool {
return len(r.results) > 0
}
func (r *SliceResultIterator) Next() Result {
next := r.results[0]
r.results = r.results[1:]
return next
}
func (r *SliceResultIterator) Cancel() {
}
func (r *SliceResultIterator) Err() error {
return nil
}

View File

@ -124,7 +124,6 @@ func Test_QueryEndToEnd(t *testing.T) {
t.Errorf("failed to run influxql query spec for test case %s. error=%s", prefix, err)
}
}
}
}
@ -163,7 +162,7 @@ func QueryTestCheckSpec(t *testing.T, qs query.QueryServiceBridge, spec *query.S
buf := new(bytes.Buffer)
// we are only expecting one result, for now
for results.More() {
_, res := results.Next()
res := results.Next()
err := enc.Encode(buf, res)
if err != nil {

View File

@ -3,3 +3,4 @@ from(db:"test")
|> group(by:["_measurement"])
|> max()
|> map(fn: (r) => {_time:r._time, _measurement:r._measurement, _field: r._field, max:r._value})
|> yield(name:"0")

View File

@ -1,5 +1,5 @@
#datatype,string,long,string,string,dateTime:RFC3339,double
#partition,false,false,false,true,false,false
#default,_result,,,,,
#default,0,,,,,
,result,table,_field,_measurement,_time,max
,,0,f1,m1,2018-04-17T00:00:01Z,43

1 #datatype string long string string dateTime:RFC3339 double
2 #partition false false false true false false
3 #default _result 0
4 result table _field _measurement _time max
5 0 f1 m1 2018-04-17T00:00:01Z 43

View File

@ -7,6 +7,7 @@ import (
)
type Result interface {
Name() string
// Blocks returns a BlockIterator for iterating through results
Blocks() BlockIterator
}
@ -138,10 +139,17 @@ type MultiResultEncoder interface {
// DelimitedMultiResultEncoder encodes multiple results using a trailing delimiter.
// The delimiter is written after every result.
//
// If an error is encountered when iterating EncodeError will be called on the Encoder.
//
// If the io.Writer implements flusher, it will be flushed after each delimiter.
type DelimitedMultiResultEncoder struct {
Delimiter []byte
Encoder ResultEncoder
Encoder interface {
ResultEncoder
// EncodeError encodes an error on the writer.
EncodeError(w io.Writer, err error) error
}
}
type flusher interface {
@ -150,18 +158,21 @@ type flusher interface {
func (e *DelimitedMultiResultEncoder) Encode(w io.Writer, results ResultIterator) error {
for results.More() {
//TODO(nathanielc): Make the result name a property of a result.
_, result := results.Next()
result := results.Next()
if err := e.Encoder.Encode(w, result); err != nil {
return err
}
if _, err := w.Write(e.Delimiter); err != nil {
return nil
return err
}
// Flush the writer after each result
if f, ok := w.(flusher); ok {
f.Flush()
}
}
return results.Err()
err := results.Err()
if err != nil {
return e.Encoder.EncodeError(w, err)
}
return nil
}