feat(cmd/influx): modify the query cli to use the http api (#19076)

This modifies the query cli to remove the flux runtime dependency and
uses the public http api to execute the query. It then uses flux's csv
decoder to read the raw response.

This removes any dependency on the flux runtime and substantially
reduces the binary size and the startup execution time.
pull/19164/head
Jonathan A. Sternberg 2020-07-30 12:44:48 -05:00 committed by GitHub
parent 4a81b5dd79
commit 8dd8d1f79e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 343 additions and 39 deletions

View File

@ -1,22 +1,22 @@
package main
import (
"context"
"crypto/tls"
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
"github.com/influxdata/flux"
"github.com/influxdata/flux/dependencies/filesystem"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/repl"
"github.com/influxdata/flux/runtime"
_ "github.com/influxdata/flux/stdlib"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
_ "github.com/influxdata/influxdb/v2/query/stdlib"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/values"
ihttp "github.com/influxdata/influxdb/v2/http"
"github.com/spf13/cobra"
)
@ -81,42 +81,346 @@ func fluxQueryF(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to load query: %v", err)
}
plan.RegisterLogicalRules(
influxdb.DefaultFromAttributes{
Org: &influxdb.NameOrID{
ID: queryFlags.org.id,
Name: queryFlags.org.name,
},
Host: &flags.Host,
Token: &flags.Token,
},
)
runtime.FinalizeBuiltIns()
r, err := getFluxREPL(flags.skipVerify)
u, err := url.Parse(flags.Host)
if err != nil {
return fmt.Errorf("failed to get the flux REPL: %v", err)
return fmt.Errorf("unable to parse host: %s", err)
}
if err := r.Input(q); err != nil {
return fmt.Errorf("failed to execute query: %v", err)
if !strings.HasSuffix(u.Path, "/") {
u.Path += "/"
}
u.Path += "api/v2/query"
params := url.Values{}
if queryFlags.org.id != "" {
params.Set("orgID", queryFlags.org.id)
} else {
params.Set("org", queryFlags.org.name)
}
u.RawQuery = params.Encode()
body, _ := json.Marshal(map[string]interface{}{
"query": q,
"type": "flux",
"dialect": map[string]interface{}{
"annotations": []string{"datatype", "group", "default"},
"delimiter": ",",
"header": true,
},
})
req, _ := http.NewRequest("POST", u.String(), bytes.NewReader(body))
req.Header.Set("Authorization", "Token "+flags.Token)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept-Encoding", "gzip")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if err := ihttp.CheckError(resp); err != nil {
return err
}
return nil
}
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
results, err := dec.Decode(resp.Body)
if err != nil {
return fmt.Errorf("query decode error: %s", err)
}
defer results.Release()
func getFluxREPL(skipVerify bool) (*repl.REPL, error) {
deps := flux.NewDefaultDependencies()
deps.Deps.FilesystemService = filesystem.SystemFS
if skipVerify {
deps.Deps.HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
for results.More() {
res := results.Next()
fmt.Println("Result:", res.Name())
if err := res.Tables().Do(func(tbl flux.Table) error {
_, err := newFormatter(tbl).WriteTo(os.Stdout)
return err
}); err != nil {
return err
}
}
ctx := deps.Inject(context.Background())
return repl.New(ctx, deps), nil
results.Release()
return results.Err()
}
// Below is a copy and trimmed version of the execute/format.go file from flux.
// It is copied here to avoid requiring a dependency on the execute package which
// may pull in the flux runtime as a dependency.
// In the future, the formatters and other primitives such as the csv parser should
// probably be separated out into user libraries anyway.
const fixedWidthTimeFmt = "2006-01-02T15:04:05.000000000Z"
// formatter writes a table to a Writer.
type formatter struct {
tbl flux.Table
widths []int
maxWidth int
newWidths []int
pad []byte
dash []byte
// fmtBuf is used to format values
fmtBuf [64]byte
cols orderedCols
}
var eol = []byte{'\n'}
// newFormatter creates a formatter for a given table.
func newFormatter(tbl flux.Table) *formatter {
return &formatter{
tbl: tbl,
}
}
type writeToHelper struct {
w io.Writer
n int64
err error
}
func (w *writeToHelper) write(data []byte) {
if w.err != nil {
return
}
n, err := w.w.Write(data)
w.n += int64(n)
w.err = err
}
var minWidthsByType = map[flux.ColType]int{
flux.TBool: 12,
flux.TInt: 26,
flux.TUInt: 27,
flux.TFloat: 28,
flux.TString: 22,
flux.TTime: len(fixedWidthTimeFmt),
flux.TInvalid: 10,
}
// WriteTo writes the formatted table data to w.
func (f *formatter) WriteTo(out io.Writer) (int64, error) {
w := &writeToHelper{w: out}
// Sort cols
cols := f.tbl.Cols()
f.cols = newOrderedCols(cols, f.tbl.Key())
sort.Sort(f.cols)
// Compute header widths
f.widths = make([]int, len(cols))
for j, c := range cols {
// Column header is "<label>:<type>"
l := len(c.Label) + len(c.Type.String()) + 1
min := minWidthsByType[c.Type]
if min > l {
l = min
}
if l > f.widths[j] {
f.widths[j] = l
}
if l > f.maxWidth {
f.maxWidth = l
}
}
// Write table header
w.write([]byte("Table: keys: ["))
labels := make([]string, len(f.tbl.Key().Cols()))
for i, c := range f.tbl.Key().Cols() {
labels[i] = c.Label
}
w.write([]byte(strings.Join(labels, ", ")))
w.write([]byte("]"))
w.write(eol)
// Check err and return early
if w.err != nil {
return w.n, w.err
}
// Write rows
r := 0
w.err = f.tbl.Do(func(cr flux.ColReader) error {
if r == 0 {
l := cr.Len()
for i := 0; i < l; i++ {
for oj, c := range f.cols.cols {
j := f.cols.Idx(oj)
buf := f.valueBuf(i, j, c.Type, cr)
l := len(buf)
if l > f.widths[j] {
f.widths[j] = l
}
if l > f.maxWidth {
f.maxWidth = l
}
}
}
f.makePaddingBuffers()
f.writeHeader(w)
f.writeHeaderSeparator(w)
f.newWidths = make([]int, len(f.widths))
copy(f.newWidths, f.widths)
}
l := cr.Len()
for i := 0; i < l; i++ {
for oj, c := range f.cols.cols {
j := f.cols.Idx(oj)
buf := f.valueBuf(i, j, c.Type, cr)
l := len(buf)
padding := f.widths[j] - l
if padding >= 0 {
w.write(f.pad[:padding])
w.write(buf)
} else {
//TODO make unicode friendly
w.write(buf[:f.widths[j]-3])
w.write([]byte{'.', '.', '.'})
}
w.write(f.pad[:2])
if l > f.newWidths[j] {
f.newWidths[j] = l
}
if l > f.maxWidth {
f.maxWidth = l
}
}
w.write(eol)
r++
}
return w.err
})
return w.n, w.err
}
func (f *formatter) makePaddingBuffers() {
if len(f.pad) != f.maxWidth {
f.pad = make([]byte, f.maxWidth)
for i := range f.pad {
f.pad[i] = ' '
}
}
if len(f.dash) != f.maxWidth {
f.dash = make([]byte, f.maxWidth)
for i := range f.dash {
f.dash[i] = '-'
}
}
}
func (f *formatter) writeHeader(w *writeToHelper) {
for oj, c := range f.cols.cols {
j := f.cols.Idx(oj)
buf := append(append([]byte(c.Label), ':'), []byte(c.Type.String())...)
w.write(f.pad[:f.widths[j]-len(buf)])
w.write(buf)
w.write(f.pad[:2])
}
w.write(eol)
}
func (f *formatter) writeHeaderSeparator(w *writeToHelper) {
for oj := range f.cols.cols {
j := f.cols.Idx(oj)
w.write(f.dash[:f.widths[j]])
w.write(f.pad[:2])
}
w.write(eol)
}
func (f *formatter) valueBuf(i, j int, typ flux.ColType, cr flux.ColReader) []byte {
buf := []byte("")
switch typ {
case flux.TBool:
if cr.Bools(j).IsValid(i) {
buf = strconv.AppendBool(f.fmtBuf[0:0], cr.Bools(j).Value(i))
}
case flux.TInt:
if cr.Ints(j).IsValid(i) {
buf = strconv.AppendInt(f.fmtBuf[0:0], cr.Ints(j).Value(i), 10)
}
case flux.TUInt:
if cr.UInts(j).IsValid(i) {
buf = strconv.AppendUint(f.fmtBuf[0:0], cr.UInts(j).Value(i), 10)
}
case flux.TFloat:
if cr.Floats(j).IsValid(i) {
// TODO allow specifying format and precision
buf = strconv.AppendFloat(f.fmtBuf[0:0], cr.Floats(j).Value(i), 'f', -1, 64)
}
case flux.TString:
if cr.Strings(j).IsValid(i) {
buf = []byte(cr.Strings(j).ValueString(i))
}
case flux.TTime:
if cr.Times(j).IsValid(i) {
buf = []byte(values.Time(cr.Times(j).Value(i)).String())
}
}
return buf
}
// orderedCols sorts a list of columns:
//
// * time
// * common tags sorted by label
// * other tags sorted by label
// * value
//
type orderedCols struct {
indexMap []int
cols []flux.ColMeta
key flux.GroupKey
}
func newOrderedCols(cols []flux.ColMeta, key flux.GroupKey) orderedCols {
indexMap := make([]int, len(cols))
for i := range indexMap {
indexMap[i] = i
}
cpy := make([]flux.ColMeta, len(cols))
copy(cpy, cols)
return orderedCols{
indexMap: indexMap,
cols: cpy,
key: key,
}
}
func (o orderedCols) Idx(oj int) int {
return o.indexMap[oj]
}
func (o orderedCols) Len() int { return len(o.cols) }
func (o orderedCols) Swap(i int, j int) {
o.cols[i], o.cols[j] = o.cols[j], o.cols[i]
o.indexMap[i], o.indexMap[j] = o.indexMap[j], o.indexMap[i]
}
func (o orderedCols) Less(i int, j int) bool {
ki := colIdx(o.cols[i].Label, o.key.Cols())
kj := colIdx(o.cols[j].Label, o.key.Cols())
if ki >= 0 && kj >= 0 {
return ki < kj
} else if ki >= 0 {
return true
} else if kj >= 0 {
return false
}
return i < j
}
func colIdx(label string, cols []flux.ColMeta) int {
for j, c := range cols {
if c.Label == label {
return j
}
}
return -1
}