Merged pull request #379 from influxdata/nc-bucket-id
feat(query): Add support for using bucketID in frompull/10616/head
commit
01ad9e99a7
|
@ -16,9 +16,10 @@ import (
|
|||
const FromKind = "from"
|
||||
|
||||
type FromOpSpec struct {
|
||||
Database string `json:"db"`
|
||||
Bucket string `json:"bucket"`
|
||||
Hosts []string `json:"hosts"`
|
||||
Database string `json:"db,omitempty"`
|
||||
Bucket string `json:"bucket,omitempty"`
|
||||
BucketID platform.ID `json:"bucket_id,omitempty"`
|
||||
Hosts []string `json:"hosts"`
|
||||
}
|
||||
|
||||
var fromSignature = semantic.FunctionSignature{
|
||||
|
@ -50,10 +51,19 @@ func createFromOpSpec(args query.Arguments, a *query.Administration) (query.Oper
|
|||
spec.Bucket = bucket
|
||||
}
|
||||
|
||||
if spec.Database == "" && spec.Bucket == "" {
|
||||
if bucketID, ok, err := args.GetString("bucketID"); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
err := spec.BucketID.DecodeFromString(bucketID)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "invalid bucket ID")
|
||||
}
|
||||
}
|
||||
|
||||
if spec.Database == "" && spec.Bucket == "" && len(spec.BucketID) == 0 {
|
||||
return nil, errors.New("must specify one of db or bucket")
|
||||
}
|
||||
if spec.Database != "" && spec.Bucket != "" {
|
||||
if spec.Database != "" && spec.Bucket != "" && len(spec.BucketID) == 0 {
|
||||
return nil, errors.New("must specify only one of db or bucket")
|
||||
}
|
||||
|
||||
|
@ -80,6 +90,7 @@ func (s *FromOpSpec) Kind() query.OperationKind {
|
|||
type FromProcedureSpec struct {
|
||||
Database string
|
||||
Bucket string
|
||||
BucketID platform.ID
|
||||
Hosts []string
|
||||
|
||||
BoundsSet bool
|
||||
|
@ -132,6 +143,10 @@ func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
|
|||
|
||||
ns.Database = s.Database
|
||||
ns.Bucket = s.Bucket
|
||||
if len(s.BucketID) > 0 {
|
||||
ns.BucketID = make(platform.ID, len(s.BucketID))
|
||||
copy(ns.BucketID, s.BucketID)
|
||||
}
|
||||
|
||||
if len(s.Hosts) > 0 {
|
||||
ns.Hosts = make([]string, len(s.Hosts))
|
||||
|
@ -190,14 +205,19 @@ func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execu
|
|||
orgID := a.OrganizationID()
|
||||
|
||||
var bucketID platform.ID
|
||||
if spec.Database == "" {
|
||||
// Determine bucketID
|
||||
switch {
|
||||
case spec.Database != "":
|
||||
// The bucket ID will be treated as the database name
|
||||
bucketID = platform.ID(spec.Database)
|
||||
case len(spec.BucketID) != 0:
|
||||
bucketID = spec.BucketID
|
||||
case spec.Bucket != "":
|
||||
b, ok := deps.BucketLookup.Lookup(orgID, spec.Bucket)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("could not find bucket %q", spec.Bucket)
|
||||
}
|
||||
bucketID = b
|
||||
} else {
|
||||
bucketID = platform.ID(spec.Database)
|
||||
}
|
||||
|
||||
return storage.NewSource(
|
||||
|
|
|
@ -4,9 +4,10 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/querytest"
|
||||
)
|
||||
|
||||
|
@ -32,6 +33,25 @@ func TestFrom_NewQuery(t *testing.T) {
|
|||
Raw: `from(db:"telegraf", chicken:"what is this?")`,
|
||||
WantErr: true,
|
||||
},
|
||||
{
|
||||
Name: "from bucket invalid ID",
|
||||
Raw: `from(bucketID:"invalid")`,
|
||||
WantErr: true,
|
||||
},
|
||||
{
|
||||
Name: "from bucket ID",
|
||||
Raw: `from(bucketID:"aaaaaaaa")`,
|
||||
Want: &query.Spec{
|
||||
Operations: []*query.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
BucketID: platform.ID{170, 170, 170, 170},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "from with database",
|
||||
Raw: `from(db:"mydb") |> range(start:-4h, stop:-2h) |> sum()`,
|
||||
|
|
Loading…
Reference in New Issue