Merge pull request #1349 from influxdata/integration-tests
Add end-to-end write/query integration testing.pull/10616/head
commit
f2677caf68
|
@ -3,6 +3,8 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
nethttp "net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
|
@ -10,9 +12,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform/snowflake"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/bolt"
|
||||
"github.com/influxdata/platform/chronograf/server"
|
||||
|
@ -26,6 +25,7 @@ import (
|
|||
"github.com/influxdata/platform/nats"
|
||||
"github.com/influxdata/platform/query"
|
||||
_ "github.com/influxdata/platform/query/builtin"
|
||||
"github.com/influxdata/platform/snowflake"
|
||||
"github.com/influxdata/platform/source"
|
||||
"github.com/influxdata/platform/storage"
|
||||
"github.com/influxdata/platform/storage/readservice"
|
||||
|
@ -37,54 +37,143 @@ import (
|
|||
_ "github.com/influxdata/platform/tsdb/tsi1"
|
||||
_ "github.com/influxdata/platform/tsdb/tsm1"
|
||||
pzap "github.com/influxdata/platform/zap"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// exit with SIGINT and SIGTERM
|
||||
ctx := context.Background()
|
||||
ctx = signals.WithStandardSignals(ctx)
|
||||
|
||||
m := NewMain()
|
||||
if err := m.Run(ctx, os.Args[1:]...); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
// Attempt clean shutdown.
|
||||
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer cancel()
|
||||
m.Shutdown(ctx)
|
||||
}
|
||||
|
||||
// Main represents the main program execution.
|
||||
type Main struct {
|
||||
wg sync.WaitGroup
|
||||
cancel func()
|
||||
|
||||
logLevel string
|
||||
httpBindAddress string
|
||||
boltPath string
|
||||
natsPath string
|
||||
developerMode bool
|
||||
enginePath string
|
||||
|
||||
boltClient *bolt.Client
|
||||
engine *storage.Engine
|
||||
|
||||
httpPort int
|
||||
httpServer *nethttp.Server
|
||||
|
||||
natsServer *nats.Server
|
||||
|
||||
scheduler *taskbackend.TickScheduler
|
||||
|
||||
logger *zap.Logger
|
||||
|
||||
Stdin io.Reader
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
}
|
||||
|
||||
// NewMain returns a new instance of Main connected to standard in/out/err.
|
||||
func NewMain() *Main {
|
||||
return &Main{
|
||||
Stdin: os.Stdin,
|
||||
Stdout: os.Stdout,
|
||||
Stderr: os.Stderr,
|
||||
}
|
||||
}
|
||||
|
||||
// URL returns the URL to connect to the HTTP server.
|
||||
func (m *Main) URL() string {
|
||||
return fmt.Sprintf("http://localhost:%d", m.httpPort)
|
||||
}
|
||||
|
||||
// Shutdown shuts down the HTTP server and waits for all services to clean up.
|
||||
func (m *Main) Shutdown(ctx context.Context) {
|
||||
m.cancel()
|
||||
m.httpServer.Shutdown(ctx)
|
||||
|
||||
m.logger.Info("Stopping", zap.String("service", "task"))
|
||||
m.scheduler.Stop()
|
||||
|
||||
m.logger.Info("Stopping", zap.String("service", "nats"))
|
||||
m.natsServer.Close()
|
||||
|
||||
m.logger.Info("Stopping", zap.String("service", "bolt"))
|
||||
if err := m.boltClient.Close(); err != nil {
|
||||
m.logger.Info("failed closing bolt", zap.Error(err))
|
||||
}
|
||||
|
||||
m.logger.Info("Stopping", zap.String("service", "storage-engine"))
|
||||
if err := m.engine.Close(); err != nil {
|
||||
m.logger.Error("failed to close engine", zap.Error(err))
|
||||
}
|
||||
|
||||
m.wg.Wait()
|
||||
|
||||
m.logger.Sync()
|
||||
}
|
||||
|
||||
// Run executes the program with the given CLI arguments.
|
||||
func (m *Main) Run(ctx context.Context, args ...string) error {
|
||||
dir, err := fs.InfluxDir()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Failed to determine influx directory: %v", err)
|
||||
os.Exit(1)
|
||||
return fmt.Errorf("Failed to determine influx directory: %v", err)
|
||||
}
|
||||
|
||||
prog := &cli.Program{
|
||||
Name: "influxd",
|
||||
Run: run,
|
||||
Run: func() error { return m.run(ctx) },
|
||||
Opts: []cli.Opt{
|
||||
{
|
||||
DestP: &logLevel,
|
||||
DestP: &m.logLevel,
|
||||
Flag: "log-level",
|
||||
Default: "info",
|
||||
Desc: "supported log levels are debug, info, and error",
|
||||
},
|
||||
{
|
||||
DestP: &httpBindAddress,
|
||||
DestP: &m.httpBindAddress,
|
||||
Flag: "http-bind-address",
|
||||
Default: ":9999",
|
||||
Desc: "bind address for the REST HTTP API",
|
||||
},
|
||||
{
|
||||
DestP: &boltPath,
|
||||
DestP: &m.boltPath,
|
||||
Flag: "bolt-path",
|
||||
Default: filepath.Join(dir, "influxd.bolt"),
|
||||
Desc: "path to boltdb database",
|
||||
},
|
||||
{
|
||||
DestP: &developerMode,
|
||||
DestP: &m.developerMode,
|
||||
Flag: "developer-mode",
|
||||
Default: false,
|
||||
Desc: "serve assets from the local filesystem in developer mode",
|
||||
},
|
||||
{
|
||||
DestP: &natsPath,
|
||||
DestP: &m.natsPath,
|
||||
Flag: "nats-path",
|
||||
Default: filepath.Join(dir, "nats"),
|
||||
Desc: "path to NATS queue for scraping tasks",
|
||||
},
|
||||
{
|
||||
DestP: &enginePath,
|
||||
DestP: &m.enginePath,
|
||||
Flag: "engine-path",
|
||||
Default: filepath.Join(dir, "engine"),
|
||||
Desc: "path to persistent engine files",
|
||||
|
@ -93,28 +182,15 @@ func main() {
|
|||
}
|
||||
|
||||
cmd := cli.NewCommand(prog)
|
||||
if err := cmd.Execute(); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
cmd.SetArgs(args)
|
||||
return cmd.Execute()
|
||||
}
|
||||
|
||||
var (
|
||||
logLevel string
|
||||
httpBindAddress string
|
||||
boltPath string
|
||||
natsPath string
|
||||
developerMode bool
|
||||
enginePath string
|
||||
)
|
||||
|
||||
func run() error {
|
||||
ctx := context.Background()
|
||||
// exit with SIGINT and SIGTERM
|
||||
ctx = signals.WithStandardSignals(ctx)
|
||||
func (m *Main) run(ctx context.Context) (err error) {
|
||||
ctx, m.cancel = context.WithCancel(ctx)
|
||||
|
||||
var lvl zapcore.Level
|
||||
if err := lvl.Set(logLevel); err != nil {
|
||||
if err := lvl.Set(m.logLevel); err != nil {
|
||||
return fmt.Errorf("unknown log level; supported levels are debug, info, and error")
|
||||
}
|
||||
|
||||
|
@ -123,93 +199,77 @@ func run() error {
|
|||
Format: "auto",
|
||||
Level: lvl,
|
||||
}
|
||||
logger, err := logconf.New(os.Stdout)
|
||||
m.logger, err = logconf.New(m.Stdout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer logger.Sync()
|
||||
|
||||
// set tracing
|
||||
tracer := new(pzap.Tracer)
|
||||
tracer.Logger = logger
|
||||
tracer.Logger = m.logger
|
||||
tracer.IDGenerator = snowflake.NewIDGenerator()
|
||||
opentracing.SetGlobalTracer(tracer)
|
||||
|
||||
reg := prom.NewRegistry()
|
||||
reg.MustRegister(prometheus.NewGoCollector())
|
||||
reg.WithLogger(logger)
|
||||
reg.WithLogger(m.logger)
|
||||
|
||||
c := bolt.NewClient()
|
||||
c.Path = boltPath
|
||||
c.WithLogger(logger.With(zap.String("service", "bolt")))
|
||||
m.boltClient = bolt.NewClient()
|
||||
m.boltClient.Path = m.boltPath
|
||||
m.boltClient.WithLogger(m.logger.With(zap.String("service", "bolt")))
|
||||
|
||||
if err := c.Open(ctx); err != nil {
|
||||
logger.Error("failed opening bolt", zap.Error(err))
|
||||
if err := m.boltClient.Open(ctx); err != nil {
|
||||
m.logger.Error("failed opening bolt", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer func(logger *zap.Logger) {
|
||||
logger = logger.With(zap.String("service", "bolt"))
|
||||
logger.Info("Stopping")
|
||||
if err := c.Close(); err != nil {
|
||||
logger.Info("failed closing bolt", zap.String("service", "bolt"))
|
||||
}
|
||||
}(logger)
|
||||
|
||||
var (
|
||||
orgSvc platform.OrganizationService = c
|
||||
authSvc platform.AuthorizationService = c
|
||||
userSvc platform.UserService = c
|
||||
viewSvc platform.ViewService = c
|
||||
macroSvc platform.MacroService = c
|
||||
bucketSvc platform.BucketService = c
|
||||
sourceSvc platform.SourceService = c
|
||||
sessionSvc platform.SessionService = c
|
||||
basicAuthSvc platform.BasicAuthService = c
|
||||
dashboardSvc platform.DashboardService = c
|
||||
dashboardLogSvc platform.DashboardOperationLogService = c
|
||||
userLogSvc platform.UserOperationLogService = c
|
||||
bucketLogSvc platform.BucketOperationLogService = c
|
||||
orgLogSvc platform.OrganizationOperationLogService = c
|
||||
onboardingSvc platform.OnboardingService = c
|
||||
userResourceSvc platform.UserResourceMappingService = c
|
||||
scraperTargetSvc platform.ScraperTargetStoreService = c
|
||||
orgSvc platform.OrganizationService = m.boltClient
|
||||
authSvc platform.AuthorizationService = m.boltClient
|
||||
userSvc platform.UserService = m.boltClient
|
||||
viewSvc platform.ViewService = m.boltClient
|
||||
macroSvc platform.MacroService = m.boltClient
|
||||
bucketSvc platform.BucketService = m.boltClient
|
||||
sourceSvc platform.SourceService = m.boltClient
|
||||
sessionSvc platform.SessionService = m.boltClient
|
||||
basicAuthSvc platform.BasicAuthService = m.boltClient
|
||||
dashboardSvc platform.DashboardService = m.boltClient
|
||||
dashboardLogSvc platform.DashboardOperationLogService = m.boltClient
|
||||
userLogSvc platform.UserOperationLogService = m.boltClient
|
||||
bucketLogSvc platform.BucketOperationLogService = m.boltClient
|
||||
orgLogSvc platform.OrganizationOperationLogService = m.boltClient
|
||||
onboardingSvc platform.OnboardingService = m.boltClient
|
||||
scraperTargetSvc platform.ScraperTargetStoreService = m.boltClient
|
||||
telegrafSvc platform.TelegrafConfigStore = m.boltClient
|
||||
userResourceSvc platform.UserResourceMappingService = m.boltClient
|
||||
)
|
||||
|
||||
chronografSvc, err := server.NewServiceV2(ctx, c.DB())
|
||||
chronografSvc, err := server.NewServiceV2(ctx, m.boltClient.DB())
|
||||
if err != nil {
|
||||
logger.Error("failed creating chronograf service", zap.Error(err))
|
||||
m.logger.Error("failed creating chronograf service", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var storageQueryService query.ProxyQueryService
|
||||
|
||||
var telegrafSvc platform.TelegrafConfigStore = c
|
||||
|
||||
var pointsWriter storage.PointsWriter
|
||||
{
|
||||
config := storage.NewConfig()
|
||||
|
||||
engine := storage.NewEngine(enginePath, config, storage.WithRetentionEnforcer(bucketSvc))
|
||||
engine.WithLogger(logger)
|
||||
reg.MustRegister(engine.PrometheusCollectors()...)
|
||||
m.engine = storage.NewEngine(m.enginePath, config, storage.WithRetentionEnforcer(bucketSvc))
|
||||
m.engine.WithLogger(m.logger)
|
||||
reg.MustRegister(m.engine.PrometheusCollectors()...)
|
||||
|
||||
if err := engine.Open(); err != nil {
|
||||
logger.Error("failed to open engine", zap.Error(err))
|
||||
if err := m.engine.Open(); err != nil {
|
||||
m.logger.Error("failed to open engine", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
logger.Info("Stopping", zap.String("service", "storage-engine"))
|
||||
if err := engine.Close(); err != nil {
|
||||
logger.Error("failed to close engine", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
pointsWriter = engine
|
||||
pointsWriter = m.engine
|
||||
|
||||
service, err := readservice.NewProxyQueryService(
|
||||
engine, bucketSvc, orgSvc, logger.With(zap.String("service", "storage-reads")))
|
||||
m.engine, bucketSvc, orgSvc, m.logger.With(zap.String("service", "storage-reads")))
|
||||
if err != nil {
|
||||
logger.Error("failed to create query service", zap.Error(err))
|
||||
m.logger.Error("failed to create query service", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -219,76 +279,67 @@ func run() error {
|
|||
var queryService query.QueryService = storageQueryService.(query.ProxyQueryServiceBridge).QueryService
|
||||
var taskSvc platform.TaskService
|
||||
{
|
||||
boltStore, err := taskbolt.New(c.DB(), "tasks")
|
||||
boltStore, err := taskbolt.New(m.boltClient.DB(), "tasks")
|
||||
if err != nil {
|
||||
logger.Error("failed opening task bolt", zap.Error(err))
|
||||
m.logger.Error("failed opening task bolt", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
executor := taskexecutor.NewQueryServiceExecutor(logger.With(zap.String("service", "task-executor")), queryService, boltStore)
|
||||
executor := taskexecutor.NewQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), queryService, boltStore)
|
||||
|
||||
// TODO(lh): Replace NopLogWriter with real log writer
|
||||
scheduler := taskbackend.NewScheduler(boltStore, executor, taskbackend.NopLogWriter{}, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, time.Second), taskbackend.WithLogger(logger))
|
||||
scheduler.Start(ctx)
|
||||
defer func() {
|
||||
logger.Info("Stopping", zap.String("service", "task"))
|
||||
scheduler.Stop()
|
||||
}()
|
||||
reg.MustRegister(scheduler.PrometheusCollectors()...)
|
||||
m.scheduler = taskbackend.NewScheduler(boltStore, executor, taskbackend.NopLogWriter{}, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, time.Second), taskbackend.WithLogger(m.logger))
|
||||
m.scheduler.Start(ctx)
|
||||
reg.MustRegister(m.scheduler.PrometheusCollectors()...)
|
||||
|
||||
// TODO(lh): Replace NopLogReader with real log reader
|
||||
taskSvc = task.PlatformAdapter(coordinator.New(logger.With(zap.String("service", "task-coordinator")), scheduler, boltStore), taskbackend.NopLogReader{}, scheduler)
|
||||
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), taskbackend.NopLogReader{}, m.scheduler)
|
||||
// TODO(lh): Add in `taskSvc = task.NewValidator(taskSvc)` once we have Authentication coming in the context.
|
||||
// see issue #563
|
||||
}
|
||||
|
||||
// NATS streaming server
|
||||
natsServer := nats.NewServer(nats.Config{FilestoreDir: natsPath})
|
||||
if err := natsServer.Open(); err != nil {
|
||||
logger.Error("failed to start nats streaming server", zap.Error(err))
|
||||
m.natsServer = nats.NewServer(nats.Config{FilestoreDir: m.natsPath})
|
||||
if err := m.natsServer.Open(); err != nil {
|
||||
m.logger.Error("failed to start nats streaming server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
logger.Info("Stopping", zap.String("service", "nats"))
|
||||
natsServer.Close()
|
||||
}()
|
||||
|
||||
publisher := nats.NewAsyncPublisher("nats-publisher")
|
||||
if err := publisher.Open(); err != nil {
|
||||
logger.Error("failed to connect to streaming server", zap.Error(err))
|
||||
m.logger.Error("failed to connect to streaming server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed.
|
||||
subscriber := nats.NewQueueSubscriber("nats-subscriber")
|
||||
if err := subscriber.Open(); err != nil {
|
||||
logger.Error("failed to connect to streaming server", zap.Error(err))
|
||||
m.logger.Error("failed to connect to streaming server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
scraperScheduler, err := gather.NewScheduler(10, logger, scraperTargetSvc, publisher, subscriber, 0, 0)
|
||||
scraperScheduler, err := gather.NewScheduler(10, m.logger, scraperTargetSvc, publisher, subscriber, 0, 0)
|
||||
if err != nil {
|
||||
logger.Error("failed to create scraper subscriber", zap.Error(err))
|
||||
m.logger.Error("failed to create scraper subscriber", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
m.wg.Add(1)
|
||||
go func(logger *zap.Logger) {
|
||||
defer wg.Done()
|
||||
defer m.wg.Done()
|
||||
logger = logger.With(zap.String("service", "scraper"))
|
||||
if err := scraperScheduler.Run(ctx); err != nil {
|
||||
logger.Error("failed scraper service", zap.Error(err))
|
||||
}
|
||||
logger.Info("Stopping")
|
||||
}(logger)
|
||||
}(m.logger)
|
||||
|
||||
httpServer := &nethttp.Server{
|
||||
Addr: httpBindAddress,
|
||||
m.httpServer = &nethttp.Server{
|
||||
Addr: m.httpBindAddress,
|
||||
}
|
||||
|
||||
handlerConfig := &http.APIBackend{
|
||||
Logger: logger,
|
||||
Logger: m.logger,
|
||||
NewBucketService: source.NewBucketService,
|
||||
NewQueryService: source.NewQueryService,
|
||||
PointsWriter: pointsWriter,
|
||||
|
@ -316,30 +367,38 @@ func run() error {
|
|||
}
|
||||
|
||||
// HTTP server
|
||||
wg.Add(1)
|
||||
httpLogger := m.logger.With(zap.String("service", "http"))
|
||||
platformHandler := http.NewPlatformHandler(handlerConfig)
|
||||
reg.MustRegister(platformHandler.PrometheusCollectors()...)
|
||||
|
||||
h := http.NewHandlerFromRegistry("platform", reg)
|
||||
h.Handler = platformHandler
|
||||
h.Logger = httpLogger
|
||||
h.Tracer = opentracing.GlobalTracer()
|
||||
|
||||
m.httpServer.Handler = h
|
||||
|
||||
ln, err := net.Listen("tcp", m.httpBindAddress)
|
||||
if err != nil {
|
||||
httpLogger.Error("failed http listener", zap.Error(err))
|
||||
httpLogger.Info("Stopping")
|
||||
return err
|
||||
}
|
||||
|
||||
if addr, ok := ln.Addr().(*net.TCPAddr); ok {
|
||||
m.httpPort = addr.Port
|
||||
}
|
||||
|
||||
m.wg.Add(1)
|
||||
go func(logger *zap.Logger) {
|
||||
defer wg.Done()
|
||||
logger = logger.With(zap.String("service", "http"))
|
||||
platformHandler := http.NewPlatformHandler(handlerConfig)
|
||||
reg.MustRegister(platformHandler.PrometheusCollectors()...)
|
||||
defer m.wg.Done()
|
||||
logger.Info("Listening", zap.String("transport", "http"), zap.String("addr", m.httpBindAddress), zap.Int("port", m.httpPort))
|
||||
|
||||
h := http.NewHandlerFromRegistry("platform", reg)
|
||||
h.Handler = platformHandler
|
||||
h.Logger = logger
|
||||
h.Tracer = opentracing.GlobalTracer()
|
||||
|
||||
httpServer.Handler = h
|
||||
logger.Info("Listening", zap.String("transport", "http"), zap.String("addr", httpBindAddress))
|
||||
if err := httpServer.ListenAndServe(); err != nethttp.ErrServerClosed {
|
||||
if err := m.httpServer.Serve(ln); err != nethttp.ErrServerClosed {
|
||||
logger.Error("failed http service", zap.Error(err))
|
||||
}
|
||||
logger.Info("Stopping")
|
||||
}(logger)
|
||||
}(httpLogger)
|
||||
|
||||
<-ctx.Done()
|
||||
cctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer cancel()
|
||||
httpServer.Shutdown(cctx)
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
package main_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
nethttp "net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/cmd/influxd"
|
||||
"github.com/influxdata/platform/http"
|
||||
)
|
||||
|
||||
// Default context.
|
||||
var ctx = context.Background()
|
||||
|
||||
func TestMain_Setup(t *testing.T) {
|
||||
m := NewMain()
|
||||
if err := m.Run(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer m.Shutdown(ctx)
|
||||
|
||||
svc := &http.SetupService{Addr: m.URL()}
|
||||
if results, err := svc.Generate(ctx, &platform.OnboardingRequest{
|
||||
User: "USER",
|
||||
Password: "PASSWORD",
|
||||
Org: "ORG",
|
||||
Bucket: "BUCKET",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if results.User.ID == 0 {
|
||||
t.Fatal("expected user id")
|
||||
} else if results.Org.ID == 0 {
|
||||
t.Fatal("expected org id")
|
||||
} else if results.Bucket.ID == 0 {
|
||||
t.Fatal("expected bucket id")
|
||||
} else if results.Auth.Token == "" {
|
||||
t.Fatal("expected auth token")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMain_Write(t *testing.T) {
|
||||
m := RunMainOrFail(t, ctx)
|
||||
m.SetupOrFail(t)
|
||||
defer m.ShutdownOrFail(t, ctx)
|
||||
|
||||
// Execute single write against the server.
|
||||
if resp, err := nethttp.DefaultClient.Do(m.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", m.Org.ID, m.Bucket.ID), `m,k=v f=0i 946684800000000000`)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := resp.Body.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if resp.StatusCode != nethttp.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
// Query server to ensure write persists.
|
||||
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
|
||||
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
|
||||
`,result,table,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,0,f,m,v` + "\r\n\r\n"
|
||||
|
||||
var buf bytes.Buffer
|
||||
req := (http.QueryRequest{Query: qs, Org: m.Org}).WithDefaults()
|
||||
if preq, err := req.ProxyRequest(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := m.FluxService().Query(ctx, &buf, preq); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(buf.String(), exp); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Main is a test wrapper for main.Main.
|
||||
type Main struct {
|
||||
*main.Main
|
||||
|
||||
// Root temporary directory for all data.
|
||||
Path string
|
||||
|
||||
// Initialized after calling the Setup() helper.
|
||||
User *platform.User
|
||||
Org *platform.Organization
|
||||
Bucket *platform.Bucket
|
||||
Auth *platform.Authorization
|
||||
|
||||
// Standard in/out/err buffers.
|
||||
Stdin bytes.Buffer
|
||||
Stdout bytes.Buffer
|
||||
Stderr bytes.Buffer
|
||||
}
|
||||
|
||||
// NewMain returns a new instance of Main.
|
||||
func NewMain() *Main {
|
||||
m := &Main{Main: main.NewMain()}
|
||||
m.Main.Stdin = &m.Stdin
|
||||
m.Main.Stdout = &m.Stdout
|
||||
m.Main.Stderr = &m.Stderr
|
||||
if testing.Verbose() {
|
||||
m.Main.Stdout = io.MultiWriter(m.Main.Stdout, os.Stdout)
|
||||
m.Main.Stderr = io.MultiWriter(m.Main.Stderr, os.Stderr)
|
||||
}
|
||||
|
||||
path, err := ioutil.TempDir("", "")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m.Path = path
|
||||
return m
|
||||
}
|
||||
|
||||
// RunMainOrFail initializes and starts the server.
|
||||
func RunMainOrFail(tb testing.TB, ctx context.Context, args ...string) *Main {
|
||||
tb.Helper()
|
||||
m := NewMain()
|
||||
if err := m.Run(ctx, args...); err != nil {
|
||||
tb.Fatal(err)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// Run executes the program with additional arguments to set paths and ports.
|
||||
func (m *Main) Run(ctx context.Context, args ...string) error {
|
||||
args = append(args, "--bolt-path", filepath.Join(m.Path, "influxd.bolt"))
|
||||
args = append(args, "--engine-path", filepath.Join(m.Path, "engine"))
|
||||
args = append(args, "--nats-path", filepath.Join(m.Path, "nats"))
|
||||
args = append(args, "--http-bind-address", "127.0.0.1:0")
|
||||
args = append(args, "--log-level", "debug")
|
||||
return m.Main.Run(ctx, args...)
|
||||
}
|
||||
|
||||
// Shutdown stops the program and cleans up temporary paths.
|
||||
func (m *Main) Shutdown(ctx context.Context) error {
|
||||
m.Main.Shutdown(ctx)
|
||||
return os.RemoveAll(m.Path)
|
||||
}
|
||||
|
||||
// ShutdownOrFail stops the program and cleans up temporary paths. Fail on error.
|
||||
func (m *Main) ShutdownOrFail(tb testing.TB, ctx context.Context) {
|
||||
tb.Helper()
|
||||
if err := m.Shutdown(ctx); err != nil {
|
||||
tb.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetupOrFail creates a new user, bucket, org, and auth token. Fail on error.
|
||||
func (m *Main) SetupOrFail(tb testing.TB) {
|
||||
svc := &http.SetupService{Addr: m.URL()}
|
||||
results, err := svc.Generate(ctx, &platform.OnboardingRequest{
|
||||
User: "USER",
|
||||
Password: "PASSWORD",
|
||||
Org: "ORG",
|
||||
Bucket: "BUCKET",
|
||||
})
|
||||
if err != nil {
|
||||
tb.Fatal(err)
|
||||
}
|
||||
|
||||
m.User = results.User
|
||||
m.Org = results.Org
|
||||
m.Bucket = results.Bucket
|
||||
m.Auth = results.Auth
|
||||
}
|
||||
|
||||
func (m *Main) FluxService() *http.FluxService {
|
||||
return &http.FluxService{Addr: m.URL(), Token: m.Auth.Token}
|
||||
}
|
||||
|
||||
// MustNewHTTPRequest returns a new nethttp.Request with base URL and auth attached. Fail on error.
|
||||
func (m *Main) MustNewHTTPRequest(method, rawurl, body string) *nethttp.Request {
|
||||
req, err := nethttp.NewRequest(method, m.URL()+rawurl, strings.NewReader(body))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
req.Header.Set("Authorization", "Token "+m.Auth.Token)
|
||||
return req
|
||||
}
|
|
@ -27,7 +27,7 @@ type QueryRequest struct {
|
|||
Type string `json:"type"`
|
||||
Dialect QueryDialect `json:"dialect"`
|
||||
|
||||
org *platform.Organization
|
||||
Org *platform.Organization `json:"-"`
|
||||
}
|
||||
|
||||
// QueryDialect is the formatting options for the query response.
|
||||
|
@ -164,10 +164,10 @@ func (r QueryRequest) proxyRequest(now func() time.Time) (*query.ProxyRequest, e
|
|||
// once they are supported.
|
||||
return &query.ProxyRequest{
|
||||
Request: query.Request{
|
||||
OrganizationID: r.org.ID,
|
||||
OrganizationID: r.Org.ID,
|
||||
Compiler: compiler,
|
||||
},
|
||||
Dialect: csv.Dialect{
|
||||
Dialect: &csv.Dialect{
|
||||
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||
NoHeader: noHeader,
|
||||
Delimiter: delimiter,
|
||||
|
@ -218,7 +218,7 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.Organ
|
|||
return nil, err
|
||||
}
|
||||
|
||||
req.org, err = queryOrganization(ctx, r, svc)
|
||||
req.Org, err = queryOrganization(ctx, r, svc)
|
||||
return &req, err
|
||||
}
|
||||
|
||||
|
|
|
@ -281,7 +281,7 @@ func TestFluxHandler_postFluxSpec(t *testing.T) {
|
|||
|
||||
if got := tt.w.Code; got != tt.status {
|
||||
t.Errorf("http.postFluxSpec = got %d\nwant %d", got, tt.status)
|
||||
t.Log(tt.w.HeaderMap)
|
||||
t.Log(tt.w.Header())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -331,7 +331,7 @@ func TestFluxHandler_postFluxPlan(t *testing.T) {
|
|||
|
||||
if got := tt.w.Code; got != tt.status {
|
||||
t.Errorf("http.postFluxPlan = got %d\nwant %d", got, tt.status)
|
||||
t.Log(tt.w.HeaderMap)
|
||||
t.Log(tt.w.Header())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ func TestQueryRequest_WithDefaults(t *testing.T) {
|
|||
Query: tt.fields.Query,
|
||||
Type: tt.fields.Type,
|
||||
Dialect: tt.fields.Dialect,
|
||||
org: tt.fields.org,
|
||||
Org: tt.fields.org,
|
||||
}
|
||||
if got := r.WithDefaults(); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("QueryRequest.WithDefaults() = %v, want %v", got, tt.want)
|
||||
|
@ -168,7 +168,7 @@ func TestQueryRequest_Validate(t *testing.T) {
|
|||
Query: tt.fields.Query,
|
||||
Type: tt.fields.Type,
|
||||
Dialect: tt.fields.Dialect,
|
||||
org: tt.fields.org,
|
||||
Org: tt.fields.org,
|
||||
}
|
||||
if err := r.Validate(); (err != nil) != tt.wantErr {
|
||||
t.Errorf("QueryRequest.Validate() error = %v, wantErr %v", err, tt.wantErr)
|
||||
|
@ -266,7 +266,7 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
|
|||
Query: "howdy",
|
||||
},
|
||||
},
|
||||
Dialect: csv.Dialect{
|
||||
Dialect: &csv.Dialect{
|
||||
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||
NoHeader: false,
|
||||
Delimiter: ',',
|
||||
|
@ -294,7 +294,7 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Dialect: csv.Dialect{
|
||||
Dialect: &csv.Dialect{
|
||||
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||
NoHeader: false,
|
||||
Delimiter: ',',
|
||||
|
@ -323,7 +323,7 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Dialect: csv.Dialect{
|
||||
Dialect: &csv.Dialect{
|
||||
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||
NoHeader: false,
|
||||
Delimiter: ',',
|
||||
|
@ -340,7 +340,7 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
|
|||
Query: tt.fields.Query,
|
||||
Type: tt.fields.Type,
|
||||
Dialect: tt.fields.Dialect,
|
||||
org: tt.fields.org,
|
||||
Org: tt.fields.org,
|
||||
}
|
||||
got, err := r.proxyRequest(tt.now)
|
||||
if (err != nil) != tt.wantErr {
|
||||
|
@ -348,7 +348,7 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
|
|||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("QueryRequest.ProxyRequest() = %v, want %v", got, tt.want)
|
||||
t.Errorf("QueryRequest.ProxyRequest() = %#v, want %#v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -386,7 +386,7 @@ func Test_decodeQueryRequest(t *testing.T) {
|
|||
DateTimeFormat: "RFC3339",
|
||||
Header: func(x bool) *bool { return &x }(true),
|
||||
},
|
||||
org: &platform.Organization{
|
||||
Org: &platform.Organization{
|
||||
ID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(),
|
||||
},
|
||||
},
|
||||
|
@ -452,7 +452,7 @@ func Test_decodeProxyQueryRequest(t *testing.T) {
|
|||
Query: "from()",
|
||||
},
|
||||
},
|
||||
Dialect: csv.Dialect{
|
||||
Dialect: &csv.Dialect{
|
||||
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||
NoHeader: false,
|
||||
Delimiter: ',',
|
||||
|
|
Loading…
Reference in New Issue