Merge pull request #32 from influxdata/js-transpiler-error-handling

feat(http): perform error handling in the transpiler and the query service
pull/10616/head
Jonathan A. Sternberg 2018-05-24 17:18:01 -05:00 committed by GitHub
commit ea6e96bd09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 200 additions and 219 deletions

View File

@ -63,6 +63,7 @@ func transpileF(cmd *cobra.Command, logger *zap.Logger, args []string) error {
transpileHandler.QueryService = &http.QueryService{
Addr: hosts[0],
}
transpileHandler.Logger = logger
//TODO(nathanielc): Add health checks

View File

@ -190,6 +190,12 @@ func (s *QueryService) QueryWithCompile(ctx context.Context, orgID platform.ID,
}
func (s *QueryService) processResponse(resp *http.Response) (query.ResultIterator, error) {
if err := CheckError(resp); err != nil {
return nil, err
}
// TODO(jsternberg): Handle a 204 response?
var decoder query.MultiResultDecoder
switch resp.Header.Get("Content-Type") {
case "text/csv":

View File

@ -9,11 +9,13 @@ import (
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
)
type TranspilerQueryHandler struct {
*httprouter.Router
Logger *zap.Logger
QueryService query.QueryService
}
@ -21,6 +23,7 @@ type TranspilerQueryHandler struct {
func NewTranspilerQueryHandler() *TranspilerQueryHandler {
h := &TranspilerQueryHandler{
Router: httprouter.New(),
Logger: zap.NewNop(),
}
h.HandlerFunc("POST", "/v1/transpiler/query", h.handlePostQuery)
@ -100,9 +103,10 @@ func (h *TranspilerQueryHandler) handlePostInfluxQL(w http.ResponseWriter, r *ht
return
}
err = encodeResult(w, results, ce.contentType, ce.encoder)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
// Once we have reached this stage, it is the encoder's job to encode any
// errors in the encoded format.
if err := encodeResult(w, results, ce.contentType, ce.encoder); err != nil {
h.Logger.Info("Unable to encode result", zap.Error(err))
return
}
}

View File

@ -80,6 +80,9 @@ type MultiResultDecoder struct {
// NewMultiResultDecoder creates a new MultiResultDecoder.
func NewMultiResultDecoder(c ResultDecoderConfig) *MultiResultDecoder {
if c.MaxBufferCount == 0 {
c.MaxBufferCount = defaultMaxBufferCount
}
return &MultiResultDecoder{
c: c,
}

View File

@ -1,6 +1,8 @@
package executetest
import "github.com/influxdata/platform/query"
import (
"github.com/influxdata/platform/query"
)
type Result struct {
Blks []*Block

View File

@ -1,72 +0,0 @@
package influxql
import (
"testing"
"time"
"bytes"
"strings"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/execute/executetest"
)
var epoch = time.Unix(0, 0)
func TestMultiResultEncoder_Encode(t *testing.T) {
testCases := []struct {
name string
query string
blocks map[string][]*executetest.Block
output string
}{
{
query: "",
name: "one result one row",
blocks: map[string][]*executetest.Block{
"0": {{
KeyCols: []string{"_measurement", "_field", "host"},
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "_field", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: execute.DefaultValueColLabel, Type: query.TFloat},
},
Data: [][]interface{}{
{execute.Time(5), "cpu", "max", "localhost", 98.9},
},
}},
},
output: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"localhost"},"columns":["time","max"],"values":[["1970-01-01T00:00:00Z",98.9]]}]}]}`,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
resultsmap := map[string]query.Result{}
for k, v := range tc.blocks {
resultsmap[k] = executetest.NewResult(v)
}
results := query.NewMapResultIterator(resultsmap)
var resp bytes.Buffer
var influxQLEncoder MultiResultEncoder
err := influxQLEncoder.Encode(&resp, results)
if err != nil {
t.Error("error writing to buffer: ", err)
}
got := strings.TrimSpace(resp.String())
if !cmp.Equal(got, tc.output) {
t.Error("unexpected results -want/+got", cmp.Diff(tc.output, got))
}
})
}
}

View File

@ -1,10 +1,5 @@
package influxql
import (
"encoding/json"
"errors"
)
// all of this code is copied more or less verbatim from the influxdb repo.
// we copy instead of sharing because we want to prevent inadvertent breaking
// changes introduced by the transpiler vs the actual InfluxQL engine.
@ -12,8 +7,13 @@ import (
// results generated by the transpiler diverge from InfluxQL.
type Response struct {
Results []Result `json:"results"`
Err string `json:"err,omitempty"`
Results []Result `json:"results,omitempty"`
Err string `json:"error,omitempty"`
}
func (r *Response) error(err error) {
r.Results = nil
r.Err = err.Error()
}
// Message represents a user-facing message to be included with the result.
@ -27,56 +27,18 @@ type Message struct {
type Result struct {
// StatementID is just the statement's position in the query. It's used
// to combine statement results if they're being buffered in memory.
StatementID int
Series Rows
Messages []*Message
Partial bool
Err error
StatementID int `json:"statement_id"`
Series []*Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Partial bool `json:"partial,omitempty"`
Err string `json:"error,omitempty"`
}
// MarshalJSON encodes the result into JSON.
func (r *Result) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
StatementID int `json:"statement_id"`
Series []*Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Partial bool `json:"partial,omitempty"`
Err string `json:"error,omitempty"`
}
// Copy fields to output struct.
o.StatementID = r.StatementID
o.Series = r.Series
o.Messages = r.Messages
o.Partial = r.Partial
if r.Err != nil {
o.Err = r.Err.Error()
}
return json.Marshal(&o)
}
// UnmarshalJSON decodes the data into the Result struct
func (r *Result) UnmarshalJSON(b []byte) error {
var o struct {
StatementID int `json:"statement_id"`
Series []*Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Partial bool `json:"partial,omitempty"`
Err string `json:"error,omitempty"`
}
err := json.Unmarshal(b, &o)
if err != nil {
return err
}
r.StatementID = o.StatementID
r.Series = o.Series
r.Messages = o.Messages
r.Partial = o.Partial
if o.Err != "" {
r.Err = errors.New(o.Err)
}
return nil
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}

View File

@ -9,7 +9,6 @@ import (
"time"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
)
// MultiResultEncoder encodes results as InfluxQL JSON format.
@ -17,13 +16,16 @@ type MultiResultEncoder struct{}
// Encode writes a collection of results to the influxdb 1.X http response format.
// Expectations/Assumptions:
// 1. Each result will be published as a 'statement' in the top-level list of results. The 'staementID'
// will be interpreted as an integer, and will return an error otherwise.
// 2. If the _field name is present in the tags, and a _value column is present, the _value column will
// be renamed to the value of the _field tag
// 3. If the _measurement name is present in the tags, it will be used as the row.Name for all rows.
// Otherwise, we'll use the column value, which _must_ be present in that case.
// 1. Each result will be published as a 'statement' in the top-level list of results. The result name
// will be interpreted as an integer and used as the statement id.
// 2. If the _measurement name is present in the partition key, it will be used as the result name instead
// of as a normal tag.
// 3. All columns in the partition key must be strings and they will be used as tags. There is no current way
// to have a tag and field be the same name in the results.
// TODO(jsternberg): For full compatibility, the above must be possible.
// 4. All other columns are fields and will be output in the order they are found.
// TODO(jsternberg): This function currently requires the first column to be a time field, but this isn't
// a strict requirement and will be lifted when we begin to work on transpiling meta queries.
func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) error {
resp := Response{}
@ -31,17 +33,17 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e
name, r := results.Next()
id, err := strconv.Atoi(name)
if err != nil {
return fmt.Errorf("unable to parse statement id from result name: %s", err)
resp.error(fmt.Errorf("unable to parse statement id from result name: %s", err))
results.Cancel()
break
}
blocks := r.Blocks()
result := Result{StatementID: id}
err = blocks.Do(func(b query.Block) error {
r := NewRow()
if err := blocks.Do(func(b query.Block) error {
var r Row
fieldName := ""
measurementVaries := -1
for j, c := range b.Key().Cols() {
if c.Type != query.TString {
return fmt.Errorf("partition column %q is not a string type", c.Label)
@ -49,89 +51,97 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e
v := b.Key().Value(j).(string)
if c.Label == "_measurement" {
r.Name = v
} else if c.Label == "_field" {
fieldName = v
} else {
if r.Tags == nil {
r.Tags = make(map[string]string)
}
r.Tags[c.Label] = v
}
}
for i, c := range b.Cols() {
if c.Label == "_time" {
for _, c := range b.Cols() {
if c.Label == "time" {
r.Columns = append(r.Columns, "time")
} else if c.Label == "_value" && fieldName != "" {
r.Columns = append(r.Columns, fieldName)
} else if !b.Key().HasCol(c.Label) {
r.Columns = append(r.Columns, c.Label)
if r.Name == "" && c.Label == "_measurement" {
measurementVaries = i
}
}
}
if r.Name == "" && measurementVaries == -1 {
return fmt.Errorf("no Measurement name found in result blocks for result: %s", name)
}
timeIdx := execute.ColIdx(execute.DefaultTimeColLabel, b.Cols())
if timeIdx < 0 {
return errors.New("table must have an _time column")
}
if typ := b.Cols()[timeIdx].Type; typ != query.TTime {
return fmt.Errorf("column _time must be of type Time got %v", typ)
}
err := b.Do(func(cr query.ColReader) error {
ts := cr.Times(timeIdx)
for i := range ts {
var v []interface{}
for j, c := range cr.Cols() {
if cr.Key().HasCol(c.Label) {
continue
}
if j == measurementVaries {
if c.Type != query.TString {
return errors.New("unexpected type, _measurement is not a string")
}
r.Name = cr.Strings(j)[i]
continue
}
if err := b.Do(func(cr query.ColReader) error {
var values [][]interface{}
j := 0
for idx, c := range b.Cols() {
if cr.Key().HasCol(c.Label) {
continue
}
// Use the first column, usually time, to pre-generate all of the value containers.
if j == 0 {
switch c.Type {
case query.TFloat:
v = append(v, cr.Floats(j)[i])
case query.TInt:
v = append(v, cr.Ints(j)[i])
case query.TString:
v = append(v, cr.Strings(j)[i])
case query.TUInt:
v = append(v, cr.UInts(j)[i])
case query.TBool:
v = append(v, cr.Bools(j)[i])
case query.TTime:
v = append(v, cr.Times(j)[i].Time().Format(time.RFC3339))
values = make([][]interface{}, len(cr.Times(0)))
default:
v = append(v, "unknown")
// TODO(jsternberg): Support using other columns. This will
// mostly be necessary for meta queries.
return errors.New("first column must be time")
}
for j := range values {
values[j] = make([]interface{}, len(r.Columns))
}
}
r.Values = append(r.Values, v)
// Fill in the values for each column.
switch c.Type {
case query.TFloat:
for i, v := range cr.Floats(idx) {
values[i][j] = v
}
case query.TInt:
for i, v := range cr.Ints(idx) {
values[i][j] = v
}
case query.TString:
for i, v := range cr.Strings(idx) {
values[i][j] = v
}
case query.TUInt:
for i, v := range cr.UInts(idx) {
values[i][j] = v
}
case query.TBool:
for i, v := range cr.Bools(idx) {
values[i][j] = v
}
case query.TTime:
for i, v := range cr.Times(idx) {
values[i][j] = v.Time().Format(time.RFC3339)
}
default:
return fmt.Errorf("unsupported column type: %s", c.Type)
}
j++
}
r.Values = append(r.Values, values...)
return nil
})
if err != nil {
}); err != nil {
return err
}
result.Series = append(result.Series, r)
result.Series = append(result.Series, &r)
return nil
})
if err != nil {
return fmt.Errorf("error iterating through results: %s", err)
}); err != nil {
resp.error(err)
results.Cancel()
break
}
resp.Results = append(resp.Results, result)
}
if err := results.Err(); err != nil {
resp.error(err)
}
return json.NewEncoder(w).Encode(resp)
}
func NewMultiResultEncoder() *MultiResultEncoder {

View File

@ -0,0 +1,83 @@
package influxql_test
import (
"bytes"
"errors"
"strings"
"testing"
"time"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/influxql"
)
func TestMultiResultEncoder_Encode(t *testing.T) {
for _, tt := range []struct {
name string
in query.ResultIterator
out string
}{
{
name: "Default",
in: query.NewMapResultIterator(
map[string]query.Result{
"0": &executetest.Result{
Blks: []*executetest.Block{{
KeyCols: []string{"_measurement", "host"},
ColMeta: []query.ColMeta{
{Label: "time", Type: query.TTime},
{Label: "_measurement", Type: query.TString},
{Label: "host", Type: query.TString},
{Label: "value", Type: query.TFloat},
},
Data: [][]interface{}{
{mustParseTime("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)},
},
}},
},
},
),
out: `{"results":[{"statement_id":0,"series":[{"name":"m0","tags":{"host":"server01"},"columns":["time","value"],"values":[["2018-05-24T09:00:00Z",2]]}]}]}`,
},
{
name: "Error",
in: &resultErrorIterator{Error: "expected"},
out: `{"error":"expected"}`,
},
} {
t.Run(tt.name, func(t *testing.T) {
var buf bytes.Buffer
enc := influxql.NewMultiResultEncoder()
if err := enc.Encode(&buf, tt.in); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if got, exp := strings.TrimSpace(buf.String()), tt.out; got != exp {
t.Fatalf("unexpected output:\nexp=%s\ngot=%s", exp, got)
}
})
}
}
type resultErrorIterator struct {
Error string
}
func (*resultErrorIterator) Cancel() {}
func (*resultErrorIterator) More() bool { return false }
func (*resultErrorIterator) Next() (string, query.Result) { return "", nil }
func (ri *resultErrorIterator) Err() error {
return errors.New(ri.Error)
}
func mustParseTime(s string) execute.Time {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
panic(err)
}
return execute.Time(t.UnixNano())
}

View File

@ -1,18 +0,0 @@
package influxql
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}
type Rows []*Row
func NewRow() *Row {
return &Row{
Tags: make(map[string]string),
}
}