From 8dd8d1f79ed2b4dc7a9c7aa4da8a2311f93d2c87 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 30 Jul 2020 12:44:48 -0500 Subject: [PATCH] feat(cmd/influx): modify the query cli to use the http api (#19076) This modifies the query cli to remove the flux runtime dependency and uses the public http api to execute the query. It then uses flux's csv decoder to read the raw response. This removes any dependency on the flux runtime and substantially reduces the binary size and the startup execution time. --- cmd/influx/query.go | 382 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 343 insertions(+), 39 deletions(-) diff --git a/cmd/influx/query.go b/cmd/influx/query.go index eddcc7d693..701c3ce7be 100644 --- a/cmd/influx/query.go +++ b/cmd/influx/query.go @@ -1,22 +1,22 @@ package main import ( - "context" - "crypto/tls" + "bytes" + "encoding/json" "fmt" + "io" "io/ioutil" "net/http" + "net/url" "os" + "sort" + "strconv" "strings" "github.com/influxdata/flux" - "github.com/influxdata/flux/dependencies/filesystem" - "github.com/influxdata/flux/plan" - "github.com/influxdata/flux/repl" - "github.com/influxdata/flux/runtime" - _ "github.com/influxdata/flux/stdlib" - "github.com/influxdata/flux/stdlib/influxdata/influxdb" - _ "github.com/influxdata/influxdb/v2/query/stdlib" + "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/values" + ihttp "github.com/influxdata/influxdb/v2/http" "github.com/spf13/cobra" ) @@ -81,42 +81,346 @@ func fluxQueryF(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to load query: %v", err) } - plan.RegisterLogicalRules( - influxdb.DefaultFromAttributes{ - Org: &influxdb.NameOrID{ - ID: queryFlags.org.id, - Name: queryFlags.org.name, - }, - Host: &flags.Host, - Token: &flags.Token, - }, - ) - runtime.FinalizeBuiltIns() - - r, err := getFluxREPL(flags.skipVerify) + u, err := url.Parse(flags.Host) if err != nil { - return fmt.Errorf("failed to get the flux REPL: %v", err) + return fmt.Errorf("unable to parse host: %s", err) } - if err := r.Input(q); err != nil { - return fmt.Errorf("failed to execute query: %v", err) + if !strings.HasSuffix(u.Path, "/") { + u.Path += "/" + } + u.Path += "api/v2/query" + + params := url.Values{} + if queryFlags.org.id != "" { + params.Set("orgID", queryFlags.org.id) + } else { + params.Set("org", queryFlags.org.name) + } + u.RawQuery = params.Encode() + + body, _ := json.Marshal(map[string]interface{}{ + "query": q, + "type": "flux", + "dialect": map[string]interface{}{ + "annotations": []string{"datatype", "group", "default"}, + "delimiter": ",", + "header": true, + }, + }) + + req, _ := http.NewRequest("POST", u.String(), bytes.NewReader(body)) + req.Header.Set("Authorization", "Token "+flags.Token) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept-Encoding", "gzip") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + if err := ihttp.CheckError(resp); err != nil { + return err } - return nil -} + dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{}) + results, err := dec.Decode(resp.Body) + if err != nil { + return fmt.Errorf("query decode error: %s", err) + } + defer results.Release() -func getFluxREPL(skipVerify bool) (*repl.REPL, error) { - deps := flux.NewDefaultDependencies() - deps.Deps.FilesystemService = filesystem.SystemFS - if skipVerify { - deps.Deps.HTTPClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, + for results.More() { + res := results.Next() + fmt.Println("Result:", res.Name()) + + if err := res.Tables().Do(func(tbl flux.Table) error { + _, err := newFormatter(tbl).WriteTo(os.Stdout) + return err + }); err != nil { + return err } } - ctx := deps.Inject(context.Background()) - return repl.New(ctx, deps), nil + results.Release() + return results.Err() +} + +// Below is a copy and trimmed version of the execute/format.go file from flux. +// It is copied here to avoid requiring a dependency on the execute package which +// may pull in the flux runtime as a dependency. +// In the future, the formatters and other primitives such as the csv parser should +// probably be separated out into user libraries anyway. + +const fixedWidthTimeFmt = "2006-01-02T15:04:05.000000000Z" + +// formatter writes a table to a Writer. +type formatter struct { + tbl flux.Table + widths []int + maxWidth int + newWidths []int + pad []byte + dash []byte + // fmtBuf is used to format values + fmtBuf [64]byte + + cols orderedCols +} + +var eol = []byte{'\n'} + +// newFormatter creates a formatter for a given table. +func newFormatter(tbl flux.Table) *formatter { + return &formatter{ + tbl: tbl, + } +} + +type writeToHelper struct { + w io.Writer + n int64 + err error +} + +func (w *writeToHelper) write(data []byte) { + if w.err != nil { + return + } + n, err := w.w.Write(data) + w.n += int64(n) + w.err = err +} + +var minWidthsByType = map[flux.ColType]int{ + flux.TBool: 12, + flux.TInt: 26, + flux.TUInt: 27, + flux.TFloat: 28, + flux.TString: 22, + flux.TTime: len(fixedWidthTimeFmt), + flux.TInvalid: 10, +} + +// WriteTo writes the formatted table data to w. +func (f *formatter) WriteTo(out io.Writer) (int64, error) { + w := &writeToHelper{w: out} + + // Sort cols + cols := f.tbl.Cols() + f.cols = newOrderedCols(cols, f.tbl.Key()) + sort.Sort(f.cols) + + // Compute header widths + f.widths = make([]int, len(cols)) + for j, c := range cols { + // Column header is "