feat: Port pipeline tests package from Cloud

This package provides essentially the same API as the Cloud tests
package, utilising the `TestLauncher` type.

Additional With functional options were added to the `Launcher`
in order to expose type-safe configuration of InfluxQL configuration.
Additional With options may be added in the future as the need arises.
pull/20017/head
Stuart Carnie 2020-11-13 10:35:35 +11:00
parent e8c068d330
commit 1cd9d0b04a
13 changed files with 1482 additions and 1 deletions

View File

@ -1583,3 +1583,11 @@ func (m *Launcher) CheckService() platform.CheckService {
func (m *Launcher) KeyValueService() *kv.Service {
return m.kvService
}
func (m *Launcher) DBRPMappingServiceV2() platform.DBRPMappingServiceV2 {
return m.apibackend.DBRPService
}
func (m *Launcher) SessionService() platform.SessionService {
return m.apibackend.SessionService
}

View File

@ -26,6 +26,24 @@ func WithViper(v *viper.Viper) Option {
}
}
// WithInfluxQLMaxSelectSeriesN configures the maximum number of series returned by a select statement.
func WithInfluxQLMaxSelectSeriesN(n int) Option {
return &launcherOption{
applyConfigFn: func(l *Launcher) {
l.CoordinatorConfig.MaxSelectSeriesN = n
},
}
}
// WithInfluxQLMaxSelectBucketsN configures the maximum number of buckets returned by a select statement.
func WithInfluxQLMaxSelectBucketsN(n int) Option {
return &launcherOption{
applyConfigFn: func(l *Launcher) {
l.CoordinatorConfig.MaxSelectBucketsN = n
},
}
}
type launcherOption struct {
applyInitFn func(*Launcher)
applyConfigFn func(*Launcher)

2
go.mod
View File

@ -109,7 +109,7 @@ require (
golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a
google.golang.org/api v0.17.0
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71
honnef.co/go/tools v0.0.1-2020.1.4
istio.io/pkg v0.0.0-20200606170016-70c5172b9cdf

38
tests/auth_helpers.go Normal file
View File

@ -0,0 +1,38 @@
package tests
import (
"github.com/influxdata/influxdb/v2"
)
func mergePerms(orgID influxdb.ID, in ...[]influxdb.Permission) []influxdb.Permission {
var perms []influxdb.Permission
for i := range in {
perms = append(perms, in[i]...)
}
for i := range perms {
perms[i].Resource.OrgID = &orgID
}
return perms
}
func MakeBucketPerm(bucketID influxdb.ID, actions ...influxdb.Action) []influxdb.Permission {
var perms []influxdb.Permission
for i := range actions {
perms = append(perms, influxdb.Permission{Action: actions[i], Resource: influxdb.Resource{ID: &bucketID, Type: influxdb.BucketsResourceType}})
}
return perms
}
func MakeBucketRWPerm(bucketID influxdb.ID) []influxdb.Permission {
return MakeBucketPerm(bucketID, []influxdb.Action{influxdb.ReadAction, influxdb.WriteAction}...)
}
func MakeAuthorization(org, userID influxdb.ID, perms ...[]influxdb.Permission) *influxdb.Authorization {
return &influxdb.Authorization{
OrgID: org,
UserID: userID,
Permissions: mergePerms(org, perms...),
Description: "foo user auth",
Status: influxdb.Active,
}
}

736
tests/client.go Normal file
View File

@ -0,0 +1,736 @@
package tests
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
"github.com/influxdata/flux/csv"
"github.com/influxdata/influxdb/v2"
influxhttp "github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/pkg/httpc"
)
type ClientConfig struct {
UserID influxdb.ID
OrgID influxdb.ID
BucketID influxdb.ID
DocumentsNamespace string
// If Session is provided, Token is ignored.
Token string
Session *influxdb.Session
}
// Client provides an API for writing, querying, and interacting with
// gateway's resources like authorizations, buckets, and organizations.
type Client struct {
Client *httpc.Client
*influxhttp.Service
ClientConfig
}
// NewClient initialises a new Client which is ready to write points to the Gateway write endpoint.
func NewClient(gatewayURL string, config ClientConfig) (*Client, error) {
opts := make([]httpc.ClientOptFn, 0)
if config.Session != nil {
config.Token = ""
opts = append(opts, httpc.WithSessionCookie(config.Session.Key))
}
hc, err := influxhttp.NewHTTPClient(gatewayURL, config.Token, false, opts...)
if err != nil {
return nil, err
}
svc, err := influxhttp.NewService(hc, gatewayURL, config.Token)
if err != nil {
return nil, err
}
return &Client{
Client: hc,
Service: svc,
ClientConfig: config,
}, nil
}
// Open opens the client
func (c *Client) Open() error { return nil }
// Close closes the client
func (c *Client) Close() error { return nil }
// MustWriteBatch calls WriteBatch, panicking if an error is encountered.
func (c *Client) MustWriteBatch(points string) {
if err := c.WriteBatch(points); err != nil {
panic(err)
}
}
// WriteBatch writes the current batch of points to the Gateway HTTP endpoint.
func (c *Client) WriteBatch(points string) error {
return c.WriteService.Write(context.Background(), c.OrgID, c.BucketID, strings.NewReader(points))
}
// WriteLegacyBatch writes the current batch of points to the legacy influx 1.x
// /write endpoint. Used to mimic our cloud 1 to cloud 2 go replay forwarding.
func (c *Client) WriteLegacyBatch(org, db, rp, points string) error {
writeFn := func(w io.Writer) (string, string, error) {
_, err := fmt.Fprint(w, points)
return "", "", err
}
req := c.Client.Post(writeFn, WritePathLegacy).
QueryParams([2]string{"db", db}).
QueryParams([2]string{"rp", rp}).
QueryParams([2]string{"org", org}).
StatusFn(httpc.StatusIn(http.StatusNoContent))
return req.Do(context.Background())
}
// Query returns the CSV response from a flux query to gateway.
//
// This also remove all the \r to make it easier to write tests.
func (c *Client) QueryFlux(org, query string) (string, error) {
var csv string
csvResp := func(resp *http.Response) error {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
// remove the \r to simplify testing against a body of CSV.
body = bytes.ReplaceAll(body, []byte("\r"), nil)
csv = string(body)
return nil
}
qr := QueryRequestBody(query)
err := c.Client.PostJSON(qr, fluxPath).
QueryParams([2]string{"org", org}).
Accept("text/csv").
RespFn(csvResp).
StatusFn(httpc.StatusIn(http.StatusOK)).
Do(context.Background())
return csv, err
}
const (
fluxPath = "/api/v2/query"
// This is the only namespace for documents present after init.
DefaultDocumentsNamespace = "templates"
)
// QueryRequestBody creates a body for a flux query using common CSV output params.
// Headers are included, but, annotations are not.
func QueryRequestBody(flux string) *influxhttp.QueryRequest {
header := true
return &influxhttp.QueryRequest{
Type: "flux",
Query: flux,
Dialect: influxhttp.QueryDialect{
Header: &header,
Delimiter: ",",
CommentPrefix: "#",
DateTimeFormat: "RFC3339",
Annotations: csv.DefaultEncoderConfig().Annotations,
},
}
}
// MustCreateAuth creates an auth or is a fatal error.
// Used in tests where the content of the bucket does not matter.
//
// This authorization token is an operator token for the default
// organization for the default user.
func (c *Client) MustCreateAuth(t *testing.T) influxdb.ID {
t.Helper()
perms := influxdb.OperPermissions()
auth := &influxdb.Authorization{
OrgID: c.OrgID,
UserID: c.UserID,
Permissions: perms,
}
err := c.CreateAuthorization(context.Background(), auth)
if err != nil {
t.Fatalf("unable to create auth: %v", err)
}
return auth.ID
}
// MustCreateBucket creates a bucket or is a fatal error.
// Used in tests where the content of the bucket does not matter.
func (c *Client) MustCreateBucket(t *testing.T) influxdb.ID {
t.Helper()
bucket := &influxdb.Bucket{OrgID: c.OrgID, Name: "n1"}
err := c.CreateBucket(context.Background(), bucket)
if err != nil {
t.Fatalf("unable to create bucket: %v", err)
}
return bucket.ID
}
// MustCreateOrg creates an org or is a fatal error.
// Used in tests where the content of the org does not matter.
func (c *Client) MustCreateOrg(t *testing.T) influxdb.ID {
t.Helper()
org := &influxdb.Organization{Name: "n1"}
err := c.CreateOrganization(context.Background(), org)
if err != nil {
t.Fatalf("unable to create org: %v", err)
}
return org.ID
}
// MustCreateLabel creates a label or is a fatal error.
// Used in tests where the content of the label does not matter.
func (c *Client) MustCreateLabel(t *testing.T) influxdb.ID {
t.Helper()
l := &influxdb.Label{OrgID: c.OrgID, Name: "n1"}
err := c.CreateLabel(context.Background(), l)
if err != nil {
t.Fatalf("unable to create label: %v", err)
}
return l.ID
}
// MustCreateDocument creates a document or is a fatal error.
// Used in tests where the content of the document does not matter.
func (c *Client) MustCreateDocument(t *testing.T) influxdb.ID {
t.Helper()
d := &influxdb.Document{
ID: influxdb.ID(12123434),
Meta: influxdb.DocumentMeta{
Name: "n1",
Type: "t1",
},
Content: "howdy",
}
err := c.CreateDocument(context.Background(), c.DocumentsNamespace, c.OrgID, d)
if err != nil {
t.Fatalf("unable to create document: %v", err)
}
return d.ID
}
// MustCreateCheck creates a check or is a fatal error.
// Used in tests where the content of the check does not matter.
func (c *Client) MustCreateCheck(t *testing.T) influxdb.ID {
t.Helper()
chk, err := c.CreateCheck(context.Background(), MockCheck("c", c.OrgID, c.UserID))
if err != nil {
t.Fatalf("unable to create check: %v", err)
}
return chk.ID
}
// MustCreateTelegraf creates a telegraf config or is a fatal error.
// Used in tests where the content of the telegraf config does not matter.
func (c *Client) MustCreateTelegraf(t *testing.T) influxdb.ID {
t.Helper()
tc := &influxdb.TelegrafConfig{
OrgID: c.OrgID,
Name: "n1",
Description: "d1",
Config: "[[howdy]]",
}
unused := influxdb.ID(1) /* this id is not used in the API */
err := c.CreateTelegrafConfig(context.Background(), tc, unused)
if err != nil {
t.Fatalf("unable to create telegraf config: %v", err)
}
return tc.ID
}
// MustCreateUser creates a user or is a fatal error.
// Used in tests where the content of the user does not matter.
func (c *Client) MustCreateUser(t *testing.T) influxdb.ID {
t.Helper()
u := &influxdb.User{Name: "n1"}
err := c.CreateUser(context.Background(), u)
if err != nil {
t.Fatalf("unable to create user: %v", err)
}
return u.ID
}
// MustCreateVariable creates a variable or is a fatal error.
// Used in tests where the content of the variable does not matter.
func (c *Client) MustCreateVariable(t *testing.T) influxdb.ID {
t.Helper()
v := &influxdb.Variable{
OrganizationID: c.OrgID,
Name: "n1",
Arguments: &influxdb.VariableArguments{
Type: "constant",
Values: influxdb.VariableConstantValues{"v1", "v2"},
},
}
err := c.CreateVariable(context.Background(), v)
if err != nil {
t.Fatalf("unable to create variable: %v", err)
}
return v.ID
}
// MustCreateNotificationEndpoint creates a notification endpoint or is a fatal error.
// Used in tests where the content of the notification endpoint does not matter.
func (c *Client) MustCreateNotificationEndpoint(t *testing.T) influxdb.ID {
t.Helper()
ne := ValidNotificationEndpoint(c.OrgID)
err := c.CreateNotificationEndpoint(context.Background(), ne, c.UserID)
if err != nil {
t.Fatalf("unable to create notification endpoint: %v", err)
}
return ne.GetID()
}
// MustCreateNotificationRule creates a Notification Rule or is a fatal error
// Used in tests where the content of the notification rule does not matter
func (c *Client) MustCreateNotificationRule(t *testing.T) influxdb.ID {
t.Helper()
ctx := context.Background()
ne := ValidCustomNotificationEndpoint(c.OrgID, time.Now().String())
err := c.CreateNotificationEndpoint(ctx, ne, c.UserID)
if err != nil {
t.Fatalf("unable to create notification endpoint: %v", err)
}
endpointID := ne.GetID()
r := ValidNotificationRule(c.OrgID, endpointID)
rc := influxdb.NotificationRuleCreate{NotificationRule: r, Status: influxdb.Active}
err = c.CreateNotificationRule(ctx, rc, c.UserID)
if err != nil {
t.Fatalf("unable to create notification rule: %v", err)
}
// we don't need this endpoint, so delete it to be compatible with other tests
_, _, err = c.DeleteNotificationEndpoint(ctx, endpointID)
if err != nil {
t.Fatalf("unable to delete notification endpoint: %v", err)
}
return r.GetID()
}
// MustCreateDBRPMapping creates a DBRP Mapping or is a fatal error.
// Used in tests where the content of the mapping does not matter.
// The created mapping points to the user's default bucket.
func (c *Client) MustCreateDBRPMapping(t *testing.T) influxdb.ID {
t.Helper()
ctx := context.Background()
m := &influxdb.DBRPMappingV2{
Database: "db",
RetentionPolicy: "rp",
OrganizationID: c.OrgID,
BucketID: c.BucketID,
}
if err := c.DBRPMappingServiceV2.Create(ctx, m); err != nil {
t.Fatalf("unable to create DBRP mapping: %v", err)
}
return m.ID
}
// MustCreateResource will create a generic resource via the API.
// Used in tests where the content of the resource does not matter.
//
// // Create one of each org resource
// for _, r := range influxdb.OrgResourceTypes {
// client.MustCreateResource(t, r)
// }
//
//
// // Create a variable:
// id := client.MustCreateResource(t, influxdb.VariablesResourceType)
// defer client.MustDeleteResource(t, influxdb.VariablesResourceType, id)
func (c *Client) MustCreateResource(t *testing.T, r influxdb.ResourceType) influxdb.ID {
t.Helper()
switch r {
case influxdb.AuthorizationsResourceType: // 0
return c.MustCreateAuth(t)
case influxdb.BucketsResourceType: // 1
return c.MustCreateBucket(t)
case influxdb.OrgsResourceType: // 3
return c.MustCreateOrg(t)
case influxdb.SourcesResourceType: // 4
t.Skip("I think sources are going to be removed right?")
case influxdb.TasksResourceType: // 5
t.Skip("Task go client is not yet created")
case influxdb.TelegrafsResourceType: // 6
return c.MustCreateTelegraf(t)
case influxdb.UsersResourceType: // 7
return c.MustCreateUser(t)
case influxdb.VariablesResourceType: // 8
return c.MustCreateVariable(t)
case influxdb.ScraperResourceType: // 9
t.Skip("Scraper go client is not yet created")
case influxdb.SecretsResourceType: // 10
t.Skip("Secrets go client is not yet created")
case influxdb.LabelsResourceType: // 11
return c.MustCreateLabel(t)
case influxdb.ViewsResourceType: // 12
t.Skip("Are views still a thing?")
case influxdb.DocumentsResourceType: // 13
return c.MustCreateDocument(t)
case influxdb.NotificationRuleResourceType: // 14
return c.MustCreateNotificationRule(t)
case influxdb.NotificationEndpointResourceType: // 15
return c.MustCreateNotificationEndpoint(t)
case influxdb.ChecksResourceType: // 16
return c.MustCreateCheck(t)
case influxdb.DBRPResourceType: // 17
return c.MustCreateDBRPMapping(t)
}
return 0
}
// DeleteResource will remove a resource using the API.
func (c *Client) DeleteResource(t *testing.T, r influxdb.ResourceType, id influxdb.ID) error {
t.Helper()
ctx := context.Background()
switch r {
case influxdb.AuthorizationsResourceType: // 0
return c.DeleteAuthorization(ctx, id)
case influxdb.BucketsResourceType: // 1
return c.DeleteBucket(context.Background(), id)
case influxdb.OrgsResourceType: // 3
return c.DeleteOrganization(ctx, id)
case influxdb.SourcesResourceType: // 4
t.Skip("I think sources are going to be removed right?")
case influxdb.TasksResourceType: // 5
t.Skip("Task go client is not yet created")
case influxdb.TelegrafsResourceType: // 6
return c.DeleteTelegrafConfig(ctx, id)
case influxdb.UsersResourceType: // 7
return c.DeleteUser(ctx, id)
case influxdb.VariablesResourceType: // 8
return c.DeleteVariable(ctx, id)
case influxdb.ScraperResourceType: // 9
t.Skip("Scraper go client is not yet created")
case influxdb.SecretsResourceType: // 10
t.Skip("Secrets go client is not yet created")
case influxdb.LabelsResourceType: // 11
return c.DeleteLabel(ctx, id)
case influxdb.ViewsResourceType: // 12
t.Skip("Are views still a thing?")
case influxdb.DocumentsResourceType: // 13
return c.DeleteDocument(ctx, c.DocumentsNamespace, id)
case influxdb.NotificationRuleResourceType: // 14
return c.DeleteNotificationRule(ctx, id)
case influxdb.NotificationEndpointResourceType: // 15
// Ignore the other results as suggested by goDoc.
_, _, err := c.DeleteNotificationEndpoint(ctx, id)
return err
case influxdb.ChecksResourceType: // 16
return c.DeleteCheck(ctx, id)
case influxdb.DBRPResourceType: // 17
return c.DBRPMappingServiceV2.Delete(ctx, c.OrgID, id)
}
return nil
}
// MustDeleteResource requires no error when deleting a resource.
func (c *Client) MustDeleteResource(t *testing.T, r influxdb.ResourceType, id influxdb.ID) {
t.Helper()
if err := c.DeleteResource(t, r, id); err != nil {
t.Fatalf("unable to delete resource %v %v: %v", r, id, err)
}
}
// FindAll returns all the IDs of a specific resource type.
func (c *Client) FindAll(t *testing.T, r influxdb.ResourceType) ([]influxdb.ID, error) {
t.Helper()
var ids []influxdb.ID
ctx := context.Background()
switch r {
case influxdb.AuthorizationsResourceType: // 0
rs, _, err := c.FindAuthorizations(ctx, influxdb.AuthorizationFilter{})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.BucketsResourceType: // 1
rs, _, err := c.FindBuckets(ctx, influxdb.BucketFilter{})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.OrgsResourceType: // 3
rs, _, err := c.FindOrganizations(ctx, influxdb.OrganizationFilter{})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.SourcesResourceType: // 4
t.Skip("I think sources are going to be removed right?")
case influxdb.TasksResourceType: // 5
t.Skip("Task go client is not yet created")
case influxdb.TelegrafsResourceType: // 6
rs, _, err := c.FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.UsersResourceType: // 7
rs, _, err := c.FindUsers(ctx, influxdb.UserFilter{})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.VariablesResourceType: // 8
rs, err := c.FindVariables(ctx, influxdb.VariableFilter{})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.ScraperResourceType: // 9
t.Skip("Scraper go client is not yet created")
case influxdb.SecretsResourceType: // 10
t.Skip("Secrets go client is not yet created")
case influxdb.LabelsResourceType: // 11
rs, err := c.FindLabels(ctx, influxdb.LabelFilter{})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.ViewsResourceType: // 12
t.Skip("Are views still a thing?")
case influxdb.DocumentsResourceType: // 13
rs, err := c.GetDocuments(ctx, c.DocumentsNamespace, c.OrgID)
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.NotificationRuleResourceType: // 14
rs, _, err := c.FindNotificationRules(ctx, influxdb.NotificationRuleFilter{OrgID: &c.OrgID})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.GetID())
}
return ids, nil
case influxdb.NotificationEndpointResourceType: // 15
rs, _, err := c.FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{OrgID: &c.OrgID})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.GetID())
}
case influxdb.ChecksResourceType: // 16
rs, _, err := c.FindChecks(ctx, influxdb.CheckFilter{})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
case influxdb.DBRPResourceType: // 17
rs, _, err := c.DBRPMappingServiceV2.FindMany(ctx, influxdb.DBRPMappingFilterV2{OrgID: &c.OrgID})
if err != nil {
return nil, err
}
for _, r := range rs {
ids = append(ids, r.ID)
}
}
return ids, nil
}
// MustFindAll returns all the IDs of a specific resource type; any error
// is fatal.
func (c *Client) MustFindAll(t *testing.T, r influxdb.ResourceType) []influxdb.ID {
t.Helper()
ids, err := c.FindAll(t, r)
if err != nil {
t.Fatalf("unexpected error finding resources %v: %v", r, err)
}
return ids
}
func (c *Client) AddURM(u influxdb.ID, typ influxdb.UserType, r influxdb.ResourceType, id influxdb.ID) error {
access := &influxdb.UserResourceMapping{
UserID: u,
UserType: typ,
MappingType: influxdb.UserMappingType,
ResourceType: r,
ResourceID: id,
}
return c.CreateUserResourceMapping(
context.Background(),
access,
)
}
// AddOwner associates the user as owner of the resource.
func (c *Client) AddOwner(user influxdb.ID, r influxdb.ResourceType, id influxdb.ID) error {
return c.AddURM(user, influxdb.Owner, r, id)
}
// MustAddOwner requires that the user is associated with the resource
// or the test will be stopped fatally.
func (c *Client) MustAddOwner(t *testing.T, user influxdb.ID, r influxdb.ResourceType, id influxdb.ID) {
t.Helper()
if err := c.AddOwner(user, r, id); err != nil {
t.Fatalf("unexpected error adding owner %v to %v: %v", user, id, err)
}
}
// AddMember associates the user as member of the resource.
func (c *Client) AddMember(user influxdb.ID, r influxdb.ResourceType, id influxdb.ID) error {
return c.AddURM(user, influxdb.Member, r, id)
}
// MustAddMember requires that the user is associated with the resource
// or the test will be stopped fatally.
func (c *Client) MustAddMember(t *testing.T, user influxdb.ID, r influxdb.ResourceType, id influxdb.ID) {
t.Helper()
if err := c.AddMember(user, r, id); err != nil {
t.Fatalf("unexpected error adding member %v to %v", user, id)
}
}
// RemoveURM removes association of the user to the resource.
// Interestingly the URM service does not make difference on the user type.
// I.e. removing an URM from a user to a resource, will delete every URM of every type
// from that user to that resource.
// Or, put in another way, there can only be one resource mapping from a user to a
// resource at a time: either you are a member, or an owner (in that case you are a member too).
func (c *Client) RemoveURM(user, id influxdb.ID) error {
return c.DeleteUserResourceMapping(context.Background(), id, user)
}
// RemoveSpecificURM gets around a client issue where deletes doesn't have enough context to remove a urm from
// a specific resource type
func (c *Client) RemoveSpecificURM(rt influxdb.ResourceType, ut influxdb.UserType, user, id influxdb.ID) error {
return c.SpecificURMSvc(rt, ut).DeleteUserResourceMapping(context.Background(), id, user)
}
// MustRemoveURM requires that the user is removed as owner/member from the resource.
func (c *Client) MustRemoveURM(t *testing.T, user, id influxdb.ID) {
t.Helper()
if err := c.RemoveURM(user, id); err != nil {
t.Fatalf("unexpected error removing org/resource mapping: %v", err)
}
}
// CreateLabelMapping creates a label mapping for label `l` to the resource with `id`.
func (c *Client) CreateLabelMapping(l influxdb.ID, r influxdb.ResourceType, id influxdb.ID) error {
mapping := &influxdb.LabelMapping{
LabelID: l,
ResourceType: r,
ResourceID: id,
}
return c.LabelService.CreateLabelMapping(
context.Background(),
mapping,
)
}
// MustCreateLabelMapping requires that the label is associated with the resource
// or the test will be stopped fatally.
func (c *Client) MustCreateLabelMapping(t *testing.T, l influxdb.ID, r influxdb.ResourceType, id influxdb.ID) {
t.Helper()
if err := c.CreateLabelMapping(l, r, id); err != nil {
t.Fatalf("unexpected error attaching label %v to %v: %v", l, id, err)
}
}
// FindLabelMappings finds the labels for the specified resource.
func (c *Client) FindLabelMappings(r influxdb.ResourceType, id influxdb.ID) ([]influxdb.ID, error) {
filter := influxdb.LabelMappingFilter{
ResourceType: r,
ResourceID: id,
}
ls, err := c.LabelService.FindResourceLabels(
context.Background(),
filter,
)
if err != nil {
return nil, err
}
var ids []influxdb.ID
for _, r := range ls {
ids = append(ids, r.ID)
}
return ids, nil
}
// MustFindLabelMappings makes the test fail if an error is found.
func (c *Client) MustFindLabelMappings(t *testing.T, r influxdb.ResourceType, id influxdb.ID) []influxdb.ID {
t.Helper()
ls, err := c.FindLabelMappings(r, id)
if err != nil {
t.Fatalf("unexpected error finding label mappings: %v", err)
}
return ls
}
// DeleteLabelMapping deletes the label for the specified resource.
func (c *Client) DeleteLabelMapping(l influxdb.ID, r influxdb.ResourceType, id influxdb.ID) error {
m := &influxdb.LabelMapping{
ResourceType: r,
ResourceID: id,
LabelID: l,
}
return c.LabelService.DeleteLabelMapping(
context.Background(),
m,
)
}
// MustDeleteLabelMapping makes the test fail if an error is found.
func (c *Client) MustDeleteLabelMapping(t *testing.T, l influxdb.ID, r influxdb.ResourceType, id influxdb.ID) {
t.Helper()
if err := c.DeleteLabelMapping(l, r, id); err != nil {
t.Fatalf("unexpected error deleting label %v from %v", l, id)
}
}

7
tests/doc.go Normal file
View File

@ -0,0 +1,7 @@
/*
Package tests contains a set of integration tests, which run in-memory versions
of various 2.0 services. They're not intended to be full end-to-end tests,
but are a suitable place to write tests that need to flex the logic of
multiple 2.0 components.
*/
package tests

25
tests/gateway_helpers.go Normal file
View File

@ -0,0 +1,25 @@
package tests
// WritePath is the path to write influx 2.x points.
const WritePath = "/api/v2/write"
// WritePathLegacy is the path to write influx 1.x points.
const WritePathLegacy = "/write"
// Default values created when calling NewDefaultGatewayNode.
const (
DefaultOrgName = "myorg"
DefaultBucketName = "db/rp" // Since we can only write data via 1.x path we need to have a 1.x bucket name
DefaultUsername = "admin"
DefaultPassword = "password"
// DefaultToken has permissions to write to DefaultBucket
DefaultToken = "mytoken"
// OperToken has permissions to do anything.
OperToken = "opertoken"
DefaultDatabase = "db"
DefaultRetentionPolicy = "rp"
)

56
tests/helpers.go Normal file
View File

@ -0,0 +1,56 @@
package tests
import (
"context"
)
// VeryVerbose when set to true, will enable very verbose logging of services. Controlled via
// flag in tests_test package.
var VeryVerbose bool
// An OpenCloser can both open and close.
type OpenCloser interface {
Open() error
Close() error
}
// OpenClosers is a collection of OpenCloser objects.
type OpenClosers []OpenCloser
// OpenAll opens all the OpenClosers, returning the first error encountered,
// if any.
func (ocs OpenClosers) OpenAll() error {
for _, oc := range ocs {
if err := oc.Open(); err != nil {
return err
}
}
return nil
}
// MustOpenAll opens all the OpenClosers, panicking if any error is encountered.
func (ocs OpenClosers) MustOpenAll() {
if err := ocs.OpenAll(); err != nil {
panic(err)
}
}
// CloseAll closes all the OpenClosers, returning the first error encountered,
// if any.
func (ocs OpenClosers) CloseAll() error {
// Close in the reverse order that we opened,
// under the assumption that later ocs depend on earlier ones.
for i := range ocs {
if err := ocs[len(ocs)-i-1].Close(); err != nil && err != context.Canceled {
return err
}
}
return nil
}
// MustCloseAll closes all the OpenClosers, panicking if any error is encountered.
func (ocs OpenClosers) MustCloseAll() {
if err := ocs.CloseAll(); err != nil {
panic(err)
}
}

116
tests/mock.go Normal file
View File

@ -0,0 +1,116 @@
package tests
import (
"net/http"
"time"
"github.com/influxdata/influxdb/v2"
influxhttp "github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/notification"
checks "github.com/influxdata/influxdb/v2/notification/check"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/notification/rule"
)
// ValidCustomNotificationEndpoint creates a NotificationEndpoint with a custom name
func ValidCustomNotificationEndpoint(org influxdb.ID, name string) influxdb.NotificationEndpoint {
return &endpoint.HTTP{
Base: endpoint.Base{
Name: name,
OrgID: &org,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{},
},
URL: "https://howdy.com",
AuthMethod: "none",
Method: http.MethodGet,
}
}
// ValidNotificationEndpoint returns a valid notification endpoint.
// This is the easiest way of "mocking" a influxdb.NotificationEndpoint.
func ValidNotificationEndpoint(org influxdb.ID) influxdb.NotificationEndpoint {
return ValidCustomNotificationEndpoint(org, "howdy")
}
// ValidNotificationRule returns a valid Notification Rule of type HTTP for testing
func ValidNotificationRule(org, endpoint influxdb.ID) influxdb.NotificationRule {
d, _ := notification.FromTimeDuration(time.Second * 5)
return &rule.HTTP{
Base: rule.Base{
Name: "little rule",
EndpointID: endpoint,
OrgID: org,
Every: &d,
CRUDLog: influxdb.CRUDLog{},
StatusRules: []notification.StatusRule{
{
CurrentLevel: notification.Critical,
},
},
TagRules: []notification.TagRule{},
},
}
}
// MockCheck returns a valid check to be used in tests.
func MockCheck(name string, orgID, userID influxdb.ID) *influxhttp.Check {
return &influxhttp.Check{
ID: userID,
OwnerID: userID,
Type: "threshold",
Status: influxdb.Active,
Name: name,
Description: "pipeline test check",
OrgID: orgID,
Every: "1m",
Offset: "0m",
Level: "CRIT",
StatusMessageTemplate: "Check: ${ r._check_name } is: ${ r._level }",
Query: &influxhttp.CheckQuery{
Name: name,
Text: `from(bucket: "db/rp") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == "my_measurement") |> filter(fn: (r) => r._field == "my_field") |> count() |> yield(name: "count")`,
EditMode: "builder",
BuilderConfig: &influxhttp.CheckBuilderConfig{
Buckets: []string{"db/rp"},
Tags: []struct {
Key string `json:"key"`
Values []string `json:"values"`
AggregateFunctionType string `json:"aggregateFunctionType"`
}{
{
Key: "_measurement",
Values: []string{"my_measurement"},
AggregateFunctionType: "filter",
},
{
Key: "_field",
Values: []string{"my_field"},
AggregateFunctionType: "filter",
},
},
Functions: []struct {
Name string `json:"name"`
}{
{
Name: "count",
},
},
AggregateWindow: struct {
Period string `json:"period"`
}{
Period: "1m",
},
},
},
Thresholds: []*influxhttp.CheckThreshold{
{
Type: "greater",
Value: 9999,
ThresholdConfigBase: checks.ThresholdConfigBase{
Level: notification.Critical,
},
},
},
}
}

158
tests/pipeline/fixture.go Normal file
View File

@ -0,0 +1,158 @@
package pipeline
import (
"context"
"fmt"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tests"
)
type ClientTag string
const (
AdminTag ClientTag = "admin"
OwnerTag ClientTag = "owner"
MemberTag ClientTag = "member"
NoAccessTag ClientTag = "no_access"
)
var AllClientTags = []ClientTag{AdminTag, OwnerTag, MemberTag, NoAccessTag}
// BaseFixture is a Fixture with multiple users in the system.
type BaseFixture struct {
Admin *tests.Client
Owner *tests.Client
Member *tests.Client
NoAccess *tests.Client
}
// NewBaseFixture creates a BaseFixture with and admin, an org owner, a member, and an outsider
// for the given orgID and bucketID.
func NewBaseFixture(t *testing.T, p *tests.Pipeline, orgID, bucketID influxdb.ID) BaseFixture {
fx := BaseFixture{}
admin := p.MustNewAdminClient()
fx.Admin = admin
cli, id, err := p.BrowserFor(orgID, bucketID, "owner")
if err != nil {
t.Fatalf("error while creating browser client: %v", err)
}
admin.MustAddOwner(t, id, influxdb.OrgsResourceType, orgID)
cli.UserID = id
fx.Owner = cli
cli, id, err = p.BrowserFor(orgID, bucketID, "member")
if err != nil {
t.Fatalf("error while creating browser client: %v", err)
}
admin.MustAddMember(t, id, influxdb.OrgsResourceType, orgID)
cli.UserID = id
fx.Member = cli
cli, id, err = p.BrowserFor(orgID, bucketID, "no_access")
if err != nil {
t.Fatalf("error while creating browser client: %v", err)
}
cli.UserID = id
fx.NoAccess = cli
return fx
}
// GetClient returns the client associated with the given tag.
func (f BaseFixture) GetClient(tag ClientTag) *tests.Client {
switch tag {
case AdminTag:
return f.Admin
case OwnerTag:
return f.Owner
case MemberTag:
return f.Member
case NoAccessTag:
return f.NoAccess
default:
panic(fmt.Sprintf("unknown tag %s", tag))
}
}
// NewDefaultBaseFixture creates a BaseFixture with the default ids.
func NewDefaultBaseFixture(t *testing.T, p *tests.Pipeline) BaseFixture {
return NewBaseFixture(t, p, p.DefaultOrgID, p.DefaultBucketID)
}
// ResourceFixture is a general purpose fixture.
// It contains a resource, users for every role, and a label.
// Users of the fixture can use it to create URMs from users to the resource or else.
// The label is there to create label mappings.
type ResourceFixture struct {
BaseFixture
ResourceType influxdb.ResourceType
ResourceID influxdb.ID
LabelID influxdb.ID
}
// NewResourceFixture creates a new ResourceFixture.
func NewResourceFixture(t *testing.T, p *tests.Pipeline, typ influxdb.ResourceType) ResourceFixture {
bfx := NewDefaultBaseFixture(t, p)
l := &influxdb.Label{
OrgID: bfx.Admin.OrgID,
Name: "map_me",
Properties: nil,
}
if err := bfx.Admin.CreateLabel(context.Background(), l); err != nil {
t.Fatalf("error in creating label as admin: %v", err)
}
rid := bfx.Admin.MustCreateResource(t, typ)
return ResourceFixture{
BaseFixture: bfx,
ResourceType: typ,
ResourceID: rid,
LabelID: l.ID,
}
}
// SubjectResourceFixture is a ResourceFixture associated with a subject.
// The subject is a client that performs the actions.
type SubjectResourceFixture struct {
ResourceFixture
SubjectTag ClientTag
}
// NewSubjectResourceFixture creates a new SubjectResourceFixture.
func NewSubjectResourceFixture(t *testing.T, p *tests.Pipeline, subj ClientTag, rt influxdb.ResourceType) SubjectResourceFixture {
return SubjectResourceFixture{
ResourceFixture: NewResourceFixture(t, p, rt),
SubjectTag: subj,
}
}
func (f SubjectResourceFixture) Subject() *tests.Client {
return f.GetClient(f.SubjectTag)
}
// ResourceSubjectIteration is the iteration on all the resources and subjects given.
// One can specify what do to for an iteration step by passing a ResourceSubjectIterationStepFn
// to `ResourceSubjectIteration.Do` or `ResourceSubjectIteration.ParDo`.
type ResourceSubjectIteration struct {
subjects []ClientTag
}
// Creates a ResourceSubjectIteration for applying a ResourceSubjectIterationStepFn to each
// resource-subject couple.
func ForEachResourceSubject(subjects ...ClientTag) ResourceSubjectIteration {
return ResourceSubjectIteration{
subjects: subjects,
}
}
// ResourceSubjectIterationStepFn is a function applied to each step of iteration on resources
// and subjects.
type ResourceSubjectIterationStepFn func(subj ClientTag, rt influxdb.ResourceType)
func (p ResourceSubjectIteration) Do(f ResourceSubjectIterationStepFn) {
for _, typ := range influxdb.OrgResourceTypes {
for _, subj := range p.subjects {
f(subj, typ)
}
}
}

53
tests/pipeline/helpers.go Normal file
View File

@ -0,0 +1,53 @@
package pipeline
import (
"fmt"
"strings"
"testing"
"github.com/influxdata/influxdb/v2"
)
func check(t *testing.T, gotErr error, want string, cmpErrFn func() error) {
t.Helper()
if len(want) > 0 {
if gotErr == nil {
t.Errorf("expected error got none")
} else {
if err := cmpErrFn(); err != nil {
t.Error(err)
}
}
} else {
if gotErr != nil {
t.Errorf("unexpected error: %v", gotErr)
}
}
}
// CheckErr checks if `errorMsgPart` is in the error message of `gotErr`.
// If `errorMsgPart` is an empty string, it fails if `gotErr != nil`.
func CheckErr(t *testing.T, gotErr error, errorMsgPart string) {
t.Helper()
check(t, gotErr, errorMsgPart, func() error {
if !strings.Contains(gotErr.Error(), errorMsgPart) {
return fmt.Errorf("unexpected error message: %v", gotErr)
}
return nil
})
}
// CheckHTTPErr checks if `gotErr` has the specified HTTP `code`.
// If `code` is an empty string, it fails if `gotErr != nil`.
func CheckHTTPErr(t *testing.T, gotErr error, code string) {
t.Helper()
check(t, gotErr, code, func() error {
if gotCode := influxdb.ErrorCode(gotErr); gotCode != code {
t.Errorf("unexpected error code: %v\n\tcomplete error: %v", gotCode, gotErr)
}
return nil
})
}

202
tests/pipeline_helpers.go Normal file
View File

@ -0,0 +1,202 @@
package tests
import (
"context"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
// A Pipeline is a collection of node types that act as a pipeline for writing
// and reading data.
type Pipeline struct {
Launcher *launcher.TestLauncher
DefaultOrgID influxdb.ID
DefaultBucketID influxdb.ID
DefaultUserID influxdb.ID
openClosers OpenClosers
}
// pipelineConfig tracks the pre-configuration for a pipeline.
// Because this struct is relatively complex, you should not instantiate it directly,
// but rather pass PipelineOption values to NewPipeline.
type pipelineConfig struct {
logger *zap.Logger
}
// NewDefaultPipeline creates a Pipeline with one storage node, one query node
// and one gateway node.
//
// Further, a single user, org, bucket and token are created. See
// NewDefaultGatewayNode for more details.
func NewDefaultPipeline(t *testing.T, opts ...PipelineOption) *DefaultPipeline {
opts = append(WithDefaults(), opts...)
return &DefaultPipeline{Pipeline: NewPipeline(t, opts...)}
}
// NewPipeline returns a pipeline with the given options applied to the configuration as appropriate.
// Each query node will be connected to the read service of all storage nodes.
//
// Each storage and query node share the same configs.
//
// A single user, org, bucket and token are created. See NewDefaultGatewayNode
// for more details.
func NewPipeline(tb testing.TB, opts ...PipelineOption) *Pipeline {
tb.Helper()
var conf pipelineConfig
for _, o := range opts {
o.applyConfig(&conf)
}
logger := conf.logger
if logger == nil {
// This is left here mainly for retro compatibility
var logLevel zaptest.LoggerOption
if VeryVerbose {
logLevel = zaptest.Level(zap.DebugLevel)
} else {
logLevel = zaptest.Level(zap.InfoLevel)
}
logger = zaptest.NewLogger(tb, logLevel).With(zap.String("test_name", tb.Name()))
}
launcherOptions := []launcher.Option{
launcher.WithLogger(logger),
}
for _, o := range opts {
if opt := o.makeLauncherOption(); opt != nil {
launcherOptions = append(launcherOptions, opt)
}
}
tl := launcher.NewTestLauncher(nil, launcherOptions...)
p := &Pipeline{
Launcher: tl,
}
err := tl.Run(context.Background())
require.NoError(tb, err)
// setup default operator
res := p.Launcher.OnBoardOrFail(tb, &influxdb.OnboardingRequest{
User: DefaultUsername,
Password: DefaultPassword,
Org: DefaultOrgName,
Bucket: DefaultBucketName,
RetentionPeriod: 0, // infinite retention period
Token: OperToken,
})
p.DefaultOrgID = res.Org.ID
p.DefaultUserID = res.User.ID
p.DefaultBucketID = res.Bucket.ID
return p
}
// Open opens all the components of the pipeline.
func (p *Pipeline) Open() error {
if err := p.openClosers.OpenAll(); err != nil {
return err
}
return nil
}
// MustOpen opens the pipeline, panicking if any error is encountered.
func (p *Pipeline) MustOpen() {
if err := p.Open(); err != nil {
panic(err)
}
}
// Close closes all the components of the pipeline.
func (p *Pipeline) Close() error {
_ = p.Launcher.Shutdown(context.Background())
return p.openClosers.CloseAll()
}
// MustClose closes the pipeline, panicking if any error is encountered.
func (p *Pipeline) MustClose() {
if err := p.Close(); err != nil {
panic(err)
}
}
// MustNewAdminClient returns a default client pointed at the gateway.
//
// operator token is authorized to do anything in the system.
func (p *Pipeline) MustNewAdminClient() *Client {
return p.MustNewClient(p.DefaultOrgID, p.DefaultBucketID, OperToken)
}
// MustNewClient returns a client pointed at the gateway.
func (p *Pipeline) MustNewClient(org, bucket influxdb.ID, token string) *Client {
config := ClientConfig{
UserID: p.DefaultUserID,
OrgID: org,
BucketID: bucket,
DocumentsNamespace: DefaultDocumentsNamespace,
Token: token,
}
svc, err := NewClient(p.Launcher.URL(), config)
if err != nil {
panic(err)
}
return svc
}
// NewBrowserClient returns a client pointed at the gateway with a cookie session.
func (p *Pipeline) NewBrowserClient(org, bucket influxdb.ID, session *influxdb.Session) (*Client, error) {
config := ClientConfig{
UserID: p.DefaultUserID,
OrgID: org,
BucketID: bucket,
DocumentsNamespace: DefaultDocumentsNamespace,
Session: session,
}
return NewClient(p.Launcher.URL(), config)
}
// BrowserFor will create a user, session, and browser client.
// The generated browser points to the given org and bucket.
//
// The user and session are inserted directly into the gateway backing store.
func (p *Pipeline) BrowserFor(org, bucket influxdb.ID, username string) (*Client, influxdb.ID, error) {
ctx := context.Background()
user := &influxdb.User{
Name: username,
}
err := p.Launcher.UserService().CreateUser(ctx, user)
if err != nil {
return nil, 0, err
}
session, err := p.Launcher.SessionService().CreateSession(ctx, username)
if err != nil {
return nil, 0, err
}
client, err := p.NewBrowserClient(org, bucket, session)
return client, user.ID, err
}
// Flush makes sure the PointsWriter has flushed all of the writes to kafka and the
// storage nodes have observed the writes.
func (p *Pipeline) Flush() {
}
// DefaultPipeline simplifies some of Pipeline method calls.
type DefaultPipeline struct {
*Pipeline
Writer *Client
}

64
tests/pipeline_option.go Normal file
View File

@ -0,0 +1,64 @@
package tests
import (
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"go.uber.org/zap"
)
// PipelineOption configures a pipeline.
type PipelineOption interface {
applyConfig(*pipelineConfig)
makeLauncherOption() launcher.Option
}
type pipelineOption struct {
applyConfigFn func(*pipelineConfig)
makeLauncherOptionFn func() launcher.Option
}
var _ PipelineOption = pipelineOption{}
func (o pipelineOption) applyConfig(pc *pipelineConfig) {
if o.applyConfigFn != nil {
o.applyConfigFn(pc)
}
}
func (o pipelineOption) makeLauncherOption() launcher.Option {
if o.makeLauncherOptionFn != nil {
return o.makeLauncherOptionFn()
}
return nil
}
// WithDefaults returns a slice of options for a default pipeline.
func WithDefaults() []PipelineOption {
return []PipelineOption{}
}
// WithReplicas sets the number of replicas in the pipeline.
func WithLogger(logger *zap.Logger) PipelineOption {
return pipelineOption{
applyConfigFn: func(pc *pipelineConfig) {
pc.logger = logger
},
}
}
// WithInfluxQLMaxSelectSeriesN configures the maximum number of series returned by a select statement.
func WithInfluxQLMaxSelectSeriesN(n int) PipelineOption {
return pipelineOption{
makeLauncherOptionFn: func() launcher.Option {
return launcher.WithInfluxQLMaxSelectSeriesN(n)
},
}
}
// WithInfluxQLMaxSelectBucketsN configures the maximum number of buckets returned by a select statement.
func WithInfluxQLMaxSelectBucketsN(n int) PipelineOption {
return pipelineOption{
makeLauncherOptionFn: func() launcher.Option {
return launcher.WithInfluxQLMaxSelectBucketsN(n)
},
}
}