2018-05-15 20:11:32 +00:00
|
|
|
package influxql
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"strconv"
|
|
|
|
"time"
|
|
|
|
|
2018-05-21 23:02:42 +00:00
|
|
|
"github.com/influxdata/platform/query"
|
2018-05-15 20:11:32 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// MultiResultEncoder encodes results as InfluxQL JSON format.
|
|
|
|
type MultiResultEncoder struct{}
|
|
|
|
|
|
|
|
// Encode writes a collection of results to the influxdb 1.X http response format.
|
|
|
|
// Expectations/Assumptions:
|
2018-05-24 22:14:16 +00:00
|
|
|
// 1. Each result will be published as a 'statement' in the top-level list of results. The result name
|
|
|
|
// will be interpreted as an integer and used as the statement id.
|
|
|
|
// 2. If the _measurement name is present in the partition key, it will be used as the result name instead
|
|
|
|
// of as a normal tag.
|
|
|
|
// 3. All columns in the partition key must be strings and they will be used as tags. There is no current way
|
|
|
|
// to have a tag and field be the same name in the results.
|
|
|
|
// TODO(jsternberg): For full compatibility, the above must be possible.
|
|
|
|
// 4. All other columns are fields and will be output in the order they are found.
|
|
|
|
// TODO(jsternberg): This function currently requires the first column to be a time field, but this isn't
|
|
|
|
// a strict requirement and will be lifted when we begin to work on transpiling meta queries.
|
2018-05-21 23:02:42 +00:00
|
|
|
func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) error {
|
2018-05-15 20:11:32 +00:00
|
|
|
resp := Response{}
|
|
|
|
|
|
|
|
for results.More() {
|
2018-05-24 17:29:36 +00:00
|
|
|
r := results.Next()
|
|
|
|
name := r.Name()
|
2018-05-24 21:33:51 +00:00
|
|
|
id, err := strconv.Atoi(name)
|
2018-05-15 20:11:32 +00:00
|
|
|
if err != nil {
|
2018-05-24 22:14:16 +00:00
|
|
|
resp.error(fmt.Errorf("unable to parse statement id from result name: %s", err))
|
|
|
|
results.Cancel()
|
|
|
|
break
|
2018-05-15 20:11:32 +00:00
|
|
|
}
|
|
|
|
|
2018-05-24 21:33:51 +00:00
|
|
|
blocks := r.Blocks()
|
|
|
|
|
|
|
|
result := Result{StatementID: id}
|
2018-05-24 22:14:16 +00:00
|
|
|
if err := blocks.Do(func(b query.Block) error {
|
|
|
|
var r Row
|
2018-05-15 20:11:32 +00:00
|
|
|
|
2018-05-19 21:28:37 +00:00
|
|
|
for j, c := range b.Key().Cols() {
|
2018-05-21 23:02:42 +00:00
|
|
|
if c.Type != query.TString {
|
2018-05-19 21:28:37 +00:00
|
|
|
return fmt.Errorf("partition column %q is not a string type", c.Label)
|
|
|
|
}
|
2018-05-24 15:08:32 +00:00
|
|
|
v := b.Key().Value(j).Str()
|
2018-05-19 21:28:37 +00:00
|
|
|
if c.Label == "_measurement" {
|
2018-05-15 20:11:32 +00:00
|
|
|
r.Name = v
|
|
|
|
} else {
|
2018-05-24 22:14:16 +00:00
|
|
|
if r.Tags == nil {
|
|
|
|
r.Tags = make(map[string]string)
|
|
|
|
}
|
2018-05-19 21:28:37 +00:00
|
|
|
r.Tags[c.Label] = v
|
2018-05-15 20:11:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-24 22:14:16 +00:00
|
|
|
for _, c := range b.Cols() {
|
|
|
|
if c.Label == "time" {
|
2018-05-15 20:11:32 +00:00
|
|
|
r.Columns = append(r.Columns, "time")
|
2018-05-19 21:28:37 +00:00
|
|
|
} else if !b.Key().HasCol(c.Label) {
|
2018-05-15 20:11:32 +00:00
|
|
|
r.Columns = append(r.Columns, c.Label)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-24 22:14:16 +00:00
|
|
|
if err := b.Do(func(cr query.ColReader) error {
|
|
|
|
var values [][]interface{}
|
|
|
|
j := 0
|
|
|
|
for idx, c := range b.Cols() {
|
|
|
|
if cr.Key().HasCol(c.Label) {
|
|
|
|
continue
|
|
|
|
}
|
2018-05-15 20:11:32 +00:00
|
|
|
|
2018-05-24 22:14:16 +00:00
|
|
|
// Use the first column, usually time, to pre-generate all of the value containers.
|
|
|
|
if j == 0 {
|
2018-05-15 20:11:32 +00:00
|
|
|
switch c.Type {
|
2018-05-21 23:02:42 +00:00
|
|
|
case query.TTime:
|
2018-05-24 22:14:16 +00:00
|
|
|
values = make([][]interface{}, len(cr.Times(0)))
|
2018-05-15 20:11:32 +00:00
|
|
|
default:
|
2018-05-24 22:14:16 +00:00
|
|
|
// TODO(jsternberg): Support using other columns. This will
|
|
|
|
// mostly be necessary for meta queries.
|
|
|
|
return errors.New("first column must be time")
|
|
|
|
}
|
|
|
|
|
|
|
|
for j := range values {
|
|
|
|
values[j] = make([]interface{}, len(r.Columns))
|
2018-05-15 20:11:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-24 22:14:16 +00:00
|
|
|
// Fill in the values for each column.
|
|
|
|
switch c.Type {
|
|
|
|
case query.TFloat:
|
|
|
|
for i, v := range cr.Floats(idx) {
|
|
|
|
values[i][j] = v
|
|
|
|
}
|
|
|
|
case query.TInt:
|
|
|
|
for i, v := range cr.Ints(idx) {
|
|
|
|
values[i][j] = v
|
|
|
|
}
|
|
|
|
case query.TString:
|
|
|
|
for i, v := range cr.Strings(idx) {
|
|
|
|
values[i][j] = v
|
|
|
|
}
|
|
|
|
case query.TUInt:
|
|
|
|
for i, v := range cr.UInts(idx) {
|
|
|
|
values[i][j] = v
|
|
|
|
}
|
|
|
|
case query.TBool:
|
|
|
|
for i, v := range cr.Bools(idx) {
|
|
|
|
values[i][j] = v
|
|
|
|
}
|
|
|
|
case query.TTime:
|
|
|
|
for i, v := range cr.Times(idx) {
|
|
|
|
values[i][j] = v.Time().Format(time.RFC3339)
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("unsupported column type: %s", c.Type)
|
|
|
|
}
|
|
|
|
j++
|
2018-05-15 20:11:32 +00:00
|
|
|
}
|
2018-05-24 22:14:16 +00:00
|
|
|
r.Values = append(r.Values, values...)
|
2018-05-19 21:28:37 +00:00
|
|
|
return nil
|
2018-05-24 22:14:16 +00:00
|
|
|
}); err != nil {
|
2018-05-19 21:28:37 +00:00
|
|
|
return err
|
2018-05-15 20:11:32 +00:00
|
|
|
}
|
|
|
|
|
2018-05-24 22:14:16 +00:00
|
|
|
result.Series = append(result.Series, &r)
|
2018-05-15 20:11:32 +00:00
|
|
|
return nil
|
2018-05-24 22:14:16 +00:00
|
|
|
}); err != nil {
|
|
|
|
resp.error(err)
|
|
|
|
results.Cancel()
|
|
|
|
break
|
2018-05-15 20:11:32 +00:00
|
|
|
}
|
|
|
|
resp.Results = append(resp.Results, result)
|
|
|
|
}
|
|
|
|
|
2018-05-24 22:14:16 +00:00
|
|
|
if err := results.Err(); err != nil {
|
|
|
|
resp.error(err)
|
|
|
|
}
|
|
|
|
|
2018-05-15 20:11:32 +00:00
|
|
|
return json.NewEncoder(w).Encode(resp)
|
|
|
|
}
|
|
|
|
func NewMultiResultEncoder() *MultiResultEncoder {
|
|
|
|
return new(MultiResultEncoder)
|
|
|
|
}
|