fix: Add back flux CLI (#20889)

Closes:
https://github.com/influxdata/influxdb/issues/18947
https://github.com/influxdata/influxdb/issues/20852

Similarly to 2.x, the flux CLI now only supports the -execute argument
or commands on stdin. Examples:

```
➜ echo 'import "influxdata/influxdb/v1" v1.databases()' | ./influx -type flux
Result: _result
Table: keys: [organizationID]
 organizationID:string     databaseName:string  retentionPolicy:string         retentionPeriod:int  default:bool         bucketId:string
----------------------  ----------------------  ----------------------  --------------------------  ------------  ----------------------
                                     _internal                 monitor             604800000000000          true
                                      telegraf                 autogen                           0          true
                                            db                 autogen                           0          true

➜ ./influx -type flux -execute 'import "influxdata/influxdb/v1" v1.databases()'
Result: _result
Table: keys: [organizationID]
 organizationID:string     databaseName:string  retentionPolicy:string         retentionPeriod:int  default:bool         bucketId:string
----------------------  ----------------------  ----------------------  --------------------------  ------------  ----------------------
                                     _internal                 monitor             604800000000000          true
                                      telegraf                 autogen                           0          true
                                            db                 autogen                           0          true

➜ ./influx -type flux -execute 'import "influxdata/influxdb/v1"'
Error (500): error in query specification while starting program: this Flux script returns no streaming data. Consider adding a "yield" or invoking streaming functions directly, without performing an assignment
```

Note that tty input is explicitly forbidden, unlike 2.x:

```
➜ ./influx -type flux
Connected to http://localhost:8086 version unknown
InfluxDB shell version: unknown
Interactive flux is not supported. Provide your flux script via stdin or the -execute argument.
```
pull/20903/head
Sam Arnold 2021-03-09 09:56:52 -05:00 committed by GitHub
parent 7a992dac77
commit bf7dddaec5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 522 additions and 14 deletions

63
client/errors.go Normal file
View File

@ -0,0 +1,63 @@
package client
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"net/http"
"strings"
)
// CheckError reads the http.Response and returns an error if one exists.
// It will automatically recognize the errors returned by Influx services
// and decode the error into an internal error type. If the error cannot
// be determined in that way, it will create a generic error message.
//
// If there is no error, then this returns nil.
func CheckError(resp *http.Response) error {
switch resp.StatusCode / 100 {
case 4, 5:
// We will attempt to parse this error outside of this block.
case 2:
return nil
default:
// TODO(jsternberg): Figure out what to do here?
return fmt.Errorf("unexpected status code: %d %s", resp.StatusCode, resp.Status)
}
if resp.StatusCode == http.StatusUnsupportedMediaType {
return fmt.Errorf("invalid media type (%d): %q", resp.StatusCode, resp.Header.Get("Content-Type"))
}
contentType := resp.Header.Get("Content-Type")
if contentType == "" {
// Assume JSON if there is no content-type.
contentType = "application/json"
}
mediatype, _, _ := mime.ParseMediaType(contentType)
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return fmt.Errorf("failed to read error response: %w", err)
}
errResponse := Response{}
switch mediatype {
case "application/json":
if err := json.Unmarshal(buf.Bytes(), &errResponse); err != nil {
return fmt.Errorf("Attempted to unmarshal error (%d) but failed: %w", resp.StatusCode, err)
}
if errResponse.Err == nil {
return fmt.Errorf("Missing error in http response (%d)", resp.StatusCode)
}
return fmt.Errorf("Error (%d): %w", resp.StatusCode, errResponse.Err)
default:
}
// Return the first line as the error
line, _ := buf.ReadString('\n')
return errors.New(strings.TrimSuffix(line, "\n"))
}

View File

@ -19,6 +19,9 @@ import (
"strings"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
fluxClient "github.com/influxdata/influxdb/flux/client"
"github.com/influxdata/influxdb/models"
)
@ -303,6 +306,42 @@ func (c *Client) QueryContext(ctx context.Context, q Query) (*Response, error) {
return &response, nil
}
// QueryContext sends a command to the server and returns the Response
// It uses a context that can be cancelled by the command line client
func (c *Client) QueryFlux(ctx context.Context, query *fluxClient.QueryRequest) (flux.ResultIterator, error) {
u := c.url
u.Path = path.Join(u.Path, "api/v2/query")
body, err := json.Marshal(query)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), bytes.NewBuffer(body))
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", c.userAgent)
req.Header.Set("Content-Type", "application/json")
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if err := CheckError(resp); err != nil {
resp.Body.Close()
return nil, err
}
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
return dec.Decode(resp.Body)
}
// Write takes BatchPoints and allows for writing of multiple points with defaults
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.

View File

@ -23,13 +23,16 @@ import (
"syscall"
"text/tabwriter"
"golang.org/x/crypto/ssh/terminal"
"github.com/influxdata/flux"
csv2 "github.com/influxdata/flux/csv"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/client"
fluxClient "github.com/influxdata/influxdb/flux/client"
v8 "github.com/influxdata/influxdb/importer/v8"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxql"
"github.com/peterh/liner"
"golang.org/x/crypto/ssh/terminal"
)
// ErrBlankCommand is returned when a parsed command is empty.
@ -153,7 +156,7 @@ func (c *CommandLine) Run() error {
if c.Execute != "" {
switch c.Type {
case QueryLanguageFlux:
return c.ExecuteFluxQuery(c.Execute)
return c.ExecuteFluxQuery(os.Stdout, c.Execute)
default:
// Make the non-interactive mode send everything through the CLI's parser
// the same way the interactive mode works
@ -190,7 +193,7 @@ func (c *CommandLine) Run() error {
switch c.Type {
case QueryLanguageFlux:
return c.ExecuteFluxQuery(string(cmd))
return c.ExecuteFluxQuery(os.Stdout, string(cmd))
default:
return c.ExecuteQuery(string(cmd))
}
@ -211,8 +214,9 @@ func (c *CommandLine) Run() error {
c.Version()
if c.Type == QueryLanguageFlux {
// TODO(lesam): steal 2.x flux client
return fmt.Errorf("ERROR: flux repl missing due to flux upgrade")
// Until 1.8 we supported a flux REPL here. See https://github.com/influxdata/influxdb/issues/19038
// for an explanation of why this was removed.
return fmt.Errorf("Interactive flux is not supported. Provide your flux script via stdin or the -execute argument.")
}
c.Line = liner.NewLiner()
@ -1162,7 +1166,7 @@ func (c *CommandLine) exit() {
c.Line = nil
}
func (c *CommandLine) ExecuteFluxQuery(query string) error {
func (c *CommandLine) ExecuteFluxQuery(w io.Writer, query string) error {
ctx := context.Background()
if !c.IgnoreSignals {
done := make(chan struct{})
@ -1178,8 +1182,30 @@ func (c *CommandLine) ExecuteFluxQuery(query string) error {
}
}()
}
// TODO(lesam): steal 2.x flux client
return fmt.Errorf("ERROR: flux repl missing due to flux upgrade")
request := fluxClient.QueryRequest{}.WithDefaults()
request.Query = query
request.Dialect.Annotations = csv2.DefaultDialect().Annotations
results, err := c.Client.QueryFlux(ctx, &request)
if err != nil {
return err
}
defer results.Release()
for results.More() {
res := results.Next()
fmt.Fprintln(w, "Result:", res.Name())
if err := res.Tables().Do(func(tbl flux.Table) error {
_, err := newFormatter(tbl).WriteTo(w)
return err
}); err != nil {
return err
}
}
// It is safe and appropriate to call Release multiple times and must be
// called before checking the error on the next line.
results.Release()
return results.Err()
}
type QueryLanguage uint8
@ -1210,3 +1236,282 @@ func (l *QueryLanguage) String() string {
}
return fmt.Sprintf("QueryLanguage(%d)", uint8(*l))
}
// Below is a copy and trimmed version of the execute/format.go file from flux.
// It is copied here to avoid requiring a dependency on the execute package which
// may pull in the flux runtime as a dependency.
// In the future, the formatters and other primitives such as the csv parser should
// probably be separated out into user libraries anyway.
const fixedWidthTimeFmt = "2006-01-02T15:04:05.000000000Z"
// formatter writes a table to a Writer.
type formatter struct {
tbl flux.Table
widths []int
maxWidth int
newWidths []int
pad []byte
dash []byte
// fmtBuf is used to format values
fmtBuf [64]byte
cols orderedCols
}
var eol = []byte{'\n'}
// newFormatter creates a formatter for a given table.
func newFormatter(tbl flux.Table) *formatter {
return &formatter{
tbl: tbl,
}
}
type writeToHelper struct {
w io.Writer
n int64
err error
}
func (w *writeToHelper) write(data []byte) {
if w.err != nil {
return
}
n, err := w.w.Write(data)
w.n += int64(n)
w.err = err
}
var minWidthsByType = map[flux.ColType]int{
flux.TBool: 12,
flux.TInt: 26,
flux.TUInt: 27,
flux.TFloat: 28,
flux.TString: 22,
flux.TTime: len(fixedWidthTimeFmt),
flux.TInvalid: 10,
}
// WriteTo writes the formatted table data to w.
func (f *formatter) WriteTo(out io.Writer) (int64, error) {
w := &writeToHelper{w: out}
// Sort cols
cols := f.tbl.Cols()
f.cols = newOrderedCols(cols, f.tbl.Key())
sort.Sort(f.cols)
// Compute header widths
f.widths = make([]int, len(cols))
for j, c := range cols {
// Column header is "<label>:<type>"
l := len(c.Label) + len(c.Type.String()) + 1
min := minWidthsByType[c.Type]
if min > l {
l = min
}
if l > f.widths[j] {
f.widths[j] = l
}
if l > f.maxWidth {
f.maxWidth = l
}
}
// Write table header
w.write([]byte("Table: keys: ["))
labels := make([]string, len(f.tbl.Key().Cols()))
for i, c := range f.tbl.Key().Cols() {
labels[i] = c.Label
}
w.write([]byte(strings.Join(labels, ", ")))
w.write([]byte("]"))
w.write(eol)
// Check err and return early
if w.err != nil {
return w.n, w.err
}
// Write rows
r := 0
w.err = f.tbl.Do(func(cr flux.ColReader) error {
if r == 0 {
l := cr.Len()
for i := 0; i < l; i++ {
for oj, c := range f.cols.cols {
j := f.cols.Idx(oj)
buf := f.valueBuf(i, j, c.Type, cr)
l := len(buf)
if l > f.widths[j] {
f.widths[j] = l
}
if l > f.maxWidth {
f.maxWidth = l
}
}
}
f.makePaddingBuffers()
f.writeHeader(w)
f.writeHeaderSeparator(w)
f.newWidths = make([]int, len(f.widths))
copy(f.newWidths, f.widths)
}
l := cr.Len()
for i := 0; i < l; i++ {
for oj, c := range f.cols.cols {
j := f.cols.Idx(oj)
buf := f.valueBuf(i, j, c.Type, cr)
l := len(buf)
padding := f.widths[j] - l
if padding >= 0 {
w.write(f.pad[:padding])
w.write(buf)
} else {
//TODO make unicode friendly
w.write(buf[:f.widths[j]-3])
w.write([]byte{'.', '.', '.'})
}
w.write(f.pad[:2])
if l > f.newWidths[j] {
f.newWidths[j] = l
}
if l > f.maxWidth {
f.maxWidth = l
}
}
w.write(eol)
r++
}
return w.err
})
return w.n, w.err
}
func (f *formatter) makePaddingBuffers() {
if len(f.pad) != f.maxWidth {
f.pad = make([]byte, f.maxWidth)
for i := range f.pad {
f.pad[i] = ' '
}
}
if len(f.dash) != f.maxWidth {
f.dash = make([]byte, f.maxWidth)
for i := range f.dash {
f.dash[i] = '-'
}
}
}
func (f *formatter) writeHeader(w *writeToHelper) {
for oj, c := range f.cols.cols {
j := f.cols.Idx(oj)
buf := append(append([]byte(c.Label), ':'), []byte(c.Type.String())...)
w.write(f.pad[:f.widths[j]-len(buf)])
w.write(buf)
w.write(f.pad[:2])
}
w.write(eol)
}
func (f *formatter) writeHeaderSeparator(w *writeToHelper) {
for oj := range f.cols.cols {
j := f.cols.Idx(oj)
w.write(f.dash[:f.widths[j]])
w.write(f.pad[:2])
}
w.write(eol)
}
func (f *formatter) valueBuf(i, j int, typ flux.ColType, cr flux.ColReader) []byte {
buf := []byte("")
switch typ {
case flux.TBool:
if cr.Bools(j).IsValid(i) {
buf = strconv.AppendBool(f.fmtBuf[0:0], cr.Bools(j).Value(i))
}
case flux.TInt:
if cr.Ints(j).IsValid(i) {
buf = strconv.AppendInt(f.fmtBuf[0:0], cr.Ints(j).Value(i), 10)
}
case flux.TUInt:
if cr.UInts(j).IsValid(i) {
buf = strconv.AppendUint(f.fmtBuf[0:0], cr.UInts(j).Value(i), 10)
}
case flux.TFloat:
if cr.Floats(j).IsValid(i) {
// TODO allow specifying format and precision
buf = strconv.AppendFloat(f.fmtBuf[0:0], cr.Floats(j).Value(i), 'f', -1, 64)
}
case flux.TString:
if cr.Strings(j).IsValid(i) {
buf = []byte(cr.Strings(j).ValueString(i))
}
case flux.TTime:
if cr.Times(j).IsValid(i) {
buf = []byte(values.Time(cr.Times(j).Value(i)).String())
}
}
return buf
}
// orderedCols sorts a list of columns:
//
// * time
// * common tags sorted by label
// * other tags sorted by label
// * value
//
type orderedCols struct {
indexMap []int
cols []flux.ColMeta
key flux.GroupKey
}
func newOrderedCols(cols []flux.ColMeta, key flux.GroupKey) orderedCols {
indexMap := make([]int, len(cols))
for i := range indexMap {
indexMap[i] = i
}
cpy := make([]flux.ColMeta, len(cols))
copy(cpy, cols)
return orderedCols{
indexMap: indexMap,
cols: cpy,
key: key,
}
}
func (o orderedCols) Idx(oj int) int {
return o.indexMap[oj]
}
func (o orderedCols) Len() int { return len(o.cols) }
func (o orderedCols) Swap(i int, j int) {
o.cols[i], o.cols[j] = o.cols[j], o.cols[i]
o.indexMap[i], o.indexMap[j] = o.indexMap[j], o.indexMap[i]
}
func (o orderedCols) Less(i int, j int) bool {
ki := colIdx(o.cols[i].Label, o.key.Cols())
kj := colIdx(o.cols[j].Label, o.key.Cols())
if ki >= 0 && kj >= 0 {
return ki < kj
} else if ki >= 0 {
return true
} else if kj >= 0 {
return false
}
return i < j
}
func colIdx(label string, cols []flux.ColMeta) int {
for j, c := range cols {
if c.Label == label {
return j
}
}
return -1
}

View File

@ -72,11 +72,6 @@ func match(ah accept, sct supportedContentType) bool {
(ah.SubType == "*" || ah.SubType == sct.acceptSubType)
}
// WriteError is a convenience function for writing an error response to the ResponseWriter.
func WriteError(w ResponseWriter, err error) (int, error) {
return w.WriteResponse(Response{Err: err})
}
// responseWriter is an implementation of ResponseWriter.
type responseWriter struct {
formatter interface {

View File

@ -27,6 +27,7 @@ import (
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/parser"
"github.com/influxdata/flux/stdlib"
"github.com/influxdata/influxdb/cmd/influx/cli"
"github.com/influxdata/influxdb/coordinator"
fluxClient "github.com/influxdata/influxdb/flux/client"
"github.com/influxdata/influxdb/models"
@ -9857,6 +9858,111 @@ func TestServer_Prometheus_Write(t *testing.T) {
}
}
func TestCliEndToEnd(t *testing.T) {
config := NewConfig()
config.HTTPD.FluxEnabled = true
s := OpenServer(config)
defer s.Close()
// Some test data
s.CreateDatabase(t.Name())
defer s.DropDatabase(t.Name())
points, err := models.ParsePointsString(`
m0,k=k0 f=0i 0
m0,k=k0 f=1i 1
m0,k=k0 f=2i 2
m0,k=k0 f=3i 3
m0,k=k0 f=4i 4
m0,k=k1 f=5i 5
m0,k=k1 f=6i 6
m1,k=k0 f=5i 7
m1,k=k2 f=0i 8
m1,k=k0 f=6i 9
m1,k=k1 f=6i 10
m1,k=k0 f=7i 11
m1,k=k0 f=5i 12
m1,k=k1 f=8i 13
m1,k=k2 f=9i 14
m1,k=k3 f=5i 15`)
assert.NoError(t, err)
assert.NoError(t, s.WritePoints(t.Name(), "autogen", models.ConsistencyLevelAny, nil, points))
u, err := url.Parse(s.URL())
assert.NoError(t, err)
cmd := "connect " + u.Host
c := cli.CommandLine{}
assert.NoError(t, c.ParseCommand(cmd))
// Test a successful flux query
buf := &bytes.Buffer{}
assert.NoError(t, c.ExecuteFluxQuery(buf, `import "influxdata/influxdb/v1" v1.databases()`))
assert.Equal(t,
`Result: _result
Table: keys: [organizationID]
organizationID:string databaseName:string retentionPolicy:string retentionPeriod:int default:bool bucketId:string
---------------------- ---------------------- ---------------------- -------------------------- ------------ ----------------------
TestCliEndToEnd autogen 0 true
`,
buf.String())
// Test a bigger successful flux query
buf.Reset()
assert.NoError(t, c.ExecuteFluxQuery(buf, `import "influxdata/influxdb/v1"
v1.databases() |> yield(name: "show_the_dbs")
from(bucket:"TestCliEndToEnd") |> range(start: 0) |> drop(columns:["_start","_stop"]) |> yield(name: "show_the_data")`))
dbTable := `Result: show_the_dbs
Table: keys: [organizationID]
organizationID:string databaseName:string retentionPolicy:string retentionPeriod:int default:bool bucketId:string
---------------------- ---------------------- ---------------------- -------------------------- ------------ ----------------------
TestCliEndToEnd autogen 0 true
`
dataTable := `Result: show_the_data
Table: keys: [_field, _measurement, k]
_field:string _measurement:string k:string _time:time _value:int
---------------------- ---------------------- ---------------------- ------------------------------ --------------------------
f m0 k0 1970-01-01T00:00:00.000000000Z 0
f m0 k0 1970-01-01T00:00:00.000000001Z 1
f m0 k0 1970-01-01T00:00:00.000000002Z 2
f m0 k0 1970-01-01T00:00:00.000000003Z 3
f m0 k0 1970-01-01T00:00:00.000000004Z 4
Table: keys: [_field, _measurement, k]
_field:string _measurement:string k:string _time:time _value:int
---------------------- ---------------------- ---------------------- ------------------------------ --------------------------
f m0 k1 1970-01-01T00:00:00.000000005Z 5
f m0 k1 1970-01-01T00:00:00.000000006Z 6
Table: keys: [_field, _measurement, k]
_field:string _measurement:string k:string _time:time _value:int
---------------------- ---------------------- ---------------------- ------------------------------ --------------------------
f m1 k0 1970-01-01T00:00:00.000000007Z 5
f m1 k0 1970-01-01T00:00:00.000000009Z 6
f m1 k0 1970-01-01T00:00:00.000000011Z 7
f m1 k0 1970-01-01T00:00:00.000000012Z 5
Table: keys: [_field, _measurement, k]
_field:string _measurement:string k:string _time:time _value:int
---------------------- ---------------------- ---------------------- ------------------------------ --------------------------
f m1 k1 1970-01-01T00:00:00.000000010Z 6
f m1 k1 1970-01-01T00:00:00.000000013Z 8
Table: keys: [_field, _measurement, k]
_field:string _measurement:string k:string _time:time _value:int
---------------------- ---------------------- ---------------------- ------------------------------ --------------------------
f m1 k2 1970-01-01T00:00:00.000000008Z 0
f m1 k2 1970-01-01T00:00:00.000000014Z 9
Table: keys: [_field, _measurement, k]
_field:string _measurement:string k:string _time:time _value:int
---------------------- ---------------------- ---------------------- ------------------------------ --------------------------
f m1 k3 1970-01-01T00:00:00.000000015Z 5
`
// No guaranteed order for results to come back in
assert.Contains(t, []string{dbTable + dataTable, dataTable + dbTable}, buf.String())
// Test a broken flux query - we get back both the http status code and the underlying flux error
buf.Reset()
assert.EqualError(t, c.ExecuteFluxQuery(buf, `v1.databases()`), `Error (500): error @1:1-1:3: undefined identifier v1`)
assert.Equal(t, "", buf.String())
}
func TestFluxBasicEndToEnd(t *testing.T) {
config := NewConfig()
config.HTTPD.FluxEnabled = true