influxdb/query/influxql/result.go

140 lines
3.8 KiB
Go
Raw Normal View History

package influxql
import (
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"time"
"github.com/influxdata/platform/query"
2018-05-21 21:20:06 +00:00
"github.com/influxdata/platform/query/execute"
)
// MultiResultEncoder encodes results as InfluxQL JSON format.
type MultiResultEncoder struct{}
// Encode writes a collection of results to the influxdb 1.X http response format.
// Expectations/Assumptions:
// 1. Each result will be published as a 'statement' in the top-level list of results. The 'staementID'
// will be interpreted as an integer, and will return an error otherwise.
// 2. If the _field name is present in the tags, and a _value column is present, the _value column will
// be renamed to the value of the _field tag
// 3. If the _measurement name is present in the tags, it will be used as the row.Name for all rows.
// Otherwise, we'll use the column value, which _must_ be present in that case.
func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) error {
resp := Response{}
for results.More() {
name, r := results.Next()
id, err := strconv.Atoi(name)
if err != nil {
return fmt.Errorf("unable to parse statement id from result name: %s", err)
}
blocks := r.Blocks()
result := Result{StatementID: id}
err = blocks.Do(func(b query.Block) error {
r := NewRow()
fieldName := ""
measurementVaries := -1
for j, c := range b.Key().Cols() {
if c.Type != query.TString {
return fmt.Errorf("partition column %q is not a string type", c.Label)
}
v := b.Key().Value(j).(string)
if c.Label == "_measurement" {
r.Name = v
} else if c.Label == "_field" {
fieldName = v
} else {
r.Tags[c.Label] = v
}
}
for i, c := range b.Cols() {
if c.Label == "_time" {
r.Columns = append(r.Columns, "time")
} else if c.Label == "_value" && fieldName != "" {
r.Columns = append(r.Columns, fieldName)
} else if !b.Key().HasCol(c.Label) {
r.Columns = append(r.Columns, c.Label)
if r.Name == "" && c.Label == "_measurement" {
measurementVaries = i
}
}
}
if r.Name == "" && measurementVaries == -1 {
return fmt.Errorf("no Measurement name found in result blocks for result: %s", name)
}
timeIdx := execute.ColIdx(execute.DefaultTimeColLabel, b.Cols())
if timeIdx < 0 {
return errors.New("table must have an _time column")
}
if typ := b.Cols()[timeIdx].Type; typ != query.TTime {
return fmt.Errorf("column _time must be of type Time got %v", typ)
}
err := b.Do(func(cr query.ColReader) error {
ts := cr.Times(timeIdx)
for i := range ts {
var v []interface{}
for j, c := range cr.Cols() {
if cr.Key().HasCol(c.Label) {
continue
}
if j == measurementVaries {
if c.Type != query.TString {
return errors.New("unexpected type, _measurement is not a string")
}
r.Name = cr.Strings(j)[i]
continue
}
switch c.Type {
case query.TFloat:
v = append(v, cr.Floats(j)[i])
case query.TInt:
v = append(v, cr.Ints(j)[i])
case query.TString:
v = append(v, cr.Strings(j)[i])
case query.TUInt:
v = append(v, cr.UInts(j)[i])
case query.TBool:
v = append(v, cr.Bools(j)[i])
case query.TTime:
v = append(v, cr.Times(j)[i].Time().Format(time.RFC3339))
default:
v = append(v, "unknown")
}
}
r.Values = append(r.Values, v)
}
return nil
})
if err != nil {
return err
}
result.Series = append(result.Series, r)
return nil
})
if err != nil {
return fmt.Errorf("error iterating through results: %s", err)
}
resp.Results = append(resp.Results, result)
}
return json.NewEncoder(w).Encode(resp)
}
func NewMultiResultEncoder() *MultiResultEncoder {
return new(MultiResultEncoder)
}