chore: Lots of cleanup and response to PR feedback.

pull/20017/head
Stuart Carnie 2020-11-16 09:20:14 +11:00
parent 9e42444d8a
commit 91acebeab0
8 changed files with 40 additions and 284 deletions

View File

@ -66,17 +66,6 @@
"result": {
"description": "The expected results in table format",
"type": "string"
},
"billing": {
"description": "Optional billing results",
"type": "object",
"additionalProperties": false,
"properties": {
"point_count": {
"description": "The expected number of billing records",
"type": "integer"
}
}
}
}
}

View File

@ -3,8 +3,6 @@ package tests
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
@ -29,7 +27,7 @@ type ClientConfig struct {
}
// Client provides an API for writing, querying, and interacting with
// gateway's resources like authorizations, buckets, and organizations.
// resources like authorizations, buckets, and organizations.
type Client struct {
Client *httpc.Client
*influxhttp.Service
@ -37,19 +35,19 @@ type Client struct {
ClientConfig
}
// NewClient initialises a new Client which is ready to write points to the Gateway write endpoint.
func NewClient(gatewayURL string, config ClientConfig) (*Client, error) {
// NewClient initialises a new Client which is ready to write points to the HTTP write endpoint.
func NewClient(endpoint string, config ClientConfig) (*Client, error) {
opts := make([]httpc.ClientOptFn, 0)
if config.Session != nil {
config.Token = ""
opts = append(opts, httpc.WithSessionCookie(config.Session.Key))
}
hc, err := influxhttp.NewHTTPClient(gatewayURL, config.Token, false, opts...)
hc, err := influxhttp.NewHTTPClient(endpoint, config.Token, false, opts...)
if err != nil {
return nil, err
}
svc, err := influxhttp.NewService(hc, gatewayURL, config.Token)
svc, err := influxhttp.NewService(hc, endpoint, config.Token)
if err != nil {
return nil, err
}
@ -73,28 +71,12 @@ func (c *Client) MustWriteBatch(points string) {
}
}
// WriteBatch writes the current batch of points to the Gateway HTTP endpoint.
// WriteBatch writes the current batch of points to the HTTP endpoint.
func (c *Client) WriteBatch(points string) error {
return c.WriteService.Write(context.Background(), c.OrgID, c.BucketID, strings.NewReader(points))
}
// WriteLegacyBatch writes the current batch of points to the legacy influx 1.x
// /write endpoint. Used to mimic our cloud 1 to cloud 2 go replay forwarding.
func (c *Client) WriteLegacyBatch(org, db, rp, points string) error {
writeFn := func(w io.Writer) (string, string, error) {
_, err := fmt.Fprint(w, points)
return "", "", err
}
req := c.Client.Post(writeFn, WritePathLegacy).
QueryParams([2]string{"db", db}).
QueryParams([2]string{"rp", rp}).
QueryParams([2]string{"org", org}).
StatusFn(httpc.StatusIn(http.StatusNoContent))
return req.Do(context.Background())
}
// Query returns the CSV response from a flux query to gateway.
// Query returns the CSV response from a flux query to the HTTP API.
//
// This also remove all the \r to make it easier to write tests.
func (c *Client) QueryFlux(org, query string) (string, error) {

17
tests/defaults.go Normal file
View File

@ -0,0 +1,17 @@
package tests
// Default values created when calling NewPipeline.
const (
DefaultOrgName = "myorg"
DefaultBucketName = "db/rp" // Since we can only write data via 1.x path we need to have a 1.x bucket name
DefaultUsername = "admin"
DefaultPassword = "password"
// OperToken has permissions to do anything.
OperToken = "opertoken"
)
// VeryVerbose when set to true, will enable very verbose logging of services.
var VeryVerbose bool

View File

@ -1,25 +0,0 @@
package tests
// WritePath is the path to write influx 2.x points.
const WritePath = "/api/v2/write"
// WritePathLegacy is the path to write influx 1.x points.
const WritePathLegacy = "/write"
// Default values created when calling NewDefaultGatewayNode.
const (
DefaultOrgName = "myorg"
DefaultBucketName = "db/rp" // Since we can only write data via 1.x path we need to have a 1.x bucket name
DefaultUsername = "admin"
DefaultPassword = "password"
// DefaultToken has permissions to write to DefaultBucket
DefaultToken = "mytoken"
// OperToken has permissions to do anything.
OperToken = "opertoken"
DefaultDatabase = "db"
DefaultRetentionPolicy = "rp"
)

View File

@ -1,56 +0,0 @@
package tests
import (
"context"
)
// VeryVerbose when set to true, will enable very verbose logging of services. Controlled via
// flag in tests_test package.
var VeryVerbose bool
// An OpenCloser can both open and close.
type OpenCloser interface {
Open() error
Close() error
}
// OpenClosers is a collection of OpenCloser objects.
type OpenClosers []OpenCloser
// OpenAll opens all the OpenClosers, returning the first error encountered,
// if any.
func (ocs OpenClosers) OpenAll() error {
for _, oc := range ocs {
if err := oc.Open(); err != nil {
return err
}
}
return nil
}
// MustOpenAll opens all the OpenClosers, panicking if any error is encountered.
func (ocs OpenClosers) MustOpenAll() {
if err := ocs.OpenAll(); err != nil {
panic(err)
}
}
// CloseAll closes all the OpenClosers, returning the first error encountered,
// if any.
func (ocs OpenClosers) CloseAll() error {
// Close in the reverse order that we opened,
// under the assumption that later ocs depend on earlier ones.
for i := range ocs {
if err := ocs[len(ocs)-i-1].Close(); err != nil && err != context.Canceled {
return err
}
}
return nil
}
// MustCloseAll closes all the OpenClosers, panicking if any error is encountered.
func (ocs OpenClosers) MustCloseAll() {
if err := ocs.CloseAll(); err != nil {
panic(err)
}
}

View File

@ -1,7 +1,6 @@
package pipeline
import (
"context"
"fmt"
"testing"
@ -74,85 +73,3 @@ func (f BaseFixture) GetClient(tag ClientTag) *tests.Client {
panic(fmt.Sprintf("unknown tag %s", tag))
}
}
// NewDefaultBaseFixture creates a BaseFixture with the default ids.
func NewDefaultBaseFixture(t *testing.T, p *tests.Pipeline) BaseFixture {
return NewBaseFixture(t, p, p.DefaultOrgID, p.DefaultBucketID)
}
// ResourceFixture is a general purpose fixture.
// It contains a resource, users for every role, and a label.
// Users of the fixture can use it to create URMs from users to the resource or else.
// The label is there to create label mappings.
type ResourceFixture struct {
BaseFixture
ResourceType influxdb.ResourceType
ResourceID influxdb.ID
LabelID influxdb.ID
}
// NewResourceFixture creates a new ResourceFixture.
func NewResourceFixture(t *testing.T, p *tests.Pipeline, typ influxdb.ResourceType) ResourceFixture {
bfx := NewDefaultBaseFixture(t, p)
l := &influxdb.Label{
OrgID: bfx.Admin.OrgID,
Name: "map_me",
Properties: nil,
}
if err := bfx.Admin.CreateLabel(context.Background(), l); err != nil {
t.Fatalf("error in creating label as admin: %v", err)
}
rid := bfx.Admin.MustCreateResource(t, typ)
return ResourceFixture{
BaseFixture: bfx,
ResourceType: typ,
ResourceID: rid,
LabelID: l.ID,
}
}
// SubjectResourceFixture is a ResourceFixture associated with a subject.
// The subject is a client that performs the actions.
type SubjectResourceFixture struct {
ResourceFixture
SubjectTag ClientTag
}
// NewSubjectResourceFixture creates a new SubjectResourceFixture.
func NewSubjectResourceFixture(t *testing.T, p *tests.Pipeline, subj ClientTag, rt influxdb.ResourceType) SubjectResourceFixture {
return SubjectResourceFixture{
ResourceFixture: NewResourceFixture(t, p, rt),
SubjectTag: subj,
}
}
func (f SubjectResourceFixture) Subject() *tests.Client {
return f.GetClient(f.SubjectTag)
}
// ResourceSubjectIteration is the iteration on all the resources and subjects given.
// One can specify what do to for an iteration step by passing a ResourceSubjectIterationStepFn
// to `ResourceSubjectIteration.Do` or `ResourceSubjectIteration.ParDo`.
type ResourceSubjectIteration struct {
subjects []ClientTag
}
// Creates a ResourceSubjectIteration for applying a ResourceSubjectIterationStepFn to each
// resource-subject couple.
func ForEachResourceSubject(subjects ...ClientTag) ResourceSubjectIteration {
return ResourceSubjectIteration{
subjects: subjects,
}
}
// ResourceSubjectIterationStepFn is a function applied to each step of iteration on resources
// and subjects.
type ResourceSubjectIterationStepFn func(subj ClientTag, rt influxdb.ResourceType)
func (p ResourceSubjectIteration) Do(f ResourceSubjectIterationStepFn) {
for _, typ := range influxdb.OrgResourceTypes {
for _, subj := range p.subjects {
f(subj, typ)
}
}
}

View File

@ -1,53 +0,0 @@
package pipeline
import (
"fmt"
"strings"
"testing"
"github.com/influxdata/influxdb/v2"
)
func check(t *testing.T, gotErr error, want string, cmpErrFn func() error) {
t.Helper()
if len(want) > 0 {
if gotErr == nil {
t.Errorf("expected error got none")
} else {
if err := cmpErrFn(); err != nil {
t.Error(err)
}
}
} else {
if gotErr != nil {
t.Errorf("unexpected error: %v", gotErr)
}
}
}
// CheckErr checks if `errorMsgPart` is in the error message of `gotErr`.
// If `errorMsgPart` is an empty string, it fails if `gotErr != nil`.
func CheckErr(t *testing.T, gotErr error, errorMsgPart string) {
t.Helper()
check(t, gotErr, errorMsgPart, func() error {
if !strings.Contains(gotErr.Error(), errorMsgPart) {
return fmt.Errorf("unexpected error message: %v", gotErr)
}
return nil
})
}
// CheckHTTPErr checks if `gotErr` has the specified HTTP `code`.
// If `code` is an empty string, it fails if `gotErr != nil`.
func CheckHTTPErr(t *testing.T, gotErr error, code string) {
t.Helper()
check(t, gotErr, code, func() error {
if gotCode := influxdb.ErrorCode(gotErr); gotCode != code {
t.Errorf("unexpected error code: %v\n\tcomplete error: %v", gotCode, gotErr)
}
return nil
})
}

View File

@ -11,42 +11,34 @@ import (
"go.uber.org/zap/zaptest"
)
// A Pipeline is a collection of node types that act as a pipeline for writing
// and reading data.
// A Pipeline is responsible for configuring launcher.TestLauncher
// with default values so it may be used for end-to-end integration
// tests.
type Pipeline struct {
Launcher *launcher.TestLauncher
DefaultOrgID influxdb.ID
DefaultBucketID influxdb.ID
DefaultUserID influxdb.ID
openClosers OpenClosers
}
// pipelineConfig tracks the pre-configuration for a pipeline.
// Because this struct is relatively complex, you should not instantiate it directly,
// but rather pass PipelineOption values to NewPipeline.
type pipelineConfig struct {
logger *zap.Logger
}
// NewDefaultPipeline creates a Pipeline with one storage node, one query node
// and one gateway node.
// NewDefaultPipeline creates a Pipeline with default
// values.
//
// Further, a single user, org, bucket and token are created. See
// NewDefaultGatewayNode for more details.
// It is retained for compatibility with cloud tests.
func NewDefaultPipeline(t *testing.T, opts ...PipelineOption) *DefaultPipeline {
opts = append(WithDefaults(), opts...)
return &DefaultPipeline{Pipeline: NewPipeline(t, opts...)}
}
// NewPipeline returns a pipeline with the given options applied to the configuration as appropriate.
// Each query node will be connected to the read service of all storage nodes.
//
// Each storage and query node share the same configs.
//
// A single user, org, bucket and token are created. See NewDefaultGatewayNode
// for more details.
// A single user, org, bucket and token are created.
func NewPipeline(tb testing.TB, opts ...PipelineOption) *Pipeline {
tb.Helper()
@ -104,10 +96,6 @@ func NewPipeline(tb testing.TB, opts ...PipelineOption) *Pipeline {
// Open opens all the components of the pipeline.
func (p *Pipeline) Open() error {
if err := p.openClosers.OpenAll(); err != nil {
return err
}
return nil
}
@ -120,8 +108,7 @@ func (p *Pipeline) MustOpen() {
// Close closes all the components of the pipeline.
func (p *Pipeline) Close() error {
_ = p.Launcher.Shutdown(context.Background())
return p.openClosers.CloseAll()
return p.Launcher.Shutdown(context.Background())
}
// MustClose closes the pipeline, panicking if any error is encountered.
@ -131,14 +118,14 @@ func (p *Pipeline) MustClose() {
}
}
// MustNewAdminClient returns a default client pointed at the gateway.
// MustNewAdminClient returns a default client that will direct requests to Launcher.
//
// operator token is authorized to do anything in the system.
// The operator token is authorized to do anything in the system.
func (p *Pipeline) MustNewAdminClient() *Client {
return p.MustNewClient(p.DefaultOrgID, p.DefaultBucketID, OperToken)
}
// MustNewClient returns a client pointed at the gateway.
// MustNewClient returns a client that will direct requests to Launcher.
func (p *Pipeline) MustNewClient(org, bucket influxdb.ID, token string) *Client {
config := ClientConfig{
UserID: p.DefaultUserID,
@ -154,7 +141,7 @@ func (p *Pipeline) MustNewClient(org, bucket influxdb.ID, token string) *Client
return svc
}
// NewBrowserClient returns a client pointed at the gateway with a cookie session.
// NewBrowserClient returns a client with a cookie session that will direct requests to Launcher.
func (p *Pipeline) NewBrowserClient(org, bucket influxdb.ID, session *influxdb.Session) (*Client, error) {
config := ClientConfig{
UserID: p.DefaultUserID,
@ -169,7 +156,7 @@ func (p *Pipeline) NewBrowserClient(org, bucket influxdb.ID, session *influxdb.S
// BrowserFor will create a user, session, and browser client.
// The generated browser points to the given org and bucket.
//
// The user and session are inserted directly into the gateway backing store.
// The user and session are inserted directly into the backing store.
func (p *Pipeline) BrowserFor(org, bucket influxdb.ID, username string) (*Client, influxdb.ID, error) {
ctx := context.Background()
user := &influxdb.User{
@ -189,14 +176,12 @@ func (p *Pipeline) BrowserFor(org, bucket influxdb.ID, username string) (*Client
return client, user.ID, err
}
// Flush makes sure the PointsWriter has flushed all of the writes to kafka and the
// storage nodes have observed the writes.
// Flush is a no-op and retained for compatibility with tests from cloud.
func (p *Pipeline) Flush() {
}
// DefaultPipeline simplifies some of Pipeline method calls.
// DefaultPipeline is a wrapper for Pipeline and is retained
// for compatibility with cloud tests.
type DefaultPipeline struct {
*Pipeline
Writer *Client
}