add the ability for retention policy context in cli with use command
parent
f55995f9be
commit
0cbdea531a
|
@ -17,6 +17,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
|
|||
- [#7356](https://github.com/influxdata/influxdb/issues/7356): Use X-Forwarded-For IP address in HTTP logger if present.
|
||||
- [#7066](https://github.com/influxdata/influxdb/issues/7066): Add support for secure transmission via collectd.
|
||||
- [#7036](https://github.com/influxdata/influxdb/issues/7036): Switch logging to use structured logging everywhere.
|
||||
- [#3188](https://github.com/influxdata/influxdb/issues/3188): [CLI feature request] USE retention policy for queries.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/client"
|
||||
"github.com/influxdata/influxdb/importer/v8"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/peterh/liner"
|
||||
)
|
||||
|
@ -348,17 +349,40 @@ func (c *CommandLine) use(cmd string) {
|
|||
fmt.Printf("Could not parse database name from %q.\n", cmd)
|
||||
return
|
||||
}
|
||||
d := args[1]
|
||||
|
||||
stmt := args[1]
|
||||
db, rp, err := parseDatabaseAndRetentionPolicy([]byte(stmt))
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to parse database or retention policy from %s", stmt)
|
||||
return
|
||||
}
|
||||
|
||||
if !c.databaseExists(db) {
|
||||
return
|
||||
}
|
||||
|
||||
c.Database = db
|
||||
fmt.Printf("Using database %s\n", db)
|
||||
|
||||
if rp != "" {
|
||||
if !c.retentionPolicyExists(db, rp) {
|
||||
return
|
||||
}
|
||||
c.RetentionPolicy = rp
|
||||
fmt.Printf("Using retention policy %s\n", rp)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CommandLine) databaseExists(db string) bool {
|
||||
// Validate if specified database exists
|
||||
response, err := c.Client.Query(client.Query{Command: "SHOW DATABASES"})
|
||||
if err != nil {
|
||||
fmt.Printf("ERR: %s\n", err)
|
||||
return
|
||||
return false
|
||||
} else if err := response.Error(); err != nil {
|
||||
if c.ClientConfig.Username == "" {
|
||||
fmt.Printf("ERR: %s\n", err)
|
||||
return
|
||||
return false
|
||||
}
|
||||
// TODO(jsternberg): Fix SHOW DATABASES to be user-aware #6397.
|
||||
// If we are unable to run SHOW DATABASES, display a warning and use the
|
||||
|
@ -373,7 +397,7 @@ func (c *CommandLine) use(cmd string) {
|
|||
if row.Name == "databases" {
|
||||
for _, values := range row.Values {
|
||||
for _, database := range values {
|
||||
if database == d {
|
||||
if database == db {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -383,13 +407,49 @@ func (c *CommandLine) use(cmd string) {
|
|||
}
|
||||
return false
|
||||
}(); !databaseExists {
|
||||
fmt.Printf("ERR: Database %s doesn't exist. Run SHOW DATABASES for a list of existing databases.\n", d)
|
||||
return
|
||||
fmt.Printf("ERR: Database %s doesn't exist. Run SHOW DATABASES for a list of existing databases.\n", db)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
c.Database = d
|
||||
fmt.Printf("Using database %s\n", d)
|
||||
func (c *CommandLine) retentionPolicyExists(db, rp string) bool {
|
||||
// Validate if specified database exists
|
||||
response, err := c.Client.Query(client.Query{Command: fmt.Sprintf("SHOW RETENTION POLICIES ON %q", db)})
|
||||
if err != nil {
|
||||
fmt.Printf("ERR: %s\n", err)
|
||||
return false
|
||||
} else if err := response.Error(); err != nil {
|
||||
if c.ClientConfig.Username == "" {
|
||||
fmt.Printf("ERR: %s\n", err)
|
||||
return false
|
||||
}
|
||||
fmt.Printf("WARN: %s\n", err)
|
||||
} else {
|
||||
// Verify the provided database exists
|
||||
if retentionPolicyExists := func() bool {
|
||||
for _, result := range response.Results {
|
||||
for _, row := range result.Series {
|
||||
for _, values := range row.Values {
|
||||
for i, v := range values {
|
||||
if i != 0 {
|
||||
continue
|
||||
}
|
||||
if v == rp {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}(); !retentionPolicyExists {
|
||||
fmt.Printf("ERR: RETENTION POLICY %s doesn't exist. Run SHOW RETENTION POLICIES ON %q for a list of existing retention polices.\n", rp, db)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// SetPrecision sets client precision
|
||||
|
@ -559,17 +619,40 @@ func (c *CommandLine) Insert(stmt string) error {
|
|||
}
|
||||
|
||||
// query creates a query struct to be used with the client.
|
||||
func (c *CommandLine) query(query string, database string) client.Query {
|
||||
func (c *CommandLine) query(query string) client.Query {
|
||||
return client.Query{
|
||||
Command: query,
|
||||
Database: database,
|
||||
Database: c.Database,
|
||||
Chunked: true,
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteQuery runs any query statement
|
||||
func (c *CommandLine) ExecuteQuery(query string) error {
|
||||
response, err := c.Client.Query(c.query(query, c.Database))
|
||||
// If we have a retention policy, we need to rewrite the statement sources
|
||||
if c.RetentionPolicy != "" {
|
||||
pq, err := influxql.NewParser(strings.NewReader(query)).ParseQuery()
|
||||
if err != nil {
|
||||
fmt.Printf("ERR: %s\n", err)
|
||||
return err
|
||||
}
|
||||
for _, stmt := range pq.Statements {
|
||||
if selectStatement, ok := stmt.(*influxql.SelectStatement); ok {
|
||||
influxql.WalkFunc(selectStatement.Sources, func(n influxql.Node) {
|
||||
if t, ok := n.(*influxql.Measurement); ok {
|
||||
if t.Database == "" && c.Database != "" {
|
||||
t.Database = c.Database
|
||||
}
|
||||
if t.RetentionPolicy == "" && c.RetentionPolicy != "" {
|
||||
t.RetentionPolicy = c.RetentionPolicy
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
query = pq.String()
|
||||
}
|
||||
response, err := c.Client.Query(c.query(query))
|
||||
if err != nil {
|
||||
fmt.Printf("ERR: %s\n", err)
|
||||
return err
|
||||
|
@ -588,7 +671,7 @@ func (c *CommandLine) ExecuteQuery(query string) error {
|
|||
|
||||
// DatabaseToken retrieves database token
|
||||
func (c *CommandLine) DatabaseToken() (string, error) {
|
||||
response, err := c.Client.Query(c.query("SHOW DIAGNOSTICS for 'registration'", ""))
|
||||
response, err := c.Client.Query(c.query("SHOW DIAGNOSTICS for 'registration'"))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -764,7 +847,9 @@ func interfaceToString(v interface{}) string {
|
|||
// Settings prints current settings
|
||||
func (c *CommandLine) Settings() {
|
||||
w := new(tabwriter.Writer)
|
||||
w.Init(os.Stdout, 0, 1, 1, '\t', 0)
|
||||
w.Init(os.Stdout, 0, 1, 1, ' ', 0)
|
||||
fmt.Fprintln(w, "Setting\tValue")
|
||||
fmt.Fprintln(w, "--------\t--------")
|
||||
if c.Port > 0 {
|
||||
fmt.Fprintf(w, "Host\t%s:%d\n", c.Host, c.Port)
|
||||
} else {
|
||||
|
@ -772,6 +857,7 @@ func (c *CommandLine) Settings() {
|
|||
}
|
||||
fmt.Fprintf(w, "Username\t%s\n", c.ClientConfig.Username)
|
||||
fmt.Fprintf(w, "Database\t%s\n", c.Database)
|
||||
fmt.Fprintf(w, "RetentionPolicy\t%s\n", c.RetentionPolicy)
|
||||
fmt.Fprintf(w, "Pretty\t%v\n", c.Pretty)
|
||||
fmt.Fprintf(w, "Format\t%s\n", c.Format)
|
||||
fmt.Fprintf(w, "Write Consistency\t%s\n", c.ClientConfig.WriteConsistency)
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package cli
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func parseDatabaseAndRetentionPolicy(stmt []byte) (string, string, error) {
|
||||
var db, rp []byte
|
||||
var quoted bool
|
||||
var seperatorCount int
|
||||
|
||||
stmt = bytes.TrimSpace(stmt)
|
||||
|
||||
for _, b := range stmt {
|
||||
if b == '"' {
|
||||
quoted = !quoted
|
||||
continue
|
||||
}
|
||||
if b == '.' && !quoted {
|
||||
seperatorCount++
|
||||
if seperatorCount > 1 {
|
||||
return "", "", fmt.Errorf("unable to parse database and retention policy from %s", string(stmt))
|
||||
}
|
||||
continue
|
||||
}
|
||||
if seperatorCount == 1 {
|
||||
rp = append(rp, b)
|
||||
continue
|
||||
}
|
||||
db = append(db, b)
|
||||
}
|
||||
return string(db), string(rp), nil
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
package cli
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_parseDatabaseAndretentionPolicy(t *testing.T) {
|
||||
tests := []struct {
|
||||
stmt string
|
||||
db string
|
||||
rp string
|
||||
err error
|
||||
}{
|
||||
{
|
||||
stmt: `foo`,
|
||||
db: "foo",
|
||||
},
|
||||
{
|
||||
stmt: `"foo.bar"`,
|
||||
db: "foo.bar",
|
||||
},
|
||||
{
|
||||
stmt: `"foo.bar".`,
|
||||
db: "foo.bar",
|
||||
},
|
||||
{
|
||||
stmt: `."foo.bar"`,
|
||||
rp: "foo.bar",
|
||||
},
|
||||
{
|
||||
stmt: `foo.bar`,
|
||||
db: "foo",
|
||||
rp: "bar",
|
||||
},
|
||||
{
|
||||
stmt: `"foo".bar`,
|
||||
db: "foo",
|
||||
rp: "bar",
|
||||
},
|
||||
{
|
||||
stmt: `"foo"."bar"`,
|
||||
db: "foo",
|
||||
rp: "bar",
|
||||
},
|
||||
{
|
||||
stmt: `"foo.bin"."bar"`,
|
||||
db: "foo.bin",
|
||||
rp: "bar",
|
||||
},
|
||||
{
|
||||
stmt: `"foo.bin"."bar.baz...."`,
|
||||
db: "foo.bin",
|
||||
rp: "bar.baz....",
|
||||
},
|
||||
{
|
||||
stmt: ` "foo.bin"."bar.baz...." `,
|
||||
db: "foo.bin",
|
||||
rp: "bar.baz....",
|
||||
},
|
||||
|
||||
{
|
||||
stmt: `"foo.bin"."bar".boom`,
|
||||
err: errors.New("foo"),
|
||||
},
|
||||
{
|
||||
stmt: "foo.bar.",
|
||||
err: errors.New("foo"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
db, rp, err := parseDatabaseAndRetentionPolicy([]byte(test.stmt))
|
||||
if err != nil && test.err == nil {
|
||||
t.Errorf("unexpected error: got %s", err)
|
||||
continue
|
||||
}
|
||||
if test.err != nil && err == nil {
|
||||
t.Errorf("expected err: got: nil, exp: %s", test.err)
|
||||
continue
|
||||
}
|
||||
if db != test.db {
|
||||
t.Errorf("unexpected database: got: %s, exp: %s", db, test.db)
|
||||
}
|
||||
if rp != test.rp {
|
||||
t.Errorf("unexpected retention policy: got: %s, exp: %s", rp, test.rp)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue