fix(flux): return internal influxdb error for cursor conflict (#17697)
* fix(flux): return internal influxdb error for cursor conflict Closes https://github.com/influxdata/influxdb/issues/17680. * refactor: update error code from EInternal to EInvalidpull/17701/head
parent
67ccbd4490
commit
5fe5da4ec7
|
@ -17,6 +17,32 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
||||
// GroupCursorError is returned when two different cursor types
|
||||
// are read for the same table.
|
||||
type GroupCursorError struct {
|
||||
typ string
|
||||
cursor cursors.Cursor
|
||||
}
|
||||
|
||||
func (err *GroupCursorError) Error() string {
|
||||
var got string
|
||||
switch err.cursor.(type) {
|
||||
case cursors.FloatArrayCursor:
|
||||
got = "float"
|
||||
case cursors.IntegerArrayCursor:
|
||||
got = "integer"
|
||||
case cursors.UnsignedArrayCursor:
|
||||
got = "unsigned"
|
||||
case cursors.StringArrayCursor:
|
||||
got = "string"
|
||||
case cursors.BooleanArrayCursor:
|
||||
got = "boolean"
|
||||
default:
|
||||
got = "invalid"
|
||||
}
|
||||
return fmt.Sprintf("schema collision: cannot group %s and %s types together", err.typ, got)
|
||||
}
|
||||
|
||||
type storageTable interface {
|
||||
flux.Table
|
||||
Close()
|
||||
|
|
|
@ -13,10 +13,10 @@ import (
|
|||
"github.com/influxdata/flux/arrow"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/memory"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
storage "github.com/influxdata/influxdb/v2/storage/reads"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
//
|
||||
|
@ -182,7 +182,13 @@ func (t *floatGroupTable) advanceCursor() bool {
|
|||
if typedCur, ok := cur.(cursors.FloatArrayCursor); !ok {
|
||||
// TODO(sgc): error or skip?
|
||||
cur.Close()
|
||||
t.err = errors.Errorf("expected float cursor type, got %T", cur)
|
||||
t.err = &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: &GroupCursorError{
|
||||
typ: "float",
|
||||
cursor: cur,
|
||||
},
|
||||
}
|
||||
return false
|
||||
} else {
|
||||
t.readTags(t.gc.Tags())
|
||||
|
@ -367,7 +373,13 @@ func (t *integerGroupTable) advanceCursor() bool {
|
|||
if typedCur, ok := cur.(cursors.IntegerArrayCursor); !ok {
|
||||
// TODO(sgc): error or skip?
|
||||
cur.Close()
|
||||
t.err = errors.Errorf("expected integer cursor type, got %T", cur)
|
||||
t.err = &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: &GroupCursorError{
|
||||
typ: "integer",
|
||||
cursor: cur,
|
||||
},
|
||||
}
|
||||
return false
|
||||
} else {
|
||||
t.readTags(t.gc.Tags())
|
||||
|
@ -552,7 +564,13 @@ func (t *unsignedGroupTable) advanceCursor() bool {
|
|||
if typedCur, ok := cur.(cursors.UnsignedArrayCursor); !ok {
|
||||
// TODO(sgc): error or skip?
|
||||
cur.Close()
|
||||
t.err = errors.Errorf("expected unsigned cursor type, got %T", cur)
|
||||
t.err = &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: &GroupCursorError{
|
||||
typ: "unsigned",
|
||||
cursor: cur,
|
||||
},
|
||||
}
|
||||
return false
|
||||
} else {
|
||||
t.readTags(t.gc.Tags())
|
||||
|
@ -737,7 +755,13 @@ func (t *stringGroupTable) advanceCursor() bool {
|
|||
if typedCur, ok := cur.(cursors.StringArrayCursor); !ok {
|
||||
// TODO(sgc): error or skip?
|
||||
cur.Close()
|
||||
t.err = errors.Errorf("expected string cursor type, got %T", cur)
|
||||
t.err = &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: &GroupCursorError{
|
||||
typ: "string",
|
||||
cursor: cur,
|
||||
},
|
||||
}
|
||||
return false
|
||||
} else {
|
||||
t.readTags(t.gc.Tags())
|
||||
|
@ -922,7 +946,13 @@ func (t *booleanGroupTable) advanceCursor() bool {
|
|||
if typedCur, ok := cur.(cursors.BooleanArrayCursor); !ok {
|
||||
// TODO(sgc): error or skip?
|
||||
cur.Close()
|
||||
t.err = errors.Errorf("expected boolean cursor type, got %T", cur)
|
||||
t.err = &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: &GroupCursorError{
|
||||
typ: "boolean",
|
||||
cursor: cur,
|
||||
},
|
||||
}
|
||||
return false
|
||||
} else {
|
||||
t.readTags(t.gc.Tags())
|
||||
|
|
|
@ -7,10 +7,10 @@ import (
|
|||
"github.com/influxdata/flux/arrow"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/memory"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
storage "github.com/influxdata/influxdb/v2/storage/reads"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
{{range .}}
|
||||
//
|
||||
|
@ -176,7 +176,13 @@ func (t *{{.name}}GroupTable) advanceCursor() bool {
|
|||
if typedCur, ok := cur.(cursors.{{.Name}}ArrayCursor); !ok {
|
||||
// TODO(sgc): error or skip?
|
||||
cur.Close()
|
||||
t.err = errors.Errorf("expected {{.name}} cursor type, got %T", cur)
|
||||
t.err = &influxdb.Error {
|
||||
Code: influxdb.EInvalid,
|
||||
Err: &GroupCursorError {
|
||||
typ: "{{.name}}",
|
||||
cursor: cur,
|
||||
},
|
||||
}
|
||||
return false
|
||||
} else {
|
||||
t.readTags(t.gc.Tags())
|
||||
|
|
Loading…
Reference in New Issue